Forward HITL decisions from the streaming layer to subagents via the config side-channel.

This commit is contained in:
CREDO23 2026-05-04 18:42:58 +02:00
parent ba2138c164
commit 4ac3f0b304
4 changed files with 75 additions and 20 deletions

View file

@ -85,6 +85,23 @@ class _ThreadLockManager:
if event is not None: if event is not None:
event.clear() event.clear()
def release(self, thread_id: str) -> bool:
"""Force-release the per-thread lock; safety-net for turns that end before ``__end__``.
``BusyMutexMiddleware.aafter_agent`` only releases on graph completion, so
an ``interrupt()`` pause or an early streaming bail-out would otherwise
leak the lock and block the next request with :class:`BusyError`. Returns
``True`` when a held lock was released, ``False`` otherwise.
"""
lock = self._locks.get(thread_id)
if lock is None or not lock.locked():
return False
try:
lock.release()
except RuntimeError:
return False
return True
# Module-level singleton — process-local but reused across all agent # Module-level singleton — process-local but reused across all agent
# instances built in this process. Subagents created in nested # instances built in this process. Subagents created in nested
@ -107,6 +124,11 @@ def reset_cancel(thread_id: str) -> None:
manager.reset(thread_id) manager.reset(thread_id)
def release_lock(thread_id: str) -> bool:
"""Force-release the per-thread busy lock; safe to call when not held."""
return manager.release(thread_id)
class BusyMutexMiddleware(AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]): class BusyMutexMiddleware(AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]):
"""Block concurrent prompts on the same thread. """Block concurrent prompts on the same thread.
@ -231,6 +253,7 @@ __all__ = [
"BusyMutexMiddleware", "BusyMutexMiddleware",
"get_cancel_event", "get_cancel_event",
"manager", "manager",
"release_lock",
"request_cancel", "request_cancel",
"reset_cancel", "reset_cancel",
] ]

View file

