Revert "feat(chat): add multi-agent mode routing scaffold and telemetry."

This reverts commit 7b9a218d62.
This commit is contained in:
CREDO23 2026-04-29 16:21:57 +02:00
parent 2eed81d059
commit dcae196eea
13 changed files with 58 additions and 742 deletions

View file

@ -1,211 +0,0 @@
# Multi-Agent Architecture Plan (Phased)
This document defines a phased migration from the current `single_agent` flow to a multi-agent architecture while keeping rollback simple and immediate.
## Naming
- `single_agent`: current architecture (default at start)
- `shadow_multi_agent_v1`: run multi-agent path in background, return `single_agent` output
- `multi_agent_v1`: multi-agent architecture is the user-facing path
---
## Phase 1 - Parallel Safety Layer
**Goal:** Add safe routing controls with zero behavior change.
### Todo
- [ ] Add mode selector with values: `single_agent`, `shadow_multi_agent_v1`, `multi_agent_v1`
- [ ] Add global kill switch: force all traffic to `single_agent`
- [ ] Add mode resolution priority:
1. kill switch
2. request override
3. system default
- [ ] Keep `single_agent` as default mode
- [ ] Keep frontend stream/output contract unchanged
- [ ] Add telemetry tags:
- `architecture_mode`
- `worker_count`
- `retry_count`
- `latency_ms`
- `token_total`
- [ ] Write short rollback runbook
### Exit Criteria
- [ ] Can switch modes in staging
- [ ] Kill switch verified
- [ ] No frontend contract regressions
---
## Phase 2 - Orchestrator Core and Contracts
**Goal:** Build multi-agent control-plane only (planner/router/merge), with strict schemas.
### Todo
- [ ] Implement orchestrator responsibilities:
- intent detection
- routing
- delegation
- fan-in merge
- [ ] Add budget controls:
- max workers per turn
- max parallel workers
- max turn duration
- [ ] Add loop/stall guard:
- repeated task signature detection
- no-progress threshold
- [ ] Define `WorkerTask` schema:
- `domain`, `goal`, `constraints`, `budget`
- [ ] Define `WorkerResult` schema:
- `status`, `summary`, `evidence[]`, `artifacts[]`, `needs_human`, `error_class`
- [ ] Add schema validation on send/receive boundaries
- [ ] Add controlled fallback on invalid worker results
### Exit Criteria
- [ ] Orchestrator works end-to-end with mock workers
- [ ] Invalid worker payloads are blocked cleanly
---
## Phase 3 - Pilot Workers (Gmail and Calendar)
**Goal:** Validate multi-agent architecture with two real domains only.
### Todo
- [ ] Create Gmail worker
- [ ] domain-scoped prompt
- [ ] domain-only tool loadout
- [ ] local query rewrite
- [ ] normalized `WorkerResult`
- [ ] Create Calendar worker
- [ ] domain-scoped prompt
- [ ] domain-only tool loadout
- [ ] local query rewrite/time normalization
- [ ] normalized `WorkerResult`
- [ ] Enforce no cross-domain tool access
- [ ] Preserve HITL for write actions
- [ ] Add retry policy by `error_class`
- [ ] Add tests for routing, loadout isolation, HITL behavior
### Exit Criteria
- [ ] Gmail and Calendar tasks complete in `multi_agent_v1`
- [ ] No cross-domain tool leakage
- [ ] HITL still enforced for sensitive writes
---
## Phase 4 - Knowledge Base and Evidence Normalization
**Goal:** Isolate KB retrieval and make evidence citation-ready.
### Todo
- [ ] Move KB retrieval behind dedicated worker/stage
- [ ] Reuse current KB retrieval logic, but return compact structured evidence only
- [ ] Define `EvidenceItem` fields:
- `claim`, `source_type`, `source_ref`, `confidence`, `snippet`
- [ ] Add top-k and output-size controls
- [ ] Add quote-first extraction mode for long contexts
- [ ] Add tests for traceability and bounded payloads
### Exit Criteria
- [ ] Orchestrator consumes compact evidence (no raw KB dumps)
- [ ] Citation refs remain valid and traceable
---
## Phase 5 - Verifier and Citation Gate
**Goal:** Prevent unsupported factual claims in final responses.
### Todo
- [ ] Add verifier stage before final synthesis
- [ ] Enforce claim-to-evidence checks
- [ ] Add conflict handling policy:
- consistent evidence -> accept
- conflicting evidence -> label uncertainty or retry
- [ ] Add unsupported-claim policy:
- remove claim or mark uncertain
- [ ] Add verifier telemetry:
- supported claims
- unsupported claims
- conflicts
- [ ] Support strict gate and warning modes
### Exit Criteria
- [ ] Unsupported factual claims are blocked or clearly annotated
- [ ] Citation precision improves on evaluation set
---
## Phase 6 - Shadow Evaluation and Canary
**Goal:** Ship based on data, not intuition.
### Todo
- [ ] Enable `shadow_multi_agent_v1` for selected traffic
- [ ] Compare metrics vs `single_agent`:
- success rate
- citation precision
- tool-selection accuracy
- p95 latency
- tokens/request
- cost per successful task
- [ ] Define rollout gates and auto-stop thresholds
- [ ] Start canary rollout for `multi_agent_v1`
- [ ] Ramp traffic only if quality and reliability gates pass
- [ ] Keep kill switch live for entire rollout
- [ ] Record go/no-go decision with evidence
### Exit Criteria
- [ ] Clear decision based on measured outcomes
- [ ] Rollback tested successfully during canary
---
## Phase 7 - Domain Expansion and Heavy Tool Reassignment
**Goal:** Scale multi-agent architecture safely across more domains.
### Todo
- [ ] Add domains incrementally (`notion`, `slack`, `jira`, ...)
- [ ] For each new domain enforce:
- scoped tool loadout
- local query rewrite
- contract validation
- eval plus canary gate
- [ ] Move heavy tools to specialist workers:
- podcast generation
- artifact/report generation
- video presentation
- [ ] Keep orchestrator toolbelt minimal and control-plane focused
- [ ] Regularly prune prompts and tool descriptions
### Exit Criteria
- [ ] New domains onboard without reliability regressions
- [ ] Orchestrator remains lean and stable
- [ ] Cost per successful task stays controlled
---
## Always-On Checklist
- [ ] Keep `single_agent` path healthy until rollout completion
- [ ] Keep one-click rollback available at all times
- [ ] Update observability dashboards every phase
- [ ] Track failure taxonomy and review weekly
- [ ] Validate prompt/tool changes via eval before broad rollout

