diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index f6ffce4..be71b86 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -826,5 +826,10 @@ async def _run_pipeline( except asyncio.CancelledError: logger.warning("Received CancelledError in _run_pipeline") finally: + # Close MCP sessions here, not in engine.cleanup(). The anyio cancel + # scopes opened by MCPClient.start() in engine.initialize() are + # task-affine; this finally runs in the same task as initialize(), + # whereas engine.cleanup() runs in a pipecat event-handler task. + await engine.close_mcp_sessions() await feedback_observer.cleanup() logger.debug(f"Cleaned up context providers for workflow run {workflow_run_id}") diff --git a/api/services/workflow/mcp_tool_session.py b/api/services/workflow/mcp_tool_session.py index 0caa1b7..a25d2bd 100644 --- a/api/services/workflow/mcp_tool_session.py +++ b/api/services/workflow/mcp_tool_session.py @@ -79,8 +79,12 @@ class McpToolSession: self.available: bool = False async def start(self) -> None: - """Connect, initialize, and cache the tool list. Never raises — - on any failure the session is marked unavailable.""" + """Connect, initialize, and cache the tool list. + + Never raises on a connect failure — a dead/unreachable MCP server + leaves the session marked unavailable (``available = False``). Genuine + external cancellation, KeyboardInterrupt, and SystemExit are re-raised + (see the CancelledError handling below and ``_degrade``).""" try: params = build_streamable_http_params( url=self._url, diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index 27827e2..d72c3f4 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -955,7 +955,15 @@ class PipecatEngine: exc_info=True, ) - async def _close_mcp_sessions(self) -> None: + async def close_mcp_sessions(self) -> None: + """Close all open MCP tool sessions. + + Must run in the same task that ran initialize() (which opened the + sessions via _open_mcp_sessions). The MCP client's underlying anyio + cancel scopes are task-affine — they must be exited from the task that + entered them — so this is invoked from _run_pipeline's finally, not + from cleanup() (which runs in a pipecat event-handler task). + """ for tool_uuid, session in list(self._mcp_sessions.items()): try: await session.close() @@ -964,7 +972,14 @@ class PipecatEngine: self._mcp_sessions = {} async def cleanup(self): - """Clean up engine resources on disconnect.""" + """Clean up engine resources on disconnect. + + MCP tool sessions are intentionally NOT closed here — see + close_mcp_sessions(). This method runs in a pipecat event-handler task + (on_pipeline_finished), a different task than the one that opened the + MCP sessions; closing them here raises "Attempted to exit cancel scope + in a different task than it was entered in". + """ # Cancel any pending timeout tasks if ( self._user_response_timeout_task @@ -973,11 +988,5 @@ class PipecatEngine: self._user_response_timeout_task.cancel() # Cancel any in-flight background summarization. - # MCP sessions are closed in a finally block so they are guaranteed to - # run even if the summarization cleanup raises an exception. - try: - if self._context_summarization_manager: - await self._context_summarization_manager.cleanup() - finally: - # Close any open MCP tool sessions - await self._close_mcp_sessions() + if self._context_summarization_manager: + await self._context_summarization_manager.cleanup() diff --git a/api/tests/test_mcp_integration.py b/api/tests/test_mcp_integration.py index 4cf01f0..095a0d1 100644 --- a/api/tests/test_mcp_integration.py +++ b/api/tests/test_mcp_integration.py @@ -51,7 +51,7 @@ async def test_engine_opens_and_closes_mcp_sessions(monkeypatch): assert sess.available is True assert len(sess.function_schemas()) == 2 finally: - await engine._close_mcp_sessions() + await engine.close_mcp_sessions() assert engine._mcp_sessions == {}