fix: migrate from custom audio recorder to native AudioBuffer (#115)

* fix: update to pipecat VM Detector

* fix: refactor to remove audio synchronizer

* feat: add speechmatics as STT
This commit is contained in:
Abhishek 2026-01-08 18:03:26 +05:30 committed by GitHub
parent 31521008cf
commit edf0fa4fbc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 193 additions and 591 deletions

View file

@ -20,7 +20,7 @@ RUN pip install --user --no-cache-dir -r requirements.txt && \
# Copy and install pipecat from local submodule
COPY pipecat /tmp/pipecat
RUN pip install --user --no-cache-dir '/tmp/pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3]' && \
RUN pip install --user --no-cache-dir '/tmp/pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3,speechmatics]' && \
# Clean up pip cache and temporary pipecat directory
rm -rf /root/.cache/pip /tmp/pipecat

View file

@ -39,6 +39,7 @@ class UserConfigurationValidator:
ServiceProviders.CARTESIA.value: self._check_cartesia_api_key,
ServiceProviders.DOGRAH.value: self._check_dograh_api_key,
ServiceProviders.SARVAM.value: self._check_sarvam_api_key,
ServiceProviders.SPEECHMATICS.value: self._check_speechmatics_api_key,
}
async def validate(self, configuration: UserConfiguration) -> APIKeyStatusResponse:
@ -137,3 +138,6 @@ class UserConfigurationValidator:
def _check_sarvam_api_key(self, model: str, api_key: str) -> bool:
return True
def _check_speechmatics_api_key(self, model: str, api_key: str) -> bool:
return True

View file