View file

@ -1,70 +0,0 @@
# Multi-Agent Architecture Phase 1 Runbook
## Scope
This runbook covers mode selection and emergency rollback for:
- `single_agent`
- `shadow_multi_agent_v1`
- `multi_agent_v1`
Phase 1 keeps execution behavior on the current single-agent path while mode wiring and telemetry are introduced.
## Resolution Priority
Mode resolution follows this fixed order:
1. Global kill switch (`FORCE_SINGLE_AGENT`)
2. Request override (`architecture_mode` in chat payload)
3. System default (`AGENT_ARCHITECTURE_MODE`)
4. Safe fallback (`single_agent`)
## Configuration
Set environment values in backend runtime:
- `AGENT_ARCHITECTURE_MODE=single_agent` (default)
- `FORCE_SINGLE_AGENT=FALSE` (default)
Changes require backend restart because config is loaded at process startup.
## Mode Switching
### System default switch
1. Set `AGENT_ARCHITECTURE_MODE` to desired value.
2. Keep `FORCE_SINGLE_AGENT=FALSE`.
3. Restart backend.
4. Verify logs include `[architecture_telemetry]` with expected `architecture_mode`.
### Per-request override
Send optional `architecture_mode` in chat request payload:
- `"single_agent"`
- `"shadow_multi_agent_v1"`
- `"multi_agent_v1"`
If `FORCE_SINGLE_AGENT=TRUE`, request override is ignored by design.
## Emergency Rollback
Use the kill switch:
1. Set `FORCE_SINGLE_AGENT=TRUE`.
2. Restart backend.
3. Verify new requests log `architecture_mode=single_agent`.
4. Keep this state until incident is resolved.
## Verification Checklist
- Mode resolves according to the priority order.
- Kill switch overrides all request/default values.
- Streaming response schema remains unchanged.
- Architecture telemetry is emitted with:
- `architecture_mode`
- `orchestrator_used`
- `worker_count`
- `retry_count`
- `latency_ms`
- `token_total`

View file

@ -1,17 +0,0 @@
"""Multi-agent v1 architecture package."""
from app.agents.multi_agent_v1.contracts import (
GroundingEvidence,
SubagentResult,
SubagentTaskPlan,
WorkerBudget,
)
from app.agents.multi_agent_v1.entrypoint import MultiAgentEntrypoint
__all__ = [
"GroundingEvidence",
"MultiAgentEntrypoint",
"SubagentResult",
"SubagentTaskPlan",
"WorkerBudget",
]

View file

@ -1,36 +0,0 @@
"""Contracts for multi_agent_v1 orchestrator and subagent communication."""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
class WorkerBudget(BaseModel):
max_steps: int = Field(default=1, ge=1)
max_duration_ms: int = Field(default=15_000, ge=100)
class SubagentTaskPlan(BaseModel):
domain: str = Field(..., min_length=1)
goal: str = Field(..., min_length=1)
constraints: list[str] = Field(default_factory=list)
budget: WorkerBudget = Field(default_factory=WorkerBudget)
class GroundingEvidence(BaseModel):
claim: str = Field(..., min_length=1)
source_type: str = Field(..., min_length=1)
source_ref: str = Field(..., min_length=1)
confidence: float = Field(default=0.0, ge=0.0, le=1.0)
snippet: str = ""
class SubagentResult(BaseModel):
status: Literal["success", "partial", "error"]
summary: str = ""
evidence: list[GroundingEvidence] = Field(default_factory=list)
artifacts: list[str] = Field(default_factory=list)
needs_human: bool = False
error_class: str | None = None