@ -2046,6 +2046,7 @@ async def resume_chat(
thread_visibility=thread.visibility, thread_visibility=thread.visibility,
filesystem_selection=filesystem_selection, filesystem_selection=filesystem_selection,
request_id=getattr(http_request.state, "request_id", "unknown"), request_id=getattr(http_request.state, "request_id", "unknown"),
disabled_tools=request.disabled_tools,
), ),
media_type="text/event-stream", media_type="text/event-stream",
headers={ headers={

View file

@ -330,6 +330,9 @@ class ResumeDecision(BaseModel):
class ResumeRequest(BaseModel): class ResumeRequest(BaseModel):
search_space_id: int search_space_id: int
decisions: list[ResumeDecision] decisions: list[ResumeDecision]
# Mirrors ``NewChatRequest.disabled_tools`` so the resumed run sees the
# same tool surface as the originating turn.
disabled_tools: list[str] | None = None
filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud"
client_platform: Literal["web", "desktop"] = "web" client_platform: Literal["web", "desktop"] = "web"
local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None

View file

@ -28,7 +28,9 @@ from sqlalchemy import func
from sqlalchemy.future import select from sqlalchemy.future import select
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from app.agents.multi_agent_chat.integration import create_multi_agent_chat from app.agents.multi_agent_with_deepagents import (
create_surfsense_deep_agent as create_registry_deep_agent,
)
from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent
from app.agents.new_chat.checkpointer import get_checkpointer from app.agents.new_chat.checkpointer import get_checkpointer
from app.agents.new_chat.feature_flags import get_flags from app.agents.new_chat.feature_flags import get_flags
@ -44,6 +46,7 @@ from app.agents.new_chat.memory_extraction import (
extract_and_save_memory, extract_and_save_memory,
extract_and_save_team_memory, extract_and_save_team_memory,
) )
from app.agents.new_chat.middleware.busy_mutex import release_lock as _release_busy_lock
from app.agents.new_chat.middleware.kb_persistence import ( from app.agents.new_chat.middleware.kb_persistence import (
commit_staged_filesystem_state, commit_staged_filesystem_state,
) )
@ -2087,27 +2090,28 @@ async def stream_new_chat(
visibility = thread_visibility or ChatVisibility.PRIVATE visibility = thread_visibility or ChatVisibility.PRIVATE
from app.config import config as _app_config from app.config import config as _app_config
use_multi_agent = bool(_app_config.MULTI_AGENT_CHAT_ENABLED and not disabled_tools) use_multi_agent = bool(_app_config.MULTI_AGENT_CHAT_ENABLED)
if _app_config.MULTI_AGENT_CHAT_ENABLED and disabled_tools:
logger.info(
"MULTI_AGENT_CHAT_ENABLED is on, but falling back to new_chat because disabled_tools are requested."
)
_t0 = time.perf_counter() _t0 = time.perf_counter()
if use_multi_agent: if use_multi_agent:
agent = await create_multi_agent_chat( # TODO: Propagate ``disabled_tools`` into registry subagents. Today only the main
# agent honors UI disables; ``task`` delegates still get full specialist tool sets.
# Deliverables (and similar) are user-disableable but implemented on subagents, so
# disabling them in the UI does not fully apply until subagents filter too.
agent = await create_registry_deep_agent(
llm=llm, llm=llm,
db_session=session,
search_space_id=search_space_id, search_space_id=search_space_id,
user_id=str(user_id), db_session=session,
checkpointer=checkpointer,
thread_id=str(chat_id),
firecrawl_api_key=firecrawl_api_key,
connector_service=connector_service, connector_service=connector_service,
checkpointer=checkpointer,
user_id=user_id,
thread_id=chat_id,
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility, thread_visibility=visibility,
filesystem_selection=filesystem_selection, filesystem_selection=filesystem_selection,
mentioned_document_ids=mentioned_document_ids, mentioned_document_ids=mentioned_document_ids,
citations_enabled=agent_config.citations_enabled, disabled_tools=disabled_tools,
) )
else: else:
agent = await create_surfsense_deep_agent( agent = await create_surfsense_deep_agent(
@ -2691,6 +2695,15 @@ async def stream_new_chat(
chat_id, stream_result.sandbox_files chat_id, stream_result.sandbox_files
) )
# Release the busy lock here too: ``aafter_agent`` does not fire if the
# graph paused on ``interrupt()`` or the stream bailed out early.
with contextlib.suppress(Exception):
if _release_busy_lock(str(chat_id)):
_perf_log.info(
"[stream_new_chat] released stale busy lock (chat_id=%s)",
chat_id,
)
# Break circular refs held by the agent graph, tools, and LLM # Break circular refs held by the agent graph, tools, and LLM
# wrappers so the GC can reclaim them in a single pass. # wrappers so the GC can reclaim them in a single pass.
agent = llm = connector_service = None agent = llm = connector_service = None
@ -2717,6 +2730,7 @@ async def stream_resume_chat(
thread_visibility: ChatVisibility | None = None, thread_visibility: ChatVisibility | None = None,
filesystem_selection: FilesystemSelection | None = None, filesystem_selection: FilesystemSelection | None = None,
request_id: str | None = None, request_id: str | None = None,
disabled_tools: list[str] | None = None,
) -> AsyncGenerator[str, None]: ) -> AsyncGenerator[str, None]:
streaming_service = VercelStreamingService() streaming_service = VercelStreamingService()
stream_result = StreamResult() stream_result = StreamResult()
@ -2842,18 +2856,19 @@ async def stream_resume_chat(
_t0 = time.perf_counter() _t0 = time.perf_counter()
if _app_config.MULTI_AGENT_CHAT_ENABLED: if _app_config.MULTI_AGENT_CHAT_ENABLED:
agent = await create_multi_agent_chat( agent = await create_registry_deep_agent(
llm=llm, llm=llm,
db_session=session,
search_space_id=search_space_id, search_space_id=search_space_id,
user_id=str(user_id), db_session=session,
checkpointer=checkpointer,
thread_id=str(chat_id),
firecrawl_api_key=firecrawl_api_key,
connector_service=connector_service, connector_service=connector_service,
checkpointer=checkpointer,
user_id=user_id,
thread_id=chat_id,
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility, thread_visibility=visibility,
filesystem_selection=filesystem_selection, filesystem_selection=filesystem_selection,
citations_enabled=agent_config.citations_enabled, disabled_tools=disabled_tools,
) )
else: else:
agent = await create_surfsense_deep_agent( agent = await create_surfsense_deep_agent(
@ -2868,6 +2883,7 @@ async def stream_resume_chat(
firecrawl_api_key=firecrawl_api_key, firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility, thread_visibility=visibility,
filesystem_selection=filesystem_selection, filesystem_selection=filesystem_selection,
disabled_tools=disabled_tools,
) )
_perf_log.info( _perf_log.info(
"[stream_resume] Agent created in %.3fs", time.perf_counter() - _t0 "[stream_resume] Agent created in %.3fs", time.perf_counter() - _t0
@ -2890,6 +2906,9 @@ async def stream_resume_chat(
"thread_id": str(chat_id), "thread_id": str(chat_id),
"request_id": request_id or "unknown", "request_id": request_id or "unknown",
"turn_id": stream_result.turn_id, "turn_id": stream_result.turn_id,
# Side-channel consumed by ``SurfSenseCheckpointedSubAgentMiddleware``
# to forward the resume into a subagent's pending ``interrupt()``.
"surfsense_resume_value": {"decisions": decisions},
}, },
# See ``stream_new_chat`` above for rationale: effectively # See ``stream_new_chat`` above for rationale: effectively
# uncapped to mirror the agent default and OpenCode's # uncapped to mirror the agent default and OpenCode's
@ -3065,6 +3084,15 @@ async def stream_resume_chat(
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await session.close() await session.close()
# Release the busy lock left held by the originally-interrupted turn,
# and any re-interrupt or early bailout from this resume.
with contextlib.suppress(Exception):
if _release_busy_lock(str(chat_id)):
_perf_log.info(
"[stream_resume] released stale busy lock (chat_id=%s)",
chat_id,
)
agent = llm = connector_service = None agent = llm = connector_service = None
stream_result = None stream_result = None
session = None session = None