diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 390f6cc..5ce82a9 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -21,7 +21,7 @@ from api.tasks.function_names import FunctionNames from pipecat.frames.frames import ( Frame, ) -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.worker import PipelineWorker from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.utils.enums import EndTaskReason @@ -58,7 +58,7 @@ async def _capture_call_event( def register_event_handlers( - task: PipelineTask, + task: PipelineWorker, transport, workflow_run_id: int, engine: PipecatEngine, @@ -184,13 +184,13 @@ def register_event_handlers( ) @task.event_handler("on_pipeline_started") - async def on_pipeline_started(_task: PipelineTask, _frame: Frame): + async def on_pipeline_started(_task: PipelineWorker, _frame: Frame): logger.debug("In on_pipeline_started callback handler") ready_state["pipeline_started"] = True await maybe_trigger_initial_response() @task.event_handler("on_pipeline_error") - async def on_pipeline_error(_task: PipelineTask, frame: Frame): + async def on_pipeline_error(_task: PipelineWorker, frame: Frame): logger.warning(f"Pipeline error for workflow run {workflow_run_id}: {frame}") try: workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) @@ -218,7 +218,7 @@ def register_event_handlers( @task.event_handler("on_pipeline_finished") async def on_pipeline_finished( - task: PipelineTask, + task: PipelineWorker, _frame: Frame, ): logger.debug(f"In on_pipeline_finished callback handler") diff --git a/api/services/pipecat/pipeline_builder.py b/api/services/pipecat/pipeline_builder.py index de9d48c..cefccd5 100644 --- a/api/services/pipecat/pipeline_builder.py +++ b/api/services/pipecat/pipeline_builder.py @@ -4,7 +4,7 @@ from loguru import logger from api.services.pipecat.audio_config import AudioConfig from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.utils.run_context import turn_var @@ -194,7 +194,7 @@ def create_pipeline_task( f"out: {audio_config.transport_out_sample_rate}Hz" ) - task = PipelineTask( + task = PipelineWorker( pipeline, params=pipeline_params, enable_tracing=True, diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 6cae498..be71b86 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -51,6 +51,7 @@ from api.services.pipecat.tracing_config import ( ensure_tracing, ) from api.services.pipecat.transport_setup import create_webrtc_transport +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.pipecat.ws_sender_registry import get_ws_sender from api.services.telephony import registry as telephony_registry from api.services.workflow.dto import ReactFlowDTO @@ -61,7 +62,6 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector -from pipecat.pipeline.base_task import PipelineTaskParams from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, LLMContextAggregatorPair, @@ -821,12 +821,15 @@ async def _run_pipeline( try: # Run the pipeline - loop = asyncio.get_running_loop() - params = PipelineTaskParams(loop=loop) - await task.run(params) + await run_pipeline_worker(task) logger.info(f"Task completed for run {workflow_run_id}") except asyncio.CancelledError: logger.warning("Received CancelledError in _run_pipeline") finally: + # Close MCP sessions here, not in engine.cleanup(). The anyio cancel + # scopes opened by MCPClient.start() in engine.initialize() are + # task-affine; this finally runs in the same task as initialize(), + # whereas engine.cleanup() runs in a pipecat event-handler task. + await engine.close_mcp_sessions() await feedback_observer.cleanup() logger.debug(f"Cleaned up context providers for workflow run {workflow_run_id}") diff --git a/api/services/pipecat/worker_runner.py b/api/services/pipecat/worker_runner.py new file mode 100644 index 0000000..56937c8 --- /dev/null +++ b/api/services/pipecat/worker_runner.py @@ -0,0 +1,36 @@ +import asyncio + +from pipecat.pipeline.worker import PipelineWorker +from pipecat.workers.runner import WorkerRunner + + +async def run_pipeline_worker( + worker: PipelineWorker, + *, + handle_sigint: bool = False, + handle_sigterm: bool = False, + auto_end: bool = True, +) -> None: + """Run a pipeline worker through the v1.3 worker runner lifecycle.""" + runner = WorkerRunner(handle_sigint=handle_sigint, handle_sigterm=handle_sigterm) + await runner.add_workers(worker) + await runner.run(auto_end=auto_end) + + +async def wait_for_pipeline_worker_started( + worker: PipelineWorker, + *, + timeout: float = 3.0, + run_task: asyncio.Task | None = None, +) -> None: + """Wait until a pipeline worker has fired its stable start lifecycle.""" + + async def _wait_until_started(): + while worker.started_at is None: + if run_task and run_task.done(): + await run_task + if worker.has_finished(): + raise RuntimeError("PipelineWorker finished before starting") + await asyncio.sleep(0.01) + + await asyncio.wait_for(_wait_until_started(), timeout=timeout) diff --git a/api/services/workflow/mcp_tool_session.py b/api/services/workflow/mcp_tool_session.py index 0caa1b7..a25d2bd 100644 --- a/api/services/workflow/mcp_tool_session.py +++ b/api/services/workflow/mcp_tool_session.py @@ -79,8 +79,12 @@ class McpToolSession: self.available: bool = False async def start(self) -> None: - """Connect, initialize, and cache the tool list. Never raises — - on any failure the session is marked unavailable.""" + """Connect, initialize, and cache the tool list. + + Never raises on a connect failure — a dead/unreachable MCP server + leaves the session marked unavailable (``available = False``). Genuine + external cancellation, KeyboardInterrupt, and SystemExit are re-raised + (see the CancelledError handling below and ``_degrade``).""" try: params = build_streamable_http_params( url=self._url, diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index f056725..d72c3f4 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -10,7 +10,7 @@ from pipecat.frames.frames import ( LLMContextFrame, TTSSpeakFrame, ) -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.worker import PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.services.llm_service import FunctionCallParams from pipecat.services.settings import LLMSettings @@ -60,7 +60,7 @@ class PipecatEngine: def __init__( self, *, - task: Optional[PipelineTask] = None, + task: Optional[PipelineWorker] = None, llm: Optional["LLMService"] = None, inference_llm: Optional["LLMService"] = None, context: Optional[LLMContext] = None, @@ -842,7 +842,7 @@ class PipecatEngine: """ self.context = context - def set_task(self, task: PipelineTask) -> None: + def set_task(self, task: PipelineWorker) -> None: """Set the pipeline task. This allows setting the task after the engine has been created, @@ -955,7 +955,15 @@ class PipecatEngine: exc_info=True, ) - async def _close_mcp_sessions(self) -> None: + async def close_mcp_sessions(self) -> None: + """Close all open MCP tool sessions. + + Must run in the same task that ran initialize() (which opened the + sessions via _open_mcp_sessions). The MCP client's underlying anyio + cancel scopes are task-affine — they must be exited from the task that + entered them — so this is invoked from _run_pipeline's finally, not + from cleanup() (which runs in a pipecat event-handler task). + """ for tool_uuid, session in list(self._mcp_sessions.items()): try: await session.close() @@ -964,7 +972,14 @@ class PipecatEngine: self._mcp_sessions = {} async def cleanup(self): - """Clean up engine resources on disconnect.""" + """Clean up engine resources on disconnect. + + MCP tool sessions are intentionally NOT closed here — see + close_mcp_sessions(). This method runs in a pipecat event-handler task + (on_pipeline_finished), a different task than the one that opened the + MCP sessions; closing them here raises "Attempted to exit cancel scope + in a different task than it was entered in". + """ # Cancel any pending timeout tasks if ( self._user_response_timeout_task @@ -973,11 +988,5 @@ class PipecatEngine: self._user_response_timeout_task.cancel() # Cancel any in-flight background summarization. - # MCP sessions are closed in a finally block so they are guaranteed to - # run even if the summarization cleanup raises an exception. - try: - if self._context_summarization_manager: - await self._context_summarization_manager.cleanup() - finally: - # Close any open MCP tool sessions - await self._close_mcp_sessions() + if self._context_summarization_manager: + await self._context_summarization_manager.cleanup() diff --git a/api/services/workflow/text_chat_runner.py b/api/services/workflow/text_chat_runner.py index 577aac1..83a4ad1 100644 --- a/api/services/workflow/text_chat_runner.py +++ b/api/services/workflow/text_chat_runner.py @@ -22,7 +22,6 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -45,6 +44,10 @@ from api.services.pipecat.tracing_config import ( build_remote_parent_context, get_trace_url, ) +from api.services.pipecat.worker_runner import ( + run_pipeline_worker, + wait_for_pipeline_worker_started, +) from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph @@ -534,8 +537,7 @@ async def execute_text_chat_pending_turn( conversation_type="text", additional_span_attributes=trace_span_attributes, ) - runner = PipelineRunner(handle_sigint=False, handle_sigterm=False) - runner_task = asyncio.create_task(runner.run(task)) + runner_task = asyncio.create_task(run_pipeline_worker(task)) engine.set_task(task) engine.set_audio_config(audio_config) @@ -548,7 +550,7 @@ async def execute_text_chat_pending_turn( ) try: - await asyncio.wait_for(task._pipeline_start_event.wait(), timeout=5.0) + await wait_for_pipeline_worker_started(task, timeout=5.0, run_task=runner_task) await engine.initialize() diff --git a/api/tests/integrations/_run_pipeline_helpers.py b/api/tests/integrations/_run_pipeline_helpers.py index 0591c09..a1e19b0 100644 --- a/api/tests/integrations/_run_pipeline_helpers.py +++ b/api/tests/integrations/_run_pipeline_helpers.py @@ -15,7 +15,7 @@ Provided here: - ``NoopFeedbackObserver``: a ``RealtimeFeedbackObserver`` stand-in with no WebSocket / clock-task side effects. - ``patch_run_pipeline_externals``: ``contextmanager`` that applies the - full patch set and captures the constructed ``PipelineTask`` for the + full patch set and captures the constructed ``PipelineWorker`` for the caller. Optional ``llm`` / ``tts`` arguments inject preconfigured mocks; otherwise blank ``MockLLMService`` / ``MockTTSService`` instances are constructed per-call. @@ -84,10 +84,10 @@ def patch_run_pipeline_externals( tts: MockTTSService | None = None, ): """Patch the externally-talking pieces of ``_run_pipeline`` and capture - the constructed ``PipelineTask`` so tests can drive it from outside. + the constructed ``PipelineWorker`` so tests can drive it from outside. Args: - captured_task: A list the constructed ``PipelineTask`` is appended + captured_task: A list the constructed ``PipelineWorker`` is appended to. Tests read ``captured_task[0]`` to get a handle on the task (to wait on its start event, queue frames, cancel it, etc.). llm: Optional pre-built ``MockLLMService``. When given, every call @@ -168,7 +168,7 @@ def patch_run_pipeline_externals( return_value="completed", ) ) - # Capture the PipelineTask so the test can drive it from outside. + # Capture the PipelineWorker so the test can drive it from outside. stack.enter_context( patch( "api.services.pipecat.run_pipeline.create_pipeline_task", diff --git a/api/tests/integrations/test_run_pipeline.py b/api/tests/integrations/test_run_pipeline.py index 9a87aa1..9806c50 100644 --- a/api/tests/integrations/test_run_pipeline.py +++ b/api/tests/integrations/test_run_pipeline.py @@ -2,7 +2,7 @@ Drives the actual ``_run_pipeline`` against the test database with real DB rows (organization, user, user configuration, workflow, workflow run) -and pipecat's real ``MockTransport`` / ``Pipeline`` / ``PipelineTask``. +and pipecat's real ``MockTransport`` / ``Pipeline`` / ``PipelineWorker``. The only patches are for things that talk to genuinely external systems; those are applied via ``patch_run_pipeline_externals`` from the shared helpers module. @@ -23,6 +23,7 @@ from pipecat.transports.base_transport import TransportParams from api.enums import WorkflowRunMode, WorkflowRunState from api.services.pipecat.audio_config import create_audio_config from api.services.pipecat.run_pipeline import _run_pipeline +from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started from api.tests.integrations._run_pipeline_helpers import ( create_workflow_run_rows, patch_run_pipeline_externals, @@ -116,7 +117,9 @@ async def test_run_pipeline_fires_initial_response_and_completes_run( run_task.result() # re-raise the failure assert captured_task, "create_pipeline_task was never invoked" pipeline_task = captured_task[0] - await asyncio.wait_for(pipeline_task._pipeline_start_event.wait(), timeout=3.0) + await wait_for_pipeline_worker_started( + pipeline_task, timeout=3.0, run_task=run_task + ) # Let the initial response handler (set_node, queue LLMContextFrame) # complete before tearing things down. await asyncio.sleep(0.1) diff --git a/api/tests/integrations/test_run_pipeline_text_greeting.py b/api/tests/integrations/test_run_pipeline_text_greeting.py index 0da7bf8..eb34c41 100644 --- a/api/tests/integrations/test_run_pipeline_text_greeting.py +++ b/api/tests/integrations/test_run_pipeline_text_greeting.py @@ -36,6 +36,7 @@ from pipecat.utils.time import time_now_iso8601 from api.enums import WorkflowRunMode, WorkflowRunState from api.services.pipecat.audio_config import create_audio_config from api.services.pipecat.run_pipeline import _run_pipeline +from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started from api.tests.integrations._run_pipeline_helpers import ( create_workflow_run_rows, patch_run_pipeline_externals, @@ -186,12 +187,12 @@ async def _run_test_body(workflow_run_setup, db_session) -> None: assert captured_task, "create_pipeline_task was never invoked" pipeline_task = captured_task[0] - await asyncio.wait_for( - pipeline_task._pipeline_start_event.wait(), timeout=3.0 + await wait_for_pipeline_worker_started( + pipeline_task, timeout=3.0, run_task=run_task ) # Locate the assistant aggregator's LLM context (downstream of TTS). - # The PipelineTask wraps the user's pipeline inside another Pipeline, + # The PipelineWorker wraps the user's pipeline inside another Pipeline, # so we walk the tree recursively. assistant_aggregator = _find_processor_by_class_name( pipeline_task, "LLMAssistantAggregator" diff --git a/api/tests/test_custom_tools.py b/api/tests/test_custom_tools.py index 703ae76..71d26f5 100644 --- a/api/tests/test_custom_tools.py +++ b/api/tests/test_custom_tools.py @@ -21,6 +21,7 @@ from pipecat.frames.frames import ( LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.aggregators.llm_context import LLMContext @@ -468,7 +469,7 @@ class TestExecuteHttpTool: mock_client.request.return_value = mock_response mock_client_class.return_value.__aenter__.return_value = mock_client - result = await execute_http_tool(tool, arguments) + await execute_http_tool(tool, arguments) call_kwargs = mock_client.request.call_args.kwargs assert call_kwargs["method"] == "DELETE" @@ -793,6 +794,7 @@ class TestCustomToolManagerIntegration: expected_down_frames=[ LLMFullResponseStartFrame, FunctionCallsFromLLMInfoFrame, + UserTurnInferenceCompletedFrame, FunctionCallsStartedFrame, LLMFullResponseEndFrame, FunctionCallInProgressFrame, diff --git a/api/tests/test_mcp_integration.py b/api/tests/test_mcp_integration.py index 4cf01f0..095a0d1 100644 --- a/api/tests/test_mcp_integration.py +++ b/api/tests/test_mcp_integration.py @@ -51,7 +51,7 @@ async def test_engine_opens_and_closes_mcp_sessions(monkeypatch): assert sess.available is True assert len(sess.function_schemas()) == 2 finally: - await engine._close_mcp_sessions() + await engine.close_mcp_sessions() assert engine._mcp_sessions == {} diff --git a/api/tests/test_pipecat_engine_context_update.py b/api/tests/test_pipecat_engine_context_update.py index 9235b22..b73c65b 100644 --- a/api/tests/test_pipecat_engine_context_update.py +++ b/api/tests/test_pipecat_engine_context_update.py @@ -20,8 +20,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -30,6 +29,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import ( @@ -116,7 +116,7 @@ async def run_pipeline_and_capture_context( ) # Create pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -131,10 +131,9 @@ async def run_pipeline_and_capture_context( new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_end_call.py b/api/tests/test_pipecat_engine_end_call.py index 523ad54..bc4fea8 100644 --- a/api/tests/test_pipecat_engine_end_call.py +++ b/api/tests/test_pipecat_engine_end_call.py @@ -25,8 +25,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import Frame, LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -42,6 +41,7 @@ from pipecat.turns.user_mute import ( from pipecat.utils.enums import EndTaskReason from api.enums import ToolCategory +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.dto import ( EdgeDataDTO, EndCallNodeData, @@ -112,7 +112,7 @@ async def create_engine_with_tracking( mock_llm: MockLLMService, test_helper: EndCallTestHelper, generate_audio: bool = True, -) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineTask]: +) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineWorker]: """Create a PipecatEngine with tracking for end call behavior. Args: @@ -222,7 +222,7 @@ async def create_engine_with_tracking( ) # Create pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -279,10 +279,9 @@ class TestEndCallViaNodeTransition: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -383,10 +382,9 @@ class TestEndCallViaNodeTransition: new_callable=AsyncMock, return_value={"greeting_type": "formal", "user_name": "John"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -482,10 +480,9 @@ class TestEndCallViaCustomTool: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -574,10 +571,9 @@ class TestEndCallViaCustomTool: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -652,10 +648,9 @@ class TestEndCallViaClientDisconnect: new_callable=AsyncMock, return_value={"user_intent": "disconnected"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_disconnect(): await asyncio.sleep(0.01) @@ -743,10 +738,9 @@ class TestEndCallRaceConditions: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_race(): await asyncio.sleep(0.01) @@ -855,10 +849,9 @@ class TestEndCallRaceConditions: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_race_disconnect(): nonlocal disconnect_called @@ -950,10 +943,9 @@ class TestEndCallExtractionBehavior: "_perform_extraction", side_effect=mock_extraction, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end(): await asyncio.sleep(0.01) @@ -1076,10 +1068,9 @@ class TestEndCallExtractionBehavior: "_perform_extraction", extraction_mock, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py index 82a6f55..d815a69 100644 --- a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py +++ b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py @@ -24,8 +24,7 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -48,6 +47,7 @@ from pipecat.turns.user_stop import ( from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService @@ -119,7 +119,7 @@ async def create_test_pipeline( workflow: WorkflowGraph, mock_llm: MockLLMService, user_speech_initial_delay: float = 0.01, -) -> tuple[PipecatEngine, MockTransport, PipelineTask]: +) -> tuple[PipecatEngine, MockTransport, PipelineWorker]: """Create a PipecatEngine with full pipeline for testing node switch scenarios. The pipeline includes a UserSpeechInjector processor that injects @@ -208,7 +208,7 @@ async def create_test_pipeline( ) # Create pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -286,10 +286,9 @@ class TestNodeSwitchWithUserSpeech: new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_tool_calls.py b/api/tests/test_pipecat_engine_tool_calls.py index ec04b49..5c71c09 100644 --- a/api/tests/test_pipecat_engine_tool_calls.py +++ b/api/tests/test_pipecat_engine_tool_calls.py @@ -11,8 +11,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -21,6 +20,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import END_CALL_SYSTEM_PROMPT @@ -107,7 +107,7 @@ async def run_pipeline_with_tool_calls( ) # Create a real pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -122,10 +122,9 @@ async def run_pipeline_with_tool_calls( new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): # Small delay to let runner start diff --git a/api/tests/test_pipecat_engine_transition_mute.py b/api/tests/test_pipecat_engine_transition_mute.py index 9ce0271..1bb3777 100644 --- a/api/tests/test_pipecat_engine_transition_mute.py +++ b/api/tests/test_pipecat_engine_transition_mute.py @@ -15,8 +15,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -31,6 +30,7 @@ from pipecat.turns.user_mute import ( MuteUntilFirstBotCompleteUserMuteStrategy, ) +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -99,7 +99,7 @@ async def _build_engine_and_pipeline( ] ) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) return engine, task, function_call_mute_strategy, user_context_aggregator @@ -182,10 +182,9 @@ class TestTransitionFunctionMutesUser: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -257,10 +256,9 @@ class TestTransitionFunctionMutesUser: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_variable_extraction.py b/api/tests/test_pipecat_engine_variable_extraction.py index 823592c..9adfd86 100644 --- a/api/tests/test_pipecat_engine_variable_extraction.py +++ b/api/tests/test_pipecat_engine_variable_extraction.py @@ -18,8 +18,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -28,6 +27,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -142,7 +142,7 @@ class TestVariableExtractionDuringTransitions: ) # Create pipeline task - task = PipelineTask( + task = PipelineWorker( pipeline, params=PipelineParams(), enable_rtvi=False, @@ -168,10 +168,9 @@ class TestVariableExtractionDuringTransitions: new_callable=AsyncMock, return_value={"user_name": "John Doe"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipeline_cancellation.py b/api/tests/test_pipeline_cancellation.py index 6ef0490..67a339f 100644 --- a/api/tests/test_pipeline_cancellation.py +++ b/api/tests/test_pipeline_cancellation.py @@ -8,11 +8,12 @@ from pipecat.frames.frames import ( InterruptionTaskFrame, LLMRunFrame, ) -from pipecat.pipeline.base_task import PipelineTaskParams from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.worker import PipelineWorker from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from api.services.pipecat.worker_runner import run_pipeline_worker + class MockTransport(FrameProcessor): def __init__(self, **kwargs): @@ -51,12 +52,10 @@ async def test_interruption_with_blocked_end_frame(): transport = MockTransport() pipeline = Pipeline([transport, busy_wait_processor]) - task = PipelineTask(pipeline, enable_rtvi=False) + task = PipelineWorker(pipeline, enable_rtvi=False) async def run_pipeline(): - loop = asyncio.get_running_loop() - params = PipelineTaskParams(loop=loop) - await task.run(params=params) + await run_pipeline_worker(task) async def queue_frame(): await task.queue_frames([LLMRunFrame()]) diff --git a/api/tests/test_text_and_audio_playback.py b/api/tests/test_text_and_audio_playback.py index 3c35af2..b46dc21 100644 --- a/api/tests/test_text_and_audio_playback.py +++ b/api/tests/test_text_and_audio_playback.py @@ -20,8 +20,7 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -31,6 +30,7 @@ from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams from api.services.pipecat.recording_audio_cache import RecordingAudio +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.dto import ( EdgeDataDTO, EndCallNodeData, @@ -212,7 +212,7 @@ async def run_pipeline_and_capture_frames( engine.set_transport_output(transport_output) pipeline = Pipeline([llm, tts, transport_output, context_aggregator.assistant()]) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) # Spy on task.queue_frame and transport_output.queue_frame to capture @@ -247,10 +247,9 @@ async def run_pipeline_and_capture_frames( return_value="completed", ), ): - runner = PipelineRunner() async def run(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize(): await asyncio.sleep(0.01) diff --git a/api/tests/test_tts_endframe_with_audio_write_failure.py b/api/tests/test_tts_endframe_with_audio_write_failure.py index 4914f9b..cc34a79 100644 --- a/api/tests/test_tts_endframe_with_audio_write_failure.py +++ b/api/tests/test_tts_endframe_with_audio_write_failure.py @@ -34,8 +34,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -50,6 +49,7 @@ from pipecat.turns.user_mute import ( ) from pipecat.utils.enums import EndTaskReason +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -62,7 +62,7 @@ async def create_test_pipeline_with_failing_transport( workflow: WorkflowGraph, mock_llm: MockLLMService, fail_after_n_frames: int = 0, -) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineTask]: +) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineWorker]: """Create a PipecatEngine with failing output transport for testing. Uses the real MockTransport which now extends BaseOutputTransport and uses @@ -152,7 +152,7 @@ async def create_test_pipeline_with_failing_transport( ) # Create pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -219,10 +219,9 @@ class TestTTSPauseWithAudioWriteFailure: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end_call(): await asyncio.sleep(0.01) @@ -339,10 +338,9 @@ class TestTTSPauseWithAudioWriteFailure: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_observe(): await asyncio.sleep(0.01) diff --git a/api/tests/test_unregistered_function_call.py b/api/tests/test_unregistered_function_call.py index 5229b64..a5f31a0 100644 --- a/api/tests/test_unregistered_function_call.py +++ b/api/tests/test_unregistered_function_call.py @@ -9,6 +9,7 @@ from pipecat.frames.frames import ( LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.aggregators.llm_context import LLMContext @@ -45,6 +46,7 @@ class TestUnregisteredFunctionCall: expected_down_frames=[ LLMFullResponseStartFrame, FunctionCallsFromLLMInfoFrame, + UserTurnInferenceCompletedFrame, FunctionCallsStartedFrame, LLMFullResponseEndFrame, FunctionCallInProgressFrame, diff --git a/api/tests/test_user_idle_handler.py b/api/tests/test_user_idle_handler.py index 77dfb2c..5e3b622 100644 --- a/api/tests/test_user_idle_handler.py +++ b/api/tests/test_user_idle_handler.py @@ -23,8 +23,7 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -43,6 +42,7 @@ from pipecat.turns.user_stop import ExternalUserTurnStopStrategy from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService @@ -100,7 +100,7 @@ async def create_pipeline_with_speech_injection( speeches: list[str], user_idle_timeout: float = 0.2, mock_audio_duration_ms: int = 400, -) -> tuple[PipecatEngine, PipelineTask, object]: +) -> tuple[PipecatEngine, PipelineWorker, object]: """Create a pipeline with user speech injection and idle handling. Sets up a realistic pipeline with: @@ -194,7 +194,7 @@ async def create_pipeline_with_speech_injection( ] ) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) return engine, task, user_idle_handler @@ -266,10 +266,9 @@ class TestUserIdleHandler: new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_user_muting_during_bot_speech.py b/api/tests/test_user_muting_during_bot_speech.py index ac04299..6a6acf6 100644 --- a/api/tests/test_user_muting_during_bot_speech.py +++ b/api/tests/test_user_muting_during_bot_speech.py @@ -25,8 +25,7 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -44,6 +43,7 @@ from pipecat.turns.user_mute import ( from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -125,7 +125,7 @@ async def create_engine_for_mute_test( PipecatEngine, MockTTSService, MockTransport, - PipelineTask, + PipelineWorker, LLMUserAggregator, BotSpeakingObserverProcessor, ]: @@ -196,7 +196,7 @@ async def create_engine_for_mute_test( ] ) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) return engine, tts, mock_transport, task, user_context_aggregator, observer @@ -258,10 +258,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) @@ -349,10 +348,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) @@ -445,10 +443,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) diff --git a/api/tests/test_voicemail_detector.py b/api/tests/test_voicemail_detector.py index 0677c29..c9084fd 100644 --- a/api/tests/test_voicemail_detector.py +++ b/api/tests/test_voicemail_detector.py @@ -17,8 +17,7 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -36,6 +35,7 @@ from pipecat.turns.user_stop import ( from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from pipecat.tests import MockLLMService @@ -161,11 +161,10 @@ class TestVoicemailDetectorWithUserAggregator: ] ) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) - runner = PipelineRunner() + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def inject_frames(): await asyncio.sleep(0.05) diff --git a/pipecat b/pipecat index a845887..b0ac013 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit a8458879e6c950007b0753a33652f744fca5cdb5 +Subproject commit b0ac013a08cf74131a93afc5213af6b4802e5871