feat: add voicemail detection in realtime branch

This commit is contained in:
Abhishek Kumar 2026-05-06 17:50:02 +05:30
parent 4634e1c2ed
commit 025bc14392
8 changed files with 106 additions and 63 deletions

View file

@ -15,6 +15,7 @@ concurrency:
jobs:
pytest:
if: ${{ !startsWith(github.head_ref, 'release-please--') }}
runs-on: ubuntu-latest
timeout-minutes: 30

View file

@ -18,6 +18,7 @@ concurrency:
jobs:
drift-check:
if: ${{ !startsWith(github.head_ref, 'release-please--') }}
runs-on: ubuntu-latest
timeout-minutes: 25
steps:

View file

@ -97,6 +97,12 @@ def register_event_handlers(
"initial_response_triggered": False,
}
async def queue_initial_llm_context():
# Queue LLMContextFrame after the VoicemailDetector since the detector
# gates LLMContextFrames until voicemail detection completes. We also
# don't want to trigger the Voicemail LLM with this initial frame.
await engine.llm.queue_frame(LLMContextFrame(engine.context))
async def maybe_trigger_initial_response():
"""Start the conversation after both pipeline_started and client_connected events.
@ -185,7 +191,7 @@ def register_event_handlers(
f"Failed to fetch audio greeting {greeting_value}, "
"falling back to LLM generation"
)
await engine.llm.queue_frame(LLMContextFrame(engine.context))
await queue_initial_llm_context()
else:
logger.debug("Playing text greeting via TTS")
# append_to_context=True so the assistant aggregator commits
@ -198,7 +204,7 @@ def register_event_handlers(
logger.debug(
"Both pipeline_started and client_connected received - triggering initial LLM generation"
)
await engine.llm.queue_frame(LLMContextFrame(engine.context))
await queue_initial_llm_context()
@transport.event_handler("on_client_connected")
async def on_client_connected(_transport, _participant):

View file

@ -105,23 +105,53 @@ def build_realtime_pipeline(
assistant_context_aggregator,
pipeline_engine_callback_processor,
pipeline_metrics_aggregator,
voicemail_detector=None,
):
"""Build a pipeline for realtime (speech-to-speech) LLM services.
Realtime services (e.g. OpenAI Realtime, Gemini Live) handle STT+LLM+TTS
internally, so no separate STT or TTS processors are needed.
Args:
voicemail_detector: Optional VoicemailDetector. Placed *below* the
realtime LLM. This is asymmetric with the non-realtime layout
(where the detector sits between STT and the main user aggregator)
because the realtime LLM is both the source of TranscriptionFrame
(broadcast downstream) and the sink of LLMContextFrame (consumed
by _handle_context without forwarding). Placing the detector below
the realtime LLM means: downstream TranscriptionFrames reach the
classifier branch, UserStartedSpeakingFrame /
UserStoppedSpeakingFrame are forwarded through by the LLM, and the
main aggregator's LLMContextFrame is absorbed by the realtime LLM
and never leaks into the classifier (which would otherwise run a
voicemail completion on the workflow's main context).
The TTS gate and LLM gate are intentionally not used: the realtime
LLM reacts to audio directly, not to LLMContextFrames. On voicemail
detection we drop the call via end_call_with_reason; the detector's
ConversationGate also blocks downstream audio output until the call
ends.
"""
processors = [
transport.input(),
user_context_aggregator,
realtime_llm,
pipeline_engine_callback_processor,
transport.output(),
audio_buffer,
assistant_context_aggregator,
pipeline_metrics_aggregator,
]
if voicemail_detector:
logger.info("Adding native voicemail detector to realtime pipeline")
processors.append(voicemail_detector.detector())
processors.extend(
[
pipeline_engine_callback_processor,
transport.output(),
audio_buffer,
assistant_context_aggregator,
pipeline_metrics_aggregator,
]
)
return Pipeline(processors)

View file

@ -427,11 +427,13 @@ async def _run_pipeline(
# Configure turn strategies based on STT provider, model, and workflow configuration
if is_realtime:
# Realtime services have server-side VAD/turn detection.
# For stop strategy, lets rely on SmartTurnAnalyzer which is
# enabled by default
# Realtime services do server-side turn detection for response generation,
# but we still need a client-side stop strategy so the user aggregator emits
# UserStoppedSpeakingFrame. Without it, downstream consumers (e.g. voicemail
# detector) and Gemini Live's _finalize_pending flag never see a turn end.
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()], stop=[]
start=[VADUserTurnStartStrategy()],
stop=[SpeechTimeoutUserTurnStopStrategy()],
)
# Lets not start the pipeline as muted for Realtime
@ -521,7 +523,6 @@ async def _run_pipeline(
async def on_user_turn_started(aggregator, strategy):
user_idle_handler.reset()
# Voicemail detection and recording router are not supported in realtime mode
voicemail_detector = None
recording_router = None
@ -533,58 +534,61 @@ async def _run_pipeline(
)
engine.set_fetch_recording_audio(fetch_audio)
if not is_realtime:
# Create voicemail detector if enabled in workflow configurations
voicemail_config = (workflow.workflow_configurations or {}).get(
"voicemail_detection", {}
# Voicemail detection works in both modes. In realtime mode the detector sits
# after the realtime LLM and consumes the TranscriptionFrames it broadcasts;
# the LLM gate / TTS gate are not used (the realtime LLM responds to audio
# directly, not LLMContextFrames), so on detection we rely on
# end_call_with_reason to drop the call.
voicemail_config = (workflow.workflow_configurations or {}).get(
"voicemail_detection", {}
)
if voicemail_config.get("enabled", False):
logger.info(f"Voicemail detection enabled for workflow run {workflow_run_id}")
# Create a separate LLM instance for the voicemail sub-pipeline
# (can't share with main pipeline as it would mess up frame linking)
if voicemail_config.get("use_workflow_llm", True):
voicemail_llm = create_llm_service(user_config)
else:
voicemail_llm = create_llm_service_from_provider(
provider=voicemail_config.get("provider", "openai"),
model=voicemail_config.get("model", "gpt-4.1"),
api_key=voicemail_config.get("api_key", ""),
)
long_speech_timeout = voicemail_config.get("long_speech_timeout", 8.0)
custom_system_prompt = voicemail_config.get("system_prompt") or None
voicemail_detector = VoicemailDetector(
llm=voicemail_llm,
long_speech_timeout=long_speech_timeout,
custom_system_prompt=custom_system_prompt,
)
if voicemail_config.get("enabled", False):
logger.info(
f"Voicemail detection enabled for workflow run {workflow_run_id}"
)
# Create a separate LLM instance for the voicemail sub-pipeline
# (can't share with main pipeline as it would mess up frame linking)
if voicemail_config.get("use_workflow_llm", True):
voicemail_llm = create_llm_service(user_config)
else:
voicemail_llm = create_llm_service_from_provider(
provider=voicemail_config.get("provider", "openai"),
model=voicemail_config.get("model", "gpt-4.1"),
api_key=voicemail_config.get("api_key", ""),
)
long_speech_timeout = voicemail_config.get("long_speech_timeout", 8.0)
custom_system_prompt = voicemail_config.get("system_prompt") or None
voicemail_detector = VoicemailDetector(
llm=voicemail_llm,
long_speech_timeout=long_speech_timeout,
custom_system_prompt=custom_system_prompt,
# Register event handler to end task when voicemail is detected
@voicemail_detector.event_handler("on_voicemail_detected")
async def _on_voicemail_detected(_processor):
logger.info(f"Voicemail detected for workflow run {workflow_run_id}")
await engine.end_call_with_reason(
reason=EndTaskReason.VOICEMAIL_DETECTED.value,
abort_immediately=True,
)
# Register event handler to end task when voicemail is detected
@voicemail_detector.event_handler("on_voicemail_detected")
async def _on_voicemail_detected(_processor):
logger.info(f"Voicemail detected for workflow run {workflow_run_id}")
await engine.end_call_with_reason(
reason=EndTaskReason.VOICEMAIL_DETECTED.value,
abort_immediately=True,
)
# Create recording router if workflow has active recordings
if has_recordings:
recording_router = RecordingRouterProcessor(
audio_sample_rate=audio_config.pipeline_sample_rate,
fetch_recording_audio=fetch_audio,
)
# Warm the recording cache in the background so audio is ready
# before the first playback request.
asyncio.create_task(
warm_recording_cache(
organization_id=workflow.organization_id,
pipeline_sample_rate=audio_config.pipeline_sample_rate,
)
# Recording router is only meaningful in non-realtime mode (it routes between
# pre-recorded audio playback and dynamic TTS; realtime LLMs produce audio
# directly).
if not is_realtime and has_recordings:
recording_router = RecordingRouterProcessor(
audio_sample_rate=audio_config.pipeline_sample_rate,
fetch_recording_audio=fetch_audio,
)
# Warm the recording cache in the background so audio is ready
# before the first playback request.
asyncio.create_task(
warm_recording_cache(
organization_id=workflow.organization_id,
pipeline_sample_rate=audio_config.pipeline_sample_rate,
)
)
# Build the pipeline
if is_realtime:
@ -596,6 +600,7 @@ async def _run_pipeline(
assistant_context_aggregator,
pipeline_engine_callback_processor,
pipeline_metrics_aggregator,
voicemail_detector=voicemail_detector,
)
else:
pipeline = build_pipeline(

File diff suppressed because one or more lines are too long

@ -1 +1 @@
Subproject commit 05e06e14c317841b68d35f70055882cd5e24623d
Subproject commit e8f59a6a58977f4f5598e8de7bfbbf9b6d474eb8

View file

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: dograh-openapi-XXXXXX.json.r9ocBCl93O
# timestamp: 2026-05-05T13:22:04+00:00
# filename: dograh-openapi-XXXXXX.json.r8rR0xozEB
# timestamp: 2026-05-06T12:17:39+00:00
from __future__ import annotations