fix: fix anyio same task cancellation scope

This commit is contained in:
Abhishek Kumar 2026-05-29 16:17:09 +05:30
parent 3194ff1883
commit 97957db201
4 changed files with 31 additions and 13 deletions

View file

@ -826,5 +826,10 @@ async def _run_pipeline(
except asyncio.CancelledError:
logger.warning("Received CancelledError in _run_pipeline")
finally:
# Close MCP sessions here, not in engine.cleanup(). The anyio cancel
# scopes opened by MCPClient.start() in engine.initialize() are
# task-affine; this finally runs in the same task as initialize(),
# whereas engine.cleanup() runs in a pipecat event-handler task.
await engine.close_mcp_sessions()
await feedback_observer.cleanup()
logger.debug(f"Cleaned up context providers for workflow run {workflow_run_id}")

View file

@ -79,8 +79,12 @@ class McpToolSession:
self.available: bool = False
async def start(self) -> None:
"""Connect, initialize, and cache the tool list. Never raises —
on any failure the session is marked unavailable."""
"""Connect, initialize, and cache the tool list.
Never raises on a connect failure a dead/unreachable MCP server
leaves the session marked unavailable (``available = False``). Genuine
external cancellation, KeyboardInterrupt, and SystemExit are re-raised
(see the CancelledError handling below and ``_degrade``)."""
try:
params = build_streamable_http_params(
url=self._url,

View file

@ -955,7 +955,15 @@ class PipecatEngine:
exc_info=True,
)
async def _close_mcp_sessions(self) -> None:
async def close_mcp_sessions(self) -> None:
"""Close all open MCP tool sessions.
Must run in the same task that ran initialize() (which opened the
sessions via _open_mcp_sessions). The MCP client's underlying anyio
cancel scopes are task-affine they must be exited from the task that
entered them so this is invoked from _run_pipeline's finally, not
from cleanup() (which runs in a pipecat event-handler task).
"""
for tool_uuid, session in list(self._mcp_sessions.items()):
try:
await session.close()
@ -964,7 +972,14 @@ class PipecatEngine:
self._mcp_sessions = {}
async def cleanup(self):
"""Clean up engine resources on disconnect."""
"""Clean up engine resources on disconnect.
MCP tool sessions are intentionally NOT closed here see
close_mcp_sessions(). This method runs in a pipecat event-handler task
(on_pipeline_finished), a different task than the one that opened the
MCP sessions; closing them here raises "Attempted to exit cancel scope
in a different task than it was entered in".
"""
# Cancel any pending timeout tasks
if (
self._user_response_timeout_task
@ -973,11 +988,5 @@ class PipecatEngine:
self._user_response_timeout_task.cancel()
# Cancel any in-flight background summarization.
# MCP sessions are closed in a finally block so they are guaranteed to
# run even if the summarization cleanup raises an exception.
try:
if self._context_summarization_manager:
await self._context_summarization_manager.cleanup()
finally:
# Close any open MCP tool sessions
await self._close_mcp_sessions()
if self._context_summarization_manager:
await self._context_summarization_manager.cleanup()

View file

@ -51,7 +51,7 @@ async def test_engine_opens_and_closes_mcp_sessions(monkeypatch):
assert sess.available is True
assert len(sess.function_schemas()) == 2
finally:
await engine._close_mcp_sessions()
await engine.close_mcp_sessions()
assert engine._mcp_sessions == {}