From 7b9a218d62e1f804a04d2ec33cbcad7d2cad9d76 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 28 Apr 2026 15:35:14 +0200 Subject: [PATCH] feat(chat): add multi-agent mode routing scaffold and telemetry. --- docs/multi-agent-architecture-plan.md | 211 ++++++++++++++++++ docs/multi-agent-phase1-runbook.md | 70 ++++++ .../app/agents/multi_agent_v1/__init__.py | 17 ++ .../app/agents/multi_agent_v1/contracts.py | 36 +++ .../app/agents/multi_agent_v1/entrypoint.py | 24 ++ .../app/agents/new_chat/architecture_mode.py | 40 ++++ .../app/agents/new_chat/telemetry.py | 43 ++++ surfsense_backend/app/config/__init__.py | 2 + .../app/routes/anonymous_chat_routes.py | 55 ++++- .../app/routes/new_chat_routes.py | 104 +++++---- surfsense_backend/app/schemas/new_chat.py | 8 + .../app/tasks/chat/stream_dispatch.py | 47 ++++ .../app/tasks/chat/stream_new_chat.py | 143 ++++++++++-- 13 files changed, 742 insertions(+), 58 deletions(-) create mode 100644 docs/multi-agent-architecture-plan.md create mode 100644 docs/multi-agent-phase1-runbook.md create mode 100644 surfsense_backend/app/agents/multi_agent_v1/__init__.py create mode 100644 surfsense_backend/app/agents/multi_agent_v1/contracts.py create mode 100644 surfsense_backend/app/agents/multi_agent_v1/entrypoint.py create mode 100644 surfsense_backend/app/agents/new_chat/architecture_mode.py create mode 100644 surfsense_backend/app/agents/new_chat/telemetry.py create mode 100644 surfsense_backend/app/tasks/chat/stream_dispatch.py diff --git a/docs/multi-agent-architecture-plan.md b/docs/multi-agent-architecture-plan.md new file mode 100644 index 000000000..45b5caf61 --- /dev/null +++ b/docs/multi-agent-architecture-plan.md @@ -0,0 +1,211 @@ +# Multi-Agent Architecture Plan (Phased) + +This document defines a phased migration from the current `single_agent` flow to a multi-agent architecture while keeping rollback simple and immediate. + +## Naming + +- `single_agent`: current architecture (default at start) +- `shadow_multi_agent_v1`: run multi-agent path in background, return `single_agent` output +- `multi_agent_v1`: multi-agent architecture is the user-facing path + +--- + +## Phase 1 - Parallel Safety Layer + +**Goal:** Add safe routing controls with zero behavior change. + +### Todo + +- [ ] Add mode selector with values: `single_agent`, `shadow_multi_agent_v1`, `multi_agent_v1` +- [ ] Add global kill switch: force all traffic to `single_agent` +- [ ] Add mode resolution priority: + 1. kill switch + 2. request override + 3. system default +- [ ] Keep `single_agent` as default mode +- [ ] Keep frontend stream/output contract unchanged +- [ ] Add telemetry tags: + - `architecture_mode` + - `worker_count` + - `retry_count` + - `latency_ms` + - `token_total` +- [ ] Write short rollback runbook + +### Exit Criteria + +- [ ] Can switch modes in staging +- [ ] Kill switch verified +- [ ] No frontend contract regressions + +--- + +## Phase 2 - Orchestrator Core and Contracts + +**Goal:** Build multi-agent control-plane only (planner/router/merge), with strict schemas. + +### Todo + +- [ ] Implement orchestrator responsibilities: + - intent detection + - routing + - delegation + - fan-in merge +- [ ] Add budget controls: + - max workers per turn + - max parallel workers + - max turn duration +- [ ] Add loop/stall guard: + - repeated task signature detection + - no-progress threshold +- [ ] Define `WorkerTask` schema: + - `domain`, `goal`, `constraints`, `budget` +- [ ] Define `WorkerResult` schema: + - `status`, `summary`, `evidence[]`, `artifacts[]`, `needs_human`, `error_class` +- [ ] Add schema validation on send/receive boundaries +- [ ] Add controlled fallback on invalid worker results + +### Exit Criteria + +- [ ] Orchestrator works end-to-end with mock workers +- [ ] Invalid worker payloads are blocked cleanly + +--- + +## Phase 3 - Pilot Workers (Gmail and Calendar) + +**Goal:** Validate multi-agent architecture with two real domains only. + +### Todo + +- [ ] Create Gmail worker + - [ ] domain-scoped prompt + - [ ] domain-only tool loadout + - [ ] local query rewrite + - [ ] normalized `WorkerResult` +- [ ] Create Calendar worker + - [ ] domain-scoped prompt + - [ ] domain-only tool loadout + - [ ] local query rewrite/time normalization + - [ ] normalized `WorkerResult` +- [ ] Enforce no cross-domain tool access +- [ ] Preserve HITL for write actions +- [ ] Add retry policy by `error_class` +- [ ] Add tests for routing, loadout isolation, HITL behavior + +### Exit Criteria + +- [ ] Gmail and Calendar tasks complete in `multi_agent_v1` +- [ ] No cross-domain tool leakage +- [ ] HITL still enforced for sensitive writes + +--- + +## Phase 4 - Knowledge Base and Evidence Normalization + +**Goal:** Isolate KB retrieval and make evidence citation-ready. + +### Todo + +- [ ] Move KB retrieval behind dedicated worker/stage +- [ ] Reuse current KB retrieval logic, but return compact structured evidence only +- [ ] Define `EvidenceItem` fields: + - `claim`, `source_type`, `source_ref`, `confidence`, `snippet` +- [ ] Add top-k and output-size controls +- [ ] Add quote-first extraction mode for long contexts +- [ ] Add tests for traceability and bounded payloads + +### Exit Criteria + +- [ ] Orchestrator consumes compact evidence (no raw KB dumps) +- [ ] Citation refs remain valid and traceable + +--- + +## Phase 5 - Verifier and Citation Gate + +**Goal:** Prevent unsupported factual claims in final responses. + +### Todo + +- [ ] Add verifier stage before final synthesis +- [ ] Enforce claim-to-evidence checks +- [ ] Add conflict handling policy: + - consistent evidence -> accept + - conflicting evidence -> label uncertainty or retry +- [ ] Add unsupported-claim policy: + - remove claim or mark uncertain +- [ ] Add verifier telemetry: + - supported claims + - unsupported claims + - conflicts +- [ ] Support strict gate and warning modes + +### Exit Criteria + +- [ ] Unsupported factual claims are blocked or clearly annotated +- [ ] Citation precision improves on evaluation set + +--- + +## Phase 6 - Shadow Evaluation and Canary + +**Goal:** Ship based on data, not intuition. + +### Todo + +- [ ] Enable `shadow_multi_agent_v1` for selected traffic +- [ ] Compare metrics vs `single_agent`: + - success rate + - citation precision + - tool-selection accuracy + - p95 latency + - tokens/request + - cost per successful task +- [ ] Define rollout gates and auto-stop thresholds +- [ ] Start canary rollout for `multi_agent_v1` +- [ ] Ramp traffic only if quality and reliability gates pass +- [ ] Keep kill switch live for entire rollout +- [ ] Record go/no-go decision with evidence + +### Exit Criteria + +- [ ] Clear decision based on measured outcomes +- [ ] Rollback tested successfully during canary + +--- + +## Phase 7 - Domain Expansion and Heavy Tool Reassignment + +**Goal:** Scale multi-agent architecture safely across more domains. + +### Todo + +- [ ] Add domains incrementally (`notion`, `slack`, `jira`, ...) +- [ ] For each new domain enforce: + - scoped tool loadout + - local query rewrite + - contract validation + - eval plus canary gate +- [ ] Move heavy tools to specialist workers: + - podcast generation + - artifact/report generation + - video presentation +- [ ] Keep orchestrator toolbelt minimal and control-plane focused +- [ ] Regularly prune prompts and tool descriptions + +### Exit Criteria + +- [ ] New domains onboard without reliability regressions +- [ ] Orchestrator remains lean and stable +- [ ] Cost per successful task stays controlled + +--- + +## Always-On Checklist + +- [ ] Keep `single_agent` path healthy until rollout completion +- [ ] Keep one-click rollback available at all times +- [ ] Update observability dashboards every phase +- [ ] Track failure taxonomy and review weekly +- [ ] Validate prompt/tool changes via eval before broad rollout diff --git a/docs/multi-agent-phase1-runbook.md b/docs/multi-agent-phase1-runbook.md new file mode 100644 index 000000000..c3d871e52 --- /dev/null +++ b/docs/multi-agent-phase1-runbook.md @@ -0,0 +1,70 @@ +# Multi-Agent Architecture Phase 1 Runbook + +## Scope + +This runbook covers mode selection and emergency rollback for: + +- `single_agent` +- `shadow_multi_agent_v1` +- `multi_agent_v1` + +Phase 1 keeps execution behavior on the current single-agent path while mode wiring and telemetry are introduced. + +## Resolution Priority + +Mode resolution follows this fixed order: + +1. Global kill switch (`FORCE_SINGLE_AGENT`) +2. Request override (`architecture_mode` in chat payload) +3. System default (`AGENT_ARCHITECTURE_MODE`) +4. Safe fallback (`single_agent`) + +## Configuration + +Set environment values in backend runtime: + +- `AGENT_ARCHITECTURE_MODE=single_agent` (default) +- `FORCE_SINGLE_AGENT=FALSE` (default) + +Changes require backend restart because config is loaded at process startup. + +## Mode Switching + +### System default switch + +1. Set `AGENT_ARCHITECTURE_MODE` to desired value. +2. Keep `FORCE_SINGLE_AGENT=FALSE`. +3. Restart backend. +4. Verify logs include `[architecture_telemetry]` with expected `architecture_mode`. + +### Per-request override + +Send optional `architecture_mode` in chat request payload: + +- `"single_agent"` +- `"shadow_multi_agent_v1"` +- `"multi_agent_v1"` + +If `FORCE_SINGLE_AGENT=TRUE`, request override is ignored by design. + +## Emergency Rollback + +Use the kill switch: + +1. Set `FORCE_SINGLE_AGENT=TRUE`. +2. Restart backend. +3. Verify new requests log `architecture_mode=single_agent`. +4. Keep this state until incident is resolved. + +## Verification Checklist + +- Mode resolves according to the priority order. +- Kill switch overrides all request/default values. +- Streaming response schema remains unchanged. +- Architecture telemetry is emitted with: + - `architecture_mode` + - `orchestrator_used` + - `worker_count` + - `retry_count` + - `latency_ms` + - `token_total` diff --git a/surfsense_backend/app/agents/multi_agent_v1/__init__.py b/surfsense_backend/app/agents/multi_agent_v1/__init__.py new file mode 100644 index 000000000..4fb75203a --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_v1/__init__.py @@ -0,0 +1,17 @@ +"""Multi-agent v1 architecture package.""" + +from app.agents.multi_agent_v1.contracts import ( + GroundingEvidence, + SubagentResult, + SubagentTaskPlan, + WorkerBudget, +) +from app.agents.multi_agent_v1.entrypoint import MultiAgentEntrypoint + +__all__ = [ + "GroundingEvidence", + "MultiAgentEntrypoint", + "SubagentResult", + "SubagentTaskPlan", + "WorkerBudget", +] diff --git a/surfsense_backend/app/agents/multi_agent_v1/contracts.py b/surfsense_backend/app/agents/multi_agent_v1/contracts.py new file mode 100644 index 000000000..c6eef1a06 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_v1/contracts.py @@ -0,0 +1,36 @@ +"""Contracts for multi_agent_v1 orchestrator and subagent communication.""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +class WorkerBudget(BaseModel): + max_steps: int = Field(default=1, ge=1) + max_duration_ms: int = Field(default=15_000, ge=100) + + +class SubagentTaskPlan(BaseModel): + domain: str = Field(..., min_length=1) + goal: str = Field(..., min_length=1) + constraints: list[str] = Field(default_factory=list) + budget: WorkerBudget = Field(default_factory=WorkerBudget) + + +class GroundingEvidence(BaseModel): + claim: str = Field(..., min_length=1) + source_type: str = Field(..., min_length=1) + source_ref: str = Field(..., min_length=1) + confidence: float = Field(default=0.0, ge=0.0, le=1.0) + snippet: str = "" + + +class SubagentResult(BaseModel): + status: Literal["success", "partial", "error"] + summary: str = "" + evidence: list[GroundingEvidence] = Field(default_factory=list) + artifacts: list[str] = Field(default_factory=list) + needs_human: bool = False + error_class: str | None = None diff --git a/surfsense_backend/app/agents/multi_agent_v1/entrypoint.py b/surfsense_backend/app/agents/multi_agent_v1/entrypoint.py new file mode 100644 index 000000000..417643633 --- /dev/null +++ b/surfsense_backend/app/agents/multi_agent_v1/entrypoint.py @@ -0,0 +1,24 @@ +"""Multi-agent v1 entrypoint scaffold with safe fallback behavior.""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator, Callable +from typing import Any + + +class MultiAgentEntrypoint: + def stream_new_chat( + self, + *, + fallback_streamer: Callable[..., AsyncGenerator[str, None]], + fallback_kwargs: dict[str, Any], + ) -> AsyncGenerator[str, None]: + return fallback_streamer(**fallback_kwargs) + + def stream_resume_chat( + self, + *, + fallback_streamer: Callable[..., AsyncGenerator[str, None]], + fallback_kwargs: dict[str, Any], + ) -> AsyncGenerator[str, None]: + return fallback_streamer(**fallback_kwargs) diff --git a/surfsense_backend/app/agents/new_chat/architecture_mode.py b/surfsense_backend/app/agents/new_chat/architecture_mode.py new file mode 100644 index 000000000..db74bb308 --- /dev/null +++ b/surfsense_backend/app/agents/new_chat/architecture_mode.py @@ -0,0 +1,40 @@ +"""Architecture mode contracts and resolution helpers for chat sessions.""" + +from __future__ import annotations + +from enum import StrEnum + +from app.config import config + + +class ArchitectureMode(StrEnum): + SINGLE_AGENT = "single_agent" + SHADOW_MULTI_AGENT_V1 = "shadow_multi_agent_v1" + MULTI_AGENT_V1 = "multi_agent_v1" + + +def parse_architecture_mode(value: str | None) -> ArchitectureMode | None: + if not value: + return None + normalized = value.strip().lower() + if not normalized: + return None + try: + return ArchitectureMode(normalized) + except ValueError: + return None + + +def resolve_architecture_mode(request_override: str | None = None) -> ArchitectureMode: + if config.FORCE_SINGLE_AGENT: + return ArchitectureMode.SINGLE_AGENT + + override_mode = parse_architecture_mode(request_override) + if override_mode is not None: + return override_mode + + default_mode = parse_architecture_mode(config.AGENT_ARCHITECTURE_MODE) + if default_mode is not None: + return default_mode + + return ArchitectureMode.SINGLE_AGENT diff --git a/surfsense_backend/app/agents/new_chat/telemetry.py b/surfsense_backend/app/agents/new_chat/telemetry.py new file mode 100644 index 000000000..6e5ae408d --- /dev/null +++ b/surfsense_backend/app/agents/new_chat/telemetry.py @@ -0,0 +1,43 @@ +"""Architecture telemetry logging for chat execution modes.""" + +from __future__ import annotations + +import json +from typing import Any + +from app.utils.perf import get_perf_logger + +_perf_log = get_perf_logger() + + +def log_architecture_telemetry( + *, + phase: str, + architecture_mode: str, + orchestrator_used: bool, + worker_count: int, + retry_count: int, + latency_ms: float, + token_total: int, + request_id: str | None = None, + turn_id: str | None = None, + status: str = "ok", + source: str = "new_chat", + extra: dict[str, Any] | None = None, +) -> None: + payload: dict[str, Any] = { + "phase": phase, + "source": source, + "status": status, + "architecture_mode": architecture_mode, + "orchestrator_used": orchestrator_used, + "worker_count": worker_count, + "retry_count": retry_count, + "latency_ms": round(latency_ms, 2), + "token_total": token_total, + "request_id": request_id or "unknown", + "turn_id": turn_id or "unknown", + } + if extra: + payload.update(extra) + _perf_log.info("[architecture_telemetry] %s", json.dumps(payload, ensure_ascii=False)) diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index bd97d2bb1..eb4464b13 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -342,6 +342,8 @@ class Config: ENABLE_DESKTOP_LOCAL_FILESYSTEM = ( os.getenv("ENABLE_DESKTOP_LOCAL_FILESYSTEM", "FALSE").upper() == "TRUE" ) + AGENT_ARCHITECTURE_MODE = os.getenv("AGENT_ARCHITECTURE_MODE", "single_agent") + FORCE_SINGLE_AGENT = os.getenv("FORCE_SINGLE_AGENT", "FALSE").upper() == "TRUE" @classmethod def is_self_hosted(cls) -> bool: diff --git a/surfsense_backend/app/routes/anonymous_chat_routes.py b/surfsense_backend/app/routes/anonymous_chat_routes.py index f9d694e5a..1af7418d9 100644 --- a/surfsense_backend/app/routes/anonymous_chat_routes.py +++ b/surfsense_backend/app/routes/anonymous_chat_routes.py @@ -4,6 +4,7 @@ from __future__ import annotations import logging import secrets +import time import uuid from pathlib import PurePosixPath from typing import Any @@ -12,6 +13,11 @@ from fastapi import APIRouter, HTTPException, Request, Response, UploadFile, sta from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field +from app.agents.new_chat.architecture_mode import ( + ArchitectureMode, + resolve_architecture_mode, +) +from app.agents.new_chat.telemetry import log_architecture_telemetry from app.config import config from app.etl_pipeline.file_classifier import ( DIRECT_CONVERT_EXTENSIONS, @@ -84,6 +90,7 @@ class AnonChatRequest(BaseModel): messages: list[dict[str, Any]] = Field(..., min_length=1) disabled_tools: list[str] | None = None turnstile_token: str | None = None + architecture_mode: ArchitectureMode | None = None class AnonQuotaResponse(BaseModel): @@ -361,6 +368,22 @@ async def stream_anonymous_chat( accumulator = start_turn() streaming_service = VercelStreamingService() + architecture_mode = resolve_architecture_mode(body.architecture_mode) + started_at = time.perf_counter() + turn_id = f"anon:{session_id}:{request_id}" + log_architecture_telemetry( + phase="turn_start", + source="anon_chat", + status="started", + architecture_mode=architecture_mode.value, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=0.0, + token_total=0, + request_id=request_id, + turn_id=turn_id, + ) try: async with shielded_async_session() as session: @@ -400,7 +423,10 @@ async def stream_anonymous_chat( } langgraph_config = { - "configurable": {"thread_id": anon_thread_id}, + "configurable": { + "thread_id": anon_thread_id, + "architecture_mode": architecture_mode.value, + }, "recursion_limit": 40, } @@ -468,6 +494,19 @@ async def stream_anonymous_chat( "total_tokens": accumulator.grand_total, }, ) + log_architecture_telemetry( + phase="turn_end", + source="anon_chat", + status="completed", + architecture_mode=architecture_mode.value, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=(time.perf_counter() - started_at) * 1000.0, + token_total=accumulator.grand_total, + request_id=request_id, + turn_id=turn_id, + ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -475,6 +514,20 @@ async def stream_anonymous_chat( except Exception as e: logger.exception("Anonymous chat stream error") + log_architecture_telemetry( + phase="turn_end", + source="anon_chat", + status="error", + architecture_mode=architecture_mode.value, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=(time.perf_counter() - started_at) * 1000.0, + token_total=accumulator.grand_total, + request_id=request_id, + turn_id=turn_id, + extra={"error_type": type(e).__name__}, + ) await TokenQuotaService.anon_release(session_key, ip_key, request_id) yield streaming_service.format_error(f"Error during chat: {e!s}") yield streaming_service.format_done() diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index cbc660222..896a4bd31 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -22,6 +22,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy.orm import selectinload +from app.agents.new_chat.architecture_mode import resolve_architecture_mode from app.agents.new_chat.filesystem_selection import ( ClientPlatform, FilesystemMode, @@ -61,7 +62,10 @@ from app.schemas.new_chat import ( TokenUsageSummary, ) from app.services.token_tracking_service import record_token_usage -from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat +from app.tasks.chat.stream_dispatch import ( + dispatch_new_chat_stream, + dispatch_resume_chat_stream, +) from app.users import current_active_user from app.utils.rbac import check_permission from app.utils.user_message_multimodal import ( @@ -1244,23 +1248,28 @@ async def handle_new_chat( image_urls = ( [p.as_data_url() for p in request.user_images] if request.user_images else None ) + architecture_mode = resolve_architecture_mode(request.architecture_mode) return StreamingResponse( - stream_new_chat( - user_query=request.user_query, - search_space_id=request.search_space_id, - chat_id=request.chat_id, - user_id=str(user.id), - llm_config_id=llm_config_id, - mentioned_document_ids=request.mentioned_document_ids, - mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids, - needs_history_bootstrap=thread.needs_history_bootstrap, - thread_visibility=thread.visibility, - current_user_display_name=user.display_name or "A team member", - disabled_tools=request.disabled_tools, - filesystem_selection=filesystem_selection, - request_id=getattr(http_request.state, "request_id", "unknown"), - user_image_data_urls=image_urls, + dispatch_new_chat_stream( + architecture_mode=architecture_mode.value, + stream_kwargs={ + "user_query": request.user_query, + "search_space_id": request.search_space_id, + "chat_id": request.chat_id, + "user_id": str(user.id), + "llm_config_id": llm_config_id, + "mentioned_document_ids": request.mentioned_document_ids, + "mentioned_surfsense_doc_ids": request.mentioned_surfsense_doc_ids, + "needs_history_bootstrap": thread.needs_history_bootstrap, + "thread_visibility": thread.visibility, + "current_user_display_name": user.display_name or "A team member", + "disabled_tools": request.disabled_tools, + "filesystem_selection": filesystem_selection, + "request_id": getattr(http_request.state, "request_id", "unknown"), + "user_image_data_urls": image_urls, + "architecture_mode": architecture_mode.value, + }, ), media_type="text/event-stream", headers={ @@ -1458,6 +1467,7 @@ async def regenerate_response( if request.user_images is not None: regenerate_image_urls = [p.as_data_url() for p in request.user_images] + architecture_mode = resolve_architecture_mode(request.architecture_mode) if user_query_to_use is None: raise HTTPException( @@ -1506,23 +1516,28 @@ async def regenerate_response( async def stream_with_cleanup(): streaming_completed = False try: - async for chunk in stream_new_chat( - user_query=str(user_query_to_use), - search_space_id=request.search_space_id, - chat_id=thread_id, - user_id=str(user.id), - llm_config_id=llm_config_id, - mentioned_document_ids=request.mentioned_document_ids, - mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids, - checkpoint_id=target_checkpoint_id, - needs_history_bootstrap=thread.needs_history_bootstrap, - thread_visibility=thread.visibility, - current_user_display_name=user.display_name or "A team member", - disabled_tools=request.disabled_tools, - filesystem_selection=filesystem_selection, - request_id=getattr(http_request.state, "request_id", "unknown"), - user_image_data_urls=regenerate_image_urls or None, - ): + stream = dispatch_new_chat_stream( + architecture_mode=architecture_mode.value, + stream_kwargs={ + "user_query": str(user_query_to_use), + "search_space_id": request.search_space_id, + "chat_id": thread_id, + "user_id": str(user.id), + "llm_config_id": llm_config_id, + "mentioned_document_ids": request.mentioned_document_ids, + "mentioned_surfsense_doc_ids": request.mentioned_surfsense_doc_ids, + "checkpoint_id": target_checkpoint_id, + "needs_history_bootstrap": thread.needs_history_bootstrap, + "thread_visibility": thread.visibility, + "current_user_display_name": user.display_name or "A team member", + "disabled_tools": request.disabled_tools, + "filesystem_selection": filesystem_selection, + "request_id": getattr(http_request.state, "request_id", "unknown"), + "user_image_data_urls": regenerate_image_urls or None, + "architecture_mode": architecture_mode.value, + }, + ) + async for chunk in stream: yield chunk streaming_completed = True finally: @@ -1628,6 +1643,7 @@ async def resume_chat( ) decisions = [d.model_dump() for d in request.decisions] + architecture_mode = resolve_architecture_mode(request.architecture_mode) # Release the read-transaction so we don't hold ACCESS SHARE locks # on searchspaces/documents for the entire duration of the stream. @@ -1635,15 +1651,19 @@ async def resume_chat( await session.close() return StreamingResponse( - stream_resume_chat( - chat_id=thread_id, - search_space_id=request.search_space_id, - decisions=decisions, - user_id=str(user.id), - llm_config_id=llm_config_id, - thread_visibility=thread.visibility, - filesystem_selection=filesystem_selection, - request_id=getattr(http_request.state, "request_id", "unknown"), + dispatch_resume_chat_stream( + architecture_mode=architecture_mode.value, + stream_kwargs={ + "chat_id": thread_id, + "search_space_id": request.search_space_id, + "decisions": decisions, + "user_id": str(user.id), + "llm_config_id": llm_config_id, + "thread_visibility": thread.visibility, + "filesystem_selection": filesystem_selection, + "request_id": getattr(http_request.state, "request_id", "unknown"), + "architecture_mode": architecture_mode.value, + }, ), media_type="text/event-stream", headers={ diff --git a/surfsense_backend/app/schemas/new_chat.py b/surfsense_backend/app/schemas/new_chat.py index 477fdf2ca..f446eb0b5 100644 --- a/surfsense_backend/app/schemas/new_chat.py +++ b/surfsense_backend/app/schemas/new_chat.py @@ -176,6 +176,11 @@ class LocalFilesystemMountPayload(BaseModel): MAX_NEW_CHAT_IMAGE_BYTES = 8 * 1024 * 1024 MAX_NEW_CHAT_IMAGES = 4 +ArchitectureModeLiteral = Literal[ + "single_agent", + "shadow_multi_agent_v1", + "multi_agent_v1", +] class NewChatUserImagePart(BaseModel): @@ -210,6 +215,7 @@ class NewChatRequest(BaseModel): disabled_tools: list[str] | None = ( None # Optional list of tool names the user has disabled from the UI ) + architecture_mode: ArchitectureModeLiteral | None = None filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" client_platform: Literal["web", "desktop"] = "web" local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None @@ -250,6 +256,7 @@ class RegenerateRequest(BaseModel): mentioned_document_ids: list[int] | None = None mentioned_surfsense_doc_ids: list[int] | None = None disabled_tools: list[str] | None = None + architecture_mode: ArchitectureModeLiteral | None = None filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" client_platform: Literal["web", "desktop"] = "web" local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None @@ -286,6 +293,7 @@ class ResumeDecision(BaseModel): class ResumeRequest(BaseModel): search_space_id: int decisions: list[ResumeDecision] + architecture_mode: ArchitectureModeLiteral | None = None filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" client_platform: Literal["web", "desktop"] = "web" local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None diff --git a/surfsense_backend/app/tasks/chat/stream_dispatch.py b/surfsense_backend/app/tasks/chat/stream_dispatch.py new file mode 100644 index 000000000..73d7fc076 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/stream_dispatch.py @@ -0,0 +1,47 @@ +"""Thin architecture dispatch seam for chat streaming entrypoints.""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator +from typing import Any + +from app.agents.multi_agent_v1.entrypoint import MultiAgentEntrypoint +from app.agents.new_chat.architecture_mode import ( + ArchitectureMode, + parse_architecture_mode, +) +from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat + + +def _resolve_mode(mode_value: str) -> ArchitectureMode: + return parse_architecture_mode(mode_value) or ArchitectureMode.SINGLE_AGENT + + +def dispatch_new_chat_stream( + *, + architecture_mode: str, + stream_kwargs: dict[str, Any], +) -> AsyncGenerator[str, None]: + mode = _resolve_mode(architecture_mode) + if mode == ArchitectureMode.SINGLE_AGENT: + return stream_new_chat(**stream_kwargs) + entrypoint = MultiAgentEntrypoint() + return entrypoint.stream_new_chat( + fallback_streamer=stream_new_chat, + fallback_kwargs=stream_kwargs, + ) + + +def dispatch_resume_chat_stream( + *, + architecture_mode: str, + stream_kwargs: dict[str, Any], +) -> AsyncGenerator[str, None]: + mode = _resolve_mode(architecture_mode) + if mode == ArchitectureMode.SINGLE_AGENT: + return stream_resume_chat(**stream_kwargs) + entrypoint = MultiAgentEntrypoint() + return entrypoint.stream_resume_chat( + fallback_streamer=stream_resume_chat, + fallback_kwargs=stream_kwargs, + ) diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 396c7574e..5edfcd658 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -42,6 +42,7 @@ from app.agents.new_chat.memory_extraction import ( extract_and_save_memory, extract_and_save_team_memory, ) +from app.agents.new_chat.telemetry import log_architecture_telemetry from app.db import ( ChatVisibility, NewChatMessage, @@ -149,6 +150,7 @@ class StreamResult: agent_called_update_memory: bool = False request_id: str | None = None turn_id: str = "" + architecture_mode: str = "single_agent" filesystem_mode: str = "cloud" client_platform: str = "web" intent_detected: str = "chat_only" @@ -182,9 +184,7 @@ def _tool_output_has_error(tool_output: Any) -> bool: if tool_output.get("error"): return True result = tool_output.get("result") - if isinstance(result, str) and result.strip().lower().startswith("error:"): - return True - return False + return isinstance(result, str) and result.strip().lower().startswith("error:") if isinstance(tool_output, str): return tool_output.strip().lower().startswith("error:") return False @@ -231,6 +231,7 @@ def _log_file_contract(stage: str, result: StreamResult, **extra: Any) -> None: "request_id": result.request_id or "unknown", "turn_id": result.turn_id or "unknown", "chat_id": result.turn_id.split(":", 1)[0] if ":" in result.turn_id else "unknown", + "architecture_mode": result.architecture_mode, "filesystem_mode": result.filesystem_mode, "client_platform": result.client_platform, "intent_detected": result.intent_detected, @@ -1308,18 +1309,17 @@ async def _stream_agent_events( result.commit_gate_passed, result.commit_gate_reason = ( _evaluate_file_contract_outcome(result) ) - if not result.commit_gate_passed: - if _contract_enforcement_active(result): - gate_notice = ( - "I could not complete the requested file write because no successful " - "write_file/edit_file operation was confirmed." - ) - gate_text_id = streaming_service.generate_text_id() - yield streaming_service.format_text_start(gate_text_id) - yield streaming_service.format_text_delta(gate_text_id, gate_notice) - yield streaming_service.format_text_end(gate_text_id) - yield streaming_service.format_terminal_info(gate_notice, "error") - accumulated_text = gate_notice + if not result.commit_gate_passed and _contract_enforcement_active(result): + gate_notice = ( + "I could not complete the requested file write because no successful " + "write_file/edit_file operation was confirmed." + ) + gate_text_id = streaming_service.generate_text_id() + yield streaming_service.format_text_start(gate_text_id) + yield streaming_service.format_text_delta(gate_text_id, gate_notice) + yield streaming_service.format_text_end(gate_text_id) + yield streaming_service.format_terminal_info(gate_notice, "error") + accumulated_text = gate_notice else: result.commit_gate_passed = True result.commit_gate_reason = "" @@ -1351,6 +1351,7 @@ async def stream_new_chat( filesystem_selection: FilesystemSelection | None = None, request_id: str | None = None, user_image_data_urls: list[str] | None = None, + architecture_mode: str = "single_agent", ) -> AsyncGenerator[str, None]: """ Stream chat responses from the new SurfSense deep agent. @@ -1384,8 +1385,22 @@ async def stream_new_chat( ) stream_result.request_id = request_id stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}" + stream_result.architecture_mode = architecture_mode stream_result.filesystem_mode = fs_mode stream_result.client_platform = fs_platform + log_architecture_telemetry( + phase="turn_start", + source="new_chat", + status="started", + architecture_mode=architecture_mode, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=0.0, + token_total=0, + request_id=request_id, + turn_id=stream_result.turn_id, + ) _log_file_contract("turn_start", stream_result) _perf_log.info( "[stream_new_chat] filesystem_mode=%s client_platform=%s", @@ -1638,6 +1653,7 @@ async def stream_new_chat( "search_space_id": search_space_id, "request_id": request_id or "unknown", "turn_id": stream_result.turn_id, + "architecture_mode": architecture_mode, } _perf_log.info( @@ -1669,6 +1685,7 @@ async def stream_new_chat( configurable = {"thread_id": str(chat_id)} configurable["request_id"] = request_id or "unknown" configurable["turn_id"] = stream_result.turn_id + configurable["architecture_mode"] = architecture_mode if checkpoint_id: configurable["checkpoint_id"] = checkpoint_id @@ -1884,6 +1901,19 @@ async def stream_new_chat( "call_details": accumulator.serialized_calls(), }, ) + log_architecture_telemetry( + phase="turn_end", + source="new_chat", + status="interrupted", + architecture_mode=stream_result.architecture_mode, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=(time.perf_counter() - _t_total) * 1000.0, + token_total=accumulator.grand_total, + request_id=request_id, + turn_id=stream_result.turn_id, + ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -1956,6 +1986,19 @@ async def stream_new_chat( "call_details": accumulator.serialized_calls(), }, ) + log_architecture_telemetry( + phase="turn_end", + source="new_chat", + status="completed", + architecture_mode=stream_result.architecture_mode, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=(time.perf_counter() - _t_total) * 1000.0, + token_total=accumulator.grand_total, + request_id=request_id, + turn_id=stream_result.turn_id, + ) # Fire background memory extraction if the agent didn't handle it. # Shared threads write to team memory; private threads write to user memory. @@ -2000,6 +2043,20 @@ async def stream_new_chat( print(f"[stream_new_chat] {error_message}") print(f"[stream_new_chat] Exception type: {type(e).__name__}") print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}") + log_architecture_telemetry( + phase="turn_end", + source="new_chat", + status="error", + architecture_mode=stream_result.architecture_mode, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=(time.perf_counter() - _t_total) * 1000.0, + token_total=accumulator.grand_total, + request_id=request_id, + turn_id=stream_result.turn_id, + extra={"error_type": type(e).__name__}, + ) yield streaming_service.format_error(error_message) yield streaming_service.format_finish_step() @@ -2093,6 +2150,7 @@ async def stream_resume_chat( thread_visibility: ChatVisibility | None = None, filesystem_selection: FilesystemSelection | None = None, request_id: str | None = None, + architecture_mode: str = "single_agent", ) -> AsyncGenerator[str, None]: streaming_service = VercelStreamingService() stream_result = StreamResult() @@ -2103,8 +2161,22 @@ async def stream_resume_chat( ) stream_result.request_id = request_id stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}" + stream_result.architecture_mode = architecture_mode stream_result.filesystem_mode = fs_mode stream_result.client_platform = fs_platform + log_architecture_telemetry( + phase="turn_start", + source="resume_chat", + status="started", + architecture_mode=architecture_mode, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=0.0, + token_total=0, + request_id=request_id, + turn_id=stream_result.turn_id, + ) _log_file_contract("turn_start", stream_result) _perf_log.info( "[stream_resume] filesystem_mode=%s client_platform=%s", @@ -2250,6 +2322,7 @@ async def stream_resume_chat( "thread_id": str(chat_id), "request_id": request_id or "unknown", "turn_id": stream_result.turn_id, + "architecture_mode": architecture_mode, }, "recursion_limit": 80, } @@ -2300,6 +2373,19 @@ async def stream_resume_chat( "call_details": accumulator.serialized_calls(), }, ) + log_architecture_telemetry( + phase="turn_end", + source="resume_chat", + status="interrupted", + architecture_mode=stream_result.architecture_mode, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=(time.perf_counter() - _t_total) * 1000.0, + token_total=accumulator.grand_total, + request_id=request_id, + turn_id=stream_result.turn_id, + ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -2353,6 +2439,19 @@ async def stream_resume_chat( "call_details": accumulator.serialized_calls(), }, ) + log_architecture_telemetry( + phase="turn_end", + source="resume_chat", + status="completed", + architecture_mode=stream_result.architecture_mode, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=(time.perf_counter() - _t_total) * 1000.0, + token_total=accumulator.grand_total, + request_id=request_id, + turn_id=stream_result.turn_id, + ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -2364,6 +2463,20 @@ async def stream_resume_chat( error_message = f"Error during resume: {e!s}" print(f"[stream_resume_chat] {error_message}") print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}") + log_architecture_telemetry( + phase="turn_end", + source="resume_chat", + status="error", + architecture_mode=stream_result.architecture_mode, + orchestrator_used=False, + worker_count=0, + retry_count=0, + latency_ms=(time.perf_counter() - _t_total) * 1000.0, + token_total=accumulator.grand_total, + request_id=request_id, + turn_id=stream_result.turn_id, + extra={"error_type": type(e).__name__}, + ) yield streaming_service.format_error(error_message) yield streaming_service.format_finish_step() yield streaming_service.format_finish()