mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-07 14:52:39 +02:00
Harden multi-agent for production: resume cleanup, busy-mutex race, deny propagation, disabled-tools.
This commit is contained in:
parent
7735becd02
commit
65f1f8f73c
7 changed files with 128 additions and 29 deletions
|
|
@ -41,6 +41,7 @@ def build_compiled_agent_graph_sync(
|
||||||
checkpointer: Checkpointer,
|
checkpointer: Checkpointer,
|
||||||
subagent_dependencies: dict[str, Any],
|
subagent_dependencies: dict[str, Any],
|
||||||
mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None,
|
mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None,
|
||||||
|
disabled_tools: list[str] | None = None,
|
||||||
):
|
):
|
||||||
"""Sync compile: middleware + ``create_agent`` (run via ``asyncio.to_thread``)."""
|
"""Sync compile: middleware + ``create_agent`` (run via ``asyncio.to_thread``)."""
|
||||||
main_agent_middleware = build_main_agent_deepagent_middleware(
|
main_agent_middleware = build_main_agent_deepagent_middleware(
|
||||||
|
|
@ -61,6 +62,7 @@ def build_compiled_agent_graph_sync(
|
||||||
subagent_dependencies=subagent_dependencies,
|
subagent_dependencies=subagent_dependencies,
|
||||||
checkpointer=checkpointer,
|
checkpointer=checkpointer,
|
||||||
mcp_tools_by_agent=mcp_tools_by_agent,
|
mcp_tools_by_agent=mcp_tools_by_agent,
|
||||||
|
disabled_tools=disabled_tools,
|
||||||
)
|
)
|
||||||
|
|
||||||
agent = create_agent(
|
agent = create_agent(
|
||||||
|
|
|
||||||
|
|
@ -26,10 +26,15 @@ def subagent_invoke_config(runtime: ToolRuntime) -> dict[str, Any]:
|
||||||
return merged
|
return merged
|
||||||
|
|
||||||
|
|
||||||
def extract_surfsense_resume(runtime: ToolRuntime) -> Any:
|
def consume_surfsense_resume(runtime: ToolRuntime) -> Any:
|
||||||
"""Resume payload stashed by ``stream_resume_chat``; ``None`` on a first-time call."""
|
"""Pop the resume payload so only the first matching subagent applies it.
|
||||||
|
|
||||||
|
Sibling/nested ``task`` calls in the same parent run share the same
|
||||||
|
``configurable`` dict by reference; leaving the value would replay decisions
|
||||||
|
onto unrelated subagent interrupts.
|
||||||
|
"""
|
||||||
cfg = runtime.config or {}
|
cfg = runtime.config or {}
|
||||||
configurable = cfg.get("configurable") if isinstance(cfg, dict) else None
|
configurable = cfg.get("configurable") if isinstance(cfg, dict) else None
|
||||||
if not isinstance(configurable, dict):
|
if not isinstance(configurable, dict):
|
||||||
return None
|
return None
|
||||||
return configurable.get("surfsense_resume_value")
|
return configurable.pop("surfsense_resume_value", None)
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ from langchain_core.runnables import Runnable
|
||||||
from langchain_core.tools import StructuredTool
|
from langchain_core.tools import StructuredTool
|
||||||
from langgraph.types import Command
|
from langgraph.types import Command
|
||||||
|
|
||||||
from .config import extract_surfsense_resume, subagent_invoke_config
|
from .config import consume_surfsense_resume, subagent_invoke_config
|
||||||
from .constants import EXCLUDED_STATE_KEYS
|
from .constants import EXCLUDED_STATE_KEYS
|
||||||
from .propagation import (
|
from .propagation import (
|
||||||
amaybe_propagate_subagent_interrupt,
|
amaybe_propagate_subagent_interrupt,
|
||||||
|
|
@ -123,7 +123,7 @@ def build_task_tool_with_parent_config(
|
||||||
)
|
)
|
||||||
|
|
||||||
if pending_value is not None:
|
if pending_value is not None:
|
||||||
resume_value = extract_surfsense_resume(runtime)
|
resume_value = consume_surfsense_resume(runtime)
|
||||||
if resume_value is not None:
|
if resume_value is not None:
|
||||||
expected = hitlrequest_action_count(pending_value)
|
expected = hitlrequest_action_count(pending_value)
|
||||||
resume_value = fan_out_decisions_to_match(resume_value, expected)
|
resume_value = fan_out_decisions_to_match(resume_value, expected)
|
||||||
|
|
@ -189,7 +189,7 @@ def build_task_tool_with_parent_config(
|
||||||
)
|
)
|
||||||
|
|
||||||
if pending_value is not None:
|
if pending_value is not None:
|
||||||
resume_value = extract_surfsense_resume(runtime)
|
resume_value = consume_surfsense_resume(runtime)
|
||||||
if resume_value is not None:
|
if resume_value is not None:
|
||||||
expected = hitlrequest_action_count(pending_value)
|
expected = hitlrequest_action_count(pending_value)
|
||||||
resume_value = fan_out_decisions_to_match(resume_value, expected)
|
resume_value = fan_out_decisions_to_match(resume_value, expected)
|
||||||
|
|
|
||||||
|
|
@ -88,6 +88,7 @@ def build_main_agent_deepagent_middleware(
|
||||||
subagent_dependencies: dict[str, Any],
|
subagent_dependencies: dict[str, Any],
|
||||||
checkpointer: Checkpointer,
|
checkpointer: Checkpointer,
|
||||||
mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None,
|
mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None,
|
||||||
|
disabled_tools: list[str] | None = None,
|
||||||
) -> list[Any]:
|
) -> list[Any]:
|
||||||
"""Build ordered middleware for ``create_agent`` (Nones already stripped)."""
|
"""Build ordered middleware for ``create_agent`` (Nones already stripped)."""
|
||||||
_memory_middleware = MemoryInjectionMiddleware(
|
_memory_middleware = MemoryInjectionMiddleware(
|
||||||
|
|
@ -158,6 +159,42 @@ def build_main_agent_deepagent_middleware(
|
||||||
if gp_interrupt_on:
|
if gp_interrupt_on:
|
||||||
general_purpose_spec["interrupt_on"] = gp_interrupt_on
|
general_purpose_spec["interrupt_on"] = gp_interrupt_on
|
||||||
|
|
||||||
|
# ``deny`` rules must apply on every tool call, including those emitted
|
||||||
|
# from ``task`` runs that never reach the parent's ``PermissionMiddleware``.
|
||||||
|
# Stripping ``allow``/``ask`` keeps the bucket-based ask gates (per-tool
|
||||||
|
# ``interrupt_on`` for ``mcp`` rows + ``request_approval`` in native tool
|
||||||
|
# bodies) as the single ask path — no double-prompt — and ensures the
|
||||||
|
# ``runtime_ruleset`` mutation in ``_persist_always`` is unreachable, so a
|
||||||
|
# shared instance across subagents stays read-only.
|
||||||
|
subagent_deny_rulesets: list[Ruleset] = [
|
||||||
|
Ruleset(
|
||||||
|
rules=[r for r in rs.rules if r.action == "deny"],
|
||||||
|
origin=rs.origin,
|
||||||
|
)
|
||||||
|
for rs in permission_rulesets
|
||||||
|
]
|
||||||
|
subagent_deny_rulesets = [rs for rs in subagent_deny_rulesets if rs.rules]
|
||||||
|
|
||||||
|
subagent_deny_permission_mw: PermissionMiddleware | None = (
|
||||||
|
PermissionMiddleware(rulesets=subagent_deny_rulesets)
|
||||||
|
if subagent_deny_rulesets
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
|
if subagent_deny_permission_mw is not None:
|
||||||
|
# Match new_chat ordering: deny check runs on already-repaired tool
|
||||||
|
# calls. Insert just before ``PatchToolCallsMiddleware`` (and fall back
|
||||||
|
# to append if the slot moves).
|
||||||
|
_patch_idx = next(
|
||||||
|
(
|
||||||
|
i
|
||||||
|
for i, m in enumerate(gp_middleware)
|
||||||
|
if isinstance(m, PatchToolCallsMiddleware)
|
||||||
|
),
|
||||||
|
len(gp_middleware),
|
||||||
|
)
|
||||||
|
gp_middleware.insert(_patch_idx, subagent_deny_permission_mw)
|
||||||
|
|
||||||
registry_subagents: list[SubAgent] = []
|
registry_subagents: list[SubAgent] = []
|
||||||
try:
|
try:
|
||||||
subagent_extra_middleware: list[Any] = [
|
subagent_extra_middleware: list[Any] = [
|
||||||
|
|
@ -170,12 +207,15 @@ def build_main_agent_deepagent_middleware(
|
||||||
thread_id=thread_id,
|
thread_id=thread_id,
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
if subagent_deny_permission_mw is not None:
|
||||||
|
subagent_extra_middleware.append(subagent_deny_permission_mw)
|
||||||
registry_subagents = build_subagents(
|
registry_subagents = build_subagents(
|
||||||
dependencies=subagent_dependencies,
|
dependencies=subagent_dependencies,
|
||||||
model=llm,
|
model=llm,
|
||||||
extra_middleware=subagent_extra_middleware,
|
extra_middleware=subagent_extra_middleware,
|
||||||
mcp_tools_by_agent=mcp_tools_by_agent or {},
|
mcp_tools_by_agent=mcp_tools_by_agent or {},
|
||||||
exclude=get_subagents_to_exclude(available_connectors),
|
exclude=get_subagents_to_exclude(available_connectors),
|
||||||
|
disabled_tools=disabled_tools,
|
||||||
)
|
)
|
||||||
logging.info(
|
logging.info(
|
||||||
"Registry subagents: %s",
|
"Registry subagents: %s",
|
||||||
|
|
|
||||||
|
|
@ -212,6 +212,7 @@ async def create_surfsense_deep_agent(
|
||||||
checkpointer=checkpointer,
|
checkpointer=checkpointer,
|
||||||
subagent_dependencies=dependencies,
|
subagent_dependencies=dependencies,
|
||||||
mcp_tools_by_agent=mcp_tools_by_agent,
|
mcp_tools_by_agent=mcp_tools_by_agent,
|
||||||
|
disabled_tools=disabled_tools,
|
||||||
)
|
)
|
||||||
_perf_log.info(
|
_perf_log.info(
|
||||||
"[create_agent] Middleware stack + graph compiled in %.3fs",
|
"[create_agent] Middleware stack + graph compiled in %.3fs",
|
||||||
|
|
|
||||||
|
|
@ -145,6 +145,30 @@ def get_subagents_to_exclude(
|
||||||
return sorted(excluded_names)
|
return sorted(excluded_names)
|
||||||
|
|
||||||
|
|
||||||
|
def _filter_disabled_tools_in_place(
|
||||||
|
spec: SubAgent,
|
||||||
|
disabled_names: frozenset[str],
|
||||||
|
) -> None:
|
||||||
|
"""Drop UI-disabled tools from ``spec["tools"]`` and ``spec["interrupt_on"]``.
|
||||||
|
|
||||||
|
Single funnel for both native (loaded by the route's ``load_tools``) and MCP
|
||||||
|
(passed via ``extra_tools_bucket``) — by post-processing the packed spec we
|
||||||
|
avoid touching every per-route ``build_subagent``.
|
||||||
|
"""
|
||||||
|
if not disabled_names:
|
||||||
|
return
|
||||||
|
tools = spec.get("tools") # type: ignore[typeddict-item]
|
||||||
|
if isinstance(tools, list):
|
||||||
|
spec["tools"] = [ # type: ignore[typeddict-unknown-key]
|
||||||
|
t for t in tools if getattr(t, "name", None) not in disabled_names
|
||||||
|
]
|
||||||
|
interrupt_on = spec.get("interrupt_on") # type: ignore[typeddict-item]
|
||||||
|
if isinstance(interrupt_on, dict):
|
||||||
|
spec["interrupt_on"] = { # type: ignore[typeddict-unknown-key]
|
||||||
|
k: v for k, v in interrupt_on.items() if k not in disabled_names
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def build_subagents(
|
def build_subagents(
|
||||||
*,
|
*,
|
||||||
dependencies: dict[str, Any],
|
dependencies: dict[str, Any],
|
||||||
|
|
@ -152,6 +176,7 @@ def build_subagents(
|
||||||
extra_middleware: Sequence[Any] | None = None,
|
extra_middleware: Sequence[Any] | None = None,
|
||||||
mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None,
|
mcp_tools_by_agent: dict[str, ToolsPermissions] | None = None,
|
||||||
exclude: list[str] | None = None,
|
exclude: list[str] | None = None,
|
||||||
|
disabled_tools: list[str] | None = None,
|
||||||
) -> list[SubAgent]:
|
) -> list[SubAgent]:
|
||||||
"""Build registry subagents; skip memory/research; skip names in exclude."""
|
"""Build registry subagents; skip memory/research; skip names in exclude."""
|
||||||
mcp = mcp_tools_by_agent or {}
|
mcp = mcp_tools_by_agent or {}
|
||||||
|
|
@ -159,16 +184,17 @@ def build_subagents(
|
||||||
excluded = ["memory", "research"]
|
excluded = ["memory", "research"]
|
||||||
if exclude:
|
if exclude:
|
||||||
excluded.extend(exclude)
|
excluded.extend(exclude)
|
||||||
|
disabled_names = frozenset(disabled_tools or ())
|
||||||
for name in sorted(SUBAGENT_BUILDERS_BY_NAME):
|
for name in sorted(SUBAGENT_BUILDERS_BY_NAME):
|
||||||
if name in excluded:
|
if name in excluded:
|
||||||
continue
|
continue
|
||||||
builder = SUBAGENT_BUILDERS_BY_NAME[name]
|
builder = SUBAGENT_BUILDERS_BY_NAME[name]
|
||||||
specs.append(
|
spec = builder(
|
||||||
builder(
|
dependencies=dependencies,
|
||||||
dependencies=dependencies,
|
model=model,
|
||||||
model=model,
|
extra_middleware=extra_middleware,
|
||||||
extra_middleware=extra_middleware,
|
extra_tools_bucket=mcp.get(name),
|
||||||
extra_tools_bucket=mcp.get(name),
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
_filter_disabled_tools_in_place(spec, disabled_names)
|
||||||
|
specs.append(spec)
|
||||||
return specs
|
return specs
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ from app.agents.new_chat.memory_extraction import (
|
||||||
extract_and_save_memory,
|
extract_and_save_memory,
|
||||||
extract_and_save_team_memory,
|
extract_and_save_team_memory,
|
||||||
)
|
)
|
||||||
|
from app.agents.new_chat.errors import BusyError
|
||||||
from app.agents.new_chat.middleware.busy_mutex import release_lock as _release_busy_lock
|
from app.agents.new_chat.middleware.busy_mutex import release_lock as _release_busy_lock
|
||||||
from app.agents.new_chat.middleware.kb_persistence import (
|
from app.agents.new_chat.middleware.kb_persistence import (
|
||||||
commit_staged_filesystem_state,
|
commit_staged_filesystem_state,
|
||||||
|
|
@ -1977,6 +1978,11 @@ async def stream_new_chat(
|
||||||
_premium_reserved = 0
|
_premium_reserved = 0
|
||||||
_premium_request_id: str | None = None
|
_premium_request_id: str | None = None
|
||||||
|
|
||||||
|
# ``BusyMutexMiddleware.abefore_agent`` raises ``BusyError`` *before*
|
||||||
|
# acquiring the lock, so a concurrent caller must not release the
|
||||||
|
# in-flight caller's lock from its own ``finally`` block.
|
||||||
|
_busy_error_raised = False
|
||||||
|
|
||||||
session = async_session_maker()
|
session = async_session_maker()
|
||||||
try:
|
try:
|
||||||
# Mark AI as responding to this user for live collaboration
|
# Mark AI as responding to this user for live collaboration
|
||||||
|
|
@ -2094,10 +2100,6 @@ async def stream_new_chat(
|
||||||
|
|
||||||
_t0 = time.perf_counter()
|
_t0 = time.perf_counter()
|
||||||
if use_multi_agent:
|
if use_multi_agent:
|
||||||
# TODO: Propagate ``disabled_tools`` into registry subagents. Today only the main
|
|
||||||
# agent honors UI disables; ``task`` delegates still get full specialist tool sets.
|
|
||||||
# Deliverables (and similar) are user-disableable but implemented on subagents, so
|
|
||||||
# disabling them in the UI does not fully apply until subagents filter too.
|
|
||||||
agent = await create_registry_deep_agent(
|
agent = await create_registry_deep_agent(
|
||||||
llm=llm,
|
llm=llm,
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
|
|
@ -2620,6 +2622,13 @@ async def stream_new_chat(
|
||||||
yield streaming_service.format_finish()
|
yield streaming_service.format_finish()
|
||||||
yield streaming_service.format_done()
|
yield streaming_service.format_done()
|
||||||
|
|
||||||
|
except BusyError as e:
|
||||||
|
_busy_error_raised = True
|
||||||
|
yield streaming_service.format_error(str(e))
|
||||||
|
yield streaming_service.format_finish_step()
|
||||||
|
yield streaming_service.format_finish()
|
||||||
|
yield streaming_service.format_done()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Handle any errors
|
# Handle any errors
|
||||||
import traceback
|
import traceback
|
||||||
|
|
@ -2697,12 +2706,15 @@ async def stream_new_chat(
|
||||||
|
|
||||||
# Release the busy lock here too: ``aafter_agent`` does not fire if the
|
# Release the busy lock here too: ``aafter_agent`` does not fire if the
|
||||||
# graph paused on ``interrupt()`` or the stream bailed out early.
|
# graph paused on ``interrupt()`` or the stream bailed out early.
|
||||||
with contextlib.suppress(Exception):
|
# Skip on ``BusyError``: this caller never acquired the lock, so a
|
||||||
if _release_busy_lock(str(chat_id)):
|
# release here would steal the in-flight caller's lock.
|
||||||
_perf_log.info(
|
if not _busy_error_raised:
|
||||||
"[stream_new_chat] released stale busy lock (chat_id=%s)",
|
with contextlib.suppress(Exception):
|
||||||
chat_id,
|
if _release_busy_lock(str(chat_id)):
|
||||||
)
|
_perf_log.info(
|
||||||
|
"[stream_new_chat] released stale busy lock (chat_id=%s)",
|
||||||
|
chat_id,
|
||||||
|
)
|
||||||
|
|
||||||
# Break circular refs held by the agent graph, tools, and LLM
|
# Break circular refs held by the agent graph, tools, and LLM
|
||||||
# wrappers so the GC can reclaim them in a single pass.
|
# wrappers so the GC can reclaim them in a single pass.
|
||||||
|
|
@ -2754,6 +2766,10 @@ async def stream_resume_chat(
|
||||||
|
|
||||||
accumulator = start_turn()
|
accumulator = start_turn()
|
||||||
|
|
||||||
|
# See ``stream_new_chat``: skip the finally release when ``BusyError``
|
||||||
|
# short-circuited before this caller acquired the lock.
|
||||||
|
_busy_error_raised = False
|
||||||
|
|
||||||
session = async_session_maker()
|
session = async_session_maker()
|
||||||
try:
|
try:
|
||||||
if user_id:
|
if user_id:
|
||||||
|
|
@ -3036,6 +3052,13 @@ async def stream_resume_chat(
|
||||||
yield streaming_service.format_finish()
|
yield streaming_service.format_finish()
|
||||||
yield streaming_service.format_done()
|
yield streaming_service.format_done()
|
||||||
|
|
||||||
|
except BusyError as e:
|
||||||
|
_busy_error_raised = True
|
||||||
|
yield streaming_service.format_error(str(e))
|
||||||
|
yield streaming_service.format_finish_step()
|
||||||
|
yield streaming_service.format_finish()
|
||||||
|
yield streaming_service.format_done()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
|
@ -3086,12 +3109,14 @@ async def stream_resume_chat(
|
||||||
|
|
||||||
# Release the busy lock left held by the originally-interrupted turn,
|
# Release the busy lock left held by the originally-interrupted turn,
|
||||||
# and any re-interrupt or early bailout from this resume.
|
# and any re-interrupt or early bailout from this resume.
|
||||||
with contextlib.suppress(Exception):
|
# Skip on ``BusyError``: this caller never acquired the lock.
|
||||||
if _release_busy_lock(str(chat_id)):
|
if not _busy_error_raised:
|
||||||
_perf_log.info(
|
with contextlib.suppress(Exception):
|
||||||
"[stream_resume] released stale busy lock (chat_id=%s)",
|
if _release_busy_lock(str(chat_id)):
|
||||||
chat_id,
|
_perf_log.info(
|
||||||
)
|
"[stream_resume] released stale busy lock (chat_id=%s)",
|
||||||
|
chat_id,
|
||||||
|
)
|
||||||
|
|
||||||
agent = llm = connector_service = None
|
agent = llm = connector_service = None
|
||||||
stream_result = None
|
stream_result = None
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue