fix: add vad_analyzer in user aggregator

This commit is contained in:
Abhishek Kumar 2026-02-07 12:44:05 +05:30
parent 964a778194
commit 6711dcb3ea
3 changed files with 197 additions and 152 deletions

View file

@ -46,6 +46,8 @@ from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector
from pipecat.pipeline.base_task import PipelineTaskParams
from pipecat.processors.aggregators.llm_response_universal import (
@ -608,6 +610,7 @@ async def _run_pipeline(
user_turn_strategies=user_turn_strategies,
user_mute_strategies=user_mute_strategies,
user_idle_timeout=max_user_idle_timeout,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.3)),
)
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=assistant_params, user_params=user_params

View file

@ -1,10 +1,12 @@
"""
Simulates a user idle condition and tests the behaviour
of the user idle handler.
Simulates a realistic conversation and tests the user idle handler behavior.
This module tests the behavior when the user becomes idle during a conversation,
ensuring the user_idle_timeout in LLMUserAggregatorParams properly triggers
the on_user_turn_idle event and the engine handles it correctly.
This module tests the user idle handler in a natural back-and-forth conversation
where bot and user take turns speaking, verifying that:
1. The idle handler does not trigger while the bot is speaking (even when
TTS duration exceeds the idle timeout)
2. User speech properly resets the idle timer
3. The conversation flows naturally through node transitions to completion
"""
import asyncio
@ -14,7 +16,15 @@ import pytest
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from pipecat.frames.frames import LLMContextFrame
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
Frame,
LLMContextFrame,
TranscriptionFrame,
UserSpeakingFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@ -24,44 +34,96 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.tests import MockLLMService, MockTTSService
from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from pipecat.turns.user_mute import (
CallbackUserMuteStrategy,
MuteUntilFirstBotCompleteUserMuteStrategy,
)
from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy
from pipecat.turns.user_stop import ExternalUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.time import time_now_iso8601
async def run_pipeline_with_user_idle(
class UserSpeechInjector(FrameProcessor):
"""Processor that injects user speaking frames after the bot finishes speaking.
When this processor sees a BotStoppedSpeakingFrame flowing upstream,
it injects UserStartedSpeakingFrame, TranscriptionFrame, and
UserStoppedSpeakingFrame downstream to simulate user speech. Each
BotStoppedSpeakingFrame triggers the next speech from the provided list.
"""
def __init__(self, *, speeches: list[str], **kwargs):
"""Initialize the user speech injector.
Args:
speeches: List of transcription texts to inject, one per bot utterance.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._speeches = speeches
self._bot_stopped_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, BotStoppedSpeakingFrame):
self._bot_stopped_count += 1
if self._bot_stopped_count <= len(self._speeches):
speech_text = self._speeches[self._bot_stopped_count - 1]
await asyncio.sleep(0.01)
await self.push_frame(UserStartedSpeakingFrame())
await asyncio.sleep(0)
await self.broadcast_frame(UserSpeakingFrame)
await asyncio.sleep(0)
await self.push_frame(
TranscriptionFrame(speech_text, "user", time_now_iso8601())
)
await asyncio.sleep(0)
await self.push_frame(UserStoppedSpeakingFrame())
await self.push_frame(frame, direction)
async def create_pipeline_with_speech_injection(
workflow: WorkflowGraph,
mock_llm: MockLLMService,
speeches: list[str],
user_idle_timeout: float = 0.2,
mock_steps: list | None = None,
) -> tuple[MockLLMService, LLMContext]:
"""Run a pipeline with user_idle_timeout and simulate user idle condition.
mock_audio_duration_ms: int = 400,
) -> tuple[PipecatEngine, PipelineTask, object]:
"""Create a pipeline with user speech injection and idle handling.
Sets up a realistic pipeline with:
- MockTransport for audio I/O simulation
- UserSpeechInjector that injects user speech after each bot utterance
- User idle handler with configurable timeout
- User turn and mute strategies matching production setup
Args:
workflow: The workflow graph to use.
user_idle_timeout: Timeout in seconds before considering user idle.
mock_steps: Optional list of mock step chunks for the LLM. If not provided,
defaults to a simple greeting followed by text responses.
mock_llm: The mock LLM service with pre-configured steps.
speeches: List of user speech texts to inject after each bot utterance.
user_idle_timeout: Timeout in seconds for user idle detection.
mock_audio_duration_ms: TTS audio duration in milliseconds.
Returns:
Tuple of (MockLLMService, LLMContext) for assertions.
Tuple of (engine, task, user_idle_handler).
"""
# Create mock responses - bot will speak first, then respond to idle prompts
# Step 1: Initial greeting
# Step 2: Response to first idle (asking if user is still there)
# Step 3: Response to second idle (goodbye message)
tts = MockTTSService(
mock_audio_duration_ms=mock_audio_duration_ms, frame_delay=0.001
)
if mock_steps is None:
mock_steps = MockLLMService.create_multi_step_responses(
MockLLMService.create_text_chunks("Hello, how can I help you today?"),
num_text_steps=4, # Initial + 2 idle responses + 1 variable extraction
step_prefix="Response",
)
llm = MockLLMService(mock_steps=mock_steps, chunk_delay=0.001)
tts = MockTTSService(mock_audio_duration_ms=40)
# Create MockTransport for simulating transport behavior
mock_transport = MockTransport(
transport = MockTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
@ -70,27 +132,43 @@ async def run_pipeline_with_user_idle(
),
)
# Create LLM context
user_speech_injector = UserSpeechInjector(speeches=speeches)
context = LLMContext()
# Create context aggregator with user_idle_timeout in user_params
assistant_params = LLMAssistantAggregatorParams(expect_stripped_words=True)
user_params = LLMUserAggregatorParams(user_idle_timeout=user_idle_timeout)
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=assistant_params, user_params=user_params
)
user_context_aggregator = context_aggregator.user()
assistant_context_aggregator = context_aggregator.assistant()
# Create PipecatEngine with the workflow
engine = PipecatEngine(
llm=llm,
llm=mock_llm,
context=context,
workflow=workflow,
call_context_vars={"customer_name": "Test User"},
workflow_run_id=1,
)
# User turn strategies matching production setup
user_turn_strategies = UserTurnStrategies(
start=[TranscriptionUserTurnStartStrategy()],
stop=[ExternalUserTurnStopStrategy()],
)
user_mute_strategies = [
MuteUntilFirstBotCompleteUserMuteStrategy(),
CallbackUserMuteStrategy(should_mute_callback=engine.should_mute_user),
]
user_params = LLMUserAggregatorParams(
user_turn_strategies=user_turn_strategies,
user_mute_strategies=user_mute_strategies,
user_idle_timeout=user_idle_timeout,
)
assistant_params = LLMAssistantAggregatorParams(expect_stripped_words=True)
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=assistant_params, user_params=user_params
)
user_context_aggregator = context_aggregator.user()
assistant_context_aggregator = context_aggregator.assistant()
# Register user idle event handlers
user_idle_handler = engine.create_user_idle_handler()
@ -102,144 +180,108 @@ async def run_pipeline_with_user_idle(
async def on_user_turn_started(aggregator, strategy):
user_idle_handler.reset()
# Build the pipeline
# Build pipeline:
# transport.input → speech_injector → user_aggregator → LLM → TTS → transport.output → assistant_aggregator
pipeline = Pipeline(
[
transport.input(),
user_speech_injector,
user_context_aggregator,
llm,
mock_llm,
tts,
mock_transport.output(),
transport.output(),
assistant_context_aggregator,
]
)
# Create pipeline task
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)
# Patch DB calls to avoid actual database access
with patch(
"api.services.workflow.pipecat_engine.get_organization_id_from_workflow_run",
new_callable=AsyncMock,
return_value=1,
):
with patch(
"api.services.workflow.pipecat_engine.apply_disposition_mapping",
new_callable=AsyncMock,
return_value="completed",
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
async def initialize_engine():
# Small delay to let runner start
await asyncio.sleep(0.01)
await engine.initialize()
await engine.llm.queue_frame(LLMContextFrame(engine.context))
# Calculate total wait time:
# - Initial bot speech
# - First idle timeout (user_idle_timeout)
# - First idle callback + LLM generation
# - Second idle timeout (user_idle_timeout)
# - Second idle callback (ends the task)
# Add buffer for processing time
total_wait_time = (user_idle_timeout * 3) + 1.0
async def wait_for_idle_to_trigger():
# Wait long enough for idle timeouts to trigger
await asyncio.sleep(total_wait_time)
# Run all concurrently
await asyncio.gather(
run_pipeline(),
initialize_engine(),
wait_for_idle_to_trigger(),
return_exceptions=True,
)
return llm, context
return engine, task, user_idle_handler
class TestUserIdleHandler:
"""Test user idle handling through PipecatEngine and UserIdleHandler."""
"""Test user idle handling with realistic conversation flows."""
@pytest.mark.asyncio
async def test_user_idle_triggers_callback(self, simple_workflow: WorkflowGraph):
"""Test that user idle condition properly triggers the callback.
This test verifies that when:
1. The bot starts speaking (triggers conversation tracking)
2. No user input is received for the timeout period
3. The on_user_turn_idle event triggers the idle handler
The engine's user idle handler should:
- First retry: Send a message asking if user is still there
- Second retry: Send goodbye message and end the call
"""
llm, context = await run_pipeline_with_user_idle(
workflow=simple_workflow,
user_idle_timeout=0.2, # Short timeout for faster test
)
# Final message in the context should be from the bot
assert len(context.messages) == 6, "Total 6 messages"
assert context.messages[-1]["content"] == "Response 2", (
"Final message in the context should be from LLM"
)
@pytest.mark.asyncio
async def test_user_idle_with_node_transition(
self, three_node_workflow: WorkflowGraph
async def test_idle_does_not_trigger_during_active_conversation(
self, three_node_workflow_no_variable_extraction: WorkflowGraph
):
"""Test user idle handling with node transition via tool call.
"""Test that idle handler does not fire when users actively converse.
This test verifies that when:
1. The bot starts speaking with initial greeting
2. The LLM generates a tool call to transition to the next node
3. The pipeline correctly handles the transition
Conversation flow:
1. Bot: "Hello" (short greeting)
2. User: "Hello" (injected after bot finishes speaking)
3. Bot: longer response (TTS duration 400ms > idle timeout 200ms)
4. User: "I need help with my account" (injected after bot finishes)
5. Bot: collect_info function call (Start Agent transition)
6. Bot: end_call function call (Agent End, ends conversation)
The mock steps are:
- Step 1: Text "Hello, how can I help you today?"
- Step 2: Tool call "collect_info" to transition to agent node
- Step 3+: Text responses after transition
Verifies:
- User idle handler never triggers during active conversation
- TTS duration exceeding idle timeout doesn't cause false idle triggers
- Pipeline completes all 4 LLM steps
"""
# Create custom mock steps with tool call for node transition
# For three_node_workflow:
# - Edge from node 1 -> node 2 has label "Collect Info" -> function: "collect_info"
# - Edge from node 2 -> node 3 has label "End Call" -> function: "end_call"
user_idle_timeout = 0.8
mock_steps = [
# Step 1: Initial greeting (text)
MockLLMService.create_text_chunks("Hello, how can I help you today?"),
# Step 2: Transition to Collect Info node (tool call)
# Step 0: Short greeting on Start node
MockLLMService.create_text_chunks("Hello"),
# Step 1: Longer response (TTS 400ms > idle timeout 200ms)
MockLLMService.create_text_chunks(
"I can help you with your account. Let me look into that for you. "
"Please hold on while I pull up your information."
),
# Step 2: Transition from Start → Agent node
MockLLMService.create_function_call_chunks(
function_name="collect_info",
arguments={},
tool_call_id="call_collect_info",
),
# Step 3: Response after transition (text)
MockLLMService.create_text_chunks("Response after transition"),
# Step 4+: Additional responses for idle handling
MockLLMService.create_text_chunks("Response 2"),
MockLLMService.create_text_chunks("Response 3"),
# Step 3: Transition from Agent → End node (ends call)
MockLLMService.create_function_call_chunks(
function_name="end_call",
arguments={},
tool_call_id="call_end_call",
),
]
llm, context = await run_pipeline_with_user_idle(
workflow=three_node_workflow,
user_idle_timeout=0.2,
mock_steps=mock_steps,
llm = MockLLMService(mock_steps=mock_steps, chunk_delay=0.001)
engine, task, user_idle_handler = await create_pipeline_with_speech_injection(
workflow=three_node_workflow_no_variable_extraction,
mock_llm=llm,
speeches=["Hello", "I need help with my account"],
user_idle_timeout=user_idle_timeout,
mock_audio_duration_ms=400,
)
# Verify the LLM was called multiple times (initial + after transition)
assert llm.get_current_step() >= 2, (
"LLM should have been called at least twice (initial + after transition)"
)
with patch(
"api.services.workflow.pipecat_engine.get_organization_id_from_workflow_run",
new_callable=AsyncMock,
return_value=1,
):
with patch(
"api.services.workflow.pipecat_engine.apply_disposition_mapping",
new_callable=AsyncMock,
return_value="completed",
):
runner = PipelineRunner()
# This should be the message that we inserted from user_idle_handler
assert context.messages[2]["role"] == "system", (
"The system message should be in the context"
async def run_pipeline():
await runner.run(task)
async def initialize_engine():
await asyncio.sleep(0.01)
await engine.initialize()
await engine.llm.queue_frame(LLMContextFrame(engine.context))
await asyncio.gather(run_pipeline(), initialize_engine())
# All 5 LLM steps should have been consumed
assert llm.get_current_step() == 5
# Idle handler should never have triggered
assert user_idle_handler._retry_count == 0, (
"User idle handler should not trigger during active conversation"
)
assert "The user has been quiet." in context.messages[2]["content"]

@ -1 +1 @@
Subproject commit e618bb98dfde6224ef9f4e15769580790719b269
Subproject commit 5313e8cd94443f220cc56c10cc2fc2aa98d8b6ba