diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 00744f0..4d4ea9d 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -46,6 +46,8 @@ from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow import WorkflowGraph from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector from pipecat.pipeline.base_task import PipelineTaskParams from pipecat.processors.aggregators.llm_response_universal import ( @@ -608,6 +610,7 @@ async def _run_pipeline( user_turn_strategies=user_turn_strategies, user_mute_strategies=user_mute_strategies, user_idle_timeout=max_user_idle_timeout, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.3)), ) context_aggregator = LLMContextAggregatorPair( context, assistant_params=assistant_params, user_params=user_params diff --git a/api/tests/test_user_idle_handler.py b/api/tests/test_user_idle_handler.py index b6cf542..3b01df0 100644 --- a/api/tests/test_user_idle_handler.py +++ b/api/tests/test_user_idle_handler.py @@ -1,10 +1,12 @@ """ -Simulates a user idle condition and tests the behaviour -of the user idle handler. +Simulates a realistic conversation and tests the user idle handler behavior. -This module tests the behavior when the user becomes idle during a conversation, -ensuring the user_idle_timeout in LLMUserAggregatorParams properly triggers -the on_user_turn_idle event and the engine handles it correctly. +This module tests the user idle handler in a natural back-and-forth conversation +where bot and user take turns speaking, verifying that: +1. The idle handler does not trigger while the bot is speaking (even when + TTS duration exceeds the idle timeout) +2. User speech properly resets the idle timer +3. The conversation flows naturally through node transitions to completion """ import asyncio @@ -14,7 +16,15 @@ import pytest from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow import WorkflowGraph -from pipecat.frames.frames import LLMContextFrame +from pipecat.frames.frames import ( + BotStoppedSpeakingFrame, + Frame, + LLMContextFrame, + TranscriptionFrame, + UserSpeakingFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -24,44 +34,96 @@ from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, ) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.tests import MockLLMService, MockTTSService from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from pipecat.turns.user_mute import ( + CallbackUserMuteStrategy, + MuteUntilFirstBotCompleteUserMuteStrategy, +) +from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy +from pipecat.turns.user_stop import ExternalUserTurnStopStrategy +from pipecat.turns.user_turn_strategies import UserTurnStrategies +from pipecat.utils.time import time_now_iso8601 -async def run_pipeline_with_user_idle( +class UserSpeechInjector(FrameProcessor): + """Processor that injects user speaking frames after the bot finishes speaking. + + When this processor sees a BotStoppedSpeakingFrame flowing upstream, + it injects UserStartedSpeakingFrame, TranscriptionFrame, and + UserStoppedSpeakingFrame downstream to simulate user speech. Each + BotStoppedSpeakingFrame triggers the next speech from the provided list. + """ + + def __init__(self, *, speeches: list[str], **kwargs): + """Initialize the user speech injector. + + Args: + speeches: List of transcription texts to inject, one per bot utterance. + **kwargs: Additional arguments passed to parent class. + """ + super().__init__(**kwargs) + self._speeches = speeches + self._bot_stopped_count = 0 + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, BotStoppedSpeakingFrame): + self._bot_stopped_count += 1 + if self._bot_stopped_count <= len(self._speeches): + speech_text = self._speeches[self._bot_stopped_count - 1] + await asyncio.sleep(0.01) + await self.push_frame(UserStartedSpeakingFrame()) + + await asyncio.sleep(0) + + await self.broadcast_frame(UserSpeakingFrame) + + await asyncio.sleep(0) + + await self.push_frame( + TranscriptionFrame(speech_text, "user", time_now_iso8601()) + ) + + await asyncio.sleep(0) + await self.push_frame(UserStoppedSpeakingFrame()) + + await self.push_frame(frame, direction) + + +async def create_pipeline_with_speech_injection( workflow: WorkflowGraph, + mock_llm: MockLLMService, + speeches: list[str], user_idle_timeout: float = 0.2, - mock_steps: list | None = None, -) -> tuple[MockLLMService, LLMContext]: - """Run a pipeline with user_idle_timeout and simulate user idle condition. + mock_audio_duration_ms: int = 400, +) -> tuple[PipecatEngine, PipelineTask, object]: + """Create a pipeline with user speech injection and idle handling. + + Sets up a realistic pipeline with: + - MockTransport for audio I/O simulation + - UserSpeechInjector that injects user speech after each bot utterance + - User idle handler with configurable timeout + - User turn and mute strategies matching production setup Args: workflow: The workflow graph to use. - user_idle_timeout: Timeout in seconds before considering user idle. - mock_steps: Optional list of mock step chunks for the LLM. If not provided, - defaults to a simple greeting followed by text responses. + mock_llm: The mock LLM service with pre-configured steps. + speeches: List of user speech texts to inject after each bot utterance. + user_idle_timeout: Timeout in seconds for user idle detection. + mock_audio_duration_ms: TTS audio duration in milliseconds. Returns: - Tuple of (MockLLMService, LLMContext) for assertions. + Tuple of (engine, task, user_idle_handler). """ - # Create mock responses - bot will speak first, then respond to idle prompts - # Step 1: Initial greeting - # Step 2: Response to first idle (asking if user is still there) - # Step 3: Response to second idle (goodbye message) + tts = MockTTSService( + mock_audio_duration_ms=mock_audio_duration_ms, frame_delay=0.001 + ) - if mock_steps is None: - mock_steps = MockLLMService.create_multi_step_responses( - MockLLMService.create_text_chunks("Hello, how can I help you today?"), - num_text_steps=4, # Initial + 2 idle responses + 1 variable extraction - step_prefix="Response", - ) - - llm = MockLLMService(mock_steps=mock_steps, chunk_delay=0.001) - tts = MockTTSService(mock_audio_duration_ms=40) - - # Create MockTransport for simulating transport behavior - mock_transport = MockTransport( + transport = MockTransport( params=TransportParams( audio_in_enabled=True, audio_out_enabled=True, @@ -70,27 +132,43 @@ async def run_pipeline_with_user_idle( ), ) - # Create LLM context + user_speech_injector = UserSpeechInjector(speeches=speeches) + context = LLMContext() - # Create context aggregator with user_idle_timeout in user_params - assistant_params = LLMAssistantAggregatorParams(expect_stripped_words=True) - user_params = LLMUserAggregatorParams(user_idle_timeout=user_idle_timeout) - context_aggregator = LLMContextAggregatorPair( - context, assistant_params=assistant_params, user_params=user_params - ) - user_context_aggregator = context_aggregator.user() - assistant_context_aggregator = context_aggregator.assistant() - - # Create PipecatEngine with the workflow engine = PipecatEngine( - llm=llm, + llm=mock_llm, context=context, workflow=workflow, call_context_vars={"customer_name": "Test User"}, workflow_run_id=1, ) + # User turn strategies matching production setup + user_turn_strategies = UserTurnStrategies( + start=[TranscriptionUserTurnStartStrategy()], + stop=[ExternalUserTurnStopStrategy()], + ) + + user_mute_strategies = [ + MuteUntilFirstBotCompleteUserMuteStrategy(), + CallbackUserMuteStrategy(should_mute_callback=engine.should_mute_user), + ] + + user_params = LLMUserAggregatorParams( + user_turn_strategies=user_turn_strategies, + user_mute_strategies=user_mute_strategies, + user_idle_timeout=user_idle_timeout, + ) + + assistant_params = LLMAssistantAggregatorParams(expect_stripped_words=True) + + context_aggregator = LLMContextAggregatorPair( + context, assistant_params=assistant_params, user_params=user_params + ) + user_context_aggregator = context_aggregator.user() + assistant_context_aggregator = context_aggregator.assistant() + # Register user idle event handlers user_idle_handler = engine.create_user_idle_handler() @@ -102,144 +180,108 @@ async def run_pipeline_with_user_idle( async def on_user_turn_started(aggregator, strategy): user_idle_handler.reset() - # Build the pipeline + # Build pipeline: + # transport.input → speech_injector → user_aggregator → LLM → TTS → transport.output → assistant_aggregator pipeline = Pipeline( [ + transport.input(), + user_speech_injector, user_context_aggregator, - llm, + mock_llm, tts, - mock_transport.output(), + transport.output(), assistant_context_aggregator, ] ) - # Create pipeline task task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) - engine.set_task(task) - # Patch DB calls to avoid actual database access - with patch( - "api.services.workflow.pipecat_engine.get_organization_id_from_workflow_run", - new_callable=AsyncMock, - return_value=1, - ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", - ): - runner = PipelineRunner() - - async def run_pipeline(): - await runner.run(task) - - async def initialize_engine(): - # Small delay to let runner start - await asyncio.sleep(0.01) - await engine.initialize() - await engine.llm.queue_frame(LLMContextFrame(engine.context)) - - # Calculate total wait time: - # - Initial bot speech - # - First idle timeout (user_idle_timeout) - # - First idle callback + LLM generation - # - Second idle timeout (user_idle_timeout) - # - Second idle callback (ends the task) - # Add buffer for processing time - total_wait_time = (user_idle_timeout * 3) + 1.0 - - async def wait_for_idle_to_trigger(): - # Wait long enough for idle timeouts to trigger - await asyncio.sleep(total_wait_time) - - # Run all concurrently - await asyncio.gather( - run_pipeline(), - initialize_engine(), - wait_for_idle_to_trigger(), - return_exceptions=True, - ) - - return llm, context + return engine, task, user_idle_handler class TestUserIdleHandler: - """Test user idle handling through PipecatEngine and UserIdleHandler.""" + """Test user idle handling with realistic conversation flows.""" @pytest.mark.asyncio - async def test_user_idle_triggers_callback(self, simple_workflow: WorkflowGraph): - """Test that user idle condition properly triggers the callback. - - This test verifies that when: - 1. The bot starts speaking (triggers conversation tracking) - 2. No user input is received for the timeout period - 3. The on_user_turn_idle event triggers the idle handler - - The engine's user idle handler should: - - First retry: Send a message asking if user is still there - - Second retry: Send goodbye message and end the call - """ - llm, context = await run_pipeline_with_user_idle( - workflow=simple_workflow, - user_idle_timeout=0.2, # Short timeout for faster test - ) - - # Final message in the context should be from the bot - assert len(context.messages) == 6, "Total 6 messages" - assert context.messages[-1]["content"] == "Response 2", ( - "Final message in the context should be from LLM" - ) - - @pytest.mark.asyncio - async def test_user_idle_with_node_transition( - self, three_node_workflow: WorkflowGraph + async def test_idle_does_not_trigger_during_active_conversation( + self, three_node_workflow_no_variable_extraction: WorkflowGraph ): - """Test user idle handling with node transition via tool call. + """Test that idle handler does not fire when users actively converse. - This test verifies that when: - 1. The bot starts speaking with initial greeting - 2. The LLM generates a tool call to transition to the next node - 3. The pipeline correctly handles the transition + Conversation flow: + 1. Bot: "Hello" (short greeting) + 2. User: "Hello" (injected after bot finishes speaking) + 3. Bot: longer response (TTS duration 400ms > idle timeout 200ms) + 4. User: "I need help with my account" (injected after bot finishes) + 5. Bot: collect_info function call (Start → Agent transition) + 6. Bot: end_call function call (Agent → End, ends conversation) - The mock steps are: - - Step 1: Text "Hello, how can I help you today?" - - Step 2: Tool call "collect_info" to transition to agent node - - Step 3+: Text responses after transition + Verifies: + - User idle handler never triggers during active conversation + - TTS duration exceeding idle timeout doesn't cause false idle triggers + - Pipeline completes all 4 LLM steps """ - # Create custom mock steps with tool call for node transition - # For three_node_workflow: - # - Edge from node 1 -> node 2 has label "Collect Info" -> function: "collect_info" - # - Edge from node 2 -> node 3 has label "End Call" -> function: "end_call" + user_idle_timeout = 0.8 + mock_steps = [ - # Step 1: Initial greeting (text) - MockLLMService.create_text_chunks("Hello, how can I help you today?"), - # Step 2: Transition to Collect Info node (tool call) + # Step 0: Short greeting on Start node + MockLLMService.create_text_chunks("Hello"), + # Step 1: Longer response (TTS 400ms > idle timeout 200ms) + MockLLMService.create_text_chunks( + "I can help you with your account. Let me look into that for you. " + "Please hold on while I pull up your information." + ), + # Step 2: Transition from Start → Agent node MockLLMService.create_function_call_chunks( function_name="collect_info", arguments={}, tool_call_id="call_collect_info", ), - # Step 3: Response after transition (text) - MockLLMService.create_text_chunks("Response after transition"), - # Step 4+: Additional responses for idle handling - MockLLMService.create_text_chunks("Response 2"), - MockLLMService.create_text_chunks("Response 3"), + # Step 3: Transition from Agent → End node (ends call) + MockLLMService.create_function_call_chunks( + function_name="end_call", + arguments={}, + tool_call_id="call_end_call", + ), ] - llm, context = await run_pipeline_with_user_idle( - workflow=three_node_workflow, - user_idle_timeout=0.2, - mock_steps=mock_steps, + llm = MockLLMService(mock_steps=mock_steps, chunk_delay=0.001) + + engine, task, user_idle_handler = await create_pipeline_with_speech_injection( + workflow=three_node_workflow_no_variable_extraction, + mock_llm=llm, + speeches=["Hello", "I need help with my account"], + user_idle_timeout=user_idle_timeout, + mock_audio_duration_ms=400, ) - # Verify the LLM was called multiple times (initial + after transition) - assert llm.get_current_step() >= 2, ( - "LLM should have been called at least twice (initial + after transition)" - ) + with patch( + "api.services.workflow.pipecat_engine.get_organization_id_from_workflow_run", + new_callable=AsyncMock, + return_value=1, + ): + with patch( + "api.services.workflow.pipecat_engine.apply_disposition_mapping", + new_callable=AsyncMock, + return_value="completed", + ): + runner = PipelineRunner() - # This should be the message that we inserted from user_idle_handler - assert context.messages[2]["role"] == "system", ( - "The system message should be in the context" + async def run_pipeline(): + await runner.run(task) + + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.llm.queue_frame(LLMContextFrame(engine.context)) + + await asyncio.gather(run_pipeline(), initialize_engine()) + + # All 5 LLM steps should have been consumed + assert llm.get_current_step() == 5 + + # Idle handler should never have triggered + assert user_idle_handler._retry_count == 0, ( + "User idle handler should not trigger during active conversation" ) - assert "The user has been quiet." in context.messages[2]["content"] diff --git a/pipecat b/pipecat index e618bb9..5313e8c 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit e618bb98dfde6224ef9f4e15769580790719b269 +Subproject commit 5313e8cd94443f220cc56c10cc2fc2aa98d8b6ba