diff --git a/surfsense_backend/app/agents/new_chat/middleware/busy_mutex.py b/surfsense_backend/app/agents/new_chat/middleware/busy_mutex.py index c57d85004..4b5ad546d 100644 --- a/surfsense_backend/app/agents/new_chat/middleware/busy_mutex.py +++ b/surfsense_backend/app/agents/new_chat/middleware/busy_mutex.py @@ -85,6 +85,23 @@ class _ThreadLockManager: if event is not None: event.clear() + def release(self, thread_id: str) -> bool: + """Force-release the per-thread lock; safety-net for turns that end before ``__end__``. + + ``BusyMutexMiddleware.aafter_agent`` only releases on graph completion, so + an ``interrupt()`` pause or an early streaming bail-out would otherwise + leak the lock and block the next request with :class:`BusyError`. Returns + ``True`` when a held lock was released, ``False`` otherwise. + """ + lock = self._locks.get(thread_id) + if lock is None or not lock.locked(): + return False + try: + lock.release() + except RuntimeError: + return False + return True + # Module-level singleton — process-local but reused across all agent # instances built in this process. Subagents created in nested @@ -107,6 +124,11 @@ def reset_cancel(thread_id: str) -> None: manager.reset(thread_id) +def release_lock(thread_id: str) -> bool: + """Force-release the per-thread busy lock; safe to call when not held.""" + return manager.release(thread_id) + + class BusyMutexMiddleware(AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]): """Block concurrent prompts on the same thread. @@ -231,6 +253,7 @@ __all__ = [ "BusyMutexMiddleware", "get_cancel_event", "manager", + "release_lock", "request_cancel", "reset_cancel", ] diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index 26c72bd45..c95553fce 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -2046,6 +2046,7 @@ async def resume_chat( thread_visibility=thread.visibility, filesystem_selection=filesystem_selection, request_id=getattr(http_request.state, "request_id", "unknown"), + disabled_tools=request.disabled_tools, ), media_type="text/event-stream", headers={ diff --git a/surfsense_backend/app/schemas/new_chat.py b/surfsense_backend/app/schemas/new_chat.py index c7284e901..cfb4b8b37 100644 --- a/surfsense_backend/app/schemas/new_chat.py +++ b/surfsense_backend/app/schemas/new_chat.py @@ -330,6 +330,9 @@ class ResumeDecision(BaseModel): class ResumeRequest(BaseModel): search_space_id: int decisions: list[ResumeDecision] + # Mirrors ``NewChatRequest.disabled_tools`` so the resumed run sees the + # same tool surface as the originating turn. + disabled_tools: list[str] | None = None filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" client_platform: Literal["web", "desktop"] = "web" local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 45612bf56..e22aae8c4 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -28,7 +28,9 @@ from sqlalchemy import func from sqlalchemy.future import select from sqlalchemy.orm import selectinload -from app.agents.multi_agent_chat.integration import create_multi_agent_chat +from app.agents.multi_agent_with_deepagents import ( + create_surfsense_deep_agent as create_registry_deep_agent, +) from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent from app.agents.new_chat.checkpointer import get_checkpointer from app.agents.new_chat.feature_flags import get_flags @@ -44,6 +46,7 @@ from app.agents.new_chat.memory_extraction import ( extract_and_save_memory, extract_and_save_team_memory, ) +from app.agents.new_chat.middleware.busy_mutex import release_lock as _release_busy_lock from app.agents.new_chat.middleware.kb_persistence import ( commit_staged_filesystem_state, ) @@ -2087,27 +2090,28 @@ async def stream_new_chat( visibility = thread_visibility or ChatVisibility.PRIVATE from app.config import config as _app_config - use_multi_agent = bool(_app_config.MULTI_AGENT_CHAT_ENABLED and not disabled_tools) - if _app_config.MULTI_AGENT_CHAT_ENABLED and disabled_tools: - logger.info( - "MULTI_AGENT_CHAT_ENABLED is on, but falling back to new_chat because disabled_tools are requested." - ) + use_multi_agent = bool(_app_config.MULTI_AGENT_CHAT_ENABLED) _t0 = time.perf_counter() if use_multi_agent: - agent = await create_multi_agent_chat( + # TODO: Propagate ``disabled_tools`` into registry subagents. Today only the main + # agent honors UI disables; ``task`` delegates still get full specialist tool sets. + # Deliverables (and similar) are user-disableable but implemented on subagents, so + # disabling them in the UI does not fully apply until subagents filter too. + agent = await create_registry_deep_agent( llm=llm, - db_session=session, search_space_id=search_space_id, - user_id=str(user_id), - checkpointer=checkpointer, - thread_id=str(chat_id), - firecrawl_api_key=firecrawl_api_key, + db_session=session, connector_service=connector_service, + checkpointer=checkpointer, + user_id=user_id, + thread_id=chat_id, + agent_config=agent_config, + firecrawl_api_key=firecrawl_api_key, thread_visibility=visibility, filesystem_selection=filesystem_selection, mentioned_document_ids=mentioned_document_ids, - citations_enabled=agent_config.citations_enabled, + disabled_tools=disabled_tools, ) else: agent = await create_surfsense_deep_agent( @@ -2691,6 +2695,15 @@ async def stream_new_chat( chat_id, stream_result.sandbox_files ) + # Release the busy lock here too: ``aafter_agent`` does not fire if the + # graph paused on ``interrupt()`` or the stream bailed out early. + with contextlib.suppress(Exception): + if _release_busy_lock(str(chat_id)): + _perf_log.info( + "[stream_new_chat] released stale busy lock (chat_id=%s)", + chat_id, + ) + # Break circular refs held by the agent graph, tools, and LLM # wrappers so the GC can reclaim them in a single pass. agent = llm = connector_service = None @@ -2717,6 +2730,7 @@ async def stream_resume_chat( thread_visibility: ChatVisibility | None = None, filesystem_selection: FilesystemSelection | None = None, request_id: str | None = None, + disabled_tools: list[str] | None = None, ) -> AsyncGenerator[str, None]: streaming_service = VercelStreamingService() stream_result = StreamResult() @@ -2842,18 +2856,19 @@ async def stream_resume_chat( _t0 = time.perf_counter() if _app_config.MULTI_AGENT_CHAT_ENABLED: - agent = await create_multi_agent_chat( + agent = await create_registry_deep_agent( llm=llm, - db_session=session, search_space_id=search_space_id, - user_id=str(user_id), - checkpointer=checkpointer, - thread_id=str(chat_id), - firecrawl_api_key=firecrawl_api_key, + db_session=session, connector_service=connector_service, + checkpointer=checkpointer, + user_id=user_id, + thread_id=chat_id, + agent_config=agent_config, + firecrawl_api_key=firecrawl_api_key, thread_visibility=visibility, filesystem_selection=filesystem_selection, - citations_enabled=agent_config.citations_enabled, + disabled_tools=disabled_tools, ) else: agent = await create_surfsense_deep_agent( @@ -2868,6 +2883,7 @@ async def stream_resume_chat( firecrawl_api_key=firecrawl_api_key, thread_visibility=visibility, filesystem_selection=filesystem_selection, + disabled_tools=disabled_tools, ) _perf_log.info( "[stream_resume] Agent created in %.3fs", time.perf_counter() - _t0 @@ -2890,6 +2906,9 @@ async def stream_resume_chat( "thread_id": str(chat_id), "request_id": request_id or "unknown", "turn_id": stream_result.turn_id, + # Side-channel consumed by ``SurfSenseCheckpointedSubAgentMiddleware`` + # to forward the resume into a subagent's pending ``interrupt()``. + "surfsense_resume_value": {"decisions": decisions}, }, # See ``stream_new_chat`` above for rationale: effectively # uncapped to mirror the agent default and OpenCode's @@ -3065,6 +3084,15 @@ async def stream_resume_chat( with contextlib.suppress(Exception): await session.close() + # Release the busy lock left held by the originally-interrupted turn, + # and any re-interrupt or early bailout from this resume. + with contextlib.suppress(Exception): + if _release_busy_lock(str(chat_id)): + _perf_log.info( + "[stream_resume] released stale busy lock (chat_id=%s)", + chat_id, + ) + agent = llm = connector_service = None stream_result = None session = None