diff --git a/docs/multi-agent-architecture-plan.md b/docs/multi-agent-architecture-plan.md deleted file mode 100644 index 45b5caf61..000000000 --- a/docs/multi-agent-architecture-plan.md +++ /dev/null @@ -1,211 +0,0 @@ -# Multi-Agent Architecture Plan (Phased) - -This document defines a phased migration from the current `single_agent` flow to a multi-agent architecture while keeping rollback simple and immediate. - -## Naming - -- `single_agent`: current architecture (default at start) -- `shadow_multi_agent_v1`: run multi-agent path in background, return `single_agent` output -- `multi_agent_v1`: multi-agent architecture is the user-facing path - ---- - -## Phase 1 - Parallel Safety Layer - -**Goal:** Add safe routing controls with zero behavior change. - -### Todo - -- [ ] Add mode selector with values: `single_agent`, `shadow_multi_agent_v1`, `multi_agent_v1` -- [ ] Add global kill switch: force all traffic to `single_agent` -- [ ] Add mode resolution priority: - 1. kill switch - 2. request override - 3. system default -- [ ] Keep `single_agent` as default mode -- [ ] Keep frontend stream/output contract unchanged -- [ ] Add telemetry tags: - - `architecture_mode` - - `worker_count` - - `retry_count` - - `latency_ms` - - `token_total` -- [ ] Write short rollback runbook - -### Exit Criteria - -- [ ] Can switch modes in staging -- [ ] Kill switch verified -- [ ] No frontend contract regressions - ---- - -## Phase 2 - Orchestrator Core and Contracts - -**Goal:** Build multi-agent control-plane only (planner/router/merge), with strict schemas. - -### Todo - -- [ ] Implement orchestrator responsibilities: - - intent detection - - routing - - delegation - - fan-in merge -- [ ] Add budget controls: - - max workers per turn - - max parallel workers - - max turn duration -- [ ] Add loop/stall guard: - - repeated task signature detection - - no-progress threshold -- [ ] Define `WorkerTask` schema: - - `domain`, `goal`, `constraints`, `budget` -- [ ] Define `WorkerResult` schema: - - `status`, `summary`, `evidence[]`, `artifacts[]`, `needs_human`, `error_class` -- [ ] Add schema validation on send/receive boundaries -- [ ] Add controlled fallback on invalid worker results - -### Exit Criteria - -- [ ] Orchestrator works end-to-end with mock workers -- [ ] Invalid worker payloads are blocked cleanly - ---- - -## Phase 3 - Pilot Workers (Gmail and Calendar) - -**Goal:** Validate multi-agent architecture with two real domains only. - -### Todo - -- [ ] Create Gmail worker - - [ ] domain-scoped prompt - - [ ] domain-only tool loadout - - [ ] local query rewrite - - [ ] normalized `WorkerResult` -- [ ] Create Calendar worker - - [ ] domain-scoped prompt - - [ ] domain-only tool loadout - - [ ] local query rewrite/time normalization - - [ ] normalized `WorkerResult` -- [ ] Enforce no cross-domain tool access -- [ ] Preserve HITL for write actions -- [ ] Add retry policy by `error_class` -- [ ] Add tests for routing, loadout isolation, HITL behavior - -### Exit Criteria - -- [ ] Gmail and Calendar tasks complete in `multi_agent_v1` -- [ ] No cross-domain tool leakage -- [ ] HITL still enforced for sensitive writes - ---- - -## Phase 4 - Knowledge Base and Evidence Normalization - -**Goal:** Isolate KB retrieval and make evidence citation-ready. - -### Todo - -- [ ] Move KB retrieval behind dedicated worker/stage -- [ ] Reuse current KB retrieval logic, but return compact structured evidence only -- [ ] Define `EvidenceItem` fields: - - `claim`, `source_type`, `source_ref`, `confidence`, `snippet` -- [ ] Add top-k and output-size controls -- [ ] Add quote-first extraction mode for long contexts -- [ ] Add tests for traceability and bounded payloads - -### Exit Criteria - -- [ ] Orchestrator consumes compact evidence (no raw KB dumps) -- [ ] Citation refs remain valid and traceable - ---- - -## Phase 5 - Verifier and Citation Gate - -**Goal:** Prevent unsupported factual claims in final responses. - -### Todo - -- [ ] Add verifier stage before final synthesis -- [ ] Enforce claim-to-evidence checks -- [ ] Add conflict handling policy: - - consistent evidence -> accept - - conflicting evidence -> label uncertainty or retry -- [ ] Add unsupported-claim policy: - - remove claim or mark uncertain -- [ ] Add verifier telemetry: - - supported claims - - unsupported claims - - conflicts -- [ ] Support strict gate and warning modes - -### Exit Criteria - -- [ ] Unsupported factual claims are blocked or clearly annotated -- [ ] Citation precision improves on evaluation set - ---- - -## Phase 6 - Shadow Evaluation and Canary - -**Goal:** Ship based on data, not intuition. - -### Todo - -- [ ] Enable `shadow_multi_agent_v1` for selected traffic -- [ ] Compare metrics vs `single_agent`: - - success rate - - citation precision - - tool-selection accuracy - - p95 latency - - tokens/request - - cost per successful task -- [ ] Define rollout gates and auto-stop thresholds -- [ ] Start canary rollout for `multi_agent_v1` -- [ ] Ramp traffic only if quality and reliability gates pass -- [ ] Keep kill switch live for entire rollout -- [ ] Record go/no-go decision with evidence - -### Exit Criteria - -- [ ] Clear decision based on measured outcomes -- [ ] Rollback tested successfully during canary - ---- - -## Phase 7 - Domain Expansion and Heavy Tool Reassignment - -**Goal:** Scale multi-agent architecture safely across more domains. - -### Todo - -- [ ] Add domains incrementally (`notion`, `slack`, `jira`, ...) -- [ ] For each new domain enforce: - - scoped tool loadout - - local query rewrite - - contract validation - - eval plus canary gate -- [ ] Move heavy tools to specialist workers: - - podcast generation - - artifact/report generation - - video presentation -- [ ] Keep orchestrator toolbelt minimal and control-plane focused -- [ ] Regularly prune prompts and tool descriptions - -### Exit Criteria - -- [ ] New domains onboard without reliability regressions -- [ ] Orchestrator remains lean and stable -- [ ] Cost per successful task stays controlled - ---- - -## Always-On Checklist - -- [ ] Keep `single_agent` path healthy until rollout completion -- [ ] Keep one-click rollback available at all times -- [ ] Update observability dashboards every phase -- [ ] Track failure taxonomy and review weekly -- [ ] Validate prompt/tool changes via eval before broad rollout diff --git a/docs/multi-agent-phase1-runbook.md b/docs/multi-agent-phase1-runbook.md deleted file mode 100644 index c3d871e52..000000000 --- a/docs/multi-agent-phase1-runbook.md +++ /dev/null @@ -1,70 +0,0 @@ -# Multi-Agent Architecture Phase 1 Runbook - -## Scope - -This runbook covers mode selection and emergency rollback for: - -- `single_agent` -- `shadow_multi_agent_v1` -- `multi_agent_v1` - -Phase 1 keeps execution behavior on the current single-agent path while mode wiring and telemetry are introduced. - -## Resolution Priority - -Mode resolution follows this fixed order: - -1. Global kill switch (`FORCE_SINGLE_AGENT`) -2. Request override (`architecture_mode` in chat payload) -3. System default (`AGENT_ARCHITECTURE_MODE`) -4. Safe fallback (`single_agent`) - -## Configuration - -Set environment values in backend runtime: - -- `AGENT_ARCHITECTURE_MODE=single_agent` (default) -- `FORCE_SINGLE_AGENT=FALSE` (default) - -Changes require backend restart because config is loaded at process startup. - -## Mode Switching - -### System default switch - -1. Set `AGENT_ARCHITECTURE_MODE` to desired value. -2. Keep `FORCE_SINGLE_AGENT=FALSE`. -3. Restart backend. -4. Verify logs include `[architecture_telemetry]` with expected `architecture_mode`. - -### Per-request override - -Send optional `architecture_mode` in chat request payload: - -- `"single_agent"` -- `"shadow_multi_agent_v1"` -- `"multi_agent_v1"` - -If `FORCE_SINGLE_AGENT=TRUE`, request override is ignored by design. - -## Emergency Rollback - -Use the kill switch: - -1. Set `FORCE_SINGLE_AGENT=TRUE`. -2. Restart backend. -3. Verify new requests log `architecture_mode=single_agent`. -4. Keep this state until incident is resolved. - -## Verification Checklist - -- Mode resolves according to the priority order. -- Kill switch overrides all request/default values. -- Streaming response schema remains unchanged. -- Architecture telemetry is emitted with: - - `architecture_mode` - - `orchestrator_used` - - `worker_count` - - `retry_count` - - `latency_ms` - - `token_total` diff --git a/surfsense_backend/app/agents/multi_agent_v1/__init__.py b/surfsense_backend/app/agents/multi_agent_v1/__init__.py deleted file mode 100644 index 4fb75203a..000000000 --- a/surfsense_backend/app/agents/multi_agent_v1/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -"""Multi-agent v1 architecture package.""" - -from app.agents.multi_agent_v1.contracts import ( - GroundingEvidence, - SubagentResult, - SubagentTaskPlan, - WorkerBudget, -) -from app.agents.multi_agent_v1.entrypoint import MultiAgentEntrypoint - -__all__ = [ - "GroundingEvidence", - "MultiAgentEntrypoint", - "SubagentResult", - "SubagentTaskPlan", - "WorkerBudget", -] diff --git a/surfsense_backend/app/agents/multi_agent_v1/contracts.py b/surfsense_backend/app/agents/multi_agent_v1/contracts.py deleted file mode 100644 index c6eef1a06..000000000 --- a/surfsense_backend/app/agents/multi_agent_v1/contracts.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Contracts for multi_agent_v1 orchestrator and subagent communication.""" - -from __future__ import annotations - -from typing import Literal - -from pydantic import BaseModel, Field - - -class WorkerBudget(BaseModel): - max_steps: int = Field(default=1, ge=1) - max_duration_ms: int = Field(default=15_000, ge=100) - - -class SubagentTaskPlan(BaseModel): - domain: str = Field(..., min_length=1) - goal: str = Field(..., min_length=1) - constraints: list[str] = Field(default_factory=list) - budget: WorkerBudget = Field(default_factory=WorkerBudget) - - -class GroundingEvidence(BaseModel): - claim: str = Field(..., min_length=1) - source_type: str = Field(..., min_length=1) - source_ref: str = Field(..., min_length=1) - confidence: float = Field(default=0.0, ge=0.0, le=1.0) - snippet: str = "" - - -class SubagentResult(BaseModel): - status: Literal["success", "partial", "error"] - summary: str = "" - evidence: list[GroundingEvidence] = Field(default_factory=list) - artifacts: list[str] = Field(default_factory=list) - needs_human: bool = False - error_class: str | None = None diff --git a/surfsense_backend/app/agents/multi_agent_v1/entrypoint.py b/surfsense_backend/app/agents/multi_agent_v1/entrypoint.py deleted file mode 100644 index 417643633..000000000 --- a/surfsense_backend/app/agents/multi_agent_v1/entrypoint.py +++ /dev/null @@ -1,24 +0,0 @@ -"""Multi-agent v1 entrypoint scaffold with safe fallback behavior.""" - -from __future__ import annotations - -from collections.abc import AsyncGenerator, Callable -from typing import Any - - -class MultiAgentEntrypoint: - def stream_new_chat( - self, - *, - fallback_streamer: Callable[..., AsyncGenerator[str, None]], - fallback_kwargs: dict[str, Any], - ) -> AsyncGenerator[str, None]: - return fallback_streamer(**fallback_kwargs) - - def stream_resume_chat( - self, - *, - fallback_streamer: Callable[..., AsyncGenerator[str, None]], - fallback_kwargs: dict[str, Any], - ) -> AsyncGenerator[str, None]: - return fallback_streamer(**fallback_kwargs) diff --git a/surfsense_backend/app/agents/new_chat/architecture_mode.py b/surfsense_backend/app/agents/new_chat/architecture_mode.py deleted file mode 100644 index db74bb308..000000000 --- a/surfsense_backend/app/agents/new_chat/architecture_mode.py +++ /dev/null @@ -1,40 +0,0 @@ -"""Architecture mode contracts and resolution helpers for chat sessions.""" - -from __future__ import annotations - -from enum import StrEnum - -from app.config import config - - -class ArchitectureMode(StrEnum): - SINGLE_AGENT = "single_agent" - SHADOW_MULTI_AGENT_V1 = "shadow_multi_agent_v1" - MULTI_AGENT_V1 = "multi_agent_v1" - - -def parse_architecture_mode(value: str | None) -> ArchitectureMode | None: - if not value: - return None - normalized = value.strip().lower() - if not normalized: - return None - try: - return ArchitectureMode(normalized) - except ValueError: - return None - - -def resolve_architecture_mode(request_override: str | None = None) -> ArchitectureMode: - if config.FORCE_SINGLE_AGENT: - return ArchitectureMode.SINGLE_AGENT - - override_mode = parse_architecture_mode(request_override) - if override_mode is not None: - return override_mode - - default_mode = parse_architecture_mode(config.AGENT_ARCHITECTURE_MODE) - if default_mode is not None: - return default_mode - - return ArchitectureMode.SINGLE_AGENT diff --git a/surfsense_backend/app/agents/new_chat/telemetry.py b/surfsense_backend/app/agents/new_chat/telemetry.py deleted file mode 100644 index 6e5ae408d..000000000 --- a/surfsense_backend/app/agents/new_chat/telemetry.py +++ /dev/null @@ -1,43 +0,0 @@ -"""Architecture telemetry logging for chat execution modes.""" - -from __future__ import annotations - -import json -from typing import Any - -from app.utils.perf import get_perf_logger - -_perf_log = get_perf_logger() - - -def log_architecture_telemetry( - *, - phase: str, - architecture_mode: str, - orchestrator_used: bool, - worker_count: int, - retry_count: int, - latency_ms: float, - token_total: int, - request_id: str | None = None, - turn_id: str | None = None, - status: str = "ok", - source: str = "new_chat", - extra: dict[str, Any] | None = None, -) -> None: - payload: dict[str, Any] = { - "phase": phase, - "source": source, - "status": status, - "architecture_mode": architecture_mode, - "orchestrator_used": orchestrator_used, - "worker_count": worker_count, - "retry_count": retry_count, - "latency_ms": round(latency_ms, 2), - "token_total": token_total, - "request_id": request_id or "unknown", - "turn_id": turn_id or "unknown", - } - if extra: - payload.update(extra) - _perf_log.info("[architecture_telemetry] %s", json.dumps(payload, ensure_ascii=False)) diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index eb4464b13..bd97d2bb1 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -342,8 +342,6 @@ class Config: ENABLE_DESKTOP_LOCAL_FILESYSTEM = ( os.getenv("ENABLE_DESKTOP_LOCAL_FILESYSTEM", "FALSE").upper() == "TRUE" ) - AGENT_ARCHITECTURE_MODE = os.getenv("AGENT_ARCHITECTURE_MODE", "single_agent") - FORCE_SINGLE_AGENT = os.getenv("FORCE_SINGLE_AGENT", "FALSE").upper() == "TRUE" @classmethod def is_self_hosted(cls) -> bool: diff --git a/surfsense_backend/app/routes/anonymous_chat_routes.py b/surfsense_backend/app/routes/anonymous_chat_routes.py index 1af7418d9..f9d694e5a 100644 --- a/surfsense_backend/app/routes/anonymous_chat_routes.py +++ b/surfsense_backend/app/routes/anonymous_chat_routes.py @@ -4,7 +4,6 @@ from __future__ import annotations import logging import secrets -import time import uuid from pathlib import PurePosixPath from typing import Any @@ -13,11 +12,6 @@ from fastapi import APIRouter, HTTPException, Request, Response, UploadFile, sta from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field -from app.agents.new_chat.architecture_mode import ( - ArchitectureMode, - resolve_architecture_mode, -) -from app.agents.new_chat.telemetry import log_architecture_telemetry from app.config import config from app.etl_pipeline.file_classifier import ( DIRECT_CONVERT_EXTENSIONS, @@ -90,7 +84,6 @@ class AnonChatRequest(BaseModel): messages: list[dict[str, Any]] = Field(..., min_length=1) disabled_tools: list[str] | None = None turnstile_token: str | None = None - architecture_mode: ArchitectureMode | None = None class AnonQuotaResponse(BaseModel): @@ -368,22 +361,6 @@ async def stream_anonymous_chat( accumulator = start_turn() streaming_service = VercelStreamingService() - architecture_mode = resolve_architecture_mode(body.architecture_mode) - started_at = time.perf_counter() - turn_id = f"anon:{session_id}:{request_id}" - log_architecture_telemetry( - phase="turn_start", - source="anon_chat", - status="started", - architecture_mode=architecture_mode.value, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=0.0, - token_total=0, - request_id=request_id, - turn_id=turn_id, - ) try: async with shielded_async_session() as session: @@ -423,10 +400,7 @@ async def stream_anonymous_chat( } langgraph_config = { - "configurable": { - "thread_id": anon_thread_id, - "architecture_mode": architecture_mode.value, - }, + "configurable": {"thread_id": anon_thread_id}, "recursion_limit": 40, } @@ -494,19 +468,6 @@ async def stream_anonymous_chat( "total_tokens": accumulator.grand_total, }, ) - log_architecture_telemetry( - phase="turn_end", - source="anon_chat", - status="completed", - architecture_mode=architecture_mode.value, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=(time.perf_counter() - started_at) * 1000.0, - token_total=accumulator.grand_total, - request_id=request_id, - turn_id=turn_id, - ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -514,20 +475,6 @@ async def stream_anonymous_chat( except Exception as e: logger.exception("Anonymous chat stream error") - log_architecture_telemetry( - phase="turn_end", - source="anon_chat", - status="error", - architecture_mode=architecture_mode.value, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=(time.perf_counter() - started_at) * 1000.0, - token_total=accumulator.grand_total, - request_id=request_id, - turn_id=turn_id, - extra={"error_type": type(e).__name__}, - ) await TokenQuotaService.anon_release(session_key, ip_key, request_id) yield streaming_service.format_error(f"Error during chat: {e!s}") yield streaming_service.format_done() diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index 896a4bd31..cbc660222 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -22,7 +22,6 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy.orm import selectinload -from app.agents.new_chat.architecture_mode import resolve_architecture_mode from app.agents.new_chat.filesystem_selection import ( ClientPlatform, FilesystemMode, @@ -62,10 +61,7 @@ from app.schemas.new_chat import ( TokenUsageSummary, ) from app.services.token_tracking_service import record_token_usage -from app.tasks.chat.stream_dispatch import ( - dispatch_new_chat_stream, - dispatch_resume_chat_stream, -) +from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat from app.users import current_active_user from app.utils.rbac import check_permission from app.utils.user_message_multimodal import ( @@ -1248,28 +1244,23 @@ async def handle_new_chat( image_urls = ( [p.as_data_url() for p in request.user_images] if request.user_images else None ) - architecture_mode = resolve_architecture_mode(request.architecture_mode) return StreamingResponse( - dispatch_new_chat_stream( - architecture_mode=architecture_mode.value, - stream_kwargs={ - "user_query": request.user_query, - "search_space_id": request.search_space_id, - "chat_id": request.chat_id, - "user_id": str(user.id), - "llm_config_id": llm_config_id, - "mentioned_document_ids": request.mentioned_document_ids, - "mentioned_surfsense_doc_ids": request.mentioned_surfsense_doc_ids, - "needs_history_bootstrap": thread.needs_history_bootstrap, - "thread_visibility": thread.visibility, - "current_user_display_name": user.display_name or "A team member", - "disabled_tools": request.disabled_tools, - "filesystem_selection": filesystem_selection, - "request_id": getattr(http_request.state, "request_id", "unknown"), - "user_image_data_urls": image_urls, - "architecture_mode": architecture_mode.value, - }, + stream_new_chat( + user_query=request.user_query, + search_space_id=request.search_space_id, + chat_id=request.chat_id, + user_id=str(user.id), + llm_config_id=llm_config_id, + mentioned_document_ids=request.mentioned_document_ids, + mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids, + needs_history_bootstrap=thread.needs_history_bootstrap, + thread_visibility=thread.visibility, + current_user_display_name=user.display_name or "A team member", + disabled_tools=request.disabled_tools, + filesystem_selection=filesystem_selection, + request_id=getattr(http_request.state, "request_id", "unknown"), + user_image_data_urls=image_urls, ), media_type="text/event-stream", headers={ @@ -1467,7 +1458,6 @@ async def regenerate_response( if request.user_images is not None: regenerate_image_urls = [p.as_data_url() for p in request.user_images] - architecture_mode = resolve_architecture_mode(request.architecture_mode) if user_query_to_use is None: raise HTTPException( @@ -1516,28 +1506,23 @@ async def regenerate_response( async def stream_with_cleanup(): streaming_completed = False try: - stream = dispatch_new_chat_stream( - architecture_mode=architecture_mode.value, - stream_kwargs={ - "user_query": str(user_query_to_use), - "search_space_id": request.search_space_id, - "chat_id": thread_id, - "user_id": str(user.id), - "llm_config_id": llm_config_id, - "mentioned_document_ids": request.mentioned_document_ids, - "mentioned_surfsense_doc_ids": request.mentioned_surfsense_doc_ids, - "checkpoint_id": target_checkpoint_id, - "needs_history_bootstrap": thread.needs_history_bootstrap, - "thread_visibility": thread.visibility, - "current_user_display_name": user.display_name or "A team member", - "disabled_tools": request.disabled_tools, - "filesystem_selection": filesystem_selection, - "request_id": getattr(http_request.state, "request_id", "unknown"), - "user_image_data_urls": regenerate_image_urls or None, - "architecture_mode": architecture_mode.value, - }, - ) - async for chunk in stream: + async for chunk in stream_new_chat( + user_query=str(user_query_to_use), + search_space_id=request.search_space_id, + chat_id=thread_id, + user_id=str(user.id), + llm_config_id=llm_config_id, + mentioned_document_ids=request.mentioned_document_ids, + mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids, + checkpoint_id=target_checkpoint_id, + needs_history_bootstrap=thread.needs_history_bootstrap, + thread_visibility=thread.visibility, + current_user_display_name=user.display_name or "A team member", + disabled_tools=request.disabled_tools, + filesystem_selection=filesystem_selection, + request_id=getattr(http_request.state, "request_id", "unknown"), + user_image_data_urls=regenerate_image_urls or None, + ): yield chunk streaming_completed = True finally: @@ -1643,7 +1628,6 @@ async def resume_chat( ) decisions = [d.model_dump() for d in request.decisions] - architecture_mode = resolve_architecture_mode(request.architecture_mode) # Release the read-transaction so we don't hold ACCESS SHARE locks # on searchspaces/documents for the entire duration of the stream. @@ -1651,19 +1635,15 @@ async def resume_chat( await session.close() return StreamingResponse( - dispatch_resume_chat_stream( - architecture_mode=architecture_mode.value, - stream_kwargs={ - "chat_id": thread_id, - "search_space_id": request.search_space_id, - "decisions": decisions, - "user_id": str(user.id), - "llm_config_id": llm_config_id, - "thread_visibility": thread.visibility, - "filesystem_selection": filesystem_selection, - "request_id": getattr(http_request.state, "request_id", "unknown"), - "architecture_mode": architecture_mode.value, - }, + stream_resume_chat( + chat_id=thread_id, + search_space_id=request.search_space_id, + decisions=decisions, + user_id=str(user.id), + llm_config_id=llm_config_id, + thread_visibility=thread.visibility, + filesystem_selection=filesystem_selection, + request_id=getattr(http_request.state, "request_id", "unknown"), ), media_type="text/event-stream", headers={ diff --git a/surfsense_backend/app/schemas/new_chat.py b/surfsense_backend/app/schemas/new_chat.py index f446eb0b5..477fdf2ca 100644 --- a/surfsense_backend/app/schemas/new_chat.py +++ b/surfsense_backend/app/schemas/new_chat.py @@ -176,11 +176,6 @@ class LocalFilesystemMountPayload(BaseModel): MAX_NEW_CHAT_IMAGE_BYTES = 8 * 1024 * 1024 MAX_NEW_CHAT_IMAGES = 4 -ArchitectureModeLiteral = Literal[ - "single_agent", - "shadow_multi_agent_v1", - "multi_agent_v1", -] class NewChatUserImagePart(BaseModel): @@ -215,7 +210,6 @@ class NewChatRequest(BaseModel): disabled_tools: list[str] | None = ( None # Optional list of tool names the user has disabled from the UI ) - architecture_mode: ArchitectureModeLiteral | None = None filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" client_platform: Literal["web", "desktop"] = "web" local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None @@ -256,7 +250,6 @@ class RegenerateRequest(BaseModel): mentioned_document_ids: list[int] | None = None mentioned_surfsense_doc_ids: list[int] | None = None disabled_tools: list[str] | None = None - architecture_mode: ArchitectureModeLiteral | None = None filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" client_platform: Literal["web", "desktop"] = "web" local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None @@ -293,7 +286,6 @@ class ResumeDecision(BaseModel): class ResumeRequest(BaseModel): search_space_id: int decisions: list[ResumeDecision] - architecture_mode: ArchitectureModeLiteral | None = None filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" client_platform: Literal["web", "desktop"] = "web" local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None diff --git a/surfsense_backend/app/tasks/chat/stream_dispatch.py b/surfsense_backend/app/tasks/chat/stream_dispatch.py deleted file mode 100644 index 73d7fc076..000000000 --- a/surfsense_backend/app/tasks/chat/stream_dispatch.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Thin architecture dispatch seam for chat streaming entrypoints.""" - -from __future__ import annotations - -from collections.abc import AsyncGenerator -from typing import Any - -from app.agents.multi_agent_v1.entrypoint import MultiAgentEntrypoint -from app.agents.new_chat.architecture_mode import ( - ArchitectureMode, - parse_architecture_mode, -) -from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat - - -def _resolve_mode(mode_value: str) -> ArchitectureMode: - return parse_architecture_mode(mode_value) or ArchitectureMode.SINGLE_AGENT - - -def dispatch_new_chat_stream( - *, - architecture_mode: str, - stream_kwargs: dict[str, Any], -) -> AsyncGenerator[str, None]: - mode = _resolve_mode(architecture_mode) - if mode == ArchitectureMode.SINGLE_AGENT: - return stream_new_chat(**stream_kwargs) - entrypoint = MultiAgentEntrypoint() - return entrypoint.stream_new_chat( - fallback_streamer=stream_new_chat, - fallback_kwargs=stream_kwargs, - ) - - -def dispatch_resume_chat_stream( - *, - architecture_mode: str, - stream_kwargs: dict[str, Any], -) -> AsyncGenerator[str, None]: - mode = _resolve_mode(architecture_mode) - if mode == ArchitectureMode.SINGLE_AGENT: - return stream_resume_chat(**stream_kwargs) - entrypoint = MultiAgentEntrypoint() - return entrypoint.stream_resume_chat( - fallback_streamer=stream_resume_chat, - fallback_kwargs=stream_kwargs, - ) diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 5edfcd658..396c7574e 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -42,7 +42,6 @@ from app.agents.new_chat.memory_extraction import ( extract_and_save_memory, extract_and_save_team_memory, ) -from app.agents.new_chat.telemetry import log_architecture_telemetry from app.db import ( ChatVisibility, NewChatMessage, @@ -150,7 +149,6 @@ class StreamResult: agent_called_update_memory: bool = False request_id: str | None = None turn_id: str = "" - architecture_mode: str = "single_agent" filesystem_mode: str = "cloud" client_platform: str = "web" intent_detected: str = "chat_only" @@ -184,7 +182,9 @@ def _tool_output_has_error(tool_output: Any) -> bool: if tool_output.get("error"): return True result = tool_output.get("result") - return isinstance(result, str) and result.strip().lower().startswith("error:") + if isinstance(result, str) and result.strip().lower().startswith("error:"): + return True + return False if isinstance(tool_output, str): return tool_output.strip().lower().startswith("error:") return False @@ -231,7 +231,6 @@ def _log_file_contract(stage: str, result: StreamResult, **extra: Any) -> None: "request_id": result.request_id or "unknown", "turn_id": result.turn_id or "unknown", "chat_id": result.turn_id.split(":", 1)[0] if ":" in result.turn_id else "unknown", - "architecture_mode": result.architecture_mode, "filesystem_mode": result.filesystem_mode, "client_platform": result.client_platform, "intent_detected": result.intent_detected, @@ -1309,17 +1308,18 @@ async def _stream_agent_events( result.commit_gate_passed, result.commit_gate_reason = ( _evaluate_file_contract_outcome(result) ) - if not result.commit_gate_passed and _contract_enforcement_active(result): - gate_notice = ( - "I could not complete the requested file write because no successful " - "write_file/edit_file operation was confirmed." - ) - gate_text_id = streaming_service.generate_text_id() - yield streaming_service.format_text_start(gate_text_id) - yield streaming_service.format_text_delta(gate_text_id, gate_notice) - yield streaming_service.format_text_end(gate_text_id) - yield streaming_service.format_terminal_info(gate_notice, "error") - accumulated_text = gate_notice + if not result.commit_gate_passed: + if _contract_enforcement_active(result): + gate_notice = ( + "I could not complete the requested file write because no successful " + "write_file/edit_file operation was confirmed." + ) + gate_text_id = streaming_service.generate_text_id() + yield streaming_service.format_text_start(gate_text_id) + yield streaming_service.format_text_delta(gate_text_id, gate_notice) + yield streaming_service.format_text_end(gate_text_id) + yield streaming_service.format_terminal_info(gate_notice, "error") + accumulated_text = gate_notice else: result.commit_gate_passed = True result.commit_gate_reason = "" @@ -1351,7 +1351,6 @@ async def stream_new_chat( filesystem_selection: FilesystemSelection | None = None, request_id: str | None = None, user_image_data_urls: list[str] | None = None, - architecture_mode: str = "single_agent", ) -> AsyncGenerator[str, None]: """ Stream chat responses from the new SurfSense deep agent. @@ -1385,22 +1384,8 @@ async def stream_new_chat( ) stream_result.request_id = request_id stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}" - stream_result.architecture_mode = architecture_mode stream_result.filesystem_mode = fs_mode stream_result.client_platform = fs_platform - log_architecture_telemetry( - phase="turn_start", - source="new_chat", - status="started", - architecture_mode=architecture_mode, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=0.0, - token_total=0, - request_id=request_id, - turn_id=stream_result.turn_id, - ) _log_file_contract("turn_start", stream_result) _perf_log.info( "[stream_new_chat] filesystem_mode=%s client_platform=%s", @@ -1653,7 +1638,6 @@ async def stream_new_chat( "search_space_id": search_space_id, "request_id": request_id or "unknown", "turn_id": stream_result.turn_id, - "architecture_mode": architecture_mode, } _perf_log.info( @@ -1685,7 +1669,6 @@ async def stream_new_chat( configurable = {"thread_id": str(chat_id)} configurable["request_id"] = request_id or "unknown" configurable["turn_id"] = stream_result.turn_id - configurable["architecture_mode"] = architecture_mode if checkpoint_id: configurable["checkpoint_id"] = checkpoint_id @@ -1901,19 +1884,6 @@ async def stream_new_chat( "call_details": accumulator.serialized_calls(), }, ) - log_architecture_telemetry( - phase="turn_end", - source="new_chat", - status="interrupted", - architecture_mode=stream_result.architecture_mode, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=(time.perf_counter() - _t_total) * 1000.0, - token_total=accumulator.grand_total, - request_id=request_id, - turn_id=stream_result.turn_id, - ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -1986,19 +1956,6 @@ async def stream_new_chat( "call_details": accumulator.serialized_calls(), }, ) - log_architecture_telemetry( - phase="turn_end", - source="new_chat", - status="completed", - architecture_mode=stream_result.architecture_mode, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=(time.perf_counter() - _t_total) * 1000.0, - token_total=accumulator.grand_total, - request_id=request_id, - turn_id=stream_result.turn_id, - ) # Fire background memory extraction if the agent didn't handle it. # Shared threads write to team memory; private threads write to user memory. @@ -2043,20 +2000,6 @@ async def stream_new_chat( print(f"[stream_new_chat] {error_message}") print(f"[stream_new_chat] Exception type: {type(e).__name__}") print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}") - log_architecture_telemetry( - phase="turn_end", - source="new_chat", - status="error", - architecture_mode=stream_result.architecture_mode, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=(time.perf_counter() - _t_total) * 1000.0, - token_total=accumulator.grand_total, - request_id=request_id, - turn_id=stream_result.turn_id, - extra={"error_type": type(e).__name__}, - ) yield streaming_service.format_error(error_message) yield streaming_service.format_finish_step() @@ -2150,7 +2093,6 @@ async def stream_resume_chat( thread_visibility: ChatVisibility | None = None, filesystem_selection: FilesystemSelection | None = None, request_id: str | None = None, - architecture_mode: str = "single_agent", ) -> AsyncGenerator[str, None]: streaming_service = VercelStreamingService() stream_result = StreamResult() @@ -2161,22 +2103,8 @@ async def stream_resume_chat( ) stream_result.request_id = request_id stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}" - stream_result.architecture_mode = architecture_mode stream_result.filesystem_mode = fs_mode stream_result.client_platform = fs_platform - log_architecture_telemetry( - phase="turn_start", - source="resume_chat", - status="started", - architecture_mode=architecture_mode, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=0.0, - token_total=0, - request_id=request_id, - turn_id=stream_result.turn_id, - ) _log_file_contract("turn_start", stream_result) _perf_log.info( "[stream_resume] filesystem_mode=%s client_platform=%s", @@ -2322,7 +2250,6 @@ async def stream_resume_chat( "thread_id": str(chat_id), "request_id": request_id or "unknown", "turn_id": stream_result.turn_id, - "architecture_mode": architecture_mode, }, "recursion_limit": 80, } @@ -2373,19 +2300,6 @@ async def stream_resume_chat( "call_details": accumulator.serialized_calls(), }, ) - log_architecture_telemetry( - phase="turn_end", - source="resume_chat", - status="interrupted", - architecture_mode=stream_result.architecture_mode, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=(time.perf_counter() - _t_total) * 1000.0, - token_total=accumulator.grand_total, - request_id=request_id, - turn_id=stream_result.turn_id, - ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -2439,19 +2353,6 @@ async def stream_resume_chat( "call_details": accumulator.serialized_calls(), }, ) - log_architecture_telemetry( - phase="turn_end", - source="resume_chat", - status="completed", - architecture_mode=stream_result.architecture_mode, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=(time.perf_counter() - _t_total) * 1000.0, - token_total=accumulator.grand_total, - request_id=request_id, - turn_id=stream_result.turn_id, - ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -2463,20 +2364,6 @@ async def stream_resume_chat( error_message = f"Error during resume: {e!s}" print(f"[stream_resume_chat] {error_message}") print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}") - log_architecture_telemetry( - phase="turn_end", - source="resume_chat", - status="error", - architecture_mode=stream_result.architecture_mode, - orchestrator_used=False, - worker_count=0, - retry_count=0, - latency_ms=(time.perf_counter() - _t_total) * 1000.0, - token_total=accumulator.grand_total, - request_id=request_id, - turn_id=stream_result.turn_id, - extra={"error_type": type(e).__name__}, - ) yield streaming_service.format_error(error_message) yield streaming_service.format_finish_step() yield streaming_service.format_finish()