dograh/api/services/pipecat/pipeline_builder.py

230 lines
8.6 KiB
Python
Raw Normal View History

2025-09-09 14:37:32 +05:30
import os
from loguru import logger
from api.services.pipecat.audio_config import AudioConfig
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
2026-02-11 18:18:32 +05:30
from pipecat.utils.run_context import turn_var
2025-09-09 14:37:32 +05:30
def create_pipeline_components(audio_config: AudioConfig):
2025-09-09 14:37:32 +05:30
"""Create and return the main pipeline components with proper audio configuration"""
logger.info(f"Creating pipeline components with audio config: {audio_config}")
# Use native AudioBufferProcessor for merged audio recording
audio_buffer = AudioBufferProcessor(
2025-09-09 14:37:32 +05:30
sample_rate=audio_config.pipeline_sample_rate,
buffer_size=audio_config.buffer_size_bytes,
)
context = LLMContext()
2025-09-09 14:37:32 +05:30
return audio_buffer, context
2025-09-09 14:37:32 +05:30
def build_pipeline(
transport,
stt,
audio_buffer,
llm,
tts,
user_context_aggregator,
assistant_context_aggregator,
pipeline_engine_callback_processor,
pipeline_metrics_aggregator,
voicemail_detector=None,
recording_router=None,
2025-09-09 14:37:32 +05:30
):
"""Build the main pipeline with all components.
Args:
audio_buffer: AudioBufferProcessor that handles both input and output audio recording.
voicemail_detector: Optional native pipecat VoicemailDetector. When provided,
inserts voicemail detection after STT. Note: We don't use the TTS gate
to avoid blocking TTS frames during classification.
recording_router: Optional RecordingRouterProcessor. When provided,
inserts between callback processor and TTS to route between
pre-recorded audio playback and dynamic TTS.
"""
# Build processors list with optional voicemail detection
2025-09-09 14:37:32 +05:30
processors = [
transport.input(), # Transport user input
stt,
2025-09-09 14:37:32 +05:30
]
# Insert voicemail detector after STT if enabled
# Note: We intentionally do NOT use voicemail_detector.gate() to allow TTS
# frames to continue flowing during classification (non-blocking detection)
# Note: We must keep user_context_aggregator after voicemail_detector
# or else, LLMContextFrames generated from user_context_aggregator will
# start generating LLM Completion from Voicemail Classifier
if voicemail_detector:
logger.info("Adding native voicemail detector to pipeline")
processors.append(voicemail_detector.detector())
# Continue with the rest of the pipeline
post_llm = [pipeline_engine_callback_processor]
if recording_router:
post_llm.append(recording_router)
2026-03-19 15:06:59 +05:30
processors.append(user_context_aggregator)
# Insert LLM gate before the main LLM when voicemail detection is enabled.
# This prevents the main LLM from being triggered until classification
# determines whether a human or voicemail answered the call.
if voicemail_detector:
processors.append(voicemail_detector.llm_gate())
2025-09-09 14:37:32 +05:30
processors.extend(
[
llm, # LLM
*post_llm,
2025-09-09 14:37:32 +05:30
tts, # TTS
transport.output(), # Transport bot output
audio_buffer, # AudioBufferProcessor - records both input and output audio
2025-09-09 14:37:32 +05:30
assistant_context_aggregator, # Assistant spoken responses
pipeline_metrics_aggregator,
]
)
return Pipeline(processors)
def build_realtime_pipeline(
transport,
realtime_llm,
audio_buffer,
user_context_aggregator,
assistant_context_aggregator,
pipeline_engine_callback_processor,
pipeline_metrics_aggregator,
voicemail_detector=None,
):
"""Build a pipeline for realtime (speech-to-speech) LLM services.
Realtime services (e.g. OpenAI Realtime, Gemini Live) handle STT+LLM+TTS
internally, so no separate STT or TTS processors are needed.
Args:
voicemail_detector: Optional VoicemailDetector. Placed *below* the
realtime LLM. This is asymmetric with the non-realtime layout
(where the detector sits between STT and the main user aggregator)
because the realtime LLM is both the source of TranscriptionFrame
(broadcast downstream) and the sink of LLMContextFrame (consumed
by _handle_context without forwarding). Placing the detector below
the realtime LLM means: downstream TranscriptionFrames reach the
classifier branch, UserStartedSpeakingFrame /
UserStoppedSpeakingFrame are forwarded through by the LLM, and the
main aggregator's LLMContextFrame is absorbed by the realtime LLM
and never leaks into the classifier (which would otherwise run a
voicemail completion on the workflow's main context).
The TTS gate and LLM gate are intentionally not used: the realtime
LLM reacts to audio directly, not to LLMContextFrames. On voicemail
detection we drop the call via end_call_with_reason; the detector's
ConversationGate also blocks downstream audio output until the call
ends.
"""
processors = [
transport.input(),
user_context_aggregator,
realtime_llm,
]
if voicemail_detector:
logger.info("Adding native voicemail detector to realtime pipeline")
processors.append(voicemail_detector.detector())
processors.extend(
[
pipeline_engine_callback_processor,
transport.output(),
audio_buffer,
assistant_context_aggregator,
pipeline_metrics_aggregator,
]
)
return Pipeline(processors)
def create_pipeline_task(
pipeline,
workflow_run_id,
audio_config: AudioConfig = None,
*,
conversation_parent_context=None,
conversation_type: str = "voice",
additional_span_attributes: dict | None = None,
):
"""Create a pipeline task with appropriate parameters.
Args:
pipeline: The pipeline to run.
workflow_run_id: Run id, used as the conversation id.
audio_config: Optional audio configuration.
conversation_parent_context: Optional OTEL context carrying a fixed
trace id. When provided, the conversation span attaches to that
trace instead of starting a new root trace (used by text chat to
stitch every per-turn pipeline into one trace).
conversation_type: ``conversation.type`` span attribute value.
additional_span_attributes: Extra attributes set on the conversation
span (e.g. ``langfuse.trace.name`` to name a stitched trace that
has no real root span).
"""
2025-09-09 14:37:32 +05:30
# Set up pipeline params with audio configuration if provided
pipeline_params = PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
send_initial_empty_metrics=False,
enable_heartbeats=True,
start_metadata={"workflow_run_id": workflow_run_id},
2025-09-09 14:37:32 +05:30
)
# If audio_config is provided, set the audio sample rates
if audio_config:
pipeline_params.audio_in_sample_rate = audio_config.transport_in_sample_rate
pipeline_params.audio_out_sample_rate = audio_config.transport_out_sample_rate
logger.debug(
f"Setting pipeline audio params - in: {audio_config.transport_in_sample_rate}Hz, "
f"out: {audio_config.transport_out_sample_rate}Hz"
)
task = PipelineWorker(
2025-09-09 14:37:32 +05:30
pipeline,
params=pipeline_params,
enable_tracing=True,
enable_rtvi=False,
2025-09-09 14:37:32 +05:30
conversation_id=f"{workflow_run_id}",
conversation_parent_context=conversation_parent_context,
conversation_type=conversation_type,
additional_span_attributes=additional_span_attributes,
2025-09-09 14:37:32 +05:30
)
# Check if turn logging is enabled
enable_turn_logging = os.getenv("ENABLE_TURN_LOGGING", "false").lower() == "true"
if enable_turn_logging:
# Attach event handlers to propagate turn information into the logging context
turn_observer = task.turn_tracking_observer
if turn_observer is not None:
# Import turn context manager only if needed
from api.services.pipecat.turn_context import get_turn_context_manager
async def _on_turn_started(observer, turn_number: int):
"""Set the current turn number into the context variable."""
# Set in both contextvar and turn context manager
turn_var.set(turn_number)
turn_manager = get_turn_context_manager()
turn_manager.set_turn(turn_number)
# Register the handlers with the observer
turn_observer.add_event_handler("on_turn_started", _on_turn_started)
return task