diff --git a/api/Dockerfile b/api/Dockerfile index ef39d7a..1e1d27e 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -20,7 +20,7 @@ RUN pip install --user --no-cache-dir -r requirements.txt && \ # Copy and install pipecat from local submodule COPY pipecat /tmp/pipecat -RUN pip install --user --no-cache-dir '/tmp/pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3]' && \ +RUN pip install --user --no-cache-dir '/tmp/pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3,speechmatics]' && \ # Clean up pip cache and temporary pipecat directory rm -rf /root/.cache/pip /tmp/pipecat diff --git a/api/services/configuration/check_validity.py b/api/services/configuration/check_validity.py index 6af329f..4c3f9cf 100644 --- a/api/services/configuration/check_validity.py +++ b/api/services/configuration/check_validity.py @@ -39,6 +39,7 @@ class UserConfigurationValidator: ServiceProviders.CARTESIA.value: self._check_cartesia_api_key, ServiceProviders.DOGRAH.value: self._check_dograh_api_key, ServiceProviders.SARVAM.value: self._check_sarvam_api_key, + ServiceProviders.SPEECHMATICS.value: self._check_speechmatics_api_key, } async def validate(self, configuration: UserConfiguration) -> APIKeyStatusResponse: @@ -137,3 +138,6 @@ class UserConfigurationValidator: def _check_sarvam_api_key(self, model: str, api_key: str) -> bool: return True + + def _check_speechmatics_api_key(self, model: str, api_key: str) -> bool: + return True diff --git a/api/services/configuration/registry.py b/api/services/configuration/registry.py index 3b347ca..6809060 100644 --- a/api/services/configuration/registry.py +++ b/api/services/configuration/registry.py @@ -21,6 +21,7 @@ class ServiceProviders(str, Enum): AZURE = "azure" DOGRAH = "dograh" SARVAM = "sarvam" + SPEECHMATICS = "speechmatics" class BaseServiceConfiguration(BaseModel): @@ -240,6 +241,7 @@ class DograhTTSService(BaseTTSConfiguration): default="default", json_schema_extra={"examples": DOGRAH_TTS_MODELS} ) voice: str = "default" + speed: float = Field(default=1.0, ge=0.5, le=2.0, description="Speed of the voice") api_key: str @@ -375,11 +377,50 @@ SARVAM_STT_MODELS = ["saarika:v2.5", "saaras:v2"] # api_key: str +# Speechmatics STT Service +SPEECHMATICS_STT_LANGUAGES = [ + "en", + "es", + "fr", + "de", + "it", + "pt", + "nl", + "ja", + "ko", + "zh", + "ru", + "ar", + "hi", + "pl", + "tr", + "vi", + "th", + "id", + "ms", + "sv", + "da", + "no", + "fi", +] + + +@register_stt +class SpeechmaticsSTTConfiguration(BaseSTTConfiguration): + provider: Literal[ServiceProviders.SPEECHMATICS] = ServiceProviders.SPEECHMATICS + model: str = Field(default="enhanced", description="Operating point: standard or enhanced") + language: str = Field( + default="en", json_schema_extra={"examples": SPEECHMATICS_STT_LANGUAGES} + ) + api_key: str + + STTConfig = Annotated[ Union[ DeepgramSTTConfiguration, OpenAISTTConfiguration, DograhSTTService, + SpeechmaticsSTTConfiguration, # SarvamSTTConfiguration, ], Field(discriminator="provider"), diff --git a/api/services/looptalk/core/pipeline_builder.py b/api/services/looptalk/core/pipeline_builder.py index 95ece81..b6c41a5 100644 --- a/api/services/looptalk/core/pipeline_builder.py +++ b/api/services/looptalk/core/pipeline_builder.py @@ -83,29 +83,31 @@ class LoopTalkPipelineBuilder: logger.debug(f"Created services for {role}: STT={stt}, LLM={llm}, TTS={tts}") - audio_buffer, audio_synchronizer, transcript, context = ( - create_pipeline_components(audio_config) - ) - - context_aggregator = LLMContextAggregatorPair(context) - # Get workflow graph workflow_graph = WorkflowGraph( ReactFlowDTO.model_validate(workflow.workflow_definition_with_fallback) ) - # Create engine + # Create engine first (needed for create_pipeline_components) engine = PipecatEngine( - task=None, # Will be set after creating the task llm=llm, - context=context, tts=tts, workflow=workflow_graph, call_context_vars={}, - audio_buffer=audio_buffer, workflow_run_id=None, # LoopTalk doesn't have workflow runs ) + # Create pipeline components with audio configuration and engine + audio_buffer, transcript, context = create_pipeline_components( + audio_config, engine + ) + + # Set the context and audio_buffer after creation + engine.set_context(context) + engine.set_audio_buffer(audio_buffer) + + context_aggregator = LLMContextAggregatorPair(context) + # Create STT mute filter stt_mute_filter = STTMuteFilter( config=STTMuteConfig( @@ -124,19 +126,13 @@ class LoopTalkPipelineBuilder: user_context_aggregator = context_aggregator.user() assistant_context_aggregator = context_aggregator.assistant() - # Register processors with synchronizer for merged audio - audio_synchronizer.register_processors( - audio_buffer.input(), audio_buffer.output() - ) - # Get audio streamer for real-time streaming audio_streamer = get_or_create_audio_streamer(str(test_session_id), role) - # Create pipeline + # Create pipeline with AudioBufferProcessor after transport.output() pipeline = Pipeline( [ transport.input(), - audio_buffer.input(), # Record input audio audio_streamer, # Stream audio to connected clients stt_mute_filter, stt, @@ -146,7 +142,7 @@ class LoopTalkPipelineBuilder: pipeline_engine_callback_processor, tts, transport.output(), - audio_buffer.output(), # Record output audio + audio_buffer, # AudioBufferProcessor - records both input and output audio transcript.assistant(), assistant_context_aggregator, ] @@ -157,13 +153,12 @@ class LoopTalkPipelineBuilder: task = create_pipeline_task(pipeline, conversation_id, audio_config) # Set the task on the engine - engine.task = task + engine.set_task(task) return { "task": task, "engine": engine, "audio_buffer": audio_buffer, - "audio_synchronizer": audio_synchronizer, "transcript": transcript, "assistant_context_aggregator": assistant_context_aggregator, "audio_streamer": audio_streamer, diff --git a/api/services/looptalk/orchestrator.py b/api/services/looptalk/orchestrator.py index c88e8b2..917774a 100644 --- a/api/services/looptalk/orchestrator.py +++ b/api/services/looptalk/orchestrator.py @@ -245,7 +245,6 @@ class LoopTalkTestOrchestrator: engine = pipeline_info["engine"] task = pipeline_info["task"] audio_buffer = pipeline_info["audio_buffer"] - audio_synchronizer = pipeline_info["audio_synchronizer"] transcript = pipeline_info["transcript"] assistant_context_aggregator = pipeline_info["assistant_context_aggregator"] @@ -255,7 +254,6 @@ class LoopTalkTestOrchestrator: logger.debug(f"LoopTalk {role} client connected - initializing workflow") # Start audio recording await audio_buffer.start_recording() - await audio_synchronizer.start_recording() await engine.initialize() @transport.event_handler("on_client_disconnected") @@ -263,7 +261,6 @@ class LoopTalkTestOrchestrator: logger.debug(f"LoopTalk {role} client disconnected") # Stop audio recording await audio_buffer.stop_recording() - await audio_synchronizer.stop_recording() # Handle disconnect propagation - stop the other agent too await self.session_manager.handle_agent_disconnect( @@ -274,11 +271,11 @@ class LoopTalkTestOrchestrator: # Register custom audio and transcript handlers for LoopTalk await self._register_looptalk_handlers( - audio_synchronizer, transcript, test_session_id, role + audio_buffer, transcript, test_session_id, role ) async def _register_looptalk_handlers( - self, audio_synchronizer, transcript, test_session_id: int, role: str + self, audio_buffer, transcript, test_session_id: int, role: str ): """Register LoopTalk-specific handlers for audio and transcript recording""" @@ -288,9 +285,9 @@ class LoopTalkTestOrchestrator: audio_metadata = {"sample_rate": None, "num_channels": None} # Audio handler - writes directly to PCM file - @audio_synchronizer.event_handler("on_merged_audio") - async def on_merged_audio(_, pcm, sample_rate, num_channels): - if not pcm: + @audio_buffer.event_handler("on_audio_data") + async def on_audio_data(buffer, audio, sample_rate, num_channels): + if not audio: return # Store metadata on first write @@ -301,7 +298,7 @@ class LoopTalkTestOrchestrator: # Append PCM data to temporary file try: with open(paths["temp_audio"], "ab") as f: - f.write(pcm) + f.write(audio) except Exception as e: logger.error( f"Failed to write audio for {role} in session {test_session_id}: {e}" diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 4c3ba33..c2804c2 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -16,10 +16,9 @@ from api.services.workflow.disposition_mapper import ( from api.services.workflow.pipecat_engine import PipecatEngine from api.tasks.arq import enqueue_job from api.tasks.function_names import FunctionNames -from pipecat.frames.frames import Frame +from pipecat.frames.frames import Frame, LLMContextFrame from pipecat.pipeline.task import PipelineTask -from pipecat.processors.audio.audio_buffer_processor import AudioBuffer -from pipecat.processors.audio.audio_synchronizer import AudioSynchronizer +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor def register_transport_event_handlers( @@ -27,8 +26,7 @@ def register_transport_event_handlers( transport, workflow_run_id, engine: PipecatEngine, - audio_buffer: AudioBuffer, - audio_synchronizer: AudioSynchronizer, + audio_buffer: AudioBufferProcessor, audio_config=AudioConfig, ): """Register event handlers for transport events""" @@ -53,8 +51,6 @@ def register_transport_event_handlers( async def on_client_connected(transport, participant): logger.debug("In on_client_connected callback handler - initializing workflow") await audio_buffer.start_recording() - if audio_synchronizer: - await audio_synchronizer.start_recording() await engine.initialize() @transport.event_handler("on_client_disconnected") @@ -68,8 +64,6 @@ def register_transport_event_handlers( # Stop recordings await audio_buffer.stop_recording() - if audio_synchronizer: - await audio_synchronizer.stop_recording() # Only cancel the task if the call is not already disposed by the engine if not call_disposed: @@ -84,12 +78,19 @@ def register_task_event_handler( engine: PipecatEngine, task: PipelineTask, transport, - audio_buffer: AudioBuffer, - audio_synchronizer: AudioSynchronizer, + audio_buffer: AudioBufferProcessor, in_memory_audio_buffer: InMemoryAudioBuffer, in_memory_transcript_buffer: InMemoryTranscriptBuffer, pipeline_metrics_aggregator: PipelineMetricsAggregator, ): + @task.event_handler("on_pipeline_started") + async def on_pipeline_started(task: PipelineTask, frame: Frame): + logger.debug( + "In on_pipeline_started callback handler - triggering initial LLM generation" + ) + # Trigger initial LLM generation after pipeline has started + await engine.llm.queue_frame(LLMContextFrame(engine.context)) + @task.event_handler("on_pipeline_finished") async def on_pipeline_finished( task: PipelineTask, @@ -101,8 +102,6 @@ def register_task_event_handler( # Stop recordings await audio_buffer.stop_recording() - if audio_synchronizer: - await audio_synchronizer.stop_recording() call_disposition = await engine.get_call_disposition() logger.debug(f"call disposition in on_pipeline_finished: {call_disposition}") @@ -224,19 +223,21 @@ def register_task_event_handler( def register_audio_data_handler( - audio_synchronizer, workflow_run_id, in_memory_buffer: InMemoryAudioBuffer + audio_buffer: AudioBufferProcessor, + workflow_run_id, + in_memory_buffer: InMemoryAudioBuffer, ): """Register event handler for audio data""" logger.info(f"Registering audio data handler for workflow run {workflow_run_id}") - @audio_synchronizer.event_handler("on_merged_audio") - async def on_merged_audio(_, pcm, sample_rate, num_channels): - if not pcm: + @audio_buffer.event_handler("on_audio_data") + async def on_audio_data(buffer, audio, sample_rate, num_channels): + if not audio: return # Use in-memory buffer try: - await in_memory_buffer.append(pcm) + await in_memory_buffer.append(audio) except MemoryError as e: logger.error(f"Memory buffer full: {e}") # Could implement overflow to disk here if needed diff --git a/api/services/pipecat/pipeline_builder.py b/api/services/pipecat/pipeline_builder.py index b22cbae..853f32d 100644 --- a/api/services/pipecat/pipeline_builder.py +++ b/api/services/pipecat/pipeline_builder.py @@ -10,8 +10,7 @@ from api.services.pipecat.audio_config import AudioConfig from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.audio.audio_buffer_processor import AudioBuffer -from pipecat.processors.audio.audio_synchronizer import AudioSynchronizer +from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.processors.transcript_processor import TranscriptProcessor from pipecat.utils.context import turn_var @@ -23,15 +22,8 @@ def create_pipeline_components(audio_config: AudioConfig, engine: "PipecatEngine """Create and return the main pipeline components with proper audio configuration""" logger.info(f"Creating pipeline components with audio config: {audio_config}") - # Use new split audio buffer for better performance - audio_buffer = AudioBuffer( - sample_rate=audio_config.pipeline_sample_rate, - buffer_size=audio_config.buffer_size_bytes, - max_recording_bytes=audio_config.max_recording_bytes, - ) - - # Create synchronizer for merged audio (outside pipeline) - audio_synchronizer = AudioSynchronizer( + # Use native AudioBufferProcessor for merged audio recording + audio_buffer = AudioBufferProcessor( sample_rate=audio_config.pipeline_sample_rate, buffer_size=audio_config.buffer_size_bytes, ) @@ -42,7 +34,7 @@ def create_pipeline_components(audio_config: AudioConfig, engine: "PipecatEngine context = LLMContext() - return audio_buffer, audio_synchronizer, transcript, context + return audio_buffer, transcript, context def build_pipeline( @@ -50,7 +42,6 @@ def build_pipeline( stt, transcript, audio_buffer, - audio_synchronizer, llm, tts, user_context_aggregator, @@ -59,30 +50,41 @@ def build_pipeline( stt_mute_filter, pipeline_metrics_aggregator, user_idle_disconnect, + voicemail_detector=None, ): - """Build the main pipeline with all components""" - # Register processors with synchronizer for merged audio - logger.info("Registering audio buffer processors with synchronizer") - audio_synchronizer.register_processors(audio_buffer.input(), audio_buffer.output()) + """Build the main pipeline with all components. - # Build processors list with optional context controller + Args: + audio_buffer: AudioBufferProcessor that handles both input and output audio recording. + voicemail_detector: Optional native pipecat VoicemailDetector. When provided, + inserts voicemail detection after STT. Note: We don't use the TTS gate + to avoid blocking TTS frames during classification. + """ + # Build processors list with optional voicemail detection processors = [ transport.input(), # Transport user input - audio_buffer.input(), # Record input audio (only processes InputAudioRawFrame) - stt, # STT can now have audio_passthrough=False - stt_mute_filter, # STTMuteFilters don't let VAD related events pass through if muted - user_idle_disconnect, - transcript.user(), + stt, # STT (audio_passthrough=True by default, passes InputAudioRawFrame) ] + # Insert voicemail detector after STT if enabled + # Note: We intentionally do NOT use voicemail_detector.gate() to allow TTS + # frames to continue flowing during classification (non-blocking detection) + if voicemail_detector: + logger.info("Adding native voicemail detector to pipeline") + processors.append(voicemail_detector.detector()) + + # Continue with the rest of the pipeline processors.extend( [ + stt_mute_filter, # STTMuteFilters don't let VAD related events pass through if muted + user_idle_disconnect, + transcript.user(), user_context_aggregator, llm, # LLM pipeline_engine_callback_processor, tts, # TTS transport.output(), # Transport bot output - audio_buffer.output(), # Record output audio (only processes OutputAudioRawFrame) + audio_buffer, # AudioBufferProcessor - records both input and output audio transcript.assistant(), assistant_context_aggregator, # Assistant spoken responses pipeline_metrics_aggregator, diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index f8765ad..4d725a9 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -27,6 +27,7 @@ from api.services.pipecat.service_factory import ( create_llm_service, create_stt_service, create_tts_service, + create_voicemail_classification_llm, ) from api.services.pipecat.tracing_config import setup_pipeline_tracing from api.services.pipecat.transport_setup import ( @@ -41,8 +42,12 @@ from api.services.telephony.stasis_rtp_connection import StasisRTPConnection from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow import WorkflowGraph +from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector from pipecat.pipeline.base_task import PipelineTaskParams -from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantAggregatorParams, + LLMUserAggregatorParams, +) from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, ) @@ -54,6 +59,7 @@ from pipecat.processors.filters.stt_mute_filter import ( from pipecat.processors.user_idle_processor import UserIdleProcessor from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection from pipecat.utils.context import set_current_run_id +from pipecat.utils.enums import EndTaskReason from pipecat.utils.tracing.context_registry import ContextProviderRegistry # Setup tracing if enabled @@ -468,9 +474,7 @@ async def _run_pipeline( ) # Create pipeline components with audio configuration and engine - audio_buffer, audio_synchronizer, transcript, context = create_pipeline_components( - audio_config, engine - ) + audio_buffer, transcript, context = create_pipeline_components(audio_config, engine) # Set the context and audio_buffer after creation engine.set_context(context) @@ -484,8 +488,9 @@ async def _run_pipeline( expect_stripped_words=True, correct_aggregation_callback=engine.create_aggregation_correction_callback(), ) + user_params = LLMUserAggregatorParams(enable_emulated_vad_interruptions=True) context_aggregator = LLMContextAggregatorPair( - context, assistant_params=assistant_params + context, assistant_params=assistant_params, user_params=user_params ) # Create usage metrics aggregator with engine's callback @@ -517,13 +522,35 @@ async def _run_pipeline( user_context_aggregator = context_aggregator.user() assistant_context_aggregator = context_aggregator.assistant() + # Create voicemail detector if enabled in the workflow's start node + voicemail_detector = None + start_node = workflow_graph.nodes.get(workflow_graph.start_node_id) + if start_node and start_node.detect_voicemail: + classification_llm = create_voicemail_classification_llm() + if classification_llm: + logger.info( + f"Voicemail detection enabled for workflow run {workflow_run_id}" + ) + voicemail_detector = VoicemailDetector( + llm=classification_llm, + voicemail_response_delay=2.0, + ) + + # Register event handler to end task when voicemail is detected + @voicemail_detector.event_handler("on_voicemail_detected") + async def _on_voicemail_detected(_processor): + logger.info(f"Voicemail detected for workflow run {workflow_run_id}") + await engine.send_end_task_frame( + reason=EndTaskReason.VOICEMAIL_DETECTED.value, + abort_immediately=True, + ) + # Build the pipeline with the STT mute filter and context controller pipeline = build_pipeline( transport, stt, transcript, audio_buffer, - audio_synchronizer, llm, tts, user_context_aggregator, @@ -532,6 +559,7 @@ async def _run_pipeline( stt_mute_filter, pipeline_metrics_aggregator, user_idle_disconnect, + voicemail_detector=voicemail_detector, ) # Create pipeline task with audio configuration @@ -548,7 +576,6 @@ async def _run_pipeline( workflow_run_id, engine=engine, audio_buffer=audio_buffer, - audio_synchronizer=audio_synchronizer, audio_config=audio_config, ) ) @@ -559,15 +586,12 @@ async def _run_pipeline( task, transport, audio_buffer, - audio_synchronizer, in_memory_audio_buffer, in_memory_transcript_buffer, pipeline_metrics_aggregator, ) - register_audio_data_handler( - audio_synchronizer, workflow_run_id, in_memory_audio_buffer - ) + register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer) register_transcript_handler( transcript, workflow_run_id, in_memory_transcript_buffer ) diff --git a/api/services/pipecat/service_factory.py b/api/services/pipecat/service_factory.py index db19998..c0c39e6 100644 --- a/api/services/pipecat/service_factory.py +++ b/api/services/pipecat/service_factory.py @@ -1,3 +1,4 @@ +import os from typing import TYPE_CHECKING from fastapi import HTTPException @@ -20,6 +21,7 @@ from pipecat.services.openai.stt import OpenAISTTService from pipecat.services.openai.tts import OpenAITTSService from pipecat.services.sarvam.stt import SarvamSTTService from pipecat.services.sarvam.tts import SarvamTTSService +from pipecat.services.speechmatics.stt import SpeechmaticsSTTService from pipecat.transcriptions.language import Language from pipecat.utils.text.xml_function_tag_filter import XMLFunctionTagFilter @@ -40,28 +42,20 @@ def create_stt_service(user_config): ) logger.debug(f"Using DeepGram Model - {user_config.stt.model}") return DeepgramSTTService( - live_options=live_options, - api_key=user_config.stt.api_key, - audio_passthrough=False, # Disable passthrough since audio is buffered separately + live_options=live_options, api_key=user_config.stt.api_key ) elif user_config.stt.provider == ServiceProviders.OPENAI.value: return OpenAISTTService( - api_key=user_config.stt.api_key, - model=user_config.stt.model, - audio_passthrough=False, # Disable passthrough since audio is buffered separately + api_key=user_config.stt.api_key, model=user_config.stt.model ) elif user_config.stt.provider == ServiceProviders.CARTESIA.value: - return CartesiaSTTService( - api_key=user_config.stt.api_key, - audio_passthrough=False, # Disable passthrough since audio is buffered separately - ) + return CartesiaSTTService(api_key=user_config.stt.api_key) elif user_config.stt.provider == ServiceProviders.DOGRAH.value: base_url = MPS_API_URL.replace("http://", "ws://").replace("https://", "wss://") return DograhSTTService( base_url=base_url, api_key=user_config.stt.api_key, model=user_config.stt.model, - audio_passthrough=False, # Disable passthrough since audio is buffered separately ) elif user_config.stt.provider == ServiceProviders.SARVAM.value: # Map Sarvam language code to pipecat Language enum @@ -85,7 +79,23 @@ def create_stt_service(user_config): api_key=user_config.stt.api_key, model=user_config.stt.model, params=SarvamSTTService.InputParams(language=pipecat_language), - audio_passthrough=False, + ) + elif user_config.stt.provider == ServiceProviders.SPEECHMATICS.value: + from pipecat.services.speechmatics.stt import OperatingPoint + + language = getattr(user_config.stt, "language", None) or "en" + # Map model field to operating point (standard or enhanced) + operating_point = ( + OperatingPoint.ENHANCED + if user_config.stt.model == "enhanced" + else OperatingPoint.STANDARD + ) + return SpeechmaticsSTTService( + api_key=user_config.stt.api_key, + params=SpeechmaticsSTTService.InputParams( + language=language, + operating_point=operating_point, + ), ) else: raise HTTPException( @@ -138,6 +148,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): api_key=user_config.tts.api_key, model=user_config.tts.model, voice=user_config.tts.voice, + params=DograhTTSService.InputParams(speed=user_config.tts.speed), text_filters=[xml_function_tag_filter], ) elif user_config.tts.provider == ServiceProviders.SARVAM.value: @@ -222,3 +233,24 @@ def create_llm_service(user_config): ) else: raise HTTPException(status_code=400, detail="Invalid LLM provider") + + +def create_voicemail_classification_llm(): + """Create a fast, lightweight LLM service for voicemail classification. + + Uses gpt-4o-mini which is fast and cost-effective for simple classification tasks. + The model only needs to output "CONVERSATION" or "VOICEMAIL" based on transcriptions. + + Returns: + OpenAILLMService instance, or None if OPENAI_API_KEY is not set. + """ + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + logger.warning("OPENAI_API_KEY not set - voicemail detection will be disabled") + return None + + return OpenAILLMService( + api_key=api_key, + model="gpt-4o-mini", + params=OpenAILLMService.InputParams(temperature=0.0), + ) diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index ffa068e..a7db110 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -1,19 +1,14 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Union -from api.constants import DEPLOYMENT_MODE, ENABLE_TRACING, VOICEMAIL_RECORDING_DURATION from api.services.workflow.disposition_mapper import ( apply_disposition_mapping, get_organization_id_from_workflow_run, ) -from api.services.workflow.pipecat_engine_voicemail_detector import ( - VoicemailDetector, -) from api.services.workflow.workflow import Node, WorkflowGraph from pipecat.frames.frames import ( CancelFrame, EndFrame, FunctionCallResultProperties, - LLMContextFrame, TTSSpeakFrame, ) from pipecat.pipeline.task import PipelineTask @@ -93,11 +88,6 @@ class PipecatEngine: # access to _context self._variable_extraction_manager = None - # Voicemail detection state - self._detect_voicemail = False - self._voicemail_detector = None - self._voicemail_detection_task: Optional[asyncio.Task] = None - # Lazy loaded built-in function schemas self._builtin_function_schemas: Optional[list[dict]] = None @@ -172,8 +162,6 @@ class PipecatEngine: await self.set_node(self.workflow.start_node_id) - # Trigger initial LLM generation - await self.task.queue_frame(LLMContextFrame(self.context)) logger.debug(f"{self.__class__.__name__} initialized") except Exception as e: logger.error(f"Error initializing {self.__class__.__name__}: {e}") @@ -388,43 +376,6 @@ class PipecatEngine: async def _handle_start_node(self, node: Node) -> None: """Handle start node execution.""" - # Handle voicemail detection setup (before any returns) - # Lets check ENABLE_TRACING to make sure we have prompt access from - # langfuse - if node.detect_voicemail and DEPLOYMENT_MODE == "saas" and ENABLE_TRACING: - if not self._audio_buffer: - logger.warning( - "Voicemail detection enabled but no audio buffer available - skipping detection" - ) - else: - logger.debug( - "Start node has detect_voicemail enabled - setting up audio-based detector" - ) - self._detect_voicemail = True - - self._voicemail_detector = VoicemailDetector( - detection_duration=VOICEMAIL_RECORDING_DURATION, - workflow_run_id=self._workflow_run_id, - ) - - # Register audio handler on the audio buffer input processor - audio_input = self._audio_buffer.input() - - @audio_input.event_handler("on_input_audio_data") - async def handle_voicemail_audio( - processor, pcm, sample_rate, num_channels - ): - if ( - self._voicemail_detector - and self._voicemail_detector.is_detecting - ): - await self._voicemail_detector.handle_audio_data( - processor, pcm, sample_rate, num_channels - ) - - # Start detection - await self._voicemail_detector.start_detection(self) - # Check if delayed start is enabled if node.delayed_start: # Use configured duration or default to 3 seconds @@ -745,8 +696,4 @@ class PipecatEngine: ): self._user_response_timeout_task.cancel() - # Stop voicemail detection if active - if self._voicemail_detector and hasattr( - self._voicemail_detector, "stop_detection" - ): - await self._voicemail_detector.stop_detection() + # Note: Native VoicemailDetector cleanup is handled by the pipeline diff --git a/api/services/workflow/pipecat_engine_voicemail_detector.py b/api/services/workflow/pipecat_engine_voicemail_detector.py deleted file mode 100644 index ff9efa8..0000000 --- a/api/services/workflow/pipecat_engine_voicemail_detector.py +++ /dev/null @@ -1,441 +0,0 @@ -from __future__ import annotations - -import asyncio -import io -import json -import os -import tempfile -import wave -from typing import TYPE_CHECKING, Optional - -from langfuse import get_client -from loguru import logger -from openai import AsyncOpenAI -from opentelemetry import context as otel_context - -from api.db import db_client -from api.services.pipecat.tracing_config import is_tracing_enabled -from api.tasks.arq import enqueue_job -from api.tasks.function_names import FunctionNames -from pipecat.utils.enums import EndTaskReason -from pipecat.utils.tracing.context_registry import get_current_turn_context - -if TYPE_CHECKING: - from api.services.workflow.pipecat_engine import PipecatEngine - - -DEFAULT_VOICEMAIL_PROMPT = """ -You are analyzing the beginning of a phone call to determine if it's a voicemail greeting. - -Common voicemail indicators: -- "You've reached the voicemail of..." -- "Please leave a message after the beep" -- "I'm not available right now" -- "Press 1 to leave a message" -- Robotic or pre-recorded voice quality mentioned -- Background music or hold music references - -Transcript: {transcript} - -Respond with a JSON object: -{ - "is_voicemail": true/false, - "confidence": 0.0-1.0, - "reasoning": "Brief explanation" -} -""" - - -class VoicemailDetector: - """ - Autonomous voicemail detection system that operates independently of the main pipeline. - """ - - def __init__(self, detection_duration: float = 15.0, workflow_run_id: int = None): - self.detection_duration = detection_duration - self.audio_buffer = bytearray() - self.is_detecting = False - self.workflow_run_id = workflow_run_id - self._langfuse_client = get_client() - - # We will set the sample rate when we receive the audio packet - self._sample_rate = None - - # Task management - self._detection_task: Optional[asyncio.Task] = None - self._is_cancelled = False - self._engine: Optional[PipecatEngine] = None - - # Event for audio collection completion - self._audio_collected_event = asyncio.Event() - - # ------------------------------------------------------------------ - # Utility helpers - # ------------------------------------------------------------------ - - def _current_duration_seconds(self) -> float: - """Return the duration (in seconds) of the audio currently in the buffer.""" - if self._sample_rate: - return len(self.audio_buffer) / (self._sample_rate * 2) - return 0.0 - - async def handle_audio_data( - self, processor, pcm: bytes, sample_rate: int, num_channels: int - ): - """Handle incoming audio data without affecting pipeline.""" - if not self.is_detecting or self._is_cancelled: - return - - # Store the actual sample rate from the first audio packet - if self._sample_rate is None: - self._sample_rate = sample_rate - logger.debug(f"Voicemail detector using sample rate: {sample_rate}") - - # Add to buffer without resampling - self.audio_buffer.extend(pcm) - - # Check if we've collected enough audio - current_duration = self._current_duration_seconds() - if current_duration >= self.detection_duration: - self._audio_collected_event.set() - - async def start_detection(self, engine: PipecatEngine): - """Start voicemail detection process.""" - logger.info("Starting voicemail detection") - self.is_detecting = True - self._is_cancelled = False - self._engine = engine - self._audio_collected_event.clear() - - # Start detection in background - self._detection_task = asyncio.create_task(self._run_detection_with_timeout()) - - async def stop_detection(self): - """Stop detection immediately (called on disconnect).""" - logger.info("Stopping voicemail detection due to disconnect") - self._is_cancelled = True - self.is_detecting = False - - # Set the event to unblock any waiting tasks - self._audio_collected_event.set() - - # Cancel ongoing detection task - if self._detection_task and not self._detection_task.done(): - self._detection_task.cancel() - - # Clear audio buffer - self.audio_buffer.clear() - - # Wait for tasks to complete cancellation - if self._detection_task: - try: - await self._detection_task - except asyncio.CancelledError: - pass - - async def _run_detection_with_timeout(self): - """Run detection with proper timeout and cancellation handling.""" - try: - # Wait for audio collection or cancellation directly - await self._wait_for_audio_collection() - - # Check if cancelled during collection - if self._is_cancelled: - logger.info("Detection cancelled during audio collection") - return - - # Process detection - await self._process_detection() - - except asyncio.CancelledError: - logger.info("Voicemail detection task cancelled") - except Exception as e: - logger.error(f"Error in voicemail detection: {e}") - finally: - self.is_detecting = False - - async def _wait_for_audio_collection(self): - """Wait for audio buffer to fill or timeout.""" - try: - # Wait for either audio collection completion or timeout - await asyncio.wait_for( - self._audio_collected_event.wait(), - timeout=self.detection_duration + 2.0, - ) - - if not self._is_cancelled: - current_duration = self._current_duration_seconds() - logger.info( - f"Collected {current_duration:.1f}s of audio for voicemail detection (sample rate: {self._sample_rate}Hz)" - ) - except asyncio.TimeoutError: - if not self._is_cancelled: - current_duration = self._current_duration_seconds() - logger.warning("Audio collection timeout exceeded") - logger.info( - f"Proceeding with {current_duration:.1f}s of audio (sample rate: {self._sample_rate}Hz)" - ) - - async def _process_detection(self): - """Process the collected audio to detect voicemail.""" - if not self.audio_buffer or not self._engine: - logger.warning("No audio buffer or engine available for detection") - return - - try: - # Convert PCM to WAV once for both transcription and storage - wav_data = self._create_wav_from_pcm(bytes(self.audio_buffer)) - - # Transcribe audio - logger.info("Transcribing audio for voicemail detection") - transcript = await self._transcribe_audio(wav_data) - - if not transcript: - logger.warning("No transcript obtained from audio") - - # Still upload the raw recording so data pipeline has it - if self.workflow_run_id: - await self._save_voicemail_audio(wav_data, 0.0, False) - - return - - logger.info( - f"Voicemail detection transcript obtained: {transcript[:100]}..." - ) - - # Analyze transcript - result = await self._analyze_transcript(transcript) - - # Extract common fields - confidence = result.get("confidence", 0.0) - reasoning = result.get("reasoning", "No reasoning provided") - - # Save voicemail audio to S3 once for data pipeline (include duration in filename) - s3_path = None - if self.workflow_run_id: - s3_path = await self._save_voicemail_audio( - wav_data, confidence, result.get("is_voicemail") - ) - - # Take action based on result - if result.get("is_voicemail", False): - logger.info( - f"Voicemail detected with confidence {confidence}: {reasoning}" - ) - - # Update workflow run with voicemail tags - if self.workflow_run_id: - # Fetch the workflow run from database - workflow_run = await db_client.get_workflow_run_by_id( - self.workflow_run_id - ) - if workflow_run: - call_tags = workflow_run.gathered_context.get("call_tags", []) - call_tags.extend(["voicemail_detected", "not_connected"]) - - await db_client.update_workflow_run( - run_id=workflow_run.id, - gathered_context={ - "call_tags": call_tags, - "voicemail_transcript": transcript, - "voicemail_confidence": confidence, - }, - ) - - # Send end task frame with metadata (including optional S3 path) - await self._engine.send_end_task_frame( - reason=EndTaskReason.VOICEMAIL_DETECTED.value, - abort_immediately=True, - ) - else: - logger.info("No voicemail detected, continuing normal conversation") - - except Exception as e: - logger.error(f"Error processing voicemail detection: {e}") - - async def _transcribe_audio(self, wav_data: bytes) -> str: - """Transcribe audio using OpenAI API directly. - - Args: - wav_data: WAV formatted audio data - """ - client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY")) - - # Direct API call - no pipeline involvement - response = await client.audio.transcriptions.create( - file=("audio.wav", wav_data, "audio/wav"), - model="whisper-1", # Using whisper-1 as it's more stable for transcription - language="en", - temperature=0.0, - ) - - return response.text.strip() - - def _create_wav_from_pcm(self, pcm_data: bytes) -> bytes: - """Convert raw PCM data to WAV format.""" - wav_buffer = io.BytesIO() - with wave.open(wav_buffer, "wb") as wav_file: - wav_file.setnchannels(1) # Mono - wav_file.setsampwidth(2) # 16-bit - wav_file.setframerate(self._sample_rate) - wav_file.writeframes(pcm_data) - - wav_buffer.seek(0) - return wav_buffer.read() - - async def _analyze_transcript(self, transcript: str) -> dict: - """Analyze transcript using independent OpenAI client.""" - # Capture the current turn context for proper span nesting - parent_context = get_current_turn_context() - - client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY")) - - langfuse_prompt = None - try: - langfuse_prompt = self._langfuse_client.get_prompt( - "production/voicemail_detection" - ) - prompt = langfuse_prompt.compile(transcript=transcript) - except Exception as e: - logger.warning(f"Error getting prompt from Langfuse: {e}") - prompt = DEFAULT_VOICEMAIL_PROMPT.replace("{transcript}", transcript) - - messages = [ - { - "role": "system", - "content": prompt, - } - ] - - # When we have a parent OpenTelemetry context, we need to activate it - # so that Langfuse's OTEL tracer will automatically pick it up - if parent_context and is_tracing_enabled(): - # Activate the parent context for this scope - token = otel_context.attach(parent_context) - try: - # Start Langfuse generation - it will automatically use the active OTEL context - langfuse_generation = None - try: - langfuse_generation = self._langfuse_client.start_generation( - name="voicemail_detection", - model="gpt-4o", - input=messages, - metadata={ - "temperature": 0.0, - "detection_duration": self.detection_duration, - "transcript_length": len(transcript), - }, - prompt=langfuse_prompt, - ) - except Exception as e: - logger.warning(f"Error starting Langfuse generation: {e}") - - # Direct API call - response = await client.chat.completions.create( - model="gpt-4o", - messages=messages, - temperature=0.0, - response_format={"type": "json_object"}, - ) - - llm_response = response.choices[0].message.content - - # Update and end Langfuse generation - if langfuse_generation: - try: - langfuse_generation.update( - output=llm_response, - usage_details={ - "prompt_tokens": response.usage.prompt_tokens - if response.usage - else 0, - "completion_tokens": response.usage.completion_tokens - if response.usage - else 0, - "total_tokens": response.usage.total_tokens - if response.usage - else 0, - }, - ) - langfuse_generation.end() - except Exception as e: - logger.warning(f"Error updating Langfuse generation: {e}") - finally: - # Detach the context - otel_context.detach(token) - else: - # No parent context or tracing disabled - just make the API call - response = await client.chat.completions.create( - model="gpt-4o", - messages=messages, - temperature=0.0, - response_format={"type": "json_object"}, - ) - llm_response = response.choices[0].message.content - - # Parse response - try: - return json.loads(llm_response) - except json.JSONDecodeError: - logger.warning("Invalid JSON response from voicemail detection") - return { - "is_voicemail": False, - "confidence": 0.0, - "reasoning": "Invalid response", - } - - async def _save_voicemail_audio( - self, wav_data: bytes, confidence: float, is_voicemail: bool - ) -> Optional[str]: - """Save voicemail audio to temp file and enqueue task to upload to S3. - - Args: - wav_data: WAV formatted audio data - confidence: Detection confidence score - is_voicemail: Whether it was detected as voicemail - - Returns: - The expected S3 object key (bucket path). The actual upload happens asynchronously. - """ - try: - # Create filename with prediction, confidence and duration - duration_seconds = self._current_duration_seconds() - prediction = "voicemail" if is_voicemail else "not_voicemail" - confidence_int = int(confidence * 100) - duration_int = int(duration_seconds) - s3_key = f"voicemail_detections/{self.workflow_run_id}_{prediction}_{confidence_int}_{duration_int}.wav" - - # Write WAV data to temp file - DO NOT delete it here, the async task will handle cleanup - with tempfile.NamedTemporaryFile( - suffix=".wav", - delete=False, # Important: don't delete immediately - prefix=f"voicemail_{self.workflow_run_id}_", - ) as tmp_file: - tmp_file.write(wav_data) - tmp_file.flush() - temp_file_path = tmp_file.name - - logger.info(f"Saved voicemail audio to temp file: {temp_file_path}") - - # Enqueue async task to upload to S3 - await enqueue_job( - FunctionNames.UPLOAD_VOICEMAIL_AUDIO_TO_S3, - self.workflow_run_id, - temp_file_path, - s3_key, - ) - - logger.info(f"Enqueued voicemail audio upload task for: {s3_key}") - return s3_key - - except Exception as e: - logger.error(f"Failed to save voicemail audio: {e}") - # Clean up temp file if task enqueue failed - if "temp_file_path" in locals() and os.path.exists(temp_file_path): - try: - os.remove(temp_file_path) - except Exception as cleanup_error: - logger.warning( - f"Failed to cleanup temp file after error: {cleanup_error}" - ) - return None diff --git a/scripts/setup_pipecat.sh b/scripts/setup_pipecat.sh index de115fc..3db9836 100755 --- a/scripts/setup_pipecat.sh +++ b/scripts/setup_pipecat.sh @@ -16,7 +16,7 @@ git submodule update --init --recursive # Install pipecat in editable mode with all extras echo "Installing pipecat dependencies..." -pip install -e ./pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3] +pip install -e ./pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3,speechmatics] # Install other requirements echo "Installing dograh API requirements..."