dograh/api/services/pipecat/run_pipeline.py
2026-03-07 12:41:24 +05:30

769 lines
28 KiB
Python

import asyncio
from typing import Optional
from fastapi import HTTPException, WebSocket
from loguru import logger
from api.db import db_client
from api.db.models import WorkflowModel
from api.enums import WorkflowRunMode
from api.services.configuration.registry import ServiceProviders
from api.services.pipecat.audio_config import AudioConfig, create_audio_config
from api.services.pipecat.event_handlers import (
register_audio_data_handler,
register_event_handlers,
)
from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from api.services.pipecat.pipeline_builder import (
build_pipeline,
create_pipeline_components,
create_pipeline_task,
)
from api.services.pipecat.pipeline_engine_callbacks_processor import (
PipelineEngineCallbacksProcessor,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.pipecat.realtime_feedback_observer import (
RealtimeFeedbackObserver,
register_turn_log_handlers,
)
from api.services.pipecat.service_factory import (
create_llm_service,
create_stt_service,
create_tts_service,
)
from api.services.pipecat.tracing_config import setup_tracing_exporter
from api.services.pipecat.transport_setup import (
create_ari_transport,
create_cloudonix_transport,
create_twilio_transport,
create_vobiz_transport,
create_vonage_transport,
create_webrtc_transport,
)
from api.services.pipecat.ws_sender_registry import get_ws_sender
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector
from pipecat.pipeline.base_task import PipelineTaskParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.turns.user_mute import (
CallbackUserMuteStrategy,
FunctionCallUserMuteStrategy,
MuteUntilFirstBotCompleteUserMuteStrategy,
)
from pipecat.turns.user_start import (
ExternalUserTurnStartStrategy,
TranscriptionUserTurnStartStrategy,
)
from pipecat.turns.user_start.vad_user_turn_start_strategy import (
VADUserTurnStartStrategy,
)
from pipecat.turns.user_stop import (
ExternalUserTurnStopStrategy,
SpeechTimeoutUserTurnStopStrategy,
TurnAnalyzerUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.enums import EndTaskReason, RealtimeFeedbackType
from pipecat.utils.run_context import set_current_run_id
# Setup tracing if enabled
setup_tracing_exporter()
async def run_pipeline_twilio(
websocket_client: WebSocket,
stream_sid: str,
call_sid: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Twilio connections"""
logger.debug(
f"Running pipeline for Twilio connection with workflow_id: {workflow_id} and workflow_run_id: {workflow_run_id}"
)
set_current_run_id(workflow_run_id)
# Store call ID in cost_info for later cost calculation (provider-agnostic)
cost_info = {"call_id": call_sid}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Get workflow to extract all pipeline configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
# Create audio configuration for Twilio
audio_config = create_audio_config(WorkflowRunMode.TWILIO.value)
transport = await create_twilio_transport(
websocket_client,
stream_sid,
call_sid,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
async def run_pipeline_vonage(
websocket_client,
call_uuid: str,
workflow: WorkflowModel,
organization_id: int,
workflow_id: int,
workflow_run_id: int,
user_id: int,
):
"""Run pipeline for Vonage WebSocket connections.
Vonage uses raw PCM audio over WebSocket instead of base64-encoded μ-law.
The audio is transmitted as binary frames at 16kHz by default.
"""
logger.info(f"Starting Vonage pipeline for workflow run {workflow_run_id}")
set_current_run_id(workflow_run_id)
# Store call ID in cost_info for later cost calculation (provider-agnostic)
cost_info = {"call_id": call_uuid}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Extract VAD and ambient noise config from workflow
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
try:
# Setup audio config for Vonage using the centralized config
audio_config = create_audio_config(WorkflowRunMode.VONAGE.value)
# Create Vonage transport
transport = await create_vonage_transport(
websocket_client,
call_uuid,
workflow_run_id,
audio_config,
organization_id,
vad_config,
ambient_noise_config,
)
# No special handshake needed for Vonage
# Audio streaming starts immediately
# Run the pipeline (same as Twilio/WebRTC)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
call_context_vars={},
audio_config=audio_config,
)
except Exception as e:
logger.error(f"Error in Vonage pipeline: {e}")
raise
async def run_pipeline_ari(
websocket_client: WebSocket,
channel_id: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Asterisk ARI WebSocket connections.
ARI uses raw 16-bit signed linear PCM (SLIN16) at 16kHz
transmitted as binary WebSocket frames via chan_websocket.
"""
logger.info(f"Starting ARI pipeline for workflow run {workflow_run_id}")
set_current_run_id(workflow_run_id)
# Store call ID (channel_id) in cost_info
cost_info = {"call_id": channel_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Get workflow to extract configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
try:
audio_config = create_audio_config(WorkflowRunMode.ARI.value)
transport = await create_ari_transport(
websocket_client,
channel_id,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
except Exception as e:
logger.error(f"Error in ARI pipeline: {e}")
raise
async def run_pipeline_vobiz(
websocket_client: WebSocket,
stream_id: str,
call_id: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Vobiz using Plivo-compatible WebSocket protocol."""
logger.info(
f"[run {workflow_run_id}] Starting Vobiz pipeline - "
f"stream_id={stream_id}, call_id={call_id}, workflow_id={workflow_id}"
)
set_current_run_id(workflow_run_id)
cost_info = {"call_id": call_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
try:
audio_config = create_audio_config(WorkflowRunMode.VOBIZ.value)
logger.info(
f"[run {workflow_run_id}] Vobiz audio config: "
f"sample_rate={audio_config.transport_in_sample_rate}Hz, format=MULAW"
)
transport = await create_vobiz_transport(
websocket_client,
stream_id,
call_id,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
logger.info(f"[run {workflow_run_id}] Starting Vobiz pipeline execution")
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
logger.info(f"[run {workflow_run_id}] Vobiz pipeline completed successfully")
except Exception as e:
logger.error(
f"[run {workflow_run_id}] Error in Vobiz pipeline: {e}", exc_info=True
)
raise
async def run_pipeline_cloudonix(
websocket_client: WebSocket,
stream_sid: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Cloudonix connections"""
logger.debug(
f"Running pipeline for Cloudonix connection with workflow_id: {workflow_id} and workflow_run_id: {workflow_run_id}"
)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
call_id = workflow_run.gathered_context.get("call_id")
if not call_id:
logger.warning("call_id not found in gathered_context")
raise Exception()
# Store call ID in cost_info for later cost calculation (provider-agnostic)
cost_info = {"call_id": call_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Get workflow to extract all pipeline configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
# Create audio configuration for Cloudonix
audio_config = create_audio_config(WorkflowRunMode.CLOUDONIX.value)
transport = await create_cloudonix_transport(
websocket_client,
call_id,
stream_sid,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
async def run_pipeline_smallwebrtc(
webrtc_connection: SmallWebRTCConnection,
workflow_id: int,
workflow_run_id: int,
user_id: int,
call_context_vars: dict = {},
) -> None:
"""Run pipeline for WebRTC connections"""
logger.debug(
f"Running pipeline for WebRTC connection with workflow_id: {workflow_id} and workflow_run_id: {workflow_run_id}"
)
set_current_run_id(workflow_run_id)
# Get workflow to extract all pipeline configurations
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
# Create audio configuration for WebRTC
audio_config = create_audio_config(WorkflowRunMode.SMALLWEBRTC.value)
transport = create_webrtc_transport(
webrtc_connection,
workflow_run_id,
audio_config,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
call_context_vars=call_context_vars,
audio_config=audio_config,
)
async def _run_pipeline(
transport,
workflow_id: int,
workflow_run_id: int,
user_id: int,
call_context_vars: dict = {},
audio_config: AudioConfig = None,
) -> None:
"""
Run the pipeline with the given transport and configuration
Args:
transport: The transport to use for the pipeline
workflow_id: The ID of the workflow
workflow_run_id: The ID of the workflow run
user_id: The ID of the user
mode: The mode of the pipeline (twilio or smallwebrtc)
"""
workflow_run = await db_client.get_workflow_run(workflow_run_id, user_id)
# If the workflow run is already completed, we don't need to run it again
if workflow_run.is_completed:
raise HTTPException(status_code=400, detail="Workflow run already completed")
merged_call_context_vars = workflow_run.initial_context
# If there is some extra call_context_vars, update them
if call_context_vars:
merged_call_context_vars = {**merged_call_context_vars, **call_context_vars}
await db_client.update_workflow_run(
workflow_run_id, initial_context=merged_call_context_vars
)
# Get user configuration
user_config = await db_client.get_user_configurations(user_id)
# Get workflow first so we can extract configurations before creating services
workflow = await db_client.get_workflow(workflow_id, user_id)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
# Extract configurations from workflow configurations
max_call_duration_seconds = 300 # Default 5 minutes
max_user_idle_timeout = 10.0 # Default 10 seconds
smart_turn_stop_secs = 2.0 # Default 2 seconds for incomplete turn timeout
turn_stop_strategy = "transcription" # Default to transcription-based detection
keyterms = None # Dictionary words for STT boosting
if workflow.workflow_configurations:
# Use workflow-specific max call duration if provided
if "max_call_duration" in workflow.workflow_configurations:
max_call_duration_seconds = workflow.workflow_configurations[
"max_call_duration"
]
# Use workflow-specific max user idle timeout if provided
if "max_user_idle_timeout" in workflow.workflow_configurations:
max_user_idle_timeout = workflow.workflow_configurations[
"max_user_idle_timeout"
]
# Use workflow-specific smart turn stop timeout if provided
if "smart_turn_stop_secs" in workflow.workflow_configurations:
smart_turn_stop_secs = workflow.workflow_configurations[
"smart_turn_stop_secs"
]
# Use workflow-specific turn stop strategy if provided
if "turn_stop_strategy" in workflow.workflow_configurations:
turn_stop_strategy = workflow.workflow_configurations["turn_stop_strategy"]
# Extract dictionary words and convert to keyterms list
if "dictionary" in workflow.workflow_configurations:
dictionary = workflow.workflow_configurations["dictionary"]
if dictionary and isinstance(dictionary, str):
# Split by comma and strip whitespace from each term
keyterms = [
term.strip() for term in dictionary.split(",") if term.strip()
]
# Create services based on user configuration
stt = create_stt_service(user_config, audio_config, keyterms=keyterms)
tts = create_tts_service(user_config, audio_config)
llm = create_llm_service(user_config)
workflow_graph = WorkflowGraph(
ReactFlowDTO.model_validate(workflow.workflow_definition_with_fallback)
)
# Create in-memory logs buffer early so it can be used by engine callbacks
in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id)
# Create node transition callback (always logs to buffer, optionally streams to WS)
ws_sender = get_ws_sender(workflow_run_id)
async def send_node_transition(
node_id: str,
node_name: str,
previous_node_id: Optional[str],
previous_node_name: Optional[str],
) -> None:
"""Send node transition event to logs buffer and optionally via WebSocket."""
# Update current node on the buffer so subsequent events are tagged
in_memory_logs_buffer.set_current_node(node_id, node_name)
message = {
"type": RealtimeFeedbackType.NODE_TRANSITION.value,
"payload": {
"node_id": node_id,
"node_name": node_name,
"previous_node_id": previous_node_id,
"previous_node_name": previous_node_name,
},
}
# Send via WebSocket if available
if ws_sender:
try:
await ws_sender({**message, "node_id": node_id, "node_name": node_name})
except Exception as e:
logger.debug(f"Failed to send node transition via WebSocket: {e}")
# Always log to in-memory buffer (node_id/node_name injected by buffer's append)
try:
await in_memory_logs_buffer.append(message)
except Exception as e:
logger.error(f"Failed to append node transition to logs buffer: {e}")
node_transition_callback = send_node_transition
# Extract embeddings configuration from user config
embeddings_api_key = None
embeddings_model = None
embeddings_base_url = None
if user_config and user_config.embeddings:
embeddings_api_key = user_config.embeddings.api_key
embeddings_model = user_config.embeddings.model
embeddings_base_url = getattr(user_config.embeddings, "base_url", None)
engine = PipecatEngine(
llm=llm,
workflow=workflow_graph,
call_context_vars=merged_call_context_vars,
workflow_run_id=workflow_run_id,
node_transition_callback=node_transition_callback,
embeddings_api_key=embeddings_api_key,
embeddings_model=embeddings_model,
embeddings_base_url=embeddings_base_url,
)
# Create pipeline components
audio_buffer, context = create_pipeline_components(audio_config)
# Set the context, audio_config, and audio_buffer after creation
engine.set_context(context)
engine.set_audio_config(audio_config)
assistant_params = LLMAssistantAggregatorParams(
expect_stripped_words=True,
correct_aggregation_callback=engine.create_aggregation_correction_callback(),
)
# Configure turn strategies based on STT provider, model, and workflow configuration
# Deepgram Flux uses external turn detection (VAD + External start/stop)
# Other models use configurable turn detection strategy
is_deepgram_flux = (
user_config.stt.provider == ServiceProviders.DEEPGRAM.value
and user_config.stt.model == "flux-general-en"
)
if is_deepgram_flux:
user_turn_strategies = UserTurnStrategies(
start=[
VADUserTurnStartStrategy(),
ExternalUserTurnStartStrategy(enable_interruptions=True),
],
stop=[ExternalUserTurnStopStrategy()],
)
elif turn_stop_strategy == "turn_analyzer":
# Smart Turn Analyzer: best for longer responses with natural pauses
smart_turn_params = SmartTurnParams(stop_secs=smart_turn_stop_secs)
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()],
stop=[
TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=LocalSmartTurnAnalyzerV3(params=smart_turn_params)
)
],
)
else:
# Transcription-based (default): best for short 1-2 word responses
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()],
stop=[SpeechTimeoutUserTurnStopStrategy()],
)
# Create user mute strategies
# - CallbackUserMuteStrategy: mutes based on engine's _mute_pipeline state
user_mute_strategies = [
MuteUntilFirstBotCompleteUserMuteStrategy(),
FunctionCallUserMuteStrategy(),
CallbackUserMuteStrategy(should_mute_callback=engine.should_mute_user),
]
user_params = LLMUserAggregatorParams(
user_turn_strategies=user_turn_strategies,
user_mute_strategies=user_mute_strategies,
user_idle_timeout=max_user_idle_timeout,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
)
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=assistant_params, user_params=user_params
)
# Create usage metrics aggregator with engine's callback
pipeline_engine_callback_processor = PipelineEngineCallbacksProcessor(
max_call_duration_seconds=max_call_duration_seconds,
max_duration_end_task_callback=engine.create_max_duration_callback(),
generation_started_callback=engine.create_generation_started_callback(),
llm_text_frame_callback=engine.handle_llm_text_frame,
)
pipeline_metrics_aggregator = PipelineMetricsAggregator()
user_context_aggregator = context_aggregator.user()
assistant_context_aggregator = context_aggregator.assistant()
# Register user idle event handlers
user_idle_handler = engine.create_user_idle_handler()
@user_context_aggregator.event_handler("on_user_turn_idle")
async def on_user_turn_idle(aggregator):
await user_idle_handler.handle_idle(aggregator)
@user_context_aggregator.event_handler("on_user_turn_started")
async def on_user_turn_started(aggregator, strategy):
user_idle_handler.reset()
# Create voicemail detector if enabled in the workflow's start node
voicemail_detector = None
start_node = workflow_graph.nodes.get(workflow_graph.start_node_id)
if start_node and start_node.detect_voicemail:
logger.info(f"Voicemail detection enabled for workflow run {workflow_run_id}")
# Create a separate LLM instance for the voicemail sub-pipeline
# (can't share with main pipeline as it would mess up frame linking)
voicemail_llm = create_llm_service(user_config)
voicemail_detector = VoicemailDetector(
llm=voicemail_llm,
voicemail_response_delay=1.0,
long_speech_timeout=8.0,
)
# Register event handler to end task when voicemail is detected
@voicemail_detector.event_handler("on_voicemail_detected")
async def _on_voicemail_detected(_processor):
logger.info(f"Voicemail detected for workflow run {workflow_run_id}")
await engine.end_call_with_reason(
reason=EndTaskReason.VOICEMAIL_DETECTED.value,
abort_immediately=True,
)
# Build the pipeline with the STT mute filter and context controller
pipeline = build_pipeline(
transport,
stt,
audio_buffer,
llm,
tts,
user_context_aggregator,
assistant_context_aggregator,
pipeline_engine_callback_processor,
pipeline_metrics_aggregator,
voicemail_detector=voicemail_detector,
)
# Create pipeline task with audio configuration
task = create_pipeline_task(pipeline, workflow_run_id, audio_config)
# Now set the task on the engine
engine.set_task(task)
# Initialize the engine to set the initial context
await engine.initialize()
# Add real-time feedback observer (always logs to buffer, streams to WS if available)
feedback_observer = RealtimeFeedbackObserver(
ws_sender=ws_sender,
logs_buffer=in_memory_logs_buffer,
)
task.add_observer(feedback_observer)
# Register latency observer to log user-to-bot response latency
if task.user_bot_latency_observer:
@task.user_bot_latency_observer.event_handler("on_latency_measured")
async def on_latency_measured(observer, latency_seconds):
message = {
"type": RealtimeFeedbackType.LATENCY_MEASURED.value,
"payload": {
"latency_seconds": latency_seconds,
},
}
if ws_sender:
try:
ws_message = message
if in_memory_logs_buffer.current_node_id:
ws_message = {
**message,
"node_id": in_memory_logs_buffer.current_node_id,
"node_name": in_memory_logs_buffer.current_node_name,
}
await ws_sender(ws_message)
except Exception as e:
logger.debug(f"Failed to send latency via WebSocket: {e}")
try:
await in_memory_logs_buffer.append(message)
except Exception as e:
logger.error(f"Failed to append latency to logs buffer: {e}")
# Register turn log handlers for all call types (WebRTC and telephony)
register_turn_log_handlers(
in_memory_logs_buffer, user_context_aggregator, assistant_context_aggregator
)
# Register event handlers
in_memory_audio_buffer = register_event_handlers(
task,
transport,
workflow_run_id,
engine=engine,
audio_buffer=audio_buffer,
in_memory_logs_buffer=in_memory_logs_buffer,
pipeline_metrics_aggregator=pipeline_metrics_aggregator,
audio_config=audio_config,
)
register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer)
try:
# Run the pipeline
loop = asyncio.get_running_loop()
params = PipelineTaskParams(loop=loop)
await task.run(params)
logger.info(f"Task completed for run {workflow_run_id}")
except asyncio.CancelledError:
logger.warning("Received CancelledError in _run_pipeline")
finally:
logger.debug(f"Cleaned up context providers for workflow run {workflow_run_id}")