View file

@ -1,24 +0,0 @@
"""Multi-agent v1 entrypoint scaffold with safe fallback behavior."""
from __future__ import annotations
from collections.abc import AsyncGenerator, Callable
from typing import Any
class MultiAgentEntrypoint:
def stream_new_chat(
self,
*,
fallback_streamer: Callable[..., AsyncGenerator[str, None]],
fallback_kwargs: dict[str, Any],
) -> AsyncGenerator[str, None]:
return fallback_streamer(**fallback_kwargs)
def stream_resume_chat(
self,
*,
fallback_streamer: Callable[..., AsyncGenerator[str, None]],
fallback_kwargs: dict[str, Any],
) -> AsyncGenerator[str, None]:
return fallback_streamer(**fallback_kwargs)

View file

@ -1,40 +0,0 @@
"""Architecture mode contracts and resolution helpers for chat sessions."""
from __future__ import annotations
from enum import StrEnum
from app.config import config
class ArchitectureMode(StrEnum):
SINGLE_AGENT = "single_agent"
SHADOW_MULTI_AGENT_V1 = "shadow_multi_agent_v1"
MULTI_AGENT_V1 = "multi_agent_v1"
def parse_architecture_mode(value: str | None) -> ArchitectureMode | None:
if not value:
return None
normalized = value.strip().lower()
if not normalized:
return None
try:
return ArchitectureMode(normalized)
except ValueError:
return None
def resolve_architecture_mode(request_override: str | None = None) -> ArchitectureMode:
if config.FORCE_SINGLE_AGENT:
return ArchitectureMode.SINGLE_AGENT
override_mode = parse_architecture_mode(request_override)
if override_mode is not None:
return override_mode
default_mode = parse_architecture_mode(config.AGENT_ARCHITECTURE_MODE)
if default_mode is not None:
return default_mode
return ArchitectureMode.SINGLE_AGENT

View file

@ -1,43 +0,0 @@
"""Architecture telemetry logging for chat execution modes."""
from __future__ import annotations
import json
from typing import Any
from app.utils.perf import get_perf_logger
_perf_log = get_perf_logger()
def log_architecture_telemetry(
*,
phase: str,
architecture_mode: str,
orchestrator_used: bool,
worker_count: int,
retry_count: int,
latency_ms: float,
token_total: int,
request_id: str | None = None,
turn_id: str | None = None,
status: str = "ok",
source: str = "new_chat",
extra: dict[str, Any] | None = None,
) -> None:
payload: dict[str, Any] = {
"phase": phase,
"source": source,
"status": status,
"architecture_mode": architecture_mode,
"orchestrator_used": orchestrator_used,
"worker_count": worker_count,
"retry_count": retry_count,
"latency_ms": round(latency_ms, 2),
"token_total": token_total,
"request_id": request_id or "unknown",
"turn_id": turn_id or "unknown",
}
if extra:
payload.update(extra)
_perf_log.info("[architecture_telemetry] %s", json.dumps(payload, ensure_ascii=False))

View file

@ -342,8 +342,6 @@ class Config:
ENABLE_DESKTOP_LOCAL_FILESYSTEM = ( ENABLE_DESKTOP_LOCAL_FILESYSTEM = (
os.getenv("ENABLE_DESKTOP_LOCAL_FILESYSTEM", "FALSE").upper() == "TRUE" os.getenv("ENABLE_DESKTOP_LOCAL_FILESYSTEM", "FALSE").upper() == "TRUE"
) )
AGENT_ARCHITECTURE_MODE = os.getenv("AGENT_ARCHITECTURE_MODE", "single_agent")
FORCE_SINGLE_AGENT = os.getenv("FORCE_SINGLE_AGENT", "FALSE").upper() == "TRUE"
@classmethod @classmethod
def is_self_hosted(cls) -> bool: def is_self_hosted(cls) -> bool:

View file