@ -21,6 +21,7 @@ class ServiceProviders(str, Enum):
AZURE = "azure"
DOGRAH = "dograh"
SARVAM = "sarvam"
SPEECHMATICS = "speechmatics"
class BaseServiceConfiguration(BaseModel):
@ -240,6 +241,7 @@ class DograhTTSService(BaseTTSConfiguration):
default="default", json_schema_extra={"examples": DOGRAH_TTS_MODELS}
)
voice: str = "default"
speed: float = Field(default=1.0, ge=0.5, le=2.0, description="Speed of the voice")
api_key: str
@ -375,11 +377,50 @@ SARVAM_STT_MODELS = ["saarika:v2.5", "saaras:v2"]
# api_key: str
# Speechmatics STT Service
SPEECHMATICS_STT_LANGUAGES = [
"en",
"es",
"fr",
"de",
"it",
"pt",
"nl",
"ja",
"ko",
"zh",
"ru",
"ar",
"hi",
"pl",
"tr",
"vi",
"th",
"id",
"ms",
"sv",
"da",
"no",
"fi",
]
@register_stt
class SpeechmaticsSTTConfiguration(BaseSTTConfiguration):
provider: Literal[ServiceProviders.SPEECHMATICS] = ServiceProviders.SPEECHMATICS
model: str = Field(default="enhanced", description="Operating point: standard or enhanced")
language: str = Field(
default="en", json_schema_extra={"examples": SPEECHMATICS_STT_LANGUAGES}
)
api_key: str
STTConfig = Annotated[
Union[
DeepgramSTTConfiguration,
OpenAISTTConfiguration,
DograhSTTService,
SpeechmaticsSTTConfiguration,
# SarvamSTTConfiguration,
],
Field(discriminator="provider"),

View file

@ -83,29 +83,31 @@ class LoopTalkPipelineBuilder:
logger.debug(f"Created services for {role}: STT={stt}, LLM={llm}, TTS={tts}")
audio_buffer, audio_synchronizer, transcript, context = (
create_pipeline_components(audio_config)
)
context_aggregator = LLMContextAggregatorPair(context)
# Get workflow graph
workflow_graph = WorkflowGraph(
ReactFlowDTO.model_validate(workflow.workflow_definition_with_fallback)
)
# Create engine
# Create engine first (needed for create_pipeline_components)
engine = PipecatEngine(
task=None, # Will be set after creating the task
llm=llm,
context=context,
tts=tts,
workflow=workflow_graph,
call_context_vars={},
audio_buffer=audio_buffer,
workflow_run_id=None, # LoopTalk doesn't have workflow runs
)
# Create pipeline components with audio configuration and engine
audio_buffer, transcript, context = create_pipeline_components(
audio_config, engine
)
# Set the context and audio_buffer after creation
engine.set_context(context)
engine.set_audio_buffer(audio_buffer)
context_aggregator = LLMContextAggregatorPair(context)
# Create STT mute filter
stt_mute_filter = STTMuteFilter(
config=STTMuteConfig(
@ -124,19 +126,13 @@ class LoopTalkPipelineBuilder:
user_context_aggregator = context_aggregator.user()
assistant_context_aggregator = context_aggregator.assistant()
# Register processors with synchronizer for merged audio
audio_synchronizer.register_processors(
audio_buffer.input(), audio_buffer.output()
)
# Get audio streamer for real-time streaming
audio_streamer = get_or_create_audio_streamer(str(test_session_id), role)
# Create pipeline
# Create pipeline with AudioBufferProcessor after transport.output()
pipeline = Pipeline(
[
transport.input(),
audio_buffer.input(), # Record input audio
audio_streamer, # Stream audio to connected clients
stt_mute_filter,
stt,
@ -146,7 +142,7 @@ class LoopTalkPipelineBuilder:
pipeline_engine_callback_processor,
tts,
transport.output(),
audio_buffer.output(), # Record output audio
audio_buffer, # AudioBufferProcessor - records both input and output audio
transcript.assistant(),
assistant_context_aggregator,
]
@ -157,13 +153,12 @@ class LoopTalkPipelineBuilder:
task = create_pipeline_task(pipeline, conversation_id, audio_config)
# Set the task on the engine
engine.task = task
engine.set_task(task)
return {
"task": task,
"engine": engine,
"audio_buffer": audio_buffer,
"audio_synchronizer": audio_synchronizer,
"transcript": transcript,
"assistant_context_aggregator": assistant_context_aggregator,
"audio_streamer": audio_streamer,

View file

@ -245,7 +245,6 @@ class LoopTalkTestOrchestrator:
engine = pipeline_info["engine"]
task = pipeline_info["task"]
audio_buffer = pipeline_info["audio_buffer"]
audio_synchronizer = pipeline_info["audio_synchronizer"]
transcript = pipeline_info["transcript"]
assistant_context_aggregator = pipeline_info["assistant_context_aggregator"]
@ -255,7 +254,6 @@ class LoopTalkTestOrchestrator:
logger.debug(f"LoopTalk {role} client connected - initializing workflow")
# Start audio recording
await audio_buffer.start_recording()
await audio_synchronizer.start_recording()
await engine.initialize()
@transport.event_handler("on_client_disconnected")
@ -263,7 +261,6 @@ class LoopTalkTestOrchestrator:
logger.debug(f"LoopTalk {role} client disconnected")
# Stop audio recording
await audio_buffer.stop_recording()
await audio_synchronizer.stop_recording()
# Handle disconnect propagation - stop the other agent too
await self.session_manager.handle_agent_disconnect(
@ -274,11 +271,11 @@ class LoopTalkTestOrchestrator:
# Register custom audio and transcript handlers for LoopTalk
await self._register_looptalk_handlers(
audio_synchronizer, transcript, test_session_id, role
audio_buffer, transcript, test_session_id, role
)
async def _register_looptalk_handlers(
self, audio_synchronizer, transcript, test_session_id: int, role: str
self, audio_buffer, transcript, test_session_id: int, role: str
):
"""Register LoopTalk-specific handlers for audio and transcript recording"""
@ -288,9 +285,9 @@ class LoopTalkTestOrchestrator:
audio_metadata = {"sample_rate": None, "num_channels": None}
# Audio handler - writes directly to PCM file
@audio_synchronizer.event_handler("on_merged_audio")
async def on_merged_audio(_, pcm, sample_rate, num_channels):
if not pcm:
@audio_buffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
if not audio:
return
# Store metadata on first write
@ -301,7 +298,7 @@ class LoopTalkTestOrchestrator:
# Append PCM data to temporary file
try:
with open(paths["temp_audio"], "ab") as f:
f.write(pcm)
f.write(audio)
except Exception as e:
logger.error(
f"Failed to write audio for {role} in session {test_session_id}: {e}"

View file

@ -16,10 +16,9 @@ from api.services.workflow.disposition_mapper import (
from api.services.workflow.pipecat_engine import PipecatEngine
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
from pipecat.frames.frames import Frame
from pipecat.frames.frames import Frame, LLMContextFrame
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.audio.audio_buffer_processor import AudioBuffer
from pipecat.processors.audio.audio_synchronizer import AudioSynchronizer
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
def register_transport_event_handlers(
@ -27,8 +26,7 @@ def register_transport_event_handlers(
transport,
workflow_run_id,
engine: PipecatEngine,
audio_buffer: AudioBuffer,
audio_synchronizer: AudioSynchronizer,
audio_buffer: AudioBufferProcessor,
audio_config=AudioConfig,
):
"""Register event handlers for transport events"""
@ -53,8 +51,6 @@ def register_transport_event_handlers(
async def on_client_connected(transport, participant):
logger.debug("In on_client_connected callback handler - initializing workflow")
await audio_buffer.start_recording()
if audio_synchronizer:
await audio_synchronizer.start_recording()
await engine.initialize()
@transport.event_handler("on_client_disconnected")
@ -68,8 +64,6 @@ def register_transport_event_handlers(
# Stop recordings
await audio_buffer.stop_recording()
if audio_synchronizer:
await audio_synchronizer.stop_recording()
# Only cancel the task if the call is not already disposed by the engine
if not call_disposed:
@ -84,12 +78,19 @@ def register_task_event_handler(
engine: PipecatEngine,
task: PipelineTask,
transport,
audio_buffer: AudioBuffer,
audio_synchronizer: AudioSynchronizer,
audio_buffer: AudioBufferProcessor,
in_memory_audio_buffer: InMemoryAudioBuffer,
in_memory_transcript_buffer: InMemoryTranscriptBuffer,
pipeline_metrics_aggregator: PipelineMetricsAggregator,
):
@task.event_handler("on_pipeline_started")
async def on_pipeline_started(task: PipelineTask, frame: Frame):
logger.debug(
"In on_pipeline_started callback handler - triggering initial LLM generation"
)
# Trigger initial LLM generation after pipeline has started
await engine.llm.queue_frame(LLMContextFrame(engine.context))
@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(
task: PipelineTask,
@ -101,8 +102,6 @@ def register_task_event_handler(
# Stop recordings
await audio_buffer.stop_recording()
if audio_synchronizer:
await audio_synchronizer.stop_recording()
call_disposition = await engine.get_call_disposition()
logger.debug(f"call disposition in on_pipeline_finished: {call_disposition}")
@ -224,19 +223,21 @@ def register_task_event_handler(
def register_audio_data_handler(
audio_synchronizer, workflow_run_id, in_memory_buffer: InMemoryAudioBuffer
audio_buffer: AudioBufferProcessor,
workflow_run_id,
in_memory_buffer: InMemoryAudioBuffer,
):
"""Register event handler for audio data"""
logger.info(f"Registering audio data handler for workflow run {workflow_run_id}")
@audio_synchronizer.event_handler("on_merged_audio")
async def on_merged_audio(_, pcm, sample_rate, num_channels):
if not pcm:
@audio_buffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
if not audio:
return
# Use in-memory buffer
try:
await in_memory_buffer.append(pcm)
await in_memory_buffer.append(audio)
except MemoryError as e:
logger.error(f"Memory buffer full: {e}")
# Could implement overflow to disk here if needed

View file

@ -10,8 +10,7 @@ from api.services.pipecat.audio_config import AudioConfig
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBuffer
from pipecat.processors.audio.audio_synchronizer import AudioSynchronizer
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.utils.context import turn_var
@ -23,15 +22,8 @@ def create_pipeline_components(audio_config: AudioConfig, engine: "PipecatEngine
"""Create and return the main pipeline components with proper audio configuration"""
logger.info(f"Creating pipeline components with audio config: {audio_config}")
# Use new split audio buffer for better performance
audio_buffer = AudioBuffer(
sample_rate=audio_config.pipeline_sample_rate,
buffer_size=audio_config.buffer_size_bytes,
max_recording_bytes=audio_config.max_recording_bytes,
)
# Create synchronizer for merged audio (outside pipeline)
audio_synchronizer = AudioSynchronizer(
# Use native AudioBufferProcessor for merged audio recording
audio_buffer = AudioBufferProcessor(
sample_rate=audio_config.pipeline_sample_rate,
buffer_size=audio_config.buffer_size_bytes,
)
@ -42,7 +34,7 @@ def create_pipeline_components(audio_config: AudioConfig, engine: "PipecatEngine
context = LLMContext()
return audio_buffer, audio_synchronizer, transcript, context
return audio_buffer, transcript, context
def build_pipeline(
@ -50,7 +42,6 @@ def build_pipeline(
stt,
transcript,
audio_buffer,
audio_synchronizer,
llm,
tts,
user_context_aggregator,
@ -59,30 +50,41 @@ def build_pipeline(
stt_mute_filter,
pipeline_metrics_aggregator,
user_idle_disconnect,
voicemail_detector=None,
):
"""Build the main pipeline with all components"""
# Register processors with synchronizer for merged audio
logger.info("Registering audio buffer processors with synchronizer")
audio_synchronizer.register_processors(audio_buffer.input(), audio_buffer.output())
"""Build the main pipeline with all components.
# Build processors list with optional context controller
Args:
audio_buffer: AudioBufferProcessor that handles both input and output audio recording.
voicemail_detector: Optional native pipecat VoicemailDetector. When provided,
inserts voicemail detection after STT. Note: We don't use the TTS gate
to avoid blocking TTS frames during classification.
"""
# Build processors list with optional voicemail detection
processors = [
transport.input(), # Transport user input
audio_buffer.input(), # Record input audio (only processes InputAudioRawFrame)
stt, # STT can now have audio_passthrough=False
stt_mute_filter, # STTMuteFilters don't let VAD related events pass through if muted
user_idle_disconnect,
transcript.user(),
stt, # STT (audio_passthrough=True by default, passes InputAudioRawFrame)
]
# Insert voicemail detector after STT if enabled
# Note: We intentionally do NOT use voicemail_detector.gate() to allow TTS
# frames to continue flowing during classification (non-blocking detection)
if voicemail_detector:
logger.info("Adding native voicemail detector to pipeline")
processors.append(voicemail_detector.detector())
# Continue with the rest of the pipeline
processors.extend(
[
stt_mute_filter, # STTMuteFilters don't let VAD related events pass through if muted
user_idle_disconnect,
transcript.user(),
user_context_aggregator,
llm, # LLM
pipeline_engine_callback_processor,
tts, # TTS
transport.output(), # Transport bot output
audio_buffer.output(), # Record output audio (only processes OutputAudioRawFrame)
audio_buffer, # AudioBufferProcessor - records both input and output audio
transcript.assistant(),
assistant_context_aggregator, # Assistant spoken responses
pipeline_metrics_aggregator,

View file

@ -27,6 +27,7 @@ from api.services.pipecat.service_factory import (
create_llm_service,
create_stt_service,
create_tts_service,
create_voicemail_classification_llm,
)
from api.services.pipecat.tracing_config import setup_pipeline_tracing
from api.services.pipecat.transport_setup import (
@ -41,8 +42,12 @@ from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector
from pipecat.pipeline.base_task import PipelineTaskParams
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response import (
LLMAssistantAggregatorParams,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
)
@ -54,6 +59,7 @@ from pipecat.processors.filters.stt_mute_filter import (
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.utils.context import set_current_run_id
from pipecat.utils.enums import EndTaskReason
from pipecat.utils.tracing.context_registry import ContextProviderRegistry
# Setup tracing if enabled
@ -468,9 +474,7 @@ async def _run_pipeline(
)
# Create pipeline components with audio configuration and engine
audio_buffer, audio_synchronizer, transcript, context = create_pipeline_components(
audio_config, engine
)
audio_buffer, transcript, context = create_pipeline_components(audio_config, engine)
# Set the context and audio_buffer after creation
engine.set_context(context)
@ -484,8 +488,9 @@ async def _run_pipeline(
expect_stripped_words=True,
correct_aggregation_callback=engine.create_aggregation_correction_callback(),
)
user_params = LLMUserAggregatorParams(enable_emulated_vad_interruptions=True)
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=assistant_params
context, assistant_params=assistant_params, user_params=user_params
)
# Create usage metrics aggregator with engine's callback
@ -517,13 +522,35 @@ async def _run_pipeline(
user_context_aggregator = context_aggregator.user()
assistant_context_aggregator = context_aggregator.assistant()
# Create voicemail detector if enabled in the workflow's start node
voicemail_detector = None
start_node = workflow_graph.nodes.get(workflow_graph.start_node_id)
if start_node and start_node.detect_voicemail:
classification_llm = create_voicemail_classification_llm()
if classification_llm:
logger.info(
f"Voicemail detection enabled for workflow run {workflow_run_id}"
)
voicemail_detector = VoicemailDetector(
llm=classification_llm,
voicemail_response_delay=2.0,
)
# Register event handler to end task when voicemail is detected
@voicemail_detector.event_handler("on_voicemail_detected")
async def _on_voicemail_detected(_processor):
logger.info(f"Voicemail detected for workflow run {workflow_run_id}")
await engine.send_end_task_frame(
reason=EndTaskReason.VOICEMAIL_DETECTED.value,
abort_immediately=True,
)
# Build the pipeline with the STT mute filter and context controller
pipeline = build_pipeline(
transport,
stt,
transcript,
audio_buffer,
audio_synchronizer,
llm,
tts,
user_context_aggregator,
@ -532,6 +559,7 @@ async def _run_pipeline(
stt_mute_filter,
pipeline_metrics_aggregator,
user_idle_disconnect,
voicemail_detector=voicemail_detector,
)
# Create pipeline task with audio configuration
@ -548,7 +576,6 @@ async def _run_pipeline(
workflow_run_id,
engine=engine,
audio_buffer=audio_buffer,
audio_synchronizer=audio_synchronizer,
audio_config=audio_config,
)
)
@ -559,15 +586,12 @@ async def _run_pipeline(
task,
transport,
audio_buffer,
audio_synchronizer,
in_memory_audio_buffer,
in_memory_transcript_buffer,
pipeline_metrics_aggregator,
)
register_audio_data_handler(
audio_synchronizer, workflow_run_id, in_memory_audio_buffer
)
register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer)
register_transcript_handler(
transcript, workflow_run_id, in_memory_transcript_buffer
)

View file

@ -1,3 +1,4 @@
import os
from typing import TYPE_CHECKING
from fastapi import HTTPException
@ -20,6 +21,7 @@ from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.services.sarvam.stt import SarvamSTTService
from pipecat.services.sarvam.tts import SarvamTTSService
from pipecat.services.speechmatics.stt import SpeechmaticsSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.text.xml_function_tag_filter import XMLFunctionTagFilter
@ -40,28 +42,20 @@ def create_stt_service(user_config):
)
logger.debug(f"Using DeepGram Model - {user_config.stt.model}")
return DeepgramSTTService(
live_options=live_options,
api_key=user_config.stt.api_key,
audio_passthrough=False, # Disable passthrough since audio is buffered separately
live_options=live_options, api_key=user_config.stt.api_key
)
elif user_config.stt.provider == ServiceProviders.OPENAI.value:
return OpenAISTTService(
api_key=user_config.stt.api_key,
model=user_config.stt.model,
audio_passthrough=False, # Disable passthrough since audio is buffered separately
api_key=user_config.stt.api_key, model=user_config.stt.model
)
elif user_config.stt.provider == ServiceProviders.CARTESIA.value:
return CartesiaSTTService(
api_key=user_config.stt.api_key,
audio_passthrough=False, # Disable passthrough since audio is buffered separately
)
return CartesiaSTTService(api_key=user_config.stt.api_key)
elif user_config.stt.provider == ServiceProviders.DOGRAH.value:
base_url = MPS_API_URL.replace("http://", "ws://").replace("https://", "wss://")
return DograhSTTService(
base_url=base_url,
api_key=user_config.stt.api_key,
model=user_config.stt.model,
audio_passthrough=False, # Disable passthrough since audio is buffered separately
)
elif user_config.stt.provider == ServiceProviders.SARVAM.value:
# Map Sarvam language code to pipecat Language enum
@ -85,7 +79,23 @@ def create_stt_service(user_config):
api_key=user_config.stt.api_key,
model=user_config.stt.model,
params=SarvamSTTService.InputParams(language=pipecat_language),
audio_passthrough=False,
)
elif user_config.stt.provider == ServiceProviders.SPEECHMATICS.value:
from pipecat.services.speechmatics.stt import OperatingPoint
language = getattr(user_config.stt, "language", None) or "en"
# Map model field to operating point (standard or enhanced)
operating_point = (
OperatingPoint.ENHANCED
if user_config.stt.model == "enhanced"
else OperatingPoint.STANDARD
)
return SpeechmaticsSTTService(
api_key=user_config.stt.api_key,
params=SpeechmaticsSTTService.InputParams(
language=language,
operating_point=operating_point,
),
)
else:
raise HTTPException(
@ -138,6 +148,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
api_key=user_config.tts.api_key,
model=user_config.tts.model,
voice=user_config.tts.voice,
params=DograhTTSService.InputParams(speed=user_config.tts.speed),
text_filters=[xml_function_tag_filter],
)
elif user_config.tts.provider == ServiceProviders.SARVAM.value:
@ -222,3 +233,24 @@ def create_llm_service(user_config):
)
else:
raise HTTPException(status_code=400, detail="Invalid LLM provider")
def create_voicemail_classification_llm():
"""Create a fast, lightweight LLM service for voicemail classification.
Uses gpt-4o-mini which is fast and cost-effective for simple classification tasks.
The model only needs to output "CONVERSATION" or "VOICEMAIL" based on transcriptions.
Returns:
OpenAILLMService instance, or None if OPENAI_API_KEY is not set.
"""
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
logger.warning("OPENAI_API_KEY not set - voicemail detection will be disabled")
return None
return OpenAILLMService(
api_key=api_key,
model="gpt-4o-mini",
params=OpenAILLMService.InputParams(temperature=0.0),
)

View file

@ -1,19 +1,14 @@
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Union
from api.constants import DEPLOYMENT_MODE, ENABLE_TRACING, VOICEMAIL_RECORDING_DURATION
from api.services.workflow.disposition_mapper import (
apply_disposition_mapping,
get_organization_id_from_workflow_run,
)
from api.services.workflow.pipecat_engine_voicemail_detector import (
VoicemailDetector,
)
from api.services.workflow.workflow import Node, WorkflowGraph
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
FunctionCallResultProperties,
LLMContextFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.task import PipelineTask
@ -93,11 +88,6 @@ class PipecatEngine:
# access to _context
self._variable_extraction_manager = None
# Voicemail detection state
self._detect_voicemail = False
self._voicemail_detector = None
self._voicemail_detection_task: Optional[asyncio.Task] = None
# Lazy loaded built-in function schemas
self._builtin_function_schemas: Optional[list[dict]] = None
@ -172,8 +162,6 @@ class PipecatEngine:
await self.set_node(self.workflow.start_node_id)
# Trigger initial LLM generation
await self.task.queue_frame(LLMContextFrame(self.context))
logger.debug(f"{self.__class__.__name__} initialized")
except Exception as e:
logger.error(f"Error initializing {self.__class__.__name__}: {e}")
@ -388,43 +376,6 @@ class PipecatEngine:
async def _handle_start_node(self, node: Node) -> None:
"""Handle start node execution."""
# Handle voicemail detection setup (before any returns)
# Lets check ENABLE_TRACING to make sure we have prompt access from
# langfuse
if node.detect_voicemail and DEPLOYMENT_MODE == "saas" and ENABLE_TRACING:
if not self._audio_buffer:
logger.warning(
"Voicemail detection enabled but no audio buffer available - skipping detection"
)
else:
logger.debug(
"Start node has detect_voicemail enabled - setting up audio-based detector"
)
self._detect_voicemail = True
self._voicemail_detector = VoicemailDetector(
detection_duration=VOICEMAIL_RECORDING_DURATION,
workflow_run_id=self._workflow_run_id,
)
# Register audio handler on the audio buffer input processor
audio_input = self._audio_buffer.input()
@audio_input.event_handler("on_input_audio_data")
async def handle_voicemail_audio(
processor, pcm, sample_rate, num_channels
):
if (
self._voicemail_detector
and self._voicemail_detector.is_detecting
):
await self._voicemail_detector.handle_audio_data(
processor, pcm, sample_rate, num_channels
)
# Start detection
await self._voicemail_detector.start_detection(self)
# Check if delayed start is enabled
if node.delayed_start:
# Use configured duration or default to 3 seconds
@ -745,8 +696,4 @@ class PipecatEngine:
):
self._user_response_timeout_task.cancel()
# Stop voicemail detection if active
if self._voicemail_detector and hasattr(
self._voicemail_detector, "stop_detection"
):
await self._voicemail_detector.stop_detection()
# Note: Native VoicemailDetector cleanup is handled by the pipeline

View file

@ -1,441 +0,0 @@
from __future__ import annotations
import asyncio
import io
import json
import os
import tempfile
import wave
from typing import TYPE_CHECKING, Optional
from langfuse import get_client
from loguru import logger
from openai import AsyncOpenAI
from opentelemetry import context as otel_context
from api.db import db_client
from api.services.pipecat.tracing_config import is_tracing_enabled
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
from pipecat.utils.enums import EndTaskReason
from pipecat.utils.tracing.context_registry import get_current_turn_context
if TYPE_CHECKING:
from api.services.workflow.pipecat_engine import PipecatEngine
DEFAULT_VOICEMAIL_PROMPT = """
You are analyzing the beginning of a phone call to determine if it's a voicemail greeting.
Common voicemail indicators:
- "You've reached the voicemail of..."
- "Please leave a message after the beep"
- "I'm not available right now"
- "Press 1 to leave a message"
- Robotic or pre-recorded voice quality mentioned
- Background music or hold music references
Transcript: {transcript}
Respond with a JSON object:
{
"is_voicemail": true/false,
"confidence": 0.0-1.0,
"reasoning": "Brief explanation"
}
"""
class VoicemailDetector:
"""
Autonomous voicemail detection system that operates independently of the main pipeline.
"""
def __init__(self, detection_duration: float = 15.0, workflow_run_id: int = None):
self.detection_duration = detection_duration
self.audio_buffer = bytearray()
self.is_detecting = False
self.workflow_run_id = workflow_run_id
self._langfuse_client = get_client()
# We will set the sample rate when we receive the audio packet
self._sample_rate = None
# Task management
self._detection_task: Optional[asyncio.Task] = None
self._is_cancelled = False
self._engine: Optional[PipecatEngine] = None
# Event for audio collection completion
self._audio_collected_event = asyncio.Event()
# ------------------------------------------------------------------
# Utility helpers
# ------------------------------------------------------------------
def _current_duration_seconds(self) -> float:
"""Return the duration (in seconds) of the audio currently in the buffer."""
if self._sample_rate:
return len(self.audio_buffer) / (self._sample_rate * 2)
return 0.0
async def handle_audio_data(
self, processor, pcm: bytes, sample_rate: int, num_channels: int
):
"""Handle incoming audio data without affecting pipeline."""
if not self.is_detecting or self._is_cancelled:
return
# Store the actual sample rate from the first audio packet
if self._sample_rate is None:
self._sample_rate = sample_rate
logger.debug(f"Voicemail detector using sample rate: {sample_rate}")
# Add to buffer without resampling
self.audio_buffer.extend(pcm)
# Check if we've collected enough audio
current_duration = self._current_duration_seconds()
if current_duration >= self.detection_duration:
self._audio_collected_event.set()
async def start_detection(self, engine: PipecatEngine):
"""Start voicemail detection process."""
logger.info("Starting voicemail detection")
self.is_detecting = True
self._is_cancelled = False
self._engine = engine
self._audio_collected_event.clear()
# Start detection in background
self._detection_task = asyncio.create_task(self._run_detection_with_timeout())
async def stop_detection(self):
"""Stop detection immediately (called on disconnect)."""
logger.info("Stopping voicemail detection due to disconnect")
self._is_cancelled = True
self.is_detecting = False
# Set the event to unblock any waiting tasks
self._audio_collected_event.set()
# Cancel ongoing detection task
if self._detection_task and not self._detection_task.done():
self._detection_task.cancel()
# Clear audio buffer
self.audio_buffer.clear()
# Wait for tasks to complete cancellation
if self._detection_task:
try:
await self._detection_task
except asyncio.CancelledError:
pass
async def _run_detection_with_timeout(self):
"""Run detection with proper timeout and cancellation handling."""
try:
# Wait for audio collection or cancellation directly
await self._wait_for_audio_collection()
# Check if cancelled during collection
if self._is_cancelled:
logger.info("Detection cancelled during audio collection")
return
# Process detection
await self._process_detection()
except asyncio.CancelledError:
logger.info("Voicemail detection task cancelled")
except Exception as e:
logger.error(f"Error in voicemail detection: {e}")
finally:
self.is_detecting = False
async def _wait_for_audio_collection(self):
"""Wait for audio buffer to fill or timeout."""
try:
# Wait for either audio collection completion or timeout
await asyncio.wait_for(
self._audio_collected_event.wait(),
timeout=self.detection_duration + 2.0,
)
if not self._is_cancelled:
current_duration = self._current_duration_seconds()
logger.info(
f"Collected {current_duration:.1f}s of audio for voicemail detection (sample rate: {self._sample_rate}Hz)"
)
except asyncio.TimeoutError:
if not self._is_cancelled:
current_duration = self._current_duration_seconds()
logger.warning("Audio collection timeout exceeded")
logger.info(
f"Proceeding with {current_duration:.1f}s of audio (sample rate: {self._sample_rate}Hz)"
)
async def _process_detection(self):
"""Process the collected audio to detect voicemail."""
if not self.audio_buffer or not self._engine:
logger.warning("No audio buffer or engine available for detection")
return
try:
# Convert PCM to WAV once for both transcription and storage
wav_data = self._create_wav_from_pcm(bytes(self.audio_buffer))
# Transcribe audio
logger.info("Transcribing audio for voicemail detection")
transcript = await self._transcribe_audio(wav_data)
if not transcript:
logger.warning("No transcript obtained from audio")
# Still upload the raw recording so data pipeline has it
if self.workflow_run_id:
await self._save_voicemail_audio(wav_data, 0.0, False)
return
logger.info(
f"Voicemail detection transcript obtained: {transcript[:100]}..."
)
# Analyze transcript
result = await self._analyze_transcript(transcript)
# Extract common fields
confidence = result.get("confidence", 0.0)
reasoning = result.get("reasoning", "No reasoning provided")
# Save voicemail audio to S3 once for data pipeline (include duration in filename)
s3_path = None
if self.workflow_run_id:
s3_path = await self._save_voicemail_audio(
wav_data, confidence, result.get("is_voicemail")
)
# Take action based on result
if result.get("is_voicemail", False):
logger.info(
f"Voicemail detected with confidence {confidence}: {reasoning}"
)
# Update workflow run with voicemail tags
if self.workflow_run_id:
# Fetch the workflow run from database
workflow_run = await db_client.get_workflow_run_by_id(
self.workflow_run_id
)
if workflow_run:
call_tags = workflow_run.gathered_context.get("call_tags", [])
call_tags.extend(["voicemail_detected", "not_connected"])
await db_client.update_workflow_run(
run_id=workflow_run.id,
gathered_context={
"call_tags": call_tags,
"voicemail_transcript": transcript,
"voicemail_confidence": confidence,
},
)
# Send end task frame with metadata (including optional S3 path)
await self._engine.send_end_task_frame(
reason=EndTaskReason.VOICEMAIL_DETECTED.value,
abort_immediately=True,
)
else:
logger.info("No voicemail detected, continuing normal conversation")
except Exception as e:
logger.error(f"Error processing voicemail detection: {e}")
async def _transcribe_audio(self, wav_data: bytes) -> str:
"""Transcribe audio using OpenAI API directly.
Args:
wav_data: WAV formatted audio data
"""
client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
# Direct API call - no pipeline involvement
response = await client.audio.transcriptions.create(
file=("audio.wav", wav_data, "audio/wav"),
model="whisper-1", # Using whisper-1 as it's more stable for transcription
language="en",
temperature=0.0,
)
return response.text.strip()
def _create_wav_from_pcm(self, pcm_data: bytes) -> bytes:
"""Convert raw PCM data to WAV format."""
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, "wb") as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self._sample_rate)
wav_file.writeframes(pcm_data)
wav_buffer.seek(0)
return wav_buffer.read()
async def _analyze_transcript(self, transcript: str) -> dict:
"""Analyze transcript using independent OpenAI client."""
# Capture the current turn context for proper span nesting
parent_context = get_current_turn_context()
client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
langfuse_prompt = None
try:
langfuse_prompt = self._langfuse_client.get_prompt(
"production/voicemail_detection"
)
prompt = langfuse_prompt.compile(transcript=transcript)
except Exception as e:
logger.warning(f"Error getting prompt from Langfuse: {e}")
prompt = DEFAULT_VOICEMAIL_PROMPT.replace("{transcript}", transcript)
messages = [
{
"role": "system",
"content": prompt,
}
]
# When we have a parent OpenTelemetry context, we need to activate it
# so that Langfuse's OTEL tracer will automatically pick it up
if parent_context and is_tracing_enabled():
# Activate the parent context for this scope
token = otel_context.attach(parent_context)
try:
# Start Langfuse generation - it will automatically use the active OTEL context
langfuse_generation = None
try:
langfuse_generation = self._langfuse_client.start_generation(
name="voicemail_detection",
model="gpt-4o",
input=messages,
metadata={
"temperature": 0.0,
"detection_duration": self.detection_duration,
"transcript_length": len(transcript),
},
prompt=langfuse_prompt,
)
except Exception as e:
logger.warning(f"Error starting Langfuse generation: {e}")
# Direct API call
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.0,
response_format={"type": "json_object"},
)
llm_response = response.choices[0].message.content
# Update and end Langfuse generation
if langfuse_generation:
try:
langfuse_generation.update(
output=llm_response,
usage_details={
"prompt_tokens": response.usage.prompt_tokens
if response.usage
else 0,
"completion_tokens": response.usage.completion_tokens
if response.usage
else 0,
"total_tokens": response.usage.total_tokens
if response.usage
else 0,
},
)
langfuse_generation.end()
except Exception as e:
logger.warning(f"Error updating Langfuse generation: {e}")
finally:
# Detach the context
otel_context.detach(token)
else:
# No parent context or tracing disabled - just make the API call
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.0,
response_format={"type": "json_object"},
)
llm_response = response.choices[0].message.content
# Parse response
try:
return json.loads(llm_response)
except json.JSONDecodeError:
logger.warning("Invalid JSON response from voicemail detection")
return {
"is_voicemail": False,
"confidence": 0.0,
"reasoning": "Invalid response",
}
async def _save_voicemail_audio(
self, wav_data: bytes, confidence: float, is_voicemail: bool
) -> Optional[str]:
"""Save voicemail audio to temp file and enqueue task to upload to S3.
Args:
wav_data: WAV formatted audio data
confidence: Detection confidence score
is_voicemail: Whether it was detected as voicemail
Returns:
The expected S3 object key (bucket path). The actual upload happens asynchronously.
"""
try:
# Create filename with prediction, confidence and duration
duration_seconds = self._current_duration_seconds()
prediction = "voicemail" if is_voicemail else "not_voicemail"
confidence_int = int(confidence * 100)
duration_int = int(duration_seconds)
s3_key = f"voicemail_detections/{self.workflow_run_id}_{prediction}_{confidence_int}_{duration_int}.wav"
# Write WAV data to temp file - DO NOT delete it here, the async task will handle cleanup
with tempfile.NamedTemporaryFile(
suffix=".wav",
delete=False, # Important: don't delete immediately
prefix=f"voicemail_{self.workflow_run_id}_",
) as tmp_file:
tmp_file.write(wav_data)
tmp_file.flush()
temp_file_path = tmp_file.name
logger.info(f"Saved voicemail audio to temp file: {temp_file_path}")
# Enqueue async task to upload to S3
await enqueue_job(
FunctionNames.UPLOAD_VOICEMAIL_AUDIO_TO_S3,
self.workflow_run_id,
temp_file_path,
s3_key,
)
logger.info(f"Enqueued voicemail audio upload task for: {s3_key}")
return s3_key
except Exception as e:
logger.error(f"Failed to save voicemail audio: {e}")
# Clean up temp file if task enqueue failed
if "temp_file_path" in locals() and os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
except Exception as cleanup_error:
logger.warning(
f"Failed to cleanup temp file after error: {cleanup_error}"
)
return None

View file

@ -16,7 +16,7 @@ git submodule update --init --recursive
# Install pipecat in editable mode with all extras
echo "Installing pipecat dependencies..."
pip install -e ./pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3]
pip install -e ./pipecat[cartesia,deepgram,openai,elevenlabs,groq,google,azure,sarvam,soundfile,silero,webrtc,local-smart-turn-v3,speechmatics]
# Install other requirements
echo "Installing dograh API requirements..."