2026-01-03 16:22:38 +05:30
|
|
|
"""
|
2026-02-07 12:44:05 +05:30
|
|
|
Simulates a realistic conversation and tests the user idle handler behavior.
|
|
|
|
|
|
|
|
|
|
This module tests the user idle handler in a natural back-and-forth conversation
|
|
|
|
|
where bot and user take turns speaking, verifying that:
|
|
|
|
|
1. The idle handler does not trigger while the bot is speaking (even when
|
|
|
|
|
TTS duration exceeds the idle timeout)
|
|
|
|
|
2. User speech properly resets the idle timer
|
|
|
|
|
3. The conversation flows naturally through node transitions to completion
|
2026-01-03 16:22:38 +05:30
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
|
|
|
|
|
|
import pytest
|
2026-02-07 12:44:05 +05:30
|
|
|
from pipecat.frames.frames import (
|
|
|
|
|
BotStoppedSpeakingFrame,
|
|
|
|
|
Frame,
|
|
|
|
|
LLMContextFrame,
|
|
|
|
|
TranscriptionFrame,
|
|
|
|
|
UserSpeakingFrame,
|
|
|
|
|
UserStartedSpeakingFrame,
|
|
|
|
|
UserStoppedSpeakingFrame,
|
|
|
|
|
)
|
2026-01-03 16:22:38 +05:30
|
|
|
from pipecat.pipeline.pipeline import Pipeline
|
|
|
|
|
from pipecat.pipeline.runner import PipelineRunner
|
|
|
|
|
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
|
|
|
|
from pipecat.processors.aggregators.llm_context import LLMContext
|
|
|
|
|
from pipecat.processors.aggregators.llm_response_universal import (
|
2026-02-11 18:18:32 +05:30
|
|
|
LLMAssistantAggregatorParams,
|
2026-01-03 16:22:38 +05:30
|
|
|
LLMContextAggregatorPair,
|
2026-01-23 18:53:59 +05:30
|
|
|
LLMUserAggregatorParams,
|
2026-01-03 16:22:38 +05:30
|
|
|
)
|
2026-02-07 12:44:05 +05:30
|
|
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
2026-01-27 18:20:23 +05:30
|
|
|
from pipecat.tests.mock_transport import MockTransport
|
2026-02-02 18:33:05 +05:30
|
|
|
from pipecat.transports.base_transport import TransportParams
|
2026-02-07 12:44:05 +05:30
|
|
|
from pipecat.turns.user_mute import (
|
|
|
|
|
CallbackUserMuteStrategy,
|
|
|
|
|
MuteUntilFirstBotCompleteUserMuteStrategy,
|
|
|
|
|
)
|
|
|
|
|
from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy
|
|
|
|
|
from pipecat.turns.user_stop import ExternalUserTurnStopStrategy
|
|
|
|
|
from pipecat.turns.user_turn_strategies import UserTurnStrategies
|
|
|
|
|
from pipecat.utils.time import time_now_iso8601
|
|
|
|
|
|
2026-05-07 12:23:41 +05:30
|
|
|
from api.services.workflow.pipecat_engine import PipecatEngine
|
2026-05-08 16:02:51 +05:30
|
|
|
from api.services.workflow.workflow_graph import WorkflowGraph
|
2026-05-07 12:23:41 +05:30
|
|
|
from pipecat.tests import MockLLMService, MockTTSService
|
|
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
|
|
|
|
|
class UserSpeechInjector(FrameProcessor):
|
|
|
|
|
"""Processor that injects user speaking frames after the bot finishes speaking.
|
|
|
|
|
|
|
|
|
|
When this processor sees a BotStoppedSpeakingFrame flowing upstream,
|
|
|
|
|
it injects UserStartedSpeakingFrame, TranscriptionFrame, and
|
|
|
|
|
UserStoppedSpeakingFrame downstream to simulate user speech. Each
|
|
|
|
|
BotStoppedSpeakingFrame triggers the next speech from the provided list.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, *, speeches: list[str], **kwargs):
|
|
|
|
|
"""Initialize the user speech injector.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
speeches: List of transcription texts to inject, one per bot utterance.
|
|
|
|
|
**kwargs: Additional arguments passed to parent class.
|
|
|
|
|
"""
|
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
|
self._speeches = speeches
|
|
|
|
|
self._bot_stopped_count = 0
|
2026-01-03 16:22:38 +05:30
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
|
|
|
|
await super().process_frame(frame, direction)
|
2026-01-03 16:22:38 +05:30
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
if isinstance(frame, BotStoppedSpeakingFrame):
|
|
|
|
|
self._bot_stopped_count += 1
|
|
|
|
|
if self._bot_stopped_count <= len(self._speeches):
|
|
|
|
|
speech_text = self._speeches[self._bot_stopped_count - 1]
|
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
await self.push_frame(UserStartedSpeakingFrame())
|
|
|
|
|
|
|
|
|
|
await asyncio.sleep(0)
|
|
|
|
|
|
|
|
|
|
await self.broadcast_frame(UserSpeakingFrame)
|
|
|
|
|
|
|
|
|
|
await asyncio.sleep(0)
|
|
|
|
|
|
|
|
|
|
await self.push_frame(
|
|
|
|
|
TranscriptionFrame(speech_text, "user", time_now_iso8601())
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
await asyncio.sleep(0)
|
|
|
|
|
await self.push_frame(UserStoppedSpeakingFrame())
|
|
|
|
|
|
|
|
|
|
await self.push_frame(frame, direction)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def create_pipeline_with_speech_injection(
|
2026-01-03 16:22:38 +05:30
|
|
|
workflow: WorkflowGraph,
|
2026-02-07 12:44:05 +05:30
|
|
|
mock_llm: MockLLMService,
|
|
|
|
|
speeches: list[str],
|
2026-01-03 16:22:38 +05:30
|
|
|
user_idle_timeout: float = 0.2,
|
2026-02-07 12:44:05 +05:30
|
|
|
mock_audio_duration_ms: int = 400,
|
|
|
|
|
) -> tuple[PipecatEngine, PipelineTask, object]:
|
|
|
|
|
"""Create a pipeline with user speech injection and idle handling.
|
|
|
|
|
|
|
|
|
|
Sets up a realistic pipeline with:
|
|
|
|
|
- MockTransport for audio I/O simulation
|
|
|
|
|
- UserSpeechInjector that injects user speech after each bot utterance
|
|
|
|
|
- User idle handler with configurable timeout
|
|
|
|
|
- User turn and mute strategies matching production setup
|
2026-01-03 16:22:38 +05:30
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
workflow: The workflow graph to use.
|
2026-02-07 12:44:05 +05:30
|
|
|
mock_llm: The mock LLM service with pre-configured steps.
|
|
|
|
|
speeches: List of user speech texts to inject after each bot utterance.
|
|
|
|
|
user_idle_timeout: Timeout in seconds for user idle detection.
|
|
|
|
|
mock_audio_duration_ms: TTS audio duration in milliseconds.
|
2026-01-03 16:22:38 +05:30
|
|
|
|
|
|
|
|
Returns:
|
2026-02-07 12:44:05 +05:30
|
|
|
Tuple of (engine, task, user_idle_handler).
|
2026-01-03 16:22:38 +05:30
|
|
|
"""
|
2026-02-07 12:44:05 +05:30
|
|
|
tts = MockTTSService(
|
|
|
|
|
mock_audio_duration_ms=mock_audio_duration_ms, frame_delay=0.001
|
|
|
|
|
)
|
2026-01-03 16:22:38 +05:30
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
transport = MockTransport(
|
2026-02-02 18:33:05 +05:30
|
|
|
params=TransportParams(
|
|
|
|
|
audio_in_enabled=True,
|
|
|
|
|
audio_out_enabled=True,
|
|
|
|
|
audio_in_sample_rate=16000,
|
|
|
|
|
audio_out_sample_rate=16000,
|
|
|
|
|
),
|
|
|
|
|
)
|
2026-01-03 16:22:38 +05:30
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
user_speech_injector = UserSpeechInjector(speeches=speeches)
|
2026-01-03 16:22:38 +05:30
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
context = LLMContext()
|
2026-01-03 16:22:38 +05:30
|
|
|
|
|
|
|
|
engine = PipecatEngine(
|
2026-02-07 12:44:05 +05:30
|
|
|
llm=mock_llm,
|
2026-01-03 16:22:38 +05:30
|
|
|
context=context,
|
|
|
|
|
workflow=workflow,
|
|
|
|
|
call_context_vars={"customer_name": "Test User"},
|
|
|
|
|
workflow_run_id=1,
|
|
|
|
|
)
|
|
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
# User turn strategies matching production setup
|
|
|
|
|
user_turn_strategies = UserTurnStrategies(
|
|
|
|
|
start=[TranscriptionUserTurnStartStrategy()],
|
|
|
|
|
stop=[ExternalUserTurnStopStrategy()],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
user_mute_strategies = [
|
|
|
|
|
MuteUntilFirstBotCompleteUserMuteStrategy(),
|
|
|
|
|
CallbackUserMuteStrategy(should_mute_callback=engine.should_mute_user),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
user_params = LLMUserAggregatorParams(
|
|
|
|
|
user_turn_strategies=user_turn_strategies,
|
|
|
|
|
user_mute_strategies=user_mute_strategies,
|
|
|
|
|
user_idle_timeout=user_idle_timeout,
|
|
|
|
|
)
|
|
|
|
|
|
2026-05-04 21:35:37 +05:30
|
|
|
assistant_params = LLMAssistantAggregatorParams()
|
2026-02-07 12:44:05 +05:30
|
|
|
|
|
|
|
|
context_aggregator = LLMContextAggregatorPair(
|
|
|
|
|
context, assistant_params=assistant_params, user_params=user_params
|
|
|
|
|
)
|
|
|
|
|
user_context_aggregator = context_aggregator.user()
|
|
|
|
|
assistant_context_aggregator = context_aggregator.assistant()
|
|
|
|
|
|
2026-01-23 18:53:59 +05:30
|
|
|
# Register user idle event handlers
|
|
|
|
|
user_idle_handler = engine.create_user_idle_handler()
|
|
|
|
|
|
|
|
|
|
@user_context_aggregator.event_handler("on_user_turn_idle")
|
|
|
|
|
async def on_user_turn_idle(aggregator):
|
|
|
|
|
await user_idle_handler.handle_idle(aggregator)
|
|
|
|
|
|
|
|
|
|
@user_context_aggregator.event_handler("on_user_turn_started")
|
|
|
|
|
async def on_user_turn_started(aggregator, strategy):
|
|
|
|
|
user_idle_handler.reset()
|
2026-01-03 16:22:38 +05:30
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
# Build pipeline:
|
|
|
|
|
# transport.input → speech_injector → user_aggregator → LLM → TTS → transport.output → assistant_aggregator
|
2026-01-03 16:22:38 +05:30
|
|
|
pipeline = Pipeline(
|
|
|
|
|
[
|
2026-02-07 12:44:05 +05:30
|
|
|
transport.input(),
|
|
|
|
|
user_speech_injector,
|
2026-01-03 16:22:38 +05:30
|
|
|
user_context_aggregator,
|
2026-02-07 12:44:05 +05:30
|
|
|
mock_llm,
|
2026-01-03 16:22:38 +05:30
|
|
|
tts,
|
2026-02-07 12:44:05 +05:30
|
|
|
transport.output(),
|
2026-01-03 16:22:38 +05:30
|
|
|
assistant_context_aggregator,
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
2026-01-27 18:20:23 +05:30
|
|
|
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
|
2026-01-03 16:22:38 +05:30
|
|
|
engine.set_task(task)
|
|
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
return engine, task, user_idle_handler
|
2026-01-03 16:22:38 +05:30
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestUserIdleHandler:
|
2026-02-07 12:44:05 +05:30
|
|
|
"""Test user idle handling with realistic conversation flows."""
|
2026-01-03 16:22:38 +05:30
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2026-02-07 12:44:05 +05:30
|
|
|
async def test_idle_does_not_trigger_during_active_conversation(
|
|
|
|
|
self, three_node_workflow_no_variable_extraction: WorkflowGraph
|
2026-01-03 16:22:38 +05:30
|
|
|
):
|
2026-02-07 12:44:05 +05:30
|
|
|
"""Test that idle handler does not fire when users actively converse.
|
|
|
|
|
|
|
|
|
|
Conversation flow:
|
|
|
|
|
1. Bot: "Hello" (short greeting)
|
|
|
|
|
2. User: "Hello" (injected after bot finishes speaking)
|
|
|
|
|
3. Bot: longer response (TTS duration 400ms > idle timeout 200ms)
|
|
|
|
|
4. User: "I need help with my account" (injected after bot finishes)
|
|
|
|
|
5. Bot: collect_info function call (Start → Agent transition)
|
|
|
|
|
6. Bot: end_call function call (Agent → End, ends conversation)
|
|
|
|
|
|
|
|
|
|
Verifies:
|
|
|
|
|
- User idle handler never triggers during active conversation
|
|
|
|
|
- TTS duration exceeding idle timeout doesn't cause false idle triggers
|
|
|
|
|
- Pipeline completes all 4 LLM steps
|
2026-01-03 16:22:38 +05:30
|
|
|
"""
|
2026-02-07 12:44:05 +05:30
|
|
|
user_idle_timeout = 0.8
|
|
|
|
|
|
2026-01-03 16:22:38 +05:30
|
|
|
mock_steps = [
|
2026-02-07 12:44:05 +05:30
|
|
|
# Step 0: Short greeting on Start node
|
|
|
|
|
MockLLMService.create_text_chunks("Hello"),
|
|
|
|
|
# Step 1: Longer response (TTS 400ms > idle timeout 200ms)
|
|
|
|
|
MockLLMService.create_text_chunks(
|
|
|
|
|
"I can help you with your account. Let me look into that for you. "
|
|
|
|
|
"Please hold on while I pull up your information."
|
|
|
|
|
),
|
|
|
|
|
# Step 2: Transition from Start → Agent node
|
2026-01-03 16:22:38 +05:30
|
|
|
MockLLMService.create_function_call_chunks(
|
|
|
|
|
function_name="collect_info",
|
|
|
|
|
arguments={},
|
|
|
|
|
tool_call_id="call_collect_info",
|
|
|
|
|
),
|
2026-02-07 12:44:05 +05:30
|
|
|
# Step 3: Transition from Agent → End node (ends call)
|
|
|
|
|
MockLLMService.create_function_call_chunks(
|
|
|
|
|
function_name="end_call",
|
|
|
|
|
arguments={},
|
|
|
|
|
tool_call_id="call_end_call",
|
|
|
|
|
),
|
2026-01-03 16:22:38 +05:30
|
|
|
]
|
|
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
llm = MockLLMService(mock_steps=mock_steps, chunk_delay=0.001)
|
2026-01-03 16:22:38 +05:30
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
engine, task, user_idle_handler = await create_pipeline_with_speech_injection(
|
|
|
|
|
workflow=three_node_workflow_no_variable_extraction,
|
|
|
|
|
mock_llm=llm,
|
|
|
|
|
speeches=["Hello", "I need help with my account"],
|
|
|
|
|
user_idle_timeout=user_idle_timeout,
|
|
|
|
|
mock_audio_duration_ms=400,
|
2026-01-03 16:22:38 +05:30
|
|
|
)
|
|
|
|
|
|
2026-02-07 12:44:05 +05:30
|
|
|
with patch(
|
2026-05-04 21:35:37 +05:30
|
|
|
"api.db:db_client.get_organization_id_by_workflow_run_id",
|
2026-02-07 12:44:05 +05:30
|
|
|
new_callable=AsyncMock,
|
|
|
|
|
return_value=1,
|
|
|
|
|
):
|
|
|
|
|
with patch(
|
|
|
|
|
"api.services.workflow.pipecat_engine.apply_disposition_mapping",
|
|
|
|
|
new_callable=AsyncMock,
|
|
|
|
|
return_value="completed",
|
|
|
|
|
):
|
|
|
|
|
runner = PipelineRunner()
|
|
|
|
|
|
|
|
|
|
async def run_pipeline():
|
|
|
|
|
await runner.run(task)
|
|
|
|
|
|
|
|
|
|
async def initialize_engine():
|
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
await engine.initialize()
|
2026-04-06 12:30:37 +05:30
|
|
|
await engine.set_node(engine.workflow.start_node_id)
|
2026-02-07 12:44:05 +05:30
|
|
|
await engine.llm.queue_frame(LLMContextFrame(engine.context))
|
|
|
|
|
|
|
|
|
|
await asyncio.gather(run_pipeline(), initialize_engine())
|
|
|
|
|
|
|
|
|
|
# All 5 LLM steps should have been consumed
|
|
|
|
|
assert llm.get_current_step() == 5
|
|
|
|
|
|
|
|
|
|
# Idle handler should never have triggered
|
|
|
|
|
assert user_idle_handler._retry_count == 0, (
|
|
|
|
|
"User idle handler should not trigger during active conversation"
|
2026-01-03 16:22:38 +05:30
|
|
|
)
|