diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/compile_graph_sync.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/compile_graph_sync.py index 309d37a84..89d950c54 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/compile_graph_sync.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/compile_graph_sync.py @@ -41,6 +41,7 @@ def build_compiled_agent_graph_sync( checkpointer: Checkpointer, subagent_dependencies: dict[str, Any], mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None, + disabled_tools: list[str] | None = None, ): """Sync compile: middleware + ``create_agent`` (run via ``asyncio.to_thread``).""" main_agent_middleware = build_main_agent_deepagent_middleware( @@ -61,6 +62,7 @@ def build_compiled_agent_graph_sync( subagent_dependencies=subagent_dependencies, checkpointer=checkpointer, mcp_tools_by_agent=mcp_tools_by_agent, + disabled_tools=disabled_tools, ) agent = create_agent( diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/config.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/config.py index 0312a2da5..0d4a3e4e2 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/config.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/config.py @@ -26,10 +26,15 @@ def subagent_invoke_config(runtime: ToolRuntime) -> dict[str, Any]: return merged -def extract_surfsense_resume(runtime: ToolRuntime) -> Any: - """Resume payload stashed by ``stream_resume_chat``; ``None`` on a first-time call.""" +def consume_surfsense_resume(runtime: ToolRuntime) -> Any: + """Pop the resume payload so only the first matching subagent applies it. + + Sibling/nested ``task`` calls in the same parent run share the same + ``configurable`` dict by reference; leaving the value would replay decisions + onto unrelated subagent interrupts. + """ cfg = runtime.config or {} configurable = cfg.get("configurable") if isinstance(cfg, dict) else None if not isinstance(configurable, dict): return None - return configurable.get("surfsense_resume_value") + return configurable.pop("surfsense_resume_value", None) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/task_tool.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/task_tool.py index 15145b1b8..57e01d791 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/task_tool.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/task_tool.py @@ -18,7 +18,7 @@ from langchain_core.runnables import Runnable from langchain_core.tools import StructuredTool from langgraph.types import Command -from .config import extract_surfsense_resume, subagent_invoke_config +from .config import consume_surfsense_resume, subagent_invoke_config from .constants import EXCLUDED_STATE_KEYS from .propagation import ( amaybe_propagate_subagent_interrupt, @@ -123,7 +123,7 @@ def build_task_tool_with_parent_config( ) if pending_value is not None: - resume_value = extract_surfsense_resume(runtime) + resume_value = consume_surfsense_resume(runtime) if resume_value is not None: expected = hitlrequest_action_count(pending_value) resume_value = fan_out_decisions_to_match(resume_value, expected) @@ -189,7 +189,7 @@ def build_task_tool_with_parent_config( ) if pending_value is not None: - resume_value = extract_surfsense_resume(runtime) + resume_value = consume_surfsense_resume(runtime) if resume_value is not None: expected = hitlrequest_action_count(pending_value) resume_value = fan_out_decisions_to_match(resume_value, expected) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/deepagent_stack.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/deepagent_stack.py index 31d4dbd40..8dcac512c 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/deepagent_stack.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/deepagent_stack.py @@ -88,6 +88,7 @@ def build_main_agent_deepagent_middleware( subagent_dependencies: dict[str, Any], checkpointer: Checkpointer, mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None, + disabled_tools: list[str] | None = None, ) -> list[Any]: """Build ordered middleware for ``create_agent`` (Nones already stripped).""" _memory_middleware = MemoryInjectionMiddleware( @@ -158,6 +159,42 @@ def build_main_agent_deepagent_middleware( if gp_interrupt_on: general_purpose_spec["interrupt_on"] = gp_interrupt_on + # ``deny`` rules must apply on every tool call, including those emitted + # from ``task`` runs that never reach the parent's ``PermissionMiddleware``. + # Stripping ``allow``/``ask`` keeps the bucket-based ask gates (per-tool + # ``interrupt_on`` for ``mcp`` rows + ``request_approval`` in native tool + # bodies) as the single ask path — no double-prompt — and ensures the + # ``runtime_ruleset`` mutation in ``_persist_always`` is unreachable, so a + # shared instance across subagents stays read-only. + subagent_deny_rulesets: list[Ruleset] = [ + Ruleset( + rules=[r for r in rs.rules if r.action == "deny"], + origin=rs.origin, + ) + for rs in permission_rulesets + ] + subagent_deny_rulesets = [rs for rs in subagent_deny_rulesets if rs.rules] + + subagent_deny_permission_mw: PermissionMiddleware | None = ( + PermissionMiddleware(rulesets=subagent_deny_rulesets) + if subagent_deny_rulesets + else None + ) + + if subagent_deny_permission_mw is not None: + # Match new_chat ordering: deny check runs on already-repaired tool + # calls. Insert just before ``PatchToolCallsMiddleware`` (and fall back + # to append if the slot moves). + _patch_idx = next( + ( + i + for i, m in enumerate(gp_middleware) + if isinstance(m, PatchToolCallsMiddleware) + ), + len(gp_middleware), + ) + gp_middleware.insert(_patch_idx, subagent_deny_permission_mw) + registry_subagents: list[SubAgent] = [] try: subagent_extra_middleware: list[Any] = [ @@ -170,12 +207,15 @@ def build_main_agent_deepagent_middleware( thread_id=thread_id, ), ] + if subagent_deny_permission_mw is not None: + subagent_extra_middleware.append(subagent_deny_permission_mw) registry_subagents = build_subagents( dependencies=subagent_dependencies, model=llm, extra_middleware=subagent_extra_middleware, mcp_tools_by_agent=mcp_tools_by_agent or {}, exclude=get_subagents_to_exclude(available_connectors), + disabled_tools=disabled_tools, ) logging.info( "Registry subagents: %s", diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/runtime/factory.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/runtime/factory.py index 72e09edab..13d570832 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/runtime/factory.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/runtime/factory.py @@ -212,6 +212,7 @@ async def create_surfsense_deep_agent( checkpointer=checkpointer, subagent_dependencies=dependencies, mcp_tools_by_agent=mcp_tools_by_agent, + disabled_tools=disabled_tools, ) _perf_log.info( "[create_agent] Middleware stack + graph compiled in %.3fs", diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/registry.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/registry.py index 85e23de84..6e2859b0f 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/registry.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/registry.py @@ -145,6 +145,30 @@ def get_subagents_to_exclude( return sorted(excluded_names) +def _filter_disabled_tools_in_place( + spec: SubAgent, + disabled_names: frozenset[str], +) -> None: + """Drop UI-disabled tools from ``spec["tools"]`` and ``spec["interrupt_on"]``. + + Single funnel for both native (loaded by the route's ``load_tools``) and MCP + (passed via ``extra_tools_bucket``) — by post-processing the packed spec we + avoid touching every per-route ``build_subagent``. + """ + if not disabled_names: + return + tools = spec.get("tools") # type: ignore[typeddict-item] + if isinstance(tools, list): + spec["tools"] = [ # type: ignore[typeddict-unknown-key] + t for t in tools if getattr(t, "name", None) not in disabled_names + ] + interrupt_on = spec.get("interrupt_on") # type: ignore[typeddict-item] + if isinstance(interrupt_on, dict): + spec["interrupt_on"] = { # type: ignore[typeddict-unknown-key] + k: v for k, v in interrupt_on.items() if k not in disabled_names + } + + def build_subagents( *, dependencies: dict[str, Any], @@ -152,6 +176,7 @@ def build_subagents( extra_middleware: Sequence[Any] | None = None, mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None, exclude: list[str] | None = None, + disabled_tools: list[str] | None = None, ) -> list[SubAgent]: """Build registry subagents; skip memory/research; skip names in exclude.""" mcp = mcp_tools_by_agent or {} @@ -159,16 +184,17 @@ def build_subagents( excluded = ["memory", "research"] if exclude: excluded.extend(exclude) + disabled_names = frozenset(disabled_tools or ()) for name in sorted(SUBAGENT_BUILDERS_BY_NAME): if name in excluded: continue builder = SUBAGENT_BUILDERS_BY_NAME[name] - specs.append( - builder( - dependencies=dependencies, - model=model, - extra_middleware=extra_middleware, - extra_tools_bucket=mcp.get(name), - ), + spec = builder( + dependencies=dependencies, + model=model, + extra_middleware=extra_middleware, + extra_tools_bucket=mcp.get(name), ) + _filter_disabled_tools_in_place(spec, disabled_names) + specs.append(spec) return specs diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index e22aae8c4..286b13312 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -46,6 +46,7 @@ from app.agents.new_chat.memory_extraction import ( extract_and_save_memory, extract_and_save_team_memory, ) +from app.agents.new_chat.errors import BusyError from app.agents.new_chat.middleware.busy_mutex import release_lock as _release_busy_lock from app.agents.new_chat.middleware.kb_persistence import ( commit_staged_filesystem_state, @@ -1977,6 +1978,11 @@ async def stream_new_chat( _premium_reserved = 0 _premium_request_id: str | None = None + # ``BusyMutexMiddleware.abefore_agent`` raises ``BusyError`` *before* + # acquiring the lock, so a concurrent caller must not release the + # in-flight caller's lock from its own ``finally`` block. + _busy_error_raised = False + session = async_session_maker() try: # Mark AI as responding to this user for live collaboration @@ -2094,10 +2100,6 @@ async def stream_new_chat( _t0 = time.perf_counter() if use_multi_agent: - # TODO: Propagate ``disabled_tools`` into registry subagents. Today only the main - # agent honors UI disables; ``task`` delegates still get full specialist tool sets. - # Deliverables (and similar) are user-disableable but implemented on subagents, so - # disabling them in the UI does not fully apply until subagents filter too. agent = await create_registry_deep_agent( llm=llm, search_space_id=search_space_id, @@ -2620,6 +2622,13 @@ async def stream_new_chat( yield streaming_service.format_finish() yield streaming_service.format_done() + except BusyError as e: + _busy_error_raised = True + yield streaming_service.format_error(str(e)) + yield streaming_service.format_finish_step() + yield streaming_service.format_finish() + yield streaming_service.format_done() + except Exception as e: # Handle any errors import traceback @@ -2697,12 +2706,15 @@ async def stream_new_chat( # Release the busy lock here too: ``aafter_agent`` does not fire if the # graph paused on ``interrupt()`` or the stream bailed out early. - with contextlib.suppress(Exception): - if _release_busy_lock(str(chat_id)): - _perf_log.info( - "[stream_new_chat] released stale busy lock (chat_id=%s)", - chat_id, - ) + # Skip on ``BusyError``: this caller never acquired the lock, so a + # release here would steal the in-flight caller's lock. + if not _busy_error_raised: + with contextlib.suppress(Exception): + if _release_busy_lock(str(chat_id)): + _perf_log.info( + "[stream_new_chat] released stale busy lock (chat_id=%s)", + chat_id, + ) # Break circular refs held by the agent graph, tools, and LLM # wrappers so the GC can reclaim them in a single pass. @@ -2754,6 +2766,10 @@ async def stream_resume_chat( accumulator = start_turn() + # See ``stream_new_chat``: skip the finally release when ``BusyError`` + # short-circuited before this caller acquired the lock. + _busy_error_raised = False + session = async_session_maker() try: if user_id: @@ -3036,6 +3052,13 @@ async def stream_resume_chat( yield streaming_service.format_finish() yield streaming_service.format_done() + except BusyError as e: + _busy_error_raised = True + yield streaming_service.format_error(str(e)) + yield streaming_service.format_finish_step() + yield streaming_service.format_finish() + yield streaming_service.format_done() + except Exception as e: import traceback @@ -3086,12 +3109,14 @@ async def stream_resume_chat( # Release the busy lock left held by the originally-interrupted turn, # and any re-interrupt or early bailout from this resume. - with contextlib.suppress(Exception): - if _release_busy_lock(str(chat_id)): - _perf_log.info( - "[stream_resume] released stale busy lock (chat_id=%s)", - chat_id, - ) + # Skip on ``BusyError``: this caller never acquired the lock. + if not _busy_error_raised: + with contextlib.suppress(Exception): + if _release_busy_lock(str(chat_id)): + _perf_log.info( + "[stream_resume] released stale busy lock (chat_id=%s)", + chat_id, + ) agent = llm = connector_service = None stream_result = None