From 216a678f1a743061801ed6d39068cc3dabe90669 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Mon, 4 May 2026 21:32:42 +0200 Subject: [PATCH] Address LLM review findings; trim comments. --- .../config.py | 16 +++++---- .../resume.py | 6 ++-- .../task_tool.py | 24 ++++++++++++-- .../graph/middleware/deepagent_stack.py | 33 +++++++++++++------ .../subagents/registry.py | 7 +--- .../app/tasks/chat/stream_new_chat.py | 19 ++++------- 6 files changed, 65 insertions(+), 40 deletions(-) diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/config.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/config.py index 0d4a3e4e2..16211686c 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/config.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/config.py @@ -27,14 +27,18 @@ def subagent_invoke_config(runtime: ToolRuntime) -> dict[str, Any]: def consume_surfsense_resume(runtime: ToolRuntime) -> Any: - """Pop the resume payload so only the first matching subagent applies it. - - Sibling/nested ``task`` calls in the same parent run share the same - ``configurable`` dict by reference; leaving the value would replay decisions - onto unrelated subagent interrupts. - """ + """Pop the resume payload; siblings share ``configurable`` by reference.""" cfg = runtime.config or {} configurable = cfg.get("configurable") if isinstance(cfg, dict) else None if not isinstance(configurable, dict): return None return configurable.pop("surfsense_resume_value", None) + + +def has_surfsense_resume(runtime: ToolRuntime) -> bool: + """True iff a resume payload is queued on this runtime (non-destructive).""" + cfg = runtime.config or {} + configurable = cfg.get("configurable") if isinstance(cfg, dict) else None + if not isinstance(configurable, dict): + return False + return "surfsense_resume_value" in configurable diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/resume.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/resume.py index c9b8b01e6..d09eec6af 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/resume.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/resume.py @@ -37,10 +37,10 @@ def fan_out_decisions_to_match(resume_value: Any, expected_count: int) -> Any: def get_first_pending_subagent_interrupt(state: Any) -> tuple[str | None, Any]: - """First pending ``(interrupt_id, value)`` in the snapshot, else ``(None, None)``. + """First pending ``(interrupt_id, value)``; ``(None, None)`` if no interrupt. - The ``id`` lets the caller target ``Command(resume={id: value})`` so the - payload is not broadcast to a later fresh interrupt in the same run. + Assumes at most one pending interrupt per snapshot (sequential tool nodes). + Parallel tool nodes would need an id-aware lookup instead of first-wins. """ if state is None: return None, None diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/task_tool.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/task_tool.py index 57e01d791..e7458dde9 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/task_tool.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/checkpointed_subagent_middleware/task_tool.py @@ -18,7 +18,11 @@ from langchain_core.runnables import Runnable from langchain_core.tools import StructuredTool from langgraph.types import Command -from .config import consume_surfsense_resume, subagent_invoke_config +from .config import ( + consume_surfsense_resume, + has_surfsense_resume, + subagent_invoke_config, +) from .constants import EXCLUDED_STATE_KEYS from .propagation import ( amaybe_propagate_subagent_interrupt, @@ -116,7 +120,15 @@ def build_task_tool_with_parent_config( try: snapshot = get_state(sub_config) pending_id, pending_value = get_first_pending_subagent_interrupt(snapshot) - except Exception: # pragma: no cover - defensive + except Exception: + # Fail loud if a resume is queued: silent fallback would + # replay the original interrupt to the user. + if has_surfsense_resume(runtime): + logger.exception( + "Subagent %r get_state raised with resume queued; re-raising.", + subagent_type, + ) + raise logger.debug( "Subagent get_state failed; falling back to fresh invoke", exc_info=True, @@ -182,7 +194,13 @@ def build_task_tool_with_parent_config( try: snapshot = await aget_state(sub_config) pending_id, pending_value = get_first_pending_subagent_interrupt(snapshot) - except Exception: # pragma: no cover - defensive + except Exception: + if has_surfsense_resume(runtime): + logger.exception( + "Subagent %r aget_state raised with resume queued; re-raising.", + subagent_type, + ) + raise logger.debug( "Subagent aget_state failed; falling back to fresh ainvoke", exc_info=True, diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/deepagent_stack.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/deepagent_stack.py index 8dcac512c..57fa3b34a 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/deepagent_stack.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/main_agent/graph/middleware/deepagent_stack.py @@ -143,6 +143,25 @@ def build_main_agent_deepagent_middleware( # Tools that self-prompt via ``request_approval`` must not also appear # as ``ask`` rules — that would double-prompt the user for one call. _tool_names_in_use = {t.name for t in tools} + + # Deny parent-bound tools whose ``required_connector`` is missing. + # No-op today (connector subagents are pruned upstream); guards future + # additions to the parent's tool list. + if permission_enabled: + _available_set = set(available_connectors or []) + _synthesized: list[Rule] = [] + for tool_def in BUILTIN_TOOLS: + if tool_def.name not in _tool_names_in_use: + continue + rc = tool_def.required_connector + if rc and rc not in _available_set: + _synthesized.append( + Rule(permission=tool_def.name, pattern="*", action="deny") + ) + if _synthesized: + permission_rulesets.append( + Ruleset(rules=_synthesized, origin="connector_synthesized") + ) gp_interrupt_on: dict[str, bool] = { rule.permission: True for rs in permission_rulesets @@ -159,13 +178,8 @@ def build_main_agent_deepagent_middleware( if gp_interrupt_on: general_purpose_spec["interrupt_on"] = gp_interrupt_on - # ``deny`` rules must apply on every tool call, including those emitted - # from ``task`` runs that never reach the parent's ``PermissionMiddleware``. - # Stripping ``allow``/``ask`` keeps the bucket-based ask gates (per-tool - # ``interrupt_on`` for ``mcp`` rows + ``request_approval`` in native tool - # bodies) as the single ask path — no double-prompt — and ensures the - # ``runtime_ruleset`` mutation in ``_persist_always`` is unreachable, so a - # shared instance across subagents stays read-only. + # Deny-only on subagents: ``task`` runs bypass the parent's + # PermissionMiddleware, while bucket-based ask gates own the ask path. subagent_deny_rulesets: list[Ruleset] = [ Ruleset( rules=[r for r in rs.rules if r.action == "deny"], @@ -182,9 +196,8 @@ def build_main_agent_deepagent_middleware( ) if subagent_deny_permission_mw is not None: - # Match new_chat ordering: deny check runs on already-repaired tool - # calls. Insert just before ``PatchToolCallsMiddleware`` (and fall back - # to append if the slot moves). + # Run deny check on already-repaired tool calls; insert before + # PatchToolCallsMiddleware (append if the slot moves). _patch_idx = next( ( i diff --git a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/registry.py b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/registry.py index 6e2859b0f..dde98018e 100644 --- a/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/registry.py +++ b/surfsense_backend/app/agents/multi_agent_with_deepagents/subagents/registry.py @@ -149,12 +149,7 @@ def _filter_disabled_tools_in_place( spec: SubAgent, disabled_names: frozenset[str], ) -> None: - """Drop UI-disabled tools from ``spec["tools"]`` and ``spec["interrupt_on"]``. - - Single funnel for both native (loaded by the route's ``load_tools``) and MCP - (passed via ``extra_tools_bucket``) — by post-processing the packed spec we - avoid touching every per-route ``build_subagent``. - """ + """Drop UI-disabled tools from ``spec["tools"]`` and ``spec["interrupt_on"]``.""" if not disabled_names: return tools = spec.get("tools") # type: ignore[typeddict-item] diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 286b13312..03a039054 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -1978,9 +1978,8 @@ async def stream_new_chat( _premium_reserved = 0 _premium_request_id: str | None = None - # ``BusyMutexMiddleware.abefore_agent`` raises ``BusyError`` *before* - # acquiring the lock, so a concurrent caller must not release the - # in-flight caller's lock from its own ``finally`` block. + # ``BusyError`` fires before the lock is acquired; the ``finally`` must + # not release the in-flight caller's lock. _busy_error_raised = False session = async_session_maker() @@ -2704,10 +2703,8 @@ async def stream_new_chat( chat_id, stream_result.sandbox_files ) - # Release the busy lock here too: ``aafter_agent`` does not fire if the - # graph paused on ``interrupt()`` or the stream bailed out early. - # Skip on ``BusyError``: this caller never acquired the lock, so a - # release here would steal the in-flight caller's lock. + # ``aafter_agent`` doesn't fire on ``interrupt()`` or early bailout. + # Skip on ``BusyError`` (caller never acquired the lock). if not _busy_error_raised: with contextlib.suppress(Exception): if _release_busy_lock(str(chat_id)): @@ -2766,8 +2763,7 @@ async def stream_resume_chat( accumulator = start_turn() - # See ``stream_new_chat``: skip the finally release when ``BusyError`` - # short-circuited before this caller acquired the lock. + # Skip the finally release on ``BusyError`` (caller never acquired the lock). _busy_error_raised = False session = async_session_maker() @@ -3107,9 +3103,8 @@ async def stream_resume_chat( with contextlib.suppress(Exception): await session.close() - # Release the busy lock left held by the originally-interrupted turn, - # and any re-interrupt or early bailout from this resume. - # Skip on ``BusyError``: this caller never acquired the lock. + # Release the lock from the original interrupted turn or any + # re-interrupt/bailout. Skip on ``BusyError`` (lock not held here). if not _busy_error_raised: with contextlib.suppress(Exception): if _release_busy_lock(str(chat_id)):