From a0e9b31b2062450ae213ef56875229811184afa1 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Fri, 29 May 2026 15:25:58 +0530 Subject: [PATCH] fix: fix tests --- api/services/pipecat/run_pipeline.py | 6 ++-- api/services/pipecat/worker_runner.py | 36 +++++++++++++++++++ api/services/workflow/text_chat_runner.py | 10 +++--- api/tests/integrations/test_run_pipeline.py | 5 ++- .../test_run_pipeline_text_greeting.py | 5 +-- api/tests/test_custom_tools.py | 4 ++- .../test_pipecat_engine_context_update.py | 5 ++- api/tests/test_pipecat_engine_end_call.py | 29 ++++++--------- ...cat_engine_node_switch_with_user_speech.py | 5 ++- api/tests/test_pipecat_engine_tool_calls.py | 5 ++- .../test_pipecat_engine_transition_mute.py | 8 ++--- ...test_pipecat_engine_variable_extraction.py | 5 ++- api/tests/test_pipeline_cancellation.py | 7 ++-- api/tests/test_text_and_audio_playback.py | 5 ++- ...t_tts_endframe_with_audio_write_failure.py | 8 ++--- api/tests/test_unregistered_function_call.py | 2 ++ api/tests/test_user_idle_handler.py | 5 ++- .../test_user_muting_during_bot_speech.py | 11 +++--- api/tests/test_voicemail_detector.py | 5 ++- pipecat | 2 +- 20 files changed, 94 insertions(+), 74 deletions(-) create mode 100644 api/services/pipecat/worker_runner.py diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index d6eb7857..f6ffce40 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -51,6 +51,7 @@ from api.services.pipecat.tracing_config import ( ensure_tracing, ) from api.services.pipecat.transport_setup import create_webrtc_transport +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.pipecat.ws_sender_registry import get_ws_sender from api.services.telephony import registry as telephony_registry from api.services.workflow.dto import ReactFlowDTO @@ -87,7 +88,6 @@ from pipecat.turns.user_stop import ( from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.enums import EndTaskReason, RealtimeFeedbackType from pipecat.utils.run_context import set_current_org_id, set_current_run_id -from pipecat.workers.base_worker import WorkerParams # Setup tracing if enabled ensure_tracing() @@ -821,9 +821,7 @@ async def _run_pipeline( try: # Run the pipeline - loop = asyncio.get_running_loop() - params = WorkerParams(loop=loop) - await task.run(params) + await run_pipeline_worker(task) logger.info(f"Task completed for run {workflow_run_id}") except asyncio.CancelledError: logger.warning("Received CancelledError in _run_pipeline") diff --git a/api/services/pipecat/worker_runner.py b/api/services/pipecat/worker_runner.py new file mode 100644 index 00000000..56937c8a --- /dev/null +++ b/api/services/pipecat/worker_runner.py @@ -0,0 +1,36 @@ +import asyncio + +from pipecat.pipeline.worker import PipelineWorker +from pipecat.workers.runner import WorkerRunner + + +async def run_pipeline_worker( + worker: PipelineWorker, + *, + handle_sigint: bool = False, + handle_sigterm: bool = False, + auto_end: bool = True, +) -> None: + """Run a pipeline worker through the v1.3 worker runner lifecycle.""" + runner = WorkerRunner(handle_sigint=handle_sigint, handle_sigterm=handle_sigterm) + await runner.add_workers(worker) + await runner.run(auto_end=auto_end) + + +async def wait_for_pipeline_worker_started( + worker: PipelineWorker, + *, + timeout: float = 3.0, + run_task: asyncio.Task | None = None, +) -> None: + """Wait until a pipeline worker has fired its stable start lifecycle.""" + + async def _wait_until_started(): + while worker.started_at is None: + if run_task and run_task.done(): + await run_task + if worker.has_finished(): + raise RuntimeError("PipelineWorker finished before starting") + await asyncio.sleep(0.01) + + await asyncio.wait_for(_wait_until_started(), timeout=timeout) diff --git a/api/services/workflow/text_chat_runner.py b/api/services/workflow/text_chat_runner.py index 577aac13..83a4ad15 100644 --- a/api/services/workflow/text_chat_runner.py +++ b/api/services/workflow/text_chat_runner.py @@ -22,7 +22,6 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -45,6 +44,10 @@ from api.services.pipecat.tracing_config import ( build_remote_parent_context, get_trace_url, ) +from api.services.pipecat.worker_runner import ( + run_pipeline_worker, + wait_for_pipeline_worker_started, +) from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph @@ -534,8 +537,7 @@ async def execute_text_chat_pending_turn( conversation_type="text", additional_span_attributes=trace_span_attributes, ) - runner = PipelineRunner(handle_sigint=False, handle_sigterm=False) - runner_task = asyncio.create_task(runner.run(task)) + runner_task = asyncio.create_task(run_pipeline_worker(task)) engine.set_task(task) engine.set_audio_config(audio_config) @@ -548,7 +550,7 @@ async def execute_text_chat_pending_turn( ) try: - await asyncio.wait_for(task._pipeline_start_event.wait(), timeout=5.0) + await wait_for_pipeline_worker_started(task, timeout=5.0, run_task=runner_task) await engine.initialize() diff --git a/api/tests/integrations/test_run_pipeline.py b/api/tests/integrations/test_run_pipeline.py index 5de60643..9806c509 100644 --- a/api/tests/integrations/test_run_pipeline.py +++ b/api/tests/integrations/test_run_pipeline.py @@ -23,6 +23,7 @@ from pipecat.transports.base_transport import TransportParams from api.enums import WorkflowRunMode, WorkflowRunState from api.services.pipecat.audio_config import create_audio_config from api.services.pipecat.run_pipeline import _run_pipeline +from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started from api.tests.integrations._run_pipeline_helpers import ( create_workflow_run_rows, patch_run_pipeline_externals, @@ -116,7 +117,9 @@ async def test_run_pipeline_fires_initial_response_and_completes_run( run_task.result() # re-raise the failure assert captured_task, "create_pipeline_task was never invoked" pipeline_task = captured_task[0] - await asyncio.wait_for(pipeline_task._pipeline_start_event.wait(), timeout=3.0) + await wait_for_pipeline_worker_started( + pipeline_task, timeout=3.0, run_task=run_task + ) # Let the initial response handler (set_node, queue LLMContextFrame) # complete before tearing things down. await asyncio.sleep(0.1) diff --git a/api/tests/integrations/test_run_pipeline_text_greeting.py b/api/tests/integrations/test_run_pipeline_text_greeting.py index 02a163e4..eb34c411 100644 --- a/api/tests/integrations/test_run_pipeline_text_greeting.py +++ b/api/tests/integrations/test_run_pipeline_text_greeting.py @@ -36,6 +36,7 @@ from pipecat.utils.time import time_now_iso8601 from api.enums import WorkflowRunMode, WorkflowRunState from api.services.pipecat.audio_config import create_audio_config from api.services.pipecat.run_pipeline import _run_pipeline +from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started from api.tests.integrations._run_pipeline_helpers import ( create_workflow_run_rows, patch_run_pipeline_externals, @@ -186,8 +187,8 @@ async def _run_test_body(workflow_run_setup, db_session) -> None: assert captured_task, "create_pipeline_task was never invoked" pipeline_task = captured_task[0] - await asyncio.wait_for( - pipeline_task._pipeline_start_event.wait(), timeout=3.0 + await wait_for_pipeline_worker_started( + pipeline_task, timeout=3.0, run_task=run_task ) # Locate the assistant aggregator's LLM context (downstream of TTS). diff --git a/api/tests/test_custom_tools.py b/api/tests/test_custom_tools.py index 703ae76e..71d26f55 100644 --- a/api/tests/test_custom_tools.py +++ b/api/tests/test_custom_tools.py @@ -21,6 +21,7 @@ from pipecat.frames.frames import ( LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.aggregators.llm_context import LLMContext @@ -468,7 +469,7 @@ class TestExecuteHttpTool: mock_client.request.return_value = mock_response mock_client_class.return_value.__aenter__.return_value = mock_client - result = await execute_http_tool(tool, arguments) + await execute_http_tool(tool, arguments) call_kwargs = mock_client.request.call_args.kwargs assert call_kwargs["method"] == "DELETE" @@ -793,6 +794,7 @@ class TestCustomToolManagerIntegration: expected_down_frames=[ LLMFullResponseStartFrame, FunctionCallsFromLLMInfoFrame, + UserTurnInferenceCompletedFrame, FunctionCallsStartedFrame, LLMFullResponseEndFrame, FunctionCallInProgressFrame, diff --git a/api/tests/test_pipecat_engine_context_update.py b/api/tests/test_pipecat_engine_context_update.py index a8b4481f..b73c65ba 100644 --- a/api/tests/test_pipecat_engine_context_update.py +++ b/api/tests/test_pipecat_engine_context_update.py @@ -20,7 +20,6 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -30,6 +29,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import ( @@ -131,10 +131,9 @@ async def run_pipeline_and_capture_context( new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_end_call.py b/api/tests/test_pipecat_engine_end_call.py index 1f2b6e68..bc4fea8e 100644 --- a/api/tests/test_pipecat_engine_end_call.py +++ b/api/tests/test_pipecat_engine_end_call.py @@ -25,7 +25,6 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import Frame, LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -42,6 +41,7 @@ from pipecat.turns.user_mute import ( from pipecat.utils.enums import EndTaskReason from api.enums import ToolCategory +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.dto import ( EdgeDataDTO, EndCallNodeData, @@ -279,10 +279,9 @@ class TestEndCallViaNodeTransition: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -383,10 +382,9 @@ class TestEndCallViaNodeTransition: new_callable=AsyncMock, return_value={"greeting_type": "formal", "user_name": "John"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -482,10 +480,9 @@ class TestEndCallViaCustomTool: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -574,10 +571,9 @@ class TestEndCallViaCustomTool: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -652,10 +648,9 @@ class TestEndCallViaClientDisconnect: new_callable=AsyncMock, return_value={"user_intent": "disconnected"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_disconnect(): await asyncio.sleep(0.01) @@ -743,10 +738,9 @@ class TestEndCallRaceConditions: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_race(): await asyncio.sleep(0.01) @@ -855,10 +849,9 @@ class TestEndCallRaceConditions: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_race_disconnect(): nonlocal disconnect_called @@ -950,10 +943,9 @@ class TestEndCallExtractionBehavior: "_perform_extraction", side_effect=mock_extraction, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end(): await asyncio.sleep(0.01) @@ -1076,10 +1068,9 @@ class TestEndCallExtractionBehavior: "_perform_extraction", extraction_mock, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py index 858a601f..d815a694 100644 --- a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py +++ b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py @@ -24,7 +24,6 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -48,6 +47,7 @@ from pipecat.turns.user_stop import ( from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService @@ -286,10 +286,9 @@ class TestNodeSwitchWithUserSpeech: new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_tool_calls.py b/api/tests/test_pipecat_engine_tool_calls.py index 37714501..5c71c09d 100644 --- a/api/tests/test_pipecat_engine_tool_calls.py +++ b/api/tests/test_pipecat_engine_tool_calls.py @@ -11,7 +11,6 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -21,6 +20,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import END_CALL_SYSTEM_PROMPT @@ -122,10 +122,9 @@ async def run_pipeline_with_tool_calls( new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): # Small delay to let runner start diff --git a/api/tests/test_pipecat_engine_transition_mute.py b/api/tests/test_pipecat_engine_transition_mute.py index 6080ffe3..1bb37774 100644 --- a/api/tests/test_pipecat_engine_transition_mute.py +++ b/api/tests/test_pipecat_engine_transition_mute.py @@ -15,7 +15,6 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -31,6 +30,7 @@ from pipecat.turns.user_mute import ( MuteUntilFirstBotCompleteUserMuteStrategy, ) +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -182,10 +182,9 @@ class TestTransitionFunctionMutesUser: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -257,10 +256,9 @@ class TestTransitionFunctionMutesUser: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_variable_extraction.py b/api/tests/test_pipecat_engine_variable_extraction.py index b53780af..9adfd867 100644 --- a/api/tests/test_pipecat_engine_variable_extraction.py +++ b/api/tests/test_pipecat_engine_variable_extraction.py @@ -18,7 +18,6 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -28,6 +27,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -168,10 +168,9 @@ class TestVariableExtractionDuringTransitions: new_callable=AsyncMock, return_value={"user_name": "John Doe"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipeline_cancellation.py b/api/tests/test_pipeline_cancellation.py index bb1bfd32..67a339f1 100644 --- a/api/tests/test_pipeline_cancellation.py +++ b/api/tests/test_pipeline_cancellation.py @@ -11,7 +11,8 @@ from pipecat.frames.frames import ( from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.worker import PipelineWorker from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.workers.base_worker import WorkerParams + +from api.services.pipecat.worker_runner import run_pipeline_worker class MockTransport(FrameProcessor): @@ -54,9 +55,7 @@ async def test_interruption_with_blocked_end_frame(): task = PipelineWorker(pipeline, enable_rtvi=False) async def run_pipeline(): - loop = asyncio.get_running_loop() - params = WorkerParams(loop=loop) - await task.run(params=params) + await run_pipeline_worker(task) async def queue_frame(): await task.queue_frames([LLMRunFrame()]) diff --git a/api/tests/test_text_and_audio_playback.py b/api/tests/test_text_and_audio_playback.py index a2d6cb0e..b46dc215 100644 --- a/api/tests/test_text_and_audio_playback.py +++ b/api/tests/test_text_and_audio_playback.py @@ -20,7 +20,6 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -31,6 +30,7 @@ from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams from api.services.pipecat.recording_audio_cache import RecordingAudio +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.dto import ( EdgeDataDTO, EndCallNodeData, @@ -247,10 +247,9 @@ async def run_pipeline_and_capture_frames( return_value="completed", ), ): - runner = PipelineRunner() async def run(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize(): await asyncio.sleep(0.01) diff --git a/api/tests/test_tts_endframe_with_audio_write_failure.py b/api/tests/test_tts_endframe_with_audio_write_failure.py index b945ae3e..cc34a797 100644 --- a/api/tests/test_tts_endframe_with_audio_write_failure.py +++ b/api/tests/test_tts_endframe_with_audio_write_failure.py @@ -34,7 +34,6 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -50,6 +49,7 @@ from pipecat.turns.user_mute import ( ) from pipecat.utils.enums import EndTaskReason +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -219,10 +219,9 @@ class TestTTSPauseWithAudioWriteFailure: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end_call(): await asyncio.sleep(0.01) @@ -339,10 +338,9 @@ class TestTTSPauseWithAudioWriteFailure: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_observe(): await asyncio.sleep(0.01) diff --git a/api/tests/test_unregistered_function_call.py b/api/tests/test_unregistered_function_call.py index 5229b64d..a5f31a04 100644 --- a/api/tests/test_unregistered_function_call.py +++ b/api/tests/test_unregistered_function_call.py @@ -9,6 +9,7 @@ from pipecat.frames.frames import ( LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.aggregators.llm_context import LLMContext @@ -45,6 +46,7 @@ class TestUnregisteredFunctionCall: expected_down_frames=[ LLMFullResponseStartFrame, FunctionCallsFromLLMInfoFrame, + UserTurnInferenceCompletedFrame, FunctionCallsStartedFrame, LLMFullResponseEndFrame, FunctionCallInProgressFrame, diff --git a/api/tests/test_user_idle_handler.py b/api/tests/test_user_idle_handler.py index 122d7c33..5e3b622a 100644 --- a/api/tests/test_user_idle_handler.py +++ b/api/tests/test_user_idle_handler.py @@ -23,7 +23,6 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -43,6 +42,7 @@ from pipecat.turns.user_stop import ExternalUserTurnStopStrategy from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService @@ -266,10 +266,9 @@ class TestUserIdleHandler: new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_user_muting_during_bot_speech.py b/api/tests/test_user_muting_during_bot_speech.py index 3ea0fabb..6a6acf65 100644 --- a/api/tests/test_user_muting_during_bot_speech.py +++ b/api/tests/test_user_muting_during_bot_speech.py @@ -25,7 +25,6 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -44,6 +43,7 @@ from pipecat.turns.user_mute import ( from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -258,10 +258,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) @@ -349,10 +348,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) @@ -445,10 +443,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) diff --git a/api/tests/test_voicemail_detector.py b/api/tests/test_voicemail_detector.py index b2f26a0c..c9084fd4 100644 --- a/api/tests/test_voicemail_detector.py +++ b/api/tests/test_voicemail_detector.py @@ -17,7 +17,6 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( @@ -36,6 +35,7 @@ from pipecat.turns.user_stop import ( from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from pipecat.tests import MockLLMService @@ -162,10 +162,9 @@ class TestVoicemailDetectorWithUserAggregator: ) task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def inject_frames(): await asyncio.sleep(0.05) diff --git a/pipecat b/pipecat index 1e145e42..54c83d86 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 1e145e4267df535b64547ed229d6234a99bd8843 +Subproject commit 54c83d862a418082d9b33d90233a6d1b78ebcdf8