From 5ef3be92b5fa9441609454ac19d89dd5029fa792 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Fri, 29 May 2026 16:19:42 +0530 Subject: [PATCH 01/14] chore: update pipecat to 1.3.0 (#379) * chore: rename PipelineTask to PipelineWorker * fix: fix tests * chore: update pipecat submodule * fix: fix anyio same task cancellation scope --- api/services/pipecat/event_handlers.py | 10 +++--- api/services/pipecat/pipeline_builder.py | 4 +-- api/services/pipecat/run_pipeline.py | 11 +++--- api/services/pipecat/worker_runner.py | 36 +++++++++++++++++++ api/services/workflow/mcp_tool_session.py | 8 +++-- api/services/workflow/pipecat_engine.py | 35 +++++++++++------- api/services/workflow/text_chat_runner.py | 10 +++--- .../integrations/_run_pipeline_helpers.py | 8 ++--- api/tests/integrations/test_run_pipeline.py | 7 ++-- .../test_run_pipeline_text_greeting.py | 7 ++-- api/tests/test_custom_tools.py | 4 ++- api/tests/test_mcp_integration.py | 2 +- .../test_pipecat_engine_context_update.py | 9 +++-- api/tests/test_pipecat_engine_end_call.py | 35 +++++++----------- ...cat_engine_node_switch_with_user_speech.py | 11 +++--- api/tests/test_pipecat_engine_tool_calls.py | 9 +++-- .../test_pipecat_engine_transition_mute.py | 12 +++---- ...test_pipecat_engine_variable_extraction.py | 9 +++-- api/tests/test_pipeline_cancellation.py | 11 +++--- api/tests/test_text_and_audio_playback.py | 9 +++-- ...t_tts_endframe_with_audio_write_failure.py | 14 ++++---- api/tests/test_unregistered_function_call.py | 2 ++ api/tests/test_user_idle_handler.py | 11 +++--- .../test_user_muting_during_bot_speech.py | 17 ++++----- api/tests/test_voicemail_detector.py | 9 +++-- pipecat | 2 +- 26 files changed, 170 insertions(+), 132 deletions(-) create mode 100644 api/services/pipecat/worker_runner.py diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 390f6cc..5ce82a9 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -21,7 +21,7 @@ from api.tasks.function_names import FunctionNames from pipecat.frames.frames import ( Frame, ) -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.worker import PipelineWorker from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.utils.enums import EndTaskReason @@ -58,7 +58,7 @@ async def _capture_call_event( def register_event_handlers( - task: PipelineTask, + task: PipelineWorker, transport, workflow_run_id: int, engine: PipecatEngine, @@ -184,13 +184,13 @@ def register_event_handlers( ) @task.event_handler("on_pipeline_started") - async def on_pipeline_started(_task: PipelineTask, _frame: Frame): + async def on_pipeline_started(_task: PipelineWorker, _frame: Frame): logger.debug("In on_pipeline_started callback handler") ready_state["pipeline_started"] = True await maybe_trigger_initial_response() @task.event_handler("on_pipeline_error") - async def on_pipeline_error(_task: PipelineTask, frame: Frame): + async def on_pipeline_error(_task: PipelineWorker, frame: Frame): logger.warning(f"Pipeline error for workflow run {workflow_run_id}: {frame}") try: workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) @@ -218,7 +218,7 @@ def register_event_handlers( @task.event_handler("on_pipeline_finished") async def on_pipeline_finished( - task: PipelineTask, + task: PipelineWorker, _frame: Frame, ): logger.debug(f"In on_pipeline_finished callback handler") diff --git a/api/services/pipecat/pipeline_builder.py b/api/services/pipecat/pipeline_builder.py index de9d48c..cefccd5 100644 --- a/api/services/pipecat/pipeline_builder.py +++ b/api/services/pipecat/pipeline_builder.py @@ -4,7 +4,7 @@ from loguru import logger from api.services.pipecat.audio_config import AudioConfig from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.utils.run_context import turn_var @@ -194,7 +194,7 @@ def create_pipeline_task( f"out: {audio_config.transport_out_sample_rate}Hz" ) - task = PipelineTask( + task = PipelineWorker( pipeline, params=pipeline_params, enable_tracing=True, diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 6cae498..be71b86 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -51,6 +51,7 @@ from api.services.pipecat.tracing_config import ( ensure_tracing, ) from api.services.pipecat.transport_setup import create_webrtc_transport +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.pipecat.ws_sender_registry import get_ws_sender from api.services.telephony import registry as telephony_registry from api.services.workflow.dto import ReactFlowDTO @@ -61,7 +62,6 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector -from pipecat.pipeline.base_task import PipelineTaskParams from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, LLMContextAggregatorPair, @@ -821,12 +821,15 @@ async def _run_pipeline( try: # Run the pipeline - loop = asyncio.get_running_loop() - params = PipelineTaskParams(loop=loop) - await task.run(params) + await run_pipeline_worker(task) logger.info(f"Task completed for run {workflow_run_id}") except asyncio.CancelledError: logger.warning("Received CancelledError in _run_pipeline") finally: + # Close MCP sessions here, not in engine.cleanup(). The anyio cancel + # scopes opened by MCPClient.start() in engine.initialize() are + # task-affine; this finally runs in the same task as initialize(), + # whereas engine.cleanup() runs in a pipecat event-handler task. + await engine.close_mcp_sessions() await feedback_observer.cleanup() logger.debug(f"Cleaned up context providers for workflow run {workflow_run_id}") diff --git a/api/services/pipecat/worker_runner.py b/api/services/pipecat/worker_runner.py new file mode 100644 index 0000000..56937c8 --- /dev/null +++ b/api/services/pipecat/worker_runner.py @@ -0,0 +1,36 @@ +import asyncio + +from pipecat.pipeline.worker import PipelineWorker +from pipecat.workers.runner import WorkerRunner + + +async def run_pipeline_worker( + worker: PipelineWorker, + *, + handle_sigint: bool = False, + handle_sigterm: bool = False, + auto_end: bool = True, +) -> None: + """Run a pipeline worker through the v1.3 worker runner lifecycle.""" + runner = WorkerRunner(handle_sigint=handle_sigint, handle_sigterm=handle_sigterm) + await runner.add_workers(worker) + await runner.run(auto_end=auto_end) + + +async def wait_for_pipeline_worker_started( + worker: PipelineWorker, + *, + timeout: float = 3.0, + run_task: asyncio.Task | None = None, +) -> None: + """Wait until a pipeline worker has fired its stable start lifecycle.""" + + async def _wait_until_started(): + while worker.started_at is None: + if run_task and run_task.done(): + await run_task + if worker.has_finished(): + raise RuntimeError("PipelineWorker finished before starting") + await asyncio.sleep(0.01) + + await asyncio.wait_for(_wait_until_started(), timeout=timeout) diff --git a/api/services/workflow/mcp_tool_session.py b/api/services/workflow/mcp_tool_session.py index 0caa1b7..a25d2bd 100644 --- a/api/services/workflow/mcp_tool_session.py +++ b/api/services/workflow/mcp_tool_session.py @@ -79,8 +79,12 @@ class McpToolSession: self.available: bool = False async def start(self) -> None: - """Connect, initialize, and cache the tool list. Never raises — - on any failure the session is marked unavailable.""" + """Connect, initialize, and cache the tool list. + + Never raises on a connect failure — a dead/unreachable MCP server + leaves the session marked unavailable (``available = False``). Genuine + external cancellation, KeyboardInterrupt, and SystemExit are re-raised + (see the CancelledError handling below and ``_degrade``).""" try: params = build_streamable_http_params( url=self._url, diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index f056725..d72c3f4 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -10,7 +10,7 @@ from pipecat.frames.frames import ( LLMContextFrame, TTSSpeakFrame, ) -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.worker import PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.services.llm_service import FunctionCallParams from pipecat.services.settings import LLMSettings @@ -60,7 +60,7 @@ class PipecatEngine: def __init__( self, *, - task: Optional[PipelineTask] = None, + task: Optional[PipelineWorker] = None, llm: Optional["LLMService"] = None, inference_llm: Optional["LLMService"] = None, context: Optional[LLMContext] = None, @@ -842,7 +842,7 @@ class PipecatEngine: """ self.context = context - def set_task(self, task: PipelineTask) -> None: + def set_task(self, task: PipelineWorker) -> None: """Set the pipeline task. This allows setting the task after the engine has been created, @@ -955,7 +955,15 @@ class PipecatEngine: exc_info=True, ) - async def _close_mcp_sessions(self) -> None: + async def close_mcp_sessions(self) -> None: + """Close all open MCP tool sessions. + + Must run in the same task that ran initialize() (which opened the + sessions via _open_mcp_sessions). The MCP client's underlying anyio + cancel scopes are task-affine — they must be exited from the task that + entered them — so this is invoked from _run_pipeline's finally, not + from cleanup() (which runs in a pipecat event-handler task). + """ for tool_uuid, session in list(self._mcp_sessions.items()): try: await session.close() @@ -964,7 +972,14 @@ class PipecatEngine: self._mcp_sessions = {} async def cleanup(self): - """Clean up engine resources on disconnect.""" + """Clean up engine resources on disconnect. + + MCP tool sessions are intentionally NOT closed here — see + close_mcp_sessions(). This method runs in a pipecat event-handler task + (on_pipeline_finished), a different task than the one that opened the + MCP sessions; closing them here raises "Attempted to exit cancel scope + in a different task than it was entered in". + """ # Cancel any pending timeout tasks if ( self._user_response_timeout_task @@ -973,11 +988,5 @@ class PipecatEngine: self._user_response_timeout_task.cancel() # Cancel any in-flight background summarization. - # MCP sessions are closed in a finally block so they are guaranteed to - # run even if the summarization cleanup raises an exception. - try: - if self._context_summarization_manager: - await self._context_summarization_manager.cleanup() - finally: - # Close any open MCP tool sessions - await self._close_mcp_sessions() + if self._context_summarization_manager: + await self._context_summarization_manager.cleanup() diff --git a/api/services/workflow/text_chat_runner.py b/api/services/workflow/text_chat_runner.py index 577aac1..83a4ad1 100644 --- a/api/services/workflow/text_chat_runner.py +++ b/api/services/workflow/text_chat_runner.py @@ -22,7 +22,6 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -45,6 +44,10 @@ from api.services.pipecat.tracing_config import ( build_remote_parent_context, get_trace_url, ) +from api.services.pipecat.worker_runner import ( + run_pipeline_worker, + wait_for_pipeline_worker_started, +) from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph @@ -534,8 +537,7 @@ async def execute_text_chat_pending_turn( conversation_type="text", additional_span_attributes=trace_span_attributes, ) - runner = PipelineRunner(handle_sigint=False, handle_sigterm=False) - runner_task = asyncio.create_task(runner.run(task)) + runner_task = asyncio.create_task(run_pipeline_worker(task)) engine.set_task(task) engine.set_audio_config(audio_config) @@ -548,7 +550,7 @@ async def execute_text_chat_pending_turn( ) try: - await asyncio.wait_for(task._pipeline_start_event.wait(), timeout=5.0) + await wait_for_pipeline_worker_started(task, timeout=5.0, run_task=runner_task) await engine.initialize() diff --git a/api/tests/integrations/_run_pipeline_helpers.py b/api/tests/integrations/_run_pipeline_helpers.py index 0591c09..a1e19b0 100644 --- a/api/tests/integrations/_run_pipeline_helpers.py +++ b/api/tests/integrations/_run_pipeline_helpers.py @@ -15,7 +15,7 @@ Provided here: - ``NoopFeedbackObserver``: a ``RealtimeFeedbackObserver`` stand-in with no WebSocket / clock-task side effects. - ``patch_run_pipeline_externals``: ``contextmanager`` that applies the - full patch set and captures the constructed ``PipelineTask`` for the + full patch set and captures the constructed ``PipelineWorker`` for the caller. Optional ``llm`` / ``tts`` arguments inject preconfigured mocks; otherwise blank ``MockLLMService`` / ``MockTTSService`` instances are constructed per-call. @@ -84,10 +84,10 @@ def patch_run_pipeline_externals( tts: MockTTSService | None = None, ): """Patch the externally-talking pieces of ``_run_pipeline`` and capture - the constructed ``PipelineTask`` so tests can drive it from outside. + the constructed ``PipelineWorker`` so tests can drive it from outside. Args: - captured_task: A list the constructed ``PipelineTask`` is appended + captured_task: A list the constructed ``PipelineWorker`` is appended to. Tests read ``captured_task[0]`` to get a handle on the task (to wait on its start event, queue frames, cancel it, etc.). llm: Optional pre-built ``MockLLMService``. When given, every call @@ -168,7 +168,7 @@ def patch_run_pipeline_externals( return_value="completed", ) ) - # Capture the PipelineTask so the test can drive it from outside. + # Capture the PipelineWorker so the test can drive it from outside. stack.enter_context( patch( "api.services.pipecat.run_pipeline.create_pipeline_task", diff --git a/api/tests/integrations/test_run_pipeline.py b/api/tests/integrations/test_run_pipeline.py index 9a87aa1..9806c50 100644 --- a/api/tests/integrations/test_run_pipeline.py +++ b/api/tests/integrations/test_run_pipeline.py @@ -2,7 +2,7 @@ Drives the actual ``_run_pipeline`` against the test database with real DB rows (organization, user, user configuration, workflow, workflow run) -and pipecat's real ``MockTransport`` / ``Pipeline`` / ``PipelineTask``. +and pipecat's real ``MockTransport`` / ``Pipeline`` / ``PipelineWorker``. The only patches are for things that talk to genuinely external systems; those are applied via ``patch_run_pipeline_externals`` from the shared helpers module. @@ -23,6 +23,7 @@ from pipecat.transports.base_transport import TransportParams from api.enums import WorkflowRunMode, WorkflowRunState from api.services.pipecat.audio_config import create_audio_config from api.services.pipecat.run_pipeline import _run_pipeline +from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started from api.tests.integrations._run_pipeline_helpers import ( create_workflow_run_rows, patch_run_pipeline_externals, @@ -116,7 +117,9 @@ async def test_run_pipeline_fires_initial_response_and_completes_run( run_task.result() # re-raise the failure assert captured_task, "create_pipeline_task was never invoked" pipeline_task = captured_task[0] - await asyncio.wait_for(pipeline_task._pipeline_start_event.wait(), timeout=3.0) + await wait_for_pipeline_worker_started( + pipeline_task, timeout=3.0, run_task=run_task + ) # Let the initial response handler (set_node, queue LLMContextFrame) # complete before tearing things down. await asyncio.sleep(0.1) diff --git a/api/tests/integrations/test_run_pipeline_text_greeting.py b/api/tests/integrations/test_run_pipeline_text_greeting.py index 0da7bf8..eb34c41 100644 --- a/api/tests/integrations/test_run_pipeline_text_greeting.py +++ b/api/tests/integrations/test_run_pipeline_text_greeting.py @@ -36,6 +36,7 @@ from pipecat.utils.time import time_now_iso8601 from api.enums import WorkflowRunMode, WorkflowRunState from api.services.pipecat.audio_config import create_audio_config from api.services.pipecat.run_pipeline import _run_pipeline +from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started from api.tests.integrations._run_pipeline_helpers import ( create_workflow_run_rows, patch_run_pipeline_externals, @@ -186,12 +187,12 @@ async def _run_test_body(workflow_run_setup, db_session) -> None: assert captured_task, "create_pipeline_task was never invoked" pipeline_task = captured_task[0] - await asyncio.wait_for( - pipeline_task._pipeline_start_event.wait(), timeout=3.0 + await wait_for_pipeline_worker_started( + pipeline_task, timeout=3.0, run_task=run_task ) # Locate the assistant aggregator's LLM context (downstream of TTS). - # The PipelineTask wraps the user's pipeline inside another Pipeline, + # The PipelineWorker wraps the user's pipeline inside another Pipeline, # so we walk the tree recursively. assistant_aggregator = _find_processor_by_class_name( pipeline_task, "LLMAssistantAggregator" diff --git a/api/tests/test_custom_tools.py b/api/tests/test_custom_tools.py index 703ae76..71d26f5 100644 --- a/api/tests/test_custom_tools.py +++ b/api/tests/test_custom_tools.py @@ -21,6 +21,7 @@ from pipecat.frames.frames import ( LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.aggregators.llm_context import LLMContext @@ -468,7 +469,7 @@ class TestExecuteHttpTool: mock_client.request.return_value = mock_response mock_client_class.return_value.__aenter__.return_value = mock_client - result = await execute_http_tool(tool, arguments) + await execute_http_tool(tool, arguments) call_kwargs = mock_client.request.call_args.kwargs assert call_kwargs["method"] == "DELETE" @@ -793,6 +794,7 @@ class TestCustomToolManagerIntegration: expected_down_frames=[ LLMFullResponseStartFrame, FunctionCallsFromLLMInfoFrame, + UserTurnInferenceCompletedFrame, FunctionCallsStartedFrame, LLMFullResponseEndFrame, FunctionCallInProgressFrame, diff --git a/api/tests/test_mcp_integration.py b/api/tests/test_mcp_integration.py index 4cf01f0..095a0d1 100644 --- a/api/tests/test_mcp_integration.py +++ b/api/tests/test_mcp_integration.py @@ -51,7 +51,7 @@ async def test_engine_opens_and_closes_mcp_sessions(monkeypatch): assert sess.available is True assert len(sess.function_schemas()) == 2 finally: - await engine._close_mcp_sessions() + await engine.close_mcp_sessions() assert engine._mcp_sessions == {} diff --git a/api/tests/test_pipecat_engine_context_update.py b/api/tests/test_pipecat_engine_context_update.py index 9235b22..b73c65b 100644 --- a/api/tests/test_pipecat_engine_context_update.py +++ b/api/tests/test_pipecat_engine_context_update.py @@ -20,8 +20,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -30,6 +29,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import ( @@ -116,7 +116,7 @@ async def run_pipeline_and_capture_context( ) # Create pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -131,10 +131,9 @@ async def run_pipeline_and_capture_context( new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_end_call.py b/api/tests/test_pipecat_engine_end_call.py index 523ad54..bc4fea8 100644 --- a/api/tests/test_pipecat_engine_end_call.py +++ b/api/tests/test_pipecat_engine_end_call.py @@ -25,8 +25,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import Frame, LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -42,6 +41,7 @@ from pipecat.turns.user_mute import ( from pipecat.utils.enums import EndTaskReason from api.enums import ToolCategory +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.dto import ( EdgeDataDTO, EndCallNodeData, @@ -112,7 +112,7 @@ async def create_engine_with_tracking( mock_llm: MockLLMService, test_helper: EndCallTestHelper, generate_audio: bool = True, -) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineTask]: +) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineWorker]: """Create a PipecatEngine with tracking for end call behavior. Args: @@ -222,7 +222,7 @@ async def create_engine_with_tracking( ) # Create pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -279,10 +279,9 @@ class TestEndCallViaNodeTransition: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -383,10 +382,9 @@ class TestEndCallViaNodeTransition: new_callable=AsyncMock, return_value={"greeting_type": "formal", "user_name": "John"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -482,10 +480,9 @@ class TestEndCallViaCustomTool: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -574,10 +571,9 @@ class TestEndCallViaCustomTool: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -652,10 +648,9 @@ class TestEndCallViaClientDisconnect: new_callable=AsyncMock, return_value={"user_intent": "disconnected"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_disconnect(): await asyncio.sleep(0.01) @@ -743,10 +738,9 @@ class TestEndCallRaceConditions: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_race(): await asyncio.sleep(0.01) @@ -855,10 +849,9 @@ class TestEndCallRaceConditions: new_callable=AsyncMock, return_value={"user_intent": "end"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_race_disconnect(): nonlocal disconnect_called @@ -950,10 +943,9 @@ class TestEndCallExtractionBehavior: "_perform_extraction", side_effect=mock_extraction, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end(): await asyncio.sleep(0.01) @@ -1076,10 +1068,9 @@ class TestEndCallExtractionBehavior: "_perform_extraction", extraction_mock, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py index 82a6f55..d815a69 100644 --- a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py +++ b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py @@ -24,8 +24,7 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -48,6 +47,7 @@ from pipecat.turns.user_stop import ( from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService @@ -119,7 +119,7 @@ async def create_test_pipeline( workflow: WorkflowGraph, mock_llm: MockLLMService, user_speech_initial_delay: float = 0.01, -) -> tuple[PipecatEngine, MockTransport, PipelineTask]: +) -> tuple[PipecatEngine, MockTransport, PipelineWorker]: """Create a PipecatEngine with full pipeline for testing node switch scenarios. The pipeline includes a UserSpeechInjector processor that injects @@ -208,7 +208,7 @@ async def create_test_pipeline( ) # Create pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -286,10 +286,9 @@ class TestNodeSwitchWithUserSpeech: new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_tool_calls.py b/api/tests/test_pipecat_engine_tool_calls.py index ec04b49..5c71c09 100644 --- a/api/tests/test_pipecat_engine_tool_calls.py +++ b/api/tests/test_pipecat_engine_tool_calls.py @@ -11,8 +11,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -21,6 +20,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import END_CALL_SYSTEM_PROMPT @@ -107,7 +107,7 @@ async def run_pipeline_with_tool_calls( ) # Create a real pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -122,10 +122,9 @@ async def run_pipeline_with_tool_calls( new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): # Small delay to let runner start diff --git a/api/tests/test_pipecat_engine_transition_mute.py b/api/tests/test_pipecat_engine_transition_mute.py index 9ce0271..1bb3777 100644 --- a/api/tests/test_pipecat_engine_transition_mute.py +++ b/api/tests/test_pipecat_engine_transition_mute.py @@ -15,8 +15,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -31,6 +30,7 @@ from pipecat.turns.user_mute import ( MuteUntilFirstBotCompleteUserMuteStrategy, ) +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -99,7 +99,7 @@ async def _build_engine_and_pipeline( ] ) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) return engine, task, function_call_mute_strategy, user_context_aggregator @@ -182,10 +182,9 @@ class TestTransitionFunctionMutesUser: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) @@ -257,10 +256,9 @@ class TestTransitionFunctionMutesUser: new_callable=AsyncMock, return_value={"user_intent": "end call"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipecat_engine_variable_extraction.py b/api/tests/test_pipecat_engine_variable_extraction.py index 823592c..9adfd86 100644 --- a/api/tests/test_pipecat_engine_variable_extraction.py +++ b/api/tests/test_pipecat_engine_variable_extraction.py @@ -18,8 +18,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -28,6 +27,7 @@ from pipecat.processors.aggregators.llm_response_universal import ( from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -142,7 +142,7 @@ class TestVariableExtractionDuringTransitions: ) # Create pipeline task - task = PipelineTask( + task = PipelineWorker( pipeline, params=PipelineParams(), enable_rtvi=False, @@ -168,10 +168,9 @@ class TestVariableExtractionDuringTransitions: new_callable=AsyncMock, return_value={"user_name": "John Doe"}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_pipeline_cancellation.py b/api/tests/test_pipeline_cancellation.py index 6ef0490..67a339f 100644 --- a/api/tests/test_pipeline_cancellation.py +++ b/api/tests/test_pipeline_cancellation.py @@ -8,11 +8,12 @@ from pipecat.frames.frames import ( InterruptionTaskFrame, LLMRunFrame, ) -from pipecat.pipeline.base_task import PipelineTaskParams from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.worker import PipelineWorker from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from api.services.pipecat.worker_runner import run_pipeline_worker + class MockTransport(FrameProcessor): def __init__(self, **kwargs): @@ -51,12 +52,10 @@ async def test_interruption_with_blocked_end_frame(): transport = MockTransport() pipeline = Pipeline([transport, busy_wait_processor]) - task = PipelineTask(pipeline, enable_rtvi=False) + task = PipelineWorker(pipeline, enable_rtvi=False) async def run_pipeline(): - loop = asyncio.get_running_loop() - params = PipelineTaskParams(loop=loop) - await task.run(params=params) + await run_pipeline_worker(task) async def queue_frame(): await task.queue_frames([LLMRunFrame()]) diff --git a/api/tests/test_text_and_audio_playback.py b/api/tests/test_text_and_audio_playback.py index 3c35af2..b46dc21 100644 --- a/api/tests/test_text_and_audio_playback.py +++ b/api/tests/test_text_and_audio_playback.py @@ -20,8 +20,7 @@ from pipecat.frames.frames import ( TTSStoppedFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -31,6 +30,7 @@ from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams from api.services.pipecat.recording_audio_cache import RecordingAudio +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.dto import ( EdgeDataDTO, EndCallNodeData, @@ -212,7 +212,7 @@ async def run_pipeline_and_capture_frames( engine.set_transport_output(transport_output) pipeline = Pipeline([llm, tts, transport_output, context_aggregator.assistant()]) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) # Spy on task.queue_frame and transport_output.queue_frame to capture @@ -247,10 +247,9 @@ async def run_pipeline_and_capture_frames( return_value="completed", ), ): - runner = PipelineRunner() async def run(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize(): await asyncio.sleep(0.01) diff --git a/api/tests/test_tts_endframe_with_audio_write_failure.py b/api/tests/test_tts_endframe_with_audio_write_failure.py index 4914f9b..cc34a79 100644 --- a/api/tests/test_tts_endframe_with_audio_write_failure.py +++ b/api/tests/test_tts_endframe_with_audio_write_failure.py @@ -34,8 +34,7 @@ from unittest.mock import AsyncMock, patch import pytest from pipecat.frames.frames import LLMContextFrame from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -50,6 +49,7 @@ from pipecat.turns.user_mute import ( ) from pipecat.utils.enums import EndTaskReason +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -62,7 +62,7 @@ async def create_test_pipeline_with_failing_transport( workflow: WorkflowGraph, mock_llm: MockLLMService, fail_after_n_frames: int = 0, -) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineTask]: +) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineWorker]: """Create a PipecatEngine with failing output transport for testing. Uses the real MockTransport which now extends BaseOutputTransport and uses @@ -152,7 +152,7 @@ async def create_test_pipeline_with_failing_transport( ) # Create pipeline task - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) @@ -219,10 +219,9 @@ class TestTTSPauseWithAudioWriteFailure: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_end_call(): await asyncio.sleep(0.01) @@ -339,10 +338,9 @@ class TestTTSPauseWithAudioWriteFailure: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_and_observe(): await asyncio.sleep(0.01) diff --git a/api/tests/test_unregistered_function_call.py b/api/tests/test_unregistered_function_call.py index 5229b64..a5f31a0 100644 --- a/api/tests/test_unregistered_function_call.py +++ b/api/tests/test_unregistered_function_call.py @@ -9,6 +9,7 @@ from pipecat.frames.frames import ( LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, + UserTurnInferenceCompletedFrame, ) from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.aggregators.llm_context import LLMContext @@ -45,6 +46,7 @@ class TestUnregisteredFunctionCall: expected_down_frames=[ LLMFullResponseStartFrame, FunctionCallsFromLLMInfoFrame, + UserTurnInferenceCompletedFrame, FunctionCallsStartedFrame, LLMFullResponseEndFrame, FunctionCallInProgressFrame, diff --git a/api/tests/test_user_idle_handler.py b/api/tests/test_user_idle_handler.py index 77dfb2c..5e3b622 100644 --- a/api/tests/test_user_idle_handler.py +++ b/api/tests/test_user_idle_handler.py @@ -23,8 +23,7 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -43,6 +42,7 @@ from pipecat.turns.user_stop import ExternalUserTurnStopStrategy from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService @@ -100,7 +100,7 @@ async def create_pipeline_with_speech_injection( speeches: list[str], user_idle_timeout: float = 0.2, mock_audio_duration_ms: int = 400, -) -> tuple[PipecatEngine, PipelineTask, object]: +) -> tuple[PipecatEngine, PipelineWorker, object]: """Create a pipeline with user speech injection and idle handling. Sets up a realistic pipeline with: @@ -194,7 +194,7 @@ async def create_pipeline_with_speech_injection( ] ) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) return engine, task, user_idle_handler @@ -266,10 +266,9 @@ class TestUserIdleHandler: new_callable=AsyncMock, return_value="completed", ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def initialize_engine(): await asyncio.sleep(0.01) diff --git a/api/tests/test_user_muting_during_bot_speech.py b/api/tests/test_user_muting_during_bot_speech.py index ac04299..6a6acf6 100644 --- a/api/tests/test_user_muting_during_bot_speech.py +++ b/api/tests/test_user_muting_during_bot_speech.py @@ -25,8 +25,7 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -44,6 +43,7 @@ from pipecat.turns.user_mute import ( from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, @@ -125,7 +125,7 @@ async def create_engine_for_mute_test( PipecatEngine, MockTTSService, MockTransport, - PipelineTask, + PipelineWorker, LLMUserAggregator, BotSpeakingObserverProcessor, ]: @@ -196,7 +196,7 @@ async def create_engine_for_mute_test( ] ) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) engine.set_task(task) return engine, tts, mock_transport, task, user_context_aggregator, observer @@ -258,10 +258,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) @@ -349,10 +348,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) @@ -445,10 +443,9 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value={}, ): - runner = PipelineRunner() async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def run_test(): await asyncio.sleep(0.01) diff --git a/api/tests/test_voicemail_detector.py b/api/tests/test_voicemail_detector.py index 0677c29..c9084fd 100644 --- a/api/tests/test_voicemail_detector.py +++ b/api/tests/test_voicemail_detector.py @@ -17,8 +17,7 @@ from pipecat.frames.frames import ( UserStoppedSpeakingFrame, ) from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.worker import PipelineParams, PipelineWorker from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMAssistantAggregatorParams, @@ -36,6 +35,7 @@ from pipecat.turns.user_stop import ( from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 +from api.services.pipecat.worker_runner import run_pipeline_worker from pipecat.tests import MockLLMService @@ -161,11 +161,10 @@ class TestVoicemailDetectorWithUserAggregator: ] ) - task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) - runner = PipelineRunner() + task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False) async def run_pipeline(): - await runner.run(task) + await run_pipeline_worker(task) async def inject_frames(): await asyncio.sleep(0.05) diff --git a/pipecat b/pipecat index a845887..b0ac013 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit a8458879e6c950007b0753a33652f744fca5cdb5 +Subproject commit b0ac013a08cf74131a93afc5213af6b4802e5871 From 8f10bcade32079af126e4e9d83061cd30936fcad Mon Sep 17 00:00:00 2001 From: Abhishek Date: Fri, 29 May 2026 17:07:58 +0000 Subject: [PATCH 02/14] fix: store channel id in gathered context for ARI outbound --- api/services/telephony/ari_manager.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index a9537ba..a10c05d 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -657,9 +657,17 @@ class ARIConnection: await self._mark_ext_channel(ext_channel_id) await self._set_channel_run(ext_channel_id, workflow_run_id) await self._set_pending_bridge(ext_channel_id, channel_id, workflow_run_id) + # Persist the caller channel id as call_id. Inbound runs already + # set this in create_workflow_run, but outbound runs never do, so + # without this the serializer hangup (provider reads + # gathered_context["call_id"]) and the StasisEnd teardown both get + # an empty channel id and fail to hang up the live caller channel. await db_client.update_workflow_run( run_id=int(workflow_run_id), - gathered_context={"ext_channel_id": ext_channel_id}, + gathered_context={ + "ext_channel_id": ext_channel_id, + "call_id": channel_id, + }, ) # 3. Create the ext media channel with the id we just registered. From ba342b66a76f3bf37c768f2d0010887562662a95 Mon Sep 17 00:00:00 2001 From: PK Date: Sun, 31 May 2026 09:18:33 +0530 Subject: [PATCH 03/14] Update README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 0a8a955..5b3c90b 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Self-host in 60s   - + Join Slack

@@ -149,7 +149,7 @@ You can go to [https://docs.dograh.com](https://docs.dograh.com/) for our docume - **GitHub Discussions** — share use cases, ask questions, swap workflow recipes. - **GitHub Issues** — report bugs or request features. -👉 Join us → [Dograh Community Slack](https://join.slack.com/t/dograh-community/shared_invite/zt-3czr47sw5-MSg1J0kJ7IMPOCHF~03auQ) +👉 Join us → [Dograh Community Slack](https://join.slack.com/t/dograh-community/shared_invite/zt-3zjb5vwvl-j7hRz3_F1SOn5cH~jm5f5g) ## 🙌 Contributing @@ -183,5 +183,5 @@ Founded by YC alumni and exit founders committed to keeping voice AI open and ac

⭐ Star us on GitHub | ☁️ Try Cloud Version | - 💬 Join Slack + 💬 Join Slack

From 78ba62e18558bb6d5407810807301cc611773d42 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Sun, 31 May 2026 13:05:22 +0530 Subject: [PATCH 04/14] feat: banner if API is not reachable --- CONTRIBUTING.md | 6 +- README.zh-CN.md | 6 +- api/services/workflow/dto.py | 3 +- docs/deployment/heroku.mdx | 2 +- docs/integrations/overview.mdx | 2 +- pipecat | 2 +- ui/src/app/api/config/version/route.ts | 48 +++++++++++-- .../components/EndCallToolConfig.tsx | 9 +-- .../components/HttpApiToolConfig.tsx | 9 +-- .../components/TransferCallToolConfig.tsx | 8 +-- ui/src/components/flow/TextOrAudioInput.tsx | 21 +++++- ui/src/components/flow/edges/CustomEdge.tsx | 7 +- ui/src/components/layout/AppLayout.tsx | 47 ++++++++++++- ui/src/context/AppConfigContext.tsx | 70 +++++++++++++------ ui/src/lib/apiClient.ts | 6 +- 15 files changed, 181 insertions(+), 65 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 631f446..131db75 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -4,7 +4,7 @@ Welcome to Dograh AI! ❤️ Thank you for your interest in contributing to the Dograh AI is a comprehensive voice agent platform that helps developers build, test, and deploy conversational AI systems with minimal setup. This guide will help you understand the project structure, set up your development environment, and start contributing effectively. -👉 Join our community → [Dograh Community Slack](https://join.slack.com/t/dograh-community/shared_invite/zt-3czr47sw5-MSg1J0kJ7IMPOCHF~03auQ) +👉 Join our community → [Dograh Community Slack](https://join.slack.com/t/dograh-community/shared_invite/zt-3zjb5vwvl-j7hRz3_F1SOn5cH~jm5f5g) ## 🏗️ Project Overview @@ -40,7 +40,7 @@ Please refer to our [Development Setup documentation](https://docs.dograh.com/co **Before You Start** - Check existing [GitHub Issues](../../issues) for similar work -- Join our [Slack community](https://join.slack.com/t/dograh-community/shared_invite/zt-3czr47sw5-MSg1J0kJ7IMPOCHF~03auQ) to discuss your plans +- Join our [Slack community](https://join.slack.com/t/dograh-community/shared_invite/zt-3zjb5vwvl-j7hRz3_F1SOn5cH~jm5f5g) to discuss your plans - Look for issues tagged `good first issue` for beginner-friendly tasks **During Development** @@ -58,6 +58,6 @@ Our Slack community is the heart of Dograh AI development: - **Connect**: Meet other contributors and maintainers - **Stay Updated**: Learn about contribution opportunities and releases -👉 **Join us**: [Dograh Community Slack](https://join.slack.com/t/dograh-community/shared_invite/zt-3czr47sw5-MSg1J0kJ7IMPOCHF~03auQ) +👉 **Join us**: [Dograh Community Slack](https://join.slack.com/t/dograh-community/shared_invite/zt-3zjb5vwvl-j7hRz3_F1SOn5cH~jm5f5g) Thank you for helping us keep voice AI open and accessible! 🎉 diff --git a/README.zh-CN.md b/README.zh-CN.md index 0ab8448..3b2af0d 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -15,7 +15,7 @@ 60 秒自托管   - + 加入 Slack

@@ -144,7 +144,7 @@ curl -o docker-compose.yaml https://raw.githubusercontent.com/dograh-hq/dograh/m - **GitHub Discussions** —— 分享使用场景、提问、交流工作流配方。 - **GitHub Issues** —— 报告 bug 或提交功能请求。 -👉 加入我们 → [Dograh 社区 Slack](https://join.slack.com/t/dograh-community/shared_invite/zt-3czr47sw5-MSg1J0kJ7IMPOCHF~03auQ) +👉 加入我们 → [Dograh 社区 Slack](https://join.slack.com/t/dograh-community/shared_invite/zt-3zjb5vwvl-j7hRz3_F1SOn5cH~jm5f5g) ## 🙌 参与贡献 @@ -178,5 +178,5 @@ Dograh AI 基于 [BSD 2-Clause 协议](LICENSE)开源 —— 与构建 Dograh AI

⭐ 给我们一个 Star | ☁️ 试用云端版本 | - 💬 加入 Slack + 💬 加入 Slack

diff --git a/api/services/workflow/dto.py b/api/services/workflow/dto.py index c22f804..60aad75 100644 --- a/api/services/workflow/dto.py +++ b/api/services/workflow/dto.py @@ -244,7 +244,8 @@ class _ToolDocumentRefsMixin(BaseModel): "display_name": "Greeting Text", "description": ( "Text spoken via TTS at the start of the call. Supports " - "{{template_variables}}. Leave empty to skip the greeting." + "{{template_variables}}. Leave empty to skip the greeting. " + "Not supported with realtime (speech-to-speech) models." ), "display_options": DisplayOptions(show={"greeting_type": ["text"]}), "placeholder": "Hi {{first_name}}, this is Sarah from Acme.", diff --git a/docs/deployment/heroku.mdx b/docs/deployment/heroku.mdx index 063e75d..f91721e 100644 --- a/docs/deployment/heroku.mdx +++ b/docs/deployment/heroku.mdx @@ -41,4 +41,4 @@ One-click Heroku deployment is in development. This will include: - Environment variable configuration guide - Scaling and monitoring instructions -For updates on Heroku deployment availability, please [join our Slack community](https://join.slack.com/t/dograh-community/shared_invite/zt-3czr47sw5-MSg1J0kJ7IMPOCHF~03auQ) or watch our GitHub repository for announcements. \ No newline at end of file +For updates on Heroku deployment availability, please [join our Slack community](https://join.slack.com/t/dograh-community/shared_invite/zt-3zjb5vwvl-j7hRz3_F1SOn5cH~jm5f5g) or watch our GitHub repository for announcements. \ No newline at end of file diff --git a/docs/integrations/overview.mdx b/docs/integrations/overview.mdx index 67696b2..35d7abb 100644 --- a/docs/integrations/overview.mdx +++ b/docs/integrations/overview.mdx @@ -47,4 +47,4 @@ Our integration system follows these core principles: - Check provider-specific documentation for detailed setup instructions - Visit our [GitHub Issues](https://github.com/dograh-hq/dograh/issues) for community support -- Join our [Slack community](https://join.slack.com/t/dograh-community/shared_invite/zt-3czr47sw5-MSg1J0kJ7IMPOCHF~03auQ) for assistance \ No newline at end of file +- Join our [Slack community](https://join.slack.com/t/dograh-community/shared_invite/zt-3zjb5vwvl-j7hRz3_F1SOn5cH~jm5f5g) for assistance \ No newline at end of file diff --git a/pipecat b/pipecat index b0ac013..6e410e0 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit b0ac013a08cf74131a93afc5213af6b4802e5871 +Subproject commit 6e410e06cc9f71fbadfb3850f87848d2288d6651 diff --git a/ui/src/app/api/config/version/route.ts b/ui/src/app/api/config/version/route.ts index 37609df..05900db 100644 --- a/ui/src/app/api/config/version/route.ts +++ b/ui/src/app/api/config/version/route.ts @@ -1,32 +1,64 @@ import { NextResponse } from "next/server"; -import { healthApiV1HealthGet } from "@/client/sdk.gen"; import type { HealthResponse } from "@/client/types.gen"; +import { getServerBackendUrl } from "@/lib/apiClient"; // Import version from package.json at build time import packageJson from "../../../../../package.json"; +const HEALTHCHECK_TIMEOUT_MS = 3000; + +function trimTrailingSlash(url: string) { + return url.endsWith("/") ? url.slice(0, -1) : url; +} + +function getHealthcheckFailureMessage(error: unknown, backendUrl: string) { + const errorName = + error && typeof error === "object" && "name" in error + ? String((error as { name?: unknown }).name) + : ""; + + if (errorName === "AbortError" || errorName === "TimeoutError") { + return `Backend health check timed out after ${HEALTHCHECK_TIMEOUT_MS}ms while trying to reach ${backendUrl}.`; + } + + return `Backend is not reachable at ${backendUrl}.`; +} + export async function GET() { const uiVersion = packageJson.version || "dev"; + const backendUrl = trimTrailingSlash(getServerBackendUrl()); + const healthcheckUrl = `${backendUrl}/api/v1/health`; let apiVersion = "unknown"; let deploymentMode = "oss"; let authProvider = "local"; let turnEnabled = false; let forceTurnRelay = false; + let backendStatus: "reachable" | "unreachable" = "unreachable"; + let backendMessage: string | null = `Backend is not reachable at ${backendUrl}.`; try { - const response = await healthApiV1HealthGet(); - if (response.data) { - const data = response.data as HealthResponse; + const response = await fetch(healthcheckUrl, { + cache: "no-store", + signal: AbortSignal.timeout(HEALTHCHECK_TIMEOUT_MS), + }); + + if (!response.ok) { + backendMessage = `Backend health check at ${healthcheckUrl} returned HTTP ${response.status}.`; + } else { + const data = (await response.json()) as HealthResponse; apiVersion = data.version; deploymentMode = data.deployment_mode; authProvider = data.auth_provider; turnEnabled = Boolean(data.turn_enabled); forceTurnRelay = Boolean(data.force_turn_relay); + backendStatus = "reachable"; + backendMessage = null; } - } catch { + } catch (error) { apiVersion = "unavailable"; + backendMessage = getHealthcheckFailureMessage(error, backendUrl); } return NextResponse.json({ @@ -36,5 +68,11 @@ export async function GET() { authProvider, turnEnabled, forceTurnRelay, + backend: { + status: backendStatus, + url: backendUrl, + healthcheckUrl, + message: backendMessage, + }, }); } diff --git a/ui/src/app/tools/[toolUuid]/components/EndCallToolConfig.tsx b/ui/src/app/tools/[toolUuid]/components/EndCallToolConfig.tsx index a3717ca..d5c9b23 100644 --- a/ui/src/app/tools/[toolUuid]/components/EndCallToolConfig.tsx +++ b/ui/src/app/tools/[toolUuid]/components/EndCallToolConfig.tsx @@ -1,9 +1,7 @@ "use client"; -import { AlertCircle } from "lucide-react"; - import type { RecordingResponseSchema } from "@/client/types.gen"; -import { RecordingSelect } from "@/components/flow/TextOrAudioInput"; +import { RecordingSelect, StaticTextWarning } from "@/components/flow/TextOrAudioInput"; import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; @@ -144,10 +142,7 @@ export function EndCallToolConfig({ {messageType === "custom" && (
-
- - This text is spoken as-is. For multilingual workflows, choose your phrasing carefully. -
+