Address LLM review findings; trim comments.

This commit is contained in:
CREDO23 2026-05-04 21:32:42 +02:00
parent 65f1f8f73c
commit 216a678f1a
6 changed files with 65 additions and 40 deletions

View file

@ -27,14 +27,18 @@ def subagent_invoke_config(runtime: ToolRuntime) -> dict[str, Any]:
def consume_surfsense_resume(runtime: ToolRuntime) -> Any:
"""Pop the resume payload so only the first matching subagent applies it.
Sibling/nested ``task`` calls in the same parent run share the same
``configurable`` dict by reference; leaving the value would replay decisions
onto unrelated subagent interrupts.
"""
"""Pop the resume payload; siblings share ``configurable`` by reference."""
cfg = runtime.config or {}
configurable = cfg.get("configurable") if isinstance(cfg, dict) else None
if not isinstance(configurable, dict):
return None
return configurable.pop("surfsense_resume_value", None)
def has_surfsense_resume(runtime: ToolRuntime) -> bool:
"""True iff a resume payload is queued on this runtime (non-destructive)."""
cfg = runtime.config or {}
configurable = cfg.get("configurable") if isinstance(cfg, dict) else None
if not isinstance(configurable, dict):
return False
return "surfsense_resume_value" in configurable

View file

@ -37,10 +37,10 @@ def fan_out_decisions_to_match(resume_value: Any, expected_count: int) -> Any:
def get_first_pending_subagent_interrupt(state: Any) -> tuple[str | None, Any]:
"""First pending ``(interrupt_id, value)`` in the snapshot, else ``(None, None)``.
"""First pending ``(interrupt_id, value)``; ``(None, None)`` if no interrupt.
The ``id`` lets the caller target ``Command(resume={id: value})`` so the
payload is not broadcast to a later fresh interrupt in the same run.
Assumes at most one pending interrupt per snapshot (sequential tool nodes).
Parallel tool nodes would need an id-aware lookup instead of first-wins.
"""
if state is None:
return None, None

View file

@ -18,7 +18,11 @@ from langchain_core.runnables import Runnable
from langchain_core.tools import StructuredTool
from langgraph.types import Command
from .config import consume_surfsense_resume, subagent_invoke_config
from .config import (
consume_surfsense_resume,
has_surfsense_resume,
subagent_invoke_config,
)
from .constants import EXCLUDED_STATE_KEYS
from .propagation import (
amaybe_propagate_subagent_interrupt,
@ -116,7 +120,15 @@ def build_task_tool_with_parent_config(
try:
snapshot = get_state(sub_config)
pending_id, pending_value = get_first_pending_subagent_interrupt(snapshot)
except Exception: # pragma: no cover - defensive
except Exception:
# Fail loud if a resume is queued: silent fallback would
# replay the original interrupt to the user.
if has_surfsense_resume(runtime):
logger.exception(
"Subagent %r get_state raised with resume queued; re-raising.",
subagent_type,
)
raise
logger.debug(
"Subagent get_state failed; falling back to fresh invoke",
exc_info=True,
@ -182,7 +194,13 @@ def build_task_tool_with_parent_config(
try:
snapshot = await aget_state(sub_config)
pending_id, pending_value = get_first_pending_subagent_interrupt(snapshot)
except Exception: # pragma: no cover - defensive
except Exception:
if has_surfsense_resume(runtime):
logger.exception(
"Subagent %r aget_state raised with resume queued; re-raising.",
subagent_type,
)
raise
logger.debug(
"Subagent aget_state failed; falling back to fresh ainvoke",
exc_info=True,

View file

@ -143,6 +143,25 @@ def build_main_agent_deepagent_middleware(
# Tools that self-prompt via ``request_approval`` must not also appear
# as ``ask`` rules — that would double-prompt the user for one call.
_tool_names_in_use = {t.name for t in tools}
# Deny parent-bound tools whose ``required_connector`` is missing.
# No-op today (connector subagents are pruned upstream); guards future
# additions to the parent's tool list.
if permission_enabled:
_available_set = set(available_connectors or [])
_synthesized: list[Rule] = []
for tool_def in BUILTIN_TOOLS:
if tool_def.name not in _tool_names_in_use:
continue
rc = tool_def.required_connector
if rc and rc not in _available_set:
_synthesized.append(
Rule(permission=tool_def.name, pattern="*", action="deny")
)
if _synthesized:
permission_rulesets.append(
Ruleset(rules=_synthesized, origin="connector_synthesized")
)
gp_interrupt_on: dict[str, bool] = {
rule.permission: True
for rs in permission_rulesets
@ -159,13 +178,8 @@ def build_main_agent_deepagent_middleware(
if gp_interrupt_on:
general_purpose_spec["interrupt_on"] = gp_interrupt_on
# ``deny`` rules must apply on every tool call, including those emitted
# from ``task`` runs that never reach the parent's ``PermissionMiddleware``.
# Stripping ``allow``/``ask`` keeps the bucket-based ask gates (per-tool
# ``interrupt_on`` for ``mcp`` rows + ``request_approval`` in native tool
# bodies) as the single ask path — no double-prompt — and ensures the
# ``runtime_ruleset`` mutation in ``_persist_always`` is unreachable, so a
# shared instance across subagents stays read-only.
# Deny-only on subagents: ``task`` runs bypass the parent's
# PermissionMiddleware, while bucket-based ask gates own the ask path.
subagent_deny_rulesets: list[Ruleset] = [
Ruleset(
rules=[r for r in rs.rules if r.action == "deny"],
@ -182,9 +196,8 @@ def build_main_agent_deepagent_middleware(
)
if subagent_deny_permission_mw is not None:
# Match new_chat ordering: deny check runs on already-repaired tool
# calls. Insert just before ``PatchToolCallsMiddleware`` (and fall back
# to append if the slot moves).
# Run deny check on already-repaired tool calls; insert before
# PatchToolCallsMiddleware (append if the slot moves).
_patch_idx = next(
(
i

View file

@ -149,12 +149,7 @@ def _filter_disabled_tools_in_place(
spec: SubAgent,
disabled_names: frozenset[str],
) -> None:
"""Drop UI-disabled tools from ``spec["tools"]`` and ``spec["interrupt_on"]``.
Single funnel for both native (loaded by the route's ``load_tools``) and MCP
(passed via ``extra_tools_bucket``) by post-processing the packed spec we
avoid touching every per-route ``build_subagent``.
"""
"""Drop UI-disabled tools from ``spec["tools"]`` and ``spec["interrupt_on"]``."""
if not disabled_names:
return
tools = spec.get("tools") # type: ignore[typeddict-item]

View file

@ -1978,9 +1978,8 @@ async def stream_new_chat(
_premium_reserved = 0
_premium_request_id: str | None = None
# ``BusyMutexMiddleware.abefore_agent`` raises ``BusyError`` *before*
# acquiring the lock, so a concurrent caller must not release the
# in-flight caller's lock from its own ``finally`` block.
# ``BusyError`` fires before the lock is acquired; the ``finally`` must
# not release the in-flight caller's lock.
_busy_error_raised = False
session = async_session_maker()
@ -2704,10 +2703,8 @@ async def stream_new_chat(
chat_id, stream_result.sandbox_files
)
# Release the busy lock here too: ``aafter_agent`` does not fire if the
# graph paused on ``interrupt()`` or the stream bailed out early.
# Skip on ``BusyError``: this caller never acquired the lock, so a
# release here would steal the in-flight caller's lock.
# ``aafter_agent`` doesn't fire on ``interrupt()`` or early bailout.
# Skip on ``BusyError`` (caller never acquired the lock).
if not _busy_error_raised:
with contextlib.suppress(Exception):
if _release_busy_lock(str(chat_id)):
@ -2766,8 +2763,7 @@ async def stream_resume_chat(
accumulator = start_turn()
# See ``stream_new_chat``: skip the finally release when ``BusyError``
# short-circuited before this caller acquired the lock.
# Skip the finally release on ``BusyError`` (caller never acquired the lock).
_busy_error_raised = False
session = async_session_maker()
@ -3107,9 +3103,8 @@ async def stream_resume_chat(
with contextlib.suppress(Exception):
await session.close()
# Release the busy lock left held by the originally-interrupted turn,
# and any re-interrupt or early bailout from this resume.
# Skip on ``BusyError``: this caller never acquired the lock.
# Release the lock from the original interrupted turn or any
# re-interrupt/bailout. Skip on ``BusyError`` (lock not held here).
if not _busy_error_raised:
with contextlib.suppress(Exception):
if _release_busy_lock(str(chat_id)):