@ -4,7 +4,6 @@ from __future__ import annotations
import logging import logging
import secrets import secrets
import time
import uuid import uuid
from pathlib import PurePosixPath from pathlib import PurePosixPath
from typing import Any from typing import Any
@ -13,11 +12,6 @@ from fastapi import APIRouter, HTTPException, Request, Response, UploadFile, sta
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from app.agents.new_chat.architecture_mode import (
ArchitectureMode,
resolve_architecture_mode,
)
from app.agents.new_chat.telemetry import log_architecture_telemetry
from app.config import config from app.config import config
from app.etl_pipeline.file_classifier import ( from app.etl_pipeline.file_classifier import (
DIRECT_CONVERT_EXTENSIONS, DIRECT_CONVERT_EXTENSIONS,
@ -90,7 +84,6 @@ class AnonChatRequest(BaseModel):
messages: list[dict[str, Any]] = Field(..., min_length=1) messages: list[dict[str, Any]] = Field(..., min_length=1)
disabled_tools: list[str] | None = None disabled_tools: list[str] | None = None
turnstile_token: str | None = None turnstile_token: str | None = None
architecture_mode: ArchitectureMode | None = None
class AnonQuotaResponse(BaseModel): class AnonQuotaResponse(BaseModel):
@ -368,22 +361,6 @@ async def stream_anonymous_chat(
accumulator = start_turn() accumulator = start_turn()
streaming_service = VercelStreamingService() streaming_service = VercelStreamingService()
architecture_mode = resolve_architecture_mode(body.architecture_mode)
started_at = time.perf_counter()
turn_id = f"anon:{session_id}:{request_id}"
log_architecture_telemetry(
phase="turn_start",
source="anon_chat",
status="started",
architecture_mode=architecture_mode.value,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=0.0,
token_total=0,
request_id=request_id,
turn_id=turn_id,
)
try: try:
async with shielded_async_session() as session: async with shielded_async_session() as session:
@ -423,10 +400,7 @@ async def stream_anonymous_chat(
} }
langgraph_config = { langgraph_config = {
"configurable": { "configurable": {"thread_id": anon_thread_id},
"thread_id": anon_thread_id,
"architecture_mode": architecture_mode.value,
},
"recursion_limit": 40, "recursion_limit": 40,
} }
@ -494,19 +468,6 @@ async def stream_anonymous_chat(
"total_tokens": accumulator.grand_total, "total_tokens": accumulator.grand_total,
}, },
) )
log_architecture_telemetry(
phase="turn_end",
source="anon_chat",
status="completed",
architecture_mode=architecture_mode.value,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - started_at) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=turn_id,
)
yield streaming_service.format_finish_step() yield streaming_service.format_finish_step()
yield streaming_service.format_finish() yield streaming_service.format_finish()
@ -514,20 +475,6 @@ async def stream_anonymous_chat(
except Exception as e: except Exception as e:
logger.exception("Anonymous chat stream error") logger.exception("Anonymous chat stream error")
log_architecture_telemetry(
phase="turn_end",
source="anon_chat",
status="error",
architecture_mode=architecture_mode.value,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - started_at) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=turn_id,
extra={"error_type": type(e).__name__},
)
await TokenQuotaService.anon_release(session_key, ip_key, request_id) await TokenQuotaService.anon_release(session_key, ip_key, request_id)
yield streaming_service.format_error(f"Error during chat: {e!s}") yield streaming_service.format_error(f"Error during chat: {e!s}")
yield streaming_service.format_done() yield streaming_service.format_done()

View file

@ -22,7 +22,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select from sqlalchemy.future import select
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from app.agents.new_chat.architecture_mode import resolve_architecture_mode
from app.agents.new_chat.filesystem_selection import ( from app.agents.new_chat.filesystem_selection import (
ClientPlatform, ClientPlatform,
FilesystemMode, FilesystemMode,
@ -62,10 +61,7 @@ from app.schemas.new_chat import (
TokenUsageSummary, TokenUsageSummary,
) )
from app.services.token_tracking_service import record_token_usage from app.services.token_tracking_service import record_token_usage
from app.tasks.chat.stream_dispatch import ( from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat
dispatch_new_chat_stream,
dispatch_resume_chat_stream,
)
from app.users import current_active_user from app.users import current_active_user
from app.utils.rbac import check_permission from app.utils.rbac import check_permission
from app.utils.user_message_multimodal import ( from app.utils.user_message_multimodal import (
@ -1248,28 +1244,23 @@ async def handle_new_chat(
image_urls = ( image_urls = (
[p.as_data_url() for p in request.user_images] if request.user_images else None [p.as_data_url() for p in request.user_images] if request.user_images else None
) )
architecture_mode = resolve_architecture_mode(request.architecture_mode)
return StreamingResponse( return StreamingResponse(
dispatch_new_chat_stream( stream_new_chat(
architecture_mode=architecture_mode.value, user_query=request.user_query,
stream_kwargs={ search_space_id=request.search_space_id,
"user_query": request.user_query, chat_id=request.chat_id,
"search_space_id": request.search_space_id, user_id=str(user.id),
"chat_id": request.chat_id, llm_config_id=llm_config_id,
"user_id": str(user.id), mentioned_document_ids=request.mentioned_document_ids,
"llm_config_id": llm_config_id, mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids,
"mentioned_document_ids": request.mentioned_document_ids, needs_history_bootstrap=thread.needs_history_bootstrap,
"mentioned_surfsense_doc_ids": request.mentioned_surfsense_doc_ids, thread_visibility=thread.visibility,
"needs_history_bootstrap": thread.needs_history_bootstrap, current_user_display_name=user.display_name or "A team member",
"thread_visibility": thread.visibility, disabled_tools=request.disabled_tools,
"current_user_display_name": user.display_name or "A team member", filesystem_selection=filesystem_selection,
"disabled_tools": request.disabled_tools, request_id=getattr(http_request.state, "request_id", "unknown"),
"filesystem_selection": filesystem_selection, user_image_data_urls=image_urls,
"request_id": getattr(http_request.state, "request_id", "unknown"),
"user_image_data_urls": image_urls,
"architecture_mode": architecture_mode.value,
},
), ),
media_type="text/event-stream", media_type="text/event-stream",
headers={ headers={
@ -1467,7 +1458,6 @@ async def regenerate_response(
if request.user_images is not None: if request.user_images is not None:
regenerate_image_urls = [p.as_data_url() for p in request.user_images] regenerate_image_urls = [p.as_data_url() for p in request.user_images]
architecture_mode = resolve_architecture_mode(request.architecture_mode)
if user_query_to_use is None: if user_query_to_use is None:
raise HTTPException( raise HTTPException(
@ -1516,28 +1506,23 @@ async def regenerate_response(
async def stream_with_cleanup(): async def stream_with_cleanup():
streaming_completed = False streaming_completed = False
try: try:
stream = dispatch_new_chat_stream( async for chunk in stream_new_chat(
architecture_mode=architecture_mode.value, user_query=str(user_query_to_use),
stream_kwargs={ search_space_id=request.search_space_id,
"user_query": str(user_query_to_use), chat_id=thread_id,
"search_space_id": request.search_space_id, user_id=str(user.id),
"chat_id": thread_id, llm_config_id=llm_config_id,
"user_id": str(user.id), mentioned_document_ids=request.mentioned_document_ids,
"llm_config_id": llm_config_id, mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids,
"mentioned_document_ids": request.mentioned_document_ids, checkpoint_id=target_checkpoint_id,
"mentioned_surfsense_doc_ids": request.mentioned_surfsense_doc_ids, needs_history_bootstrap=thread.needs_history_bootstrap,
"checkpoint_id": target_checkpoint_id, thread_visibility=thread.visibility,
"needs_history_bootstrap": thread.needs_history_bootstrap, current_user_display_name=user.display_name or "A team member",
"thread_visibility": thread.visibility, disabled_tools=request.disabled_tools,
"current_user_display_name": user.display_name or "A team member", filesystem_selection=filesystem_selection,
"disabled_tools": request.disabled_tools, request_id=getattr(http_request.state, "request_id", "unknown"),
"filesystem_selection": filesystem_selection, user_image_data_urls=regenerate_image_urls or None,
"request_id": getattr(http_request.state, "request_id", "unknown"), ):
"user_image_data_urls": regenerate_image_urls or None,
"architecture_mode": architecture_mode.value,
},
)
async for chunk in stream:
yield chunk yield chunk
streaming_completed = True streaming_completed = True
finally: finally:
@ -1643,7 +1628,6 @@ async def resume_chat(
) )
decisions = [d.model_dump() for d in request.decisions] decisions = [d.model_dump() for d in request.decisions]
architecture_mode = resolve_architecture_mode(request.architecture_mode)
# Release the read-transaction so we don't hold ACCESS SHARE locks # Release the read-transaction so we don't hold ACCESS SHARE locks
# on searchspaces/documents for the entire duration of the stream. # on searchspaces/documents for the entire duration of the stream.
@ -1651,19 +1635,15 @@ async def resume_chat(
await session.close() await session.close()
return StreamingResponse( return StreamingResponse(
dispatch_resume_chat_stream( stream_resume_chat(
architecture_mode=architecture_mode.value, chat_id=thread_id,
stream_kwargs={ search_space_id=request.search_space_id,
"chat_id": thread_id, decisions=decisions,
"search_space_id": request.search_space_id, user_id=str(user.id),
"decisions": decisions, llm_config_id=llm_config_id,
"user_id": str(user.id), thread_visibility=thread.visibility,
"llm_config_id": llm_config_id, filesystem_selection=filesystem_selection,
"thread_visibility": thread.visibility, request_id=getattr(http_request.state, "request_id", "unknown"),
"filesystem_selection": filesystem_selection,
"request_id": getattr(http_request.state, "request_id", "unknown"),
"architecture_mode": architecture_mode.value,
},
), ),
media_type="text/event-stream", media_type="text/event-stream",
headers={ headers={

View file

@ -176,11 +176,6 @@ class LocalFilesystemMountPayload(BaseModel):
MAX_NEW_CHAT_IMAGE_BYTES = 8 * 1024 * 1024 MAX_NEW_CHAT_IMAGE_BYTES = 8 * 1024 * 1024
MAX_NEW_CHAT_IMAGES = 4 MAX_NEW_CHAT_IMAGES = 4
ArchitectureModeLiteral = Literal[
"single_agent",
"shadow_multi_agent_v1",
"multi_agent_v1",
]
class NewChatUserImagePart(BaseModel): class NewChatUserImagePart(BaseModel):
@ -215,7 +210,6 @@ class NewChatRequest(BaseModel):
disabled_tools: list[str] | None = ( disabled_tools: list[str] | None = (
None # Optional list of tool names the user has disabled from the UI None # Optional list of tool names the user has disabled from the UI
) )
architecture_mode: ArchitectureModeLiteral | None = None
filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud"
client_platform: Literal["web", "desktop"] = "web" client_platform: Literal["web", "desktop"] = "web"
local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None
@ -256,7 +250,6 @@ class RegenerateRequest(BaseModel):
mentioned_document_ids: list[int] | None = None mentioned_document_ids: list[int] | None = None
mentioned_surfsense_doc_ids: list[int] | None = None mentioned_surfsense_doc_ids: list[int] | None = None
disabled_tools: list[str] | None = None disabled_tools: list[str] | None = None
architecture_mode: ArchitectureModeLiteral | None = None
filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud"
client_platform: Literal["web", "desktop"] = "web" client_platform: Literal["web", "desktop"] = "web"
local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None
@ -293,7 +286,6 @@ class ResumeDecision(BaseModel):
class ResumeRequest(BaseModel): class ResumeRequest(BaseModel):
search_space_id: int search_space_id: int
decisions: list[ResumeDecision] decisions: list[ResumeDecision]
architecture_mode: ArchitectureModeLiteral | None = None
filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud" filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud"
client_platform: Literal["web", "desktop"] = "web" client_platform: Literal["web", "desktop"] = "web"
local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None

View file

@ -1,47 +0,0 @@
"""Thin architecture dispatch seam for chat streaming entrypoints."""
from __future__ import annotations
from collections.abc import AsyncGenerator
from typing import Any
from app.agents.multi_agent_v1.entrypoint import MultiAgentEntrypoint
from app.agents.new_chat.architecture_mode import (
ArchitectureMode,
parse_architecture_mode,
)
from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat
def _resolve_mode(mode_value: str) -> ArchitectureMode:
return parse_architecture_mode(mode_value) or ArchitectureMode.SINGLE_AGENT
def dispatch_new_chat_stream(
*,
architecture_mode: str,
stream_kwargs: dict[str, Any],
) -> AsyncGenerator[str, None]:
mode = _resolve_mode(architecture_mode)
if mode == ArchitectureMode.SINGLE_AGENT:
return stream_new_chat(**stream_kwargs)
entrypoint = MultiAgentEntrypoint()
return entrypoint.stream_new_chat(
fallback_streamer=stream_new_chat,
fallback_kwargs=stream_kwargs,
)
def dispatch_resume_chat_stream(
*,
architecture_mode: str,
stream_kwargs: dict[str, Any],
) -> AsyncGenerator[str, None]:
mode = _resolve_mode(architecture_mode)
if mode == ArchitectureMode.SINGLE_AGENT:
return stream_resume_chat(**stream_kwargs)
entrypoint = MultiAgentEntrypoint()
return entrypoint.stream_resume_chat(
fallback_streamer=stream_resume_chat,
fallback_kwargs=stream_kwargs,
)

View file

@ -42,7 +42,6 @@ from app.agents.new_chat.memory_extraction import (
extract_and_save_memory, extract_and_save_memory,
extract_and_save_team_memory, extract_and_save_team_memory,
) )
from app.agents.new_chat.telemetry import log_architecture_telemetry
from app.db import ( from app.db import (
ChatVisibility, ChatVisibility,
NewChatMessage, NewChatMessage,
@ -150,7 +149,6 @@ class StreamResult:
agent_called_update_memory: bool = False agent_called_update_memory: bool = False
request_id: str | None = None request_id: str | None = None
turn_id: str = "" turn_id: str = ""
architecture_mode: str = "single_agent"
filesystem_mode: str = "cloud" filesystem_mode: str = "cloud"
client_platform: str = "web" client_platform: str = "web"
intent_detected: str = "chat_only" intent_detected: str = "chat_only"
@ -184,7 +182,9 @@ def _tool_output_has_error(tool_output: Any) -> bool:
if tool_output.get("error"): if tool_output.get("error"):
return True return True
result = tool_output.get("result") result = tool_output.get("result")
return isinstance(result, str) and result.strip().lower().startswith("error:") if isinstance(result, str) and result.strip().lower().startswith("error:"):
return True
return False
if isinstance(tool_output, str): if isinstance(tool_output, str):
return tool_output.strip().lower().startswith("error:") return tool_output.strip().lower().startswith("error:")
return False return False
@ -231,7 +231,6 @@ def _log_file_contract(stage: str, result: StreamResult, **extra: Any) -> None:
"request_id": result.request_id or "unknown", "request_id": result.request_id or "unknown",
"turn_id": result.turn_id or "unknown", "turn_id": result.turn_id or "unknown",
"chat_id": result.turn_id.split(":", 1)[0] if ":" in result.turn_id else "unknown", "chat_id": result.turn_id.split(":", 1)[0] if ":" in result.turn_id else "unknown",
"architecture_mode": result.architecture_mode,
"filesystem_mode": result.filesystem_mode, "filesystem_mode": result.filesystem_mode,
"client_platform": result.client_platform, "client_platform": result.client_platform,
"intent_detected": result.intent_detected, "intent_detected": result.intent_detected,
@ -1309,17 +1308,18 @@ async def _stream_agent_events(
result.commit_gate_passed, result.commit_gate_reason = ( result.commit_gate_passed, result.commit_gate_reason = (
_evaluate_file_contract_outcome(result) _evaluate_file_contract_outcome(result)
) )
if not result.commit_gate_passed and _contract_enforcement_active(result): if not result.commit_gate_passed:
gate_notice = ( if _contract_enforcement_active(result):
"I could not complete the requested file write because no successful " gate_notice = (
"write_file/edit_file operation was confirmed." "I could not complete the requested file write because no successful "
) "write_file/edit_file operation was confirmed."
gate_text_id = streaming_service.generate_text_id() )
yield streaming_service.format_text_start(gate_text_id) gate_text_id = streaming_service.generate_text_id()
yield streaming_service.format_text_delta(gate_text_id, gate_notice) yield streaming_service.format_text_start(gate_text_id)
yield streaming_service.format_text_end(gate_text_id) yield streaming_service.format_text_delta(gate_text_id, gate_notice)
yield streaming_service.format_terminal_info(gate_notice, "error") yield streaming_service.format_text_end(gate_text_id)
accumulated_text = gate_notice yield streaming_service.format_terminal_info(gate_notice, "error")
accumulated_text = gate_notice
else: else:
result.commit_gate_passed = True result.commit_gate_passed = True
result.commit_gate_reason = "" result.commit_gate_reason = ""
@ -1351,7 +1351,6 @@ async def stream_new_chat(
filesystem_selection: FilesystemSelection | None = None, filesystem_selection: FilesystemSelection | None = None,
request_id: str | None = None, request_id: str | None = None,
user_image_data_urls: list[str] | None = None, user_image_data_urls: list[str] | None = None,
architecture_mode: str = "single_agent",
) -> AsyncGenerator[str, None]: ) -> AsyncGenerator[str, None]:
""" """
Stream chat responses from the new SurfSense deep agent. Stream chat responses from the new SurfSense deep agent.
@ -1385,22 +1384,8 @@ async def stream_new_chat(
) )
stream_result.request_id = request_id stream_result.request_id = request_id
stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}" stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}"
stream_result.architecture_mode = architecture_mode
stream_result.filesystem_mode = fs_mode stream_result.filesystem_mode = fs_mode
stream_result.client_platform = fs_platform stream_result.client_platform = fs_platform
log_architecture_telemetry(
phase="turn_start",
source="new_chat",
status="started",
architecture_mode=architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=0.0,
token_total=0,
request_id=request_id,
turn_id=stream_result.turn_id,
)
_log_file_contract("turn_start", stream_result) _log_file_contract("turn_start", stream_result)
_perf_log.info( _perf_log.info(
"[stream_new_chat] filesystem_mode=%s client_platform=%s", "[stream_new_chat] filesystem_mode=%s client_platform=%s",
@ -1653,7 +1638,6 @@ async def stream_new_chat(
"search_space_id": search_space_id, "search_space_id": search_space_id,
"request_id": request_id or "unknown", "request_id": request_id or "unknown",
"turn_id": stream_result.turn_id, "turn_id": stream_result.turn_id,
"architecture_mode": architecture_mode,
} }
_perf_log.info( _perf_log.info(
@ -1685,7 +1669,6 @@ async def stream_new_chat(
configurable = {"thread_id": str(chat_id)} configurable = {"thread_id": str(chat_id)}
configurable["request_id"] = request_id or "unknown" configurable["request_id"] = request_id or "unknown"
configurable["turn_id"] = stream_result.turn_id configurable["turn_id"] = stream_result.turn_id
configurable["architecture_mode"] = architecture_mode
if checkpoint_id: if checkpoint_id:
configurable["checkpoint_id"] = checkpoint_id configurable["checkpoint_id"] = checkpoint_id
@ -1901,19 +1884,6 @@ async def stream_new_chat(
"call_details": accumulator.serialized_calls(), "call_details": accumulator.serialized_calls(),
}, },
) )
log_architecture_telemetry(
phase="turn_end",
source="new_chat",
status="interrupted",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
)
yield streaming_service.format_finish_step() yield streaming_service.format_finish_step()
yield streaming_service.format_finish() yield streaming_service.format_finish()
@ -1986,19 +1956,6 @@ async def stream_new_chat(
"call_details": accumulator.serialized_calls(), "call_details": accumulator.serialized_calls(),
}, },
) )
log_architecture_telemetry(
phase="turn_end",
source="new_chat",
status="completed",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
)
# Fire background memory extraction if the agent didn't handle it. # Fire background memory extraction if the agent didn't handle it.
# Shared threads write to team memory; private threads write to user memory. # Shared threads write to team memory; private threads write to user memory.
@ -2043,20 +2000,6 @@ async def stream_new_chat(
print(f"[stream_new_chat] {error_message}") print(f"[stream_new_chat] {error_message}")
print(f"[stream_new_chat] Exception type: {type(e).__name__}") print(f"[stream_new_chat] Exception type: {type(e).__name__}")
print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}") print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}")
log_architecture_telemetry(
phase="turn_end",
source="new_chat",
status="error",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
extra={"error_type": type(e).__name__},
)
yield streaming_service.format_error(error_message) yield streaming_service.format_error(error_message)
yield streaming_service.format_finish_step() yield streaming_service.format_finish_step()
@ -2150,7 +2093,6 @@ async def stream_resume_chat(
thread_visibility: ChatVisibility | None = None, thread_visibility: ChatVisibility | None = None,
filesystem_selection: FilesystemSelection | None = None, filesystem_selection: FilesystemSelection | None = None,
request_id: str | None = None, request_id: str | None = None,
architecture_mode: str = "single_agent",
) -> AsyncGenerator[str, None]: ) -> AsyncGenerator[str, None]:
streaming_service = VercelStreamingService() streaming_service = VercelStreamingService()
stream_result = StreamResult() stream_result = StreamResult()
@ -2161,22 +2103,8 @@ async def stream_resume_chat(
) )
stream_result.request_id = request_id stream_result.request_id = request_id
stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}" stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}"
stream_result.architecture_mode = architecture_mode
stream_result.filesystem_mode = fs_mode stream_result.filesystem_mode = fs_mode
stream_result.client_platform = fs_platform stream_result.client_platform = fs_platform
log_architecture_telemetry(
phase="turn_start",
source="resume_chat",
status="started",
architecture_mode=architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=0.0,
token_total=0,
request_id=request_id,
turn_id=stream_result.turn_id,
)
_log_file_contract("turn_start", stream_result) _log_file_contract("turn_start", stream_result)
_perf_log.info( _perf_log.info(
"[stream_resume] filesystem_mode=%s client_platform=%s", "[stream_resume] filesystem_mode=%s client_platform=%s",
@ -2322,7 +2250,6 @@ async def stream_resume_chat(
"thread_id": str(chat_id), "thread_id": str(chat_id),
"request_id": request_id or "unknown", "request_id": request_id or "unknown",
"turn_id": stream_result.turn_id, "turn_id": stream_result.turn_id,
"architecture_mode": architecture_mode,
}, },
"recursion_limit": 80, "recursion_limit": 80,
} }
@ -2373,19 +2300,6 @@ async def stream_resume_chat(
"call_details": accumulator.serialized_calls(), "call_details": accumulator.serialized_calls(),
}, },
) )
log_architecture_telemetry(
phase="turn_end",
source="resume_chat",
status="interrupted",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
)
yield streaming_service.format_finish_step() yield streaming_service.format_finish_step()
yield streaming_service.format_finish() yield streaming_service.format_finish()
@ -2439,19 +2353,6 @@ async def stream_resume_chat(
"call_details": accumulator.serialized_calls(), "call_details": accumulator.serialized_calls(),
}, },
) )
log_architecture_telemetry(
phase="turn_end",
source="resume_chat",
status="completed",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
)
yield streaming_service.format_finish_step() yield streaming_service.format_finish_step()
yield streaming_service.format_finish() yield streaming_service.format_finish()
@ -2463,20 +2364,6 @@ async def stream_resume_chat(
error_message = f"Error during resume: {e!s}" error_message = f"Error during resume: {e!s}"
print(f"[stream_resume_chat] {error_message}") print(f"[stream_resume_chat] {error_message}")
print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}") print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}")
log_architecture_telemetry(
phase="turn_end",
source="resume_chat",
status="error",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
extra={"error_type": type(e).__name__},
)
yield streaming_service.format_error(error_message) yield streaming_service.format_error(error_message)
yield streaming_service.format_finish_step() yield streaming_service.format_finish_step()
yield streaming_service.format_finish() yield streaming_service.format_finish()