feat(chat): add multi-agent mode routing scaffold and telemetry.

This commit is contained in:
CREDO23 2026-04-28 15:35:14 +02:00
parent 78f71c7e3a
commit 7b9a218d62
13 changed files with 742 additions and 58 deletions

View file

@ -0,0 +1,211 @@
# Multi-Agent Architecture Plan (Phased)
This document defines a phased migration from the current `single_agent` flow to a multi-agent architecture while keeping rollback simple and immediate.
## Naming
- `single_agent`: current architecture (default at start)
- `shadow_multi_agent_v1`: run multi-agent path in background, return `single_agent` output
- `multi_agent_v1`: multi-agent architecture is the user-facing path
---
## Phase 1 - Parallel Safety Layer
**Goal:** Add safe routing controls with zero behavior change.
### Todo
- [ ] Add mode selector with values: `single_agent`, `shadow_multi_agent_v1`, `multi_agent_v1`
- [ ] Add global kill switch: force all traffic to `single_agent`
- [ ] Add mode resolution priority:
1. kill switch
2. request override
3. system default
- [ ] Keep `single_agent` as default mode
- [ ] Keep frontend stream/output contract unchanged
- [ ] Add telemetry tags:
- `architecture_mode`
- `worker_count`
- `retry_count`
- `latency_ms`
- `token_total`
- [ ] Write short rollback runbook
### Exit Criteria
- [ ] Can switch modes in staging
- [ ] Kill switch verified
- [ ] No frontend contract regressions
---
## Phase 2 - Orchestrator Core and Contracts
**Goal:** Build multi-agent control-plane only (planner/router/merge), with strict schemas.
### Todo
- [ ] Implement orchestrator responsibilities:
- intent detection
- routing
- delegation
- fan-in merge
- [ ] Add budget controls:
- max workers per turn
- max parallel workers
- max turn duration
- [ ] Add loop/stall guard:
- repeated task signature detection
- no-progress threshold
- [ ] Define `WorkerTask` schema:
- `domain`, `goal`, `constraints`, `budget`
- [ ] Define `WorkerResult` schema:
- `status`, `summary`, `evidence[]`, `artifacts[]`, `needs_human`, `error_class`
- [ ] Add schema validation on send/receive boundaries
- [ ] Add controlled fallback on invalid worker results
### Exit Criteria
- [ ] Orchestrator works end-to-end with mock workers
- [ ] Invalid worker payloads are blocked cleanly
---
## Phase 3 - Pilot Workers (Gmail and Calendar)
**Goal:** Validate multi-agent architecture with two real domains only.
### Todo
- [ ] Create Gmail worker
- [ ] domain-scoped prompt
- [ ] domain-only tool loadout
- [ ] local query rewrite
- [ ] normalized `WorkerResult`
- [ ] Create Calendar worker
- [ ] domain-scoped prompt
- [ ] domain-only tool loadout
- [ ] local query rewrite/time normalization
- [ ] normalized `WorkerResult`
- [ ] Enforce no cross-domain tool access
- [ ] Preserve HITL for write actions
- [ ] Add retry policy by `error_class`
- [ ] Add tests for routing, loadout isolation, HITL behavior
### Exit Criteria
- [ ] Gmail and Calendar tasks complete in `multi_agent_v1`
- [ ] No cross-domain tool leakage
- [ ] HITL still enforced for sensitive writes
---
## Phase 4 - Knowledge Base and Evidence Normalization
**Goal:** Isolate KB retrieval and make evidence citation-ready.
### Todo
- [ ] Move KB retrieval behind dedicated worker/stage
- [ ] Reuse current KB retrieval logic, but return compact structured evidence only
- [ ] Define `EvidenceItem` fields:
- `claim`, `source_type`, `source_ref`, `confidence`, `snippet`
- [ ] Add top-k and output-size controls
- [ ] Add quote-first extraction mode for long contexts
- [ ] Add tests for traceability and bounded payloads
### Exit Criteria
- [ ] Orchestrator consumes compact evidence (no raw KB dumps)
- [ ] Citation refs remain valid and traceable
---
## Phase 5 - Verifier and Citation Gate
**Goal:** Prevent unsupported factual claims in final responses.
### Todo
- [ ] Add verifier stage before final synthesis
- [ ] Enforce claim-to-evidence checks
- [ ] Add conflict handling policy:
- consistent evidence -> accept
- conflicting evidence -> label uncertainty or retry
- [ ] Add unsupported-claim policy:
- remove claim or mark uncertain
- [ ] Add verifier telemetry:
- supported claims
- unsupported claims
- conflicts
- [ ] Support strict gate and warning modes
### Exit Criteria
- [ ] Unsupported factual claims are blocked or clearly annotated
- [ ] Citation precision improves on evaluation set
---
## Phase 6 - Shadow Evaluation and Canary
**Goal:** Ship based on data, not intuition.
### Todo
- [ ] Enable `shadow_multi_agent_v1` for selected traffic
- [ ] Compare metrics vs `single_agent`:
- success rate
- citation precision
- tool-selection accuracy
- p95 latency
- tokens/request
- cost per successful task
- [ ] Define rollout gates and auto-stop thresholds
- [ ] Start canary rollout for `multi_agent_v1`
- [ ] Ramp traffic only if quality and reliability gates pass
- [ ] Keep kill switch live for entire rollout
- [ ] Record go/no-go decision with evidence
### Exit Criteria
- [ ] Clear decision based on measured outcomes
- [ ] Rollback tested successfully during canary
---
## Phase 7 - Domain Expansion and Heavy Tool Reassignment
**Goal:** Scale multi-agent architecture safely across more domains.
### Todo
- [ ] Add domains incrementally (`notion`, `slack`, `jira`, ...)
- [ ] For each new domain enforce:
- scoped tool loadout
- local query rewrite
- contract validation
- eval plus canary gate
- [ ] Move heavy tools to specialist workers:
- podcast generation
- artifact/report generation
- video presentation
- [ ] Keep orchestrator toolbelt minimal and control-plane focused
- [ ] Regularly prune prompts and tool descriptions
### Exit Criteria
- [ ] New domains onboard without reliability regressions
- [ ] Orchestrator remains lean and stable
- [ ] Cost per successful task stays controlled
---
## Always-On Checklist
- [ ] Keep `single_agent` path healthy until rollout completion
- [ ] Keep one-click rollback available at all times
- [ ] Update observability dashboards every phase
- [ ] Track failure taxonomy and review weekly
- [ ] Validate prompt/tool changes via eval before broad rollout

View file

@ -0,0 +1,70 @@
# Multi-Agent Architecture Phase 1 Runbook
## Scope
This runbook covers mode selection and emergency rollback for:
- `single_agent`
- `shadow_multi_agent_v1`
- `multi_agent_v1`
Phase 1 keeps execution behavior on the current single-agent path while mode wiring and telemetry are introduced.
## Resolution Priority
Mode resolution follows this fixed order:
1. Global kill switch (`FORCE_SINGLE_AGENT`)
2. Request override (`architecture_mode` in chat payload)
3. System default (`AGENT_ARCHITECTURE_MODE`)
4. Safe fallback (`single_agent`)
## Configuration
Set environment values in backend runtime:
- `AGENT_ARCHITECTURE_MODE=single_agent` (default)
- `FORCE_SINGLE_AGENT=FALSE` (default)
Changes require backend restart because config is loaded at process startup.
## Mode Switching
### System default switch
1. Set `AGENT_ARCHITECTURE_MODE` to desired value.
2. Keep `FORCE_SINGLE_AGENT=FALSE`.
3. Restart backend.
4. Verify logs include `[architecture_telemetry]` with expected `architecture_mode`.
### Per-request override
Send optional `architecture_mode` in chat request payload:
- `"single_agent"`
- `"shadow_multi_agent_v1"`
- `"multi_agent_v1"`
If `FORCE_SINGLE_AGENT=TRUE`, request override is ignored by design.
## Emergency Rollback
Use the kill switch:
1. Set `FORCE_SINGLE_AGENT=TRUE`.
2. Restart backend.
3. Verify new requests log `architecture_mode=single_agent`.
4. Keep this state until incident is resolved.
## Verification Checklist
- Mode resolves according to the priority order.
- Kill switch overrides all request/default values.
- Streaming response schema remains unchanged.
- Architecture telemetry is emitted with:
- `architecture_mode`
- `orchestrator_used`
- `worker_count`
- `retry_count`
- `latency_ms`
- `token_total`

View file

@ -0,0 +1,17 @@
"""Multi-agent v1 architecture package."""
from app.agents.multi_agent_v1.contracts import (
GroundingEvidence,
SubagentResult,
SubagentTaskPlan,
WorkerBudget,
)
from app.agents.multi_agent_v1.entrypoint import MultiAgentEntrypoint
__all__ = [
"GroundingEvidence",
"MultiAgentEntrypoint",
"SubagentResult",
"SubagentTaskPlan",
"WorkerBudget",
]

View file

@ -0,0 +1,36 @@
"""Contracts for multi_agent_v1 orchestrator and subagent communication."""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
class WorkerBudget(BaseModel):
max_steps: int = Field(default=1, ge=1)
max_duration_ms: int = Field(default=15_000, ge=100)
class SubagentTaskPlan(BaseModel):
domain: str = Field(..., min_length=1)
goal: str = Field(..., min_length=1)
constraints: list[str] = Field(default_factory=list)
budget: WorkerBudget = Field(default_factory=WorkerBudget)
class GroundingEvidence(BaseModel):
claim: str = Field(..., min_length=1)
source_type: str = Field(..., min_length=1)
source_ref: str = Field(..., min_length=1)
confidence: float = Field(default=0.0, ge=0.0, le=1.0)
snippet: str = ""
class SubagentResult(BaseModel):
status: Literal["success", "partial", "error"]
summary: str = ""
evidence: list[GroundingEvidence] = Field(default_factory=list)
artifacts: list[str] = Field(default_factory=list)
needs_human: bool = False
error_class: str | None = None

View file

@ -0,0 +1,24 @@
"""Multi-agent v1 entrypoint scaffold with safe fallback behavior."""
from __future__ import annotations
from collections.abc import AsyncGenerator, Callable
from typing import Any
class MultiAgentEntrypoint:
def stream_new_chat(
self,
*,
fallback_streamer: Callable[..., AsyncGenerator[str, None]],
fallback_kwargs: dict[str, Any],
) -> AsyncGenerator[str, None]:
return fallback_streamer(**fallback_kwargs)
def stream_resume_chat(
self,
*,
fallback_streamer: Callable[..., AsyncGenerator[str, None]],
fallback_kwargs: dict[str, Any],
) -> AsyncGenerator[str, None]:
return fallback_streamer(**fallback_kwargs)

View file

@ -0,0 +1,40 @@
"""Architecture mode contracts and resolution helpers for chat sessions."""
from __future__ import annotations
from enum import StrEnum
from app.config import config
class ArchitectureMode(StrEnum):
SINGLE_AGENT = "single_agent"
SHADOW_MULTI_AGENT_V1 = "shadow_multi_agent_v1"
MULTI_AGENT_V1 = "multi_agent_v1"
def parse_architecture_mode(value: str | None) -> ArchitectureMode | None:
if not value:
return None
normalized = value.strip().lower()
if not normalized:
return None
try:
return ArchitectureMode(normalized)
except ValueError:
return None
def resolve_architecture_mode(request_override: str | None = None) -> ArchitectureMode:
if config.FORCE_SINGLE_AGENT:
return ArchitectureMode.SINGLE_AGENT
override_mode = parse_architecture_mode(request_override)
if override_mode is not None:
return override_mode
default_mode = parse_architecture_mode(config.AGENT_ARCHITECTURE_MODE)
if default_mode is not None:
return default_mode
return ArchitectureMode.SINGLE_AGENT

View file

@ -0,0 +1,43 @@
"""Architecture telemetry logging for chat execution modes."""
from __future__ import annotations
import json
from typing import Any
from app.utils.perf import get_perf_logger
_perf_log = get_perf_logger()
def log_architecture_telemetry(
*,
phase: str,
architecture_mode: str,
orchestrator_used: bool,
worker_count: int,
retry_count: int,
latency_ms: float,
token_total: int,
request_id: str | None = None,
turn_id: str | None = None,
status: str = "ok",
source: str = "new_chat",
extra: dict[str, Any] | None = None,
) -> None:
payload: dict[str, Any] = {
"phase": phase,
"source": source,
"status": status,
"architecture_mode": architecture_mode,
"orchestrator_used": orchestrator_used,
"worker_count": worker_count,
"retry_count": retry_count,
"latency_ms": round(latency_ms, 2),
"token_total": token_total,
"request_id": request_id or "unknown",
"turn_id": turn_id or "unknown",
}
if extra:
payload.update(extra)
_perf_log.info("[architecture_telemetry] %s", json.dumps(payload, ensure_ascii=False))

View file

@ -342,6 +342,8 @@ class Config:
ENABLE_DESKTOP_LOCAL_FILESYSTEM = (
os.getenv("ENABLE_DESKTOP_LOCAL_FILESYSTEM", "FALSE").upper() == "TRUE"
)
AGENT_ARCHITECTURE_MODE = os.getenv("AGENT_ARCHITECTURE_MODE", "single_agent")
FORCE_SINGLE_AGENT = os.getenv("FORCE_SINGLE_AGENT", "FALSE").upper() == "TRUE"
@classmethod
def is_self_hosted(cls) -> bool:

View file

@ -4,6 +4,7 @@ from __future__ import annotations
import logging
import secrets
import time
import uuid
from pathlib import PurePosixPath
from typing import Any
@ -12,6 +13,11 @@ from fastapi import APIRouter, HTTPException, Request, Response, UploadFile, sta
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from app.agents.new_chat.architecture_mode import (
ArchitectureMode,
resolve_architecture_mode,
)
from app.agents.new_chat.telemetry import log_architecture_telemetry
from app.config import config
from app.etl_pipeline.file_classifier import (
DIRECT_CONVERT_EXTENSIONS,
@ -84,6 +90,7 @@ class AnonChatRequest(BaseModel):
messages: list[dict[str, Any]] = Field(..., min_length=1)
disabled_tools: list[str] | None = None
turnstile_token: str | None = None
architecture_mode: ArchitectureMode | None = None
class AnonQuotaResponse(BaseModel):
@ -361,6 +368,22 @@ async def stream_anonymous_chat(
accumulator = start_turn()
streaming_service = VercelStreamingService()
architecture_mode = resolve_architecture_mode(body.architecture_mode)
started_at = time.perf_counter()
turn_id = f"anon:{session_id}:{request_id}"
log_architecture_telemetry(
phase="turn_start",
source="anon_chat",
status="started",
architecture_mode=architecture_mode.value,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=0.0,
token_total=0,
request_id=request_id,
turn_id=turn_id,
)
try:
async with shielded_async_session() as session:
@ -400,7 +423,10 @@ async def stream_anonymous_chat(
}
langgraph_config = {
"configurable": {"thread_id": anon_thread_id},
"configurable": {
"thread_id": anon_thread_id,
"architecture_mode": architecture_mode.value,
},
"recursion_limit": 40,
}
@ -468,6 +494,19 @@ async def stream_anonymous_chat(
"total_tokens": accumulator.grand_total,
},
)
log_architecture_telemetry(
phase="turn_end",
source="anon_chat",
status="completed",
architecture_mode=architecture_mode.value,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - started_at) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=turn_id,
)
yield streaming_service.format_finish_step()
yield streaming_service.format_finish()
@ -475,6 +514,20 @@ async def stream_anonymous_chat(
except Exception as e:
logger.exception("Anonymous chat stream error")
log_architecture_telemetry(
phase="turn_end",
source="anon_chat",
status="error",
architecture_mode=architecture_mode.value,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - started_at) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=turn_id,
extra={"error_type": type(e).__name__},
)
await TokenQuotaService.anon_release(session_key, ip_key, request_id)
yield streaming_service.format_error(f"Error during chat: {e!s}")
yield streaming_service.format_done()

View file

@ -22,6 +22,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from app.agents.new_chat.architecture_mode import resolve_architecture_mode
from app.agents.new_chat.filesystem_selection import (
ClientPlatform,
FilesystemMode,
@ -61,7 +62,10 @@ from app.schemas.new_chat import (
TokenUsageSummary,
)
from app.services.token_tracking_service import record_token_usage
from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat
from app.tasks.chat.stream_dispatch import (
dispatch_new_chat_stream,
dispatch_resume_chat_stream,
)
from app.users import current_active_user
from app.utils.rbac import check_permission
from app.utils.user_message_multimodal import (
@ -1244,23 +1248,28 @@ async def handle_new_chat(
image_urls = (
[p.as_data_url() for p in request.user_images] if request.user_images else None
)
architecture_mode = resolve_architecture_mode(request.architecture_mode)
return StreamingResponse(
stream_new_chat(
user_query=request.user_query,
search_space_id=request.search_space_id,
chat_id=request.chat_id,
user_id=str(user.id),
llm_config_id=llm_config_id,
mentioned_document_ids=request.mentioned_document_ids,
mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids,
needs_history_bootstrap=thread.needs_history_bootstrap,
thread_visibility=thread.visibility,
current_user_display_name=user.display_name or "A team member",
disabled_tools=request.disabled_tools,
filesystem_selection=filesystem_selection,
request_id=getattr(http_request.state, "request_id", "unknown"),
user_image_data_urls=image_urls,
dispatch_new_chat_stream(
architecture_mode=architecture_mode.value,
stream_kwargs={
"user_query": request.user_query,
"search_space_id": request.search_space_id,
"chat_id": request.chat_id,
"user_id": str(user.id),
"llm_config_id": llm_config_id,
"mentioned_document_ids": request.mentioned_document_ids,
"mentioned_surfsense_doc_ids": request.mentioned_surfsense_doc_ids,
"needs_history_bootstrap": thread.needs_history_bootstrap,
"thread_visibility": thread.visibility,
"current_user_display_name": user.display_name or "A team member",
"disabled_tools": request.disabled_tools,
"filesystem_selection": filesystem_selection,
"request_id": getattr(http_request.state, "request_id", "unknown"),
"user_image_data_urls": image_urls,
"architecture_mode": architecture_mode.value,
},
),
media_type="text/event-stream",
headers={
@ -1458,6 +1467,7 @@ async def regenerate_response(
if request.user_images is not None:
regenerate_image_urls = [p.as_data_url() for p in request.user_images]
architecture_mode = resolve_architecture_mode(request.architecture_mode)
if user_query_to_use is None:
raise HTTPException(
@ -1506,23 +1516,28 @@ async def regenerate_response(
async def stream_with_cleanup():
streaming_completed = False
try:
async for chunk in stream_new_chat(
user_query=str(user_query_to_use),
search_space_id=request.search_space_id,
chat_id=thread_id,
user_id=str(user.id),
llm_config_id=llm_config_id,
mentioned_document_ids=request.mentioned_document_ids,
mentioned_surfsense_doc_ids=request.mentioned_surfsense_doc_ids,
checkpoint_id=target_checkpoint_id,
needs_history_bootstrap=thread.needs_history_bootstrap,
thread_visibility=thread.visibility,
current_user_display_name=user.display_name or "A team member",
disabled_tools=request.disabled_tools,
filesystem_selection=filesystem_selection,
request_id=getattr(http_request.state, "request_id", "unknown"),
user_image_data_urls=regenerate_image_urls or None,
):
stream = dispatch_new_chat_stream(
architecture_mode=architecture_mode.value,
stream_kwargs={
"user_query": str(user_query_to_use),
"search_space_id": request.search_space_id,
"chat_id": thread_id,
"user_id": str(user.id),
"llm_config_id": llm_config_id,
"mentioned_document_ids": request.mentioned_document_ids,
"mentioned_surfsense_doc_ids": request.mentioned_surfsense_doc_ids,
"checkpoint_id": target_checkpoint_id,
"needs_history_bootstrap": thread.needs_history_bootstrap,
"thread_visibility": thread.visibility,
"current_user_display_name": user.display_name or "A team member",
"disabled_tools": request.disabled_tools,
"filesystem_selection": filesystem_selection,
"request_id": getattr(http_request.state, "request_id", "unknown"),
"user_image_data_urls": regenerate_image_urls or None,
"architecture_mode": architecture_mode.value,
},
)
async for chunk in stream:
yield chunk
streaming_completed = True
finally:
@ -1628,6 +1643,7 @@ async def resume_chat(
)
decisions = [d.model_dump() for d in request.decisions]
architecture_mode = resolve_architecture_mode(request.architecture_mode)
# Release the read-transaction so we don't hold ACCESS SHARE locks
# on searchspaces/documents for the entire duration of the stream.
@ -1635,15 +1651,19 @@ async def resume_chat(
await session.close()
return StreamingResponse(
stream_resume_chat(
chat_id=thread_id,
search_space_id=request.search_space_id,
decisions=decisions,
user_id=str(user.id),
llm_config_id=llm_config_id,
thread_visibility=thread.visibility,
filesystem_selection=filesystem_selection,
request_id=getattr(http_request.state, "request_id", "unknown"),
dispatch_resume_chat_stream(
architecture_mode=architecture_mode.value,
stream_kwargs={
"chat_id": thread_id,
"search_space_id": request.search_space_id,
"decisions": decisions,
"user_id": str(user.id),
"llm_config_id": llm_config_id,
"thread_visibility": thread.visibility,
"filesystem_selection": filesystem_selection,
"request_id": getattr(http_request.state, "request_id", "unknown"),
"architecture_mode": architecture_mode.value,
},
),
media_type="text/event-stream",
headers={

View file

@ -176,6 +176,11 @@ class LocalFilesystemMountPayload(BaseModel):
MAX_NEW_CHAT_IMAGE_BYTES = 8 * 1024 * 1024
MAX_NEW_CHAT_IMAGES = 4
ArchitectureModeLiteral = Literal[
"single_agent",
"shadow_multi_agent_v1",
"multi_agent_v1",
]
class NewChatUserImagePart(BaseModel):
@ -210,6 +215,7 @@ class NewChatRequest(BaseModel):
disabled_tools: list[str] | None = (
None # Optional list of tool names the user has disabled from the UI
)
architecture_mode: ArchitectureModeLiteral | None = None
filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud"
client_platform: Literal["web", "desktop"] = "web"
local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None
@ -250,6 +256,7 @@ class RegenerateRequest(BaseModel):
mentioned_document_ids: list[int] | None = None
mentioned_surfsense_doc_ids: list[int] | None = None
disabled_tools: list[str] | None = None
architecture_mode: ArchitectureModeLiteral | None = None
filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud"
client_platform: Literal["web", "desktop"] = "web"
local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None
@ -286,6 +293,7 @@ class ResumeDecision(BaseModel):
class ResumeRequest(BaseModel):
search_space_id: int
decisions: list[ResumeDecision]
architecture_mode: ArchitectureModeLiteral | None = None
filesystem_mode: Literal["cloud", "desktop_local_folder"] = "cloud"
client_platform: Literal["web", "desktop"] = "web"
local_filesystem_mounts: list[LocalFilesystemMountPayload] | None = None

View file

@ -0,0 +1,47 @@
"""Thin architecture dispatch seam for chat streaming entrypoints."""
from __future__ import annotations
from collections.abc import AsyncGenerator
from typing import Any
from app.agents.multi_agent_v1.entrypoint import MultiAgentEntrypoint
from app.agents.new_chat.architecture_mode import (
ArchitectureMode,
parse_architecture_mode,
)
from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat
def _resolve_mode(mode_value: str) -> ArchitectureMode:
return parse_architecture_mode(mode_value) or ArchitectureMode.SINGLE_AGENT
def dispatch_new_chat_stream(
*,
architecture_mode: str,
stream_kwargs: dict[str, Any],
) -> AsyncGenerator[str, None]:
mode = _resolve_mode(architecture_mode)
if mode == ArchitectureMode.SINGLE_AGENT:
return stream_new_chat(**stream_kwargs)
entrypoint = MultiAgentEntrypoint()
return entrypoint.stream_new_chat(
fallback_streamer=stream_new_chat,
fallback_kwargs=stream_kwargs,
)
def dispatch_resume_chat_stream(
*,
architecture_mode: str,
stream_kwargs: dict[str, Any],
) -> AsyncGenerator[str, None]:
mode = _resolve_mode(architecture_mode)
if mode == ArchitectureMode.SINGLE_AGENT:
return stream_resume_chat(**stream_kwargs)
entrypoint = MultiAgentEntrypoint()
return entrypoint.stream_resume_chat(
fallback_streamer=stream_resume_chat,
fallback_kwargs=stream_kwargs,
)

View file

@ -42,6 +42,7 @@ from app.agents.new_chat.memory_extraction import (
extract_and_save_memory,
extract_and_save_team_memory,
)
from app.agents.new_chat.telemetry import log_architecture_telemetry
from app.db import (
ChatVisibility,
NewChatMessage,
@ -149,6 +150,7 @@ class StreamResult:
agent_called_update_memory: bool = False
request_id: str | None = None
turn_id: str = ""
architecture_mode: str = "single_agent"
filesystem_mode: str = "cloud"
client_platform: str = "web"
intent_detected: str = "chat_only"
@ -182,9 +184,7 @@ def _tool_output_has_error(tool_output: Any) -> bool:
if tool_output.get("error"):
return True
result = tool_output.get("result")
if isinstance(result, str) and result.strip().lower().startswith("error:"):
return True
return False
return isinstance(result, str) and result.strip().lower().startswith("error:")
if isinstance(tool_output, str):
return tool_output.strip().lower().startswith("error:")
return False
@ -231,6 +231,7 @@ def _log_file_contract(stage: str, result: StreamResult, **extra: Any) -> None:
"request_id": result.request_id or "unknown",
"turn_id": result.turn_id or "unknown",
"chat_id": result.turn_id.split(":", 1)[0] if ":" in result.turn_id else "unknown",
"architecture_mode": result.architecture_mode,
"filesystem_mode": result.filesystem_mode,
"client_platform": result.client_platform,
"intent_detected": result.intent_detected,
@ -1308,18 +1309,17 @@ async def _stream_agent_events(
result.commit_gate_passed, result.commit_gate_reason = (
_evaluate_file_contract_outcome(result)
)
if not result.commit_gate_passed:
if _contract_enforcement_active(result):
gate_notice = (
"I could not complete the requested file write because no successful "
"write_file/edit_file operation was confirmed."
)
gate_text_id = streaming_service.generate_text_id()
yield streaming_service.format_text_start(gate_text_id)
yield streaming_service.format_text_delta(gate_text_id, gate_notice)
yield streaming_service.format_text_end(gate_text_id)
yield streaming_service.format_terminal_info(gate_notice, "error")
accumulated_text = gate_notice
if not result.commit_gate_passed and _contract_enforcement_active(result):
gate_notice = (
"I could not complete the requested file write because no successful "
"write_file/edit_file operation was confirmed."
)
gate_text_id = streaming_service.generate_text_id()
yield streaming_service.format_text_start(gate_text_id)
yield streaming_service.format_text_delta(gate_text_id, gate_notice)
yield streaming_service.format_text_end(gate_text_id)
yield streaming_service.format_terminal_info(gate_notice, "error")
accumulated_text = gate_notice
else:
result.commit_gate_passed = True
result.commit_gate_reason = ""
@ -1351,6 +1351,7 @@ async def stream_new_chat(
filesystem_selection: FilesystemSelection | None = None,
request_id: str | None = None,
user_image_data_urls: list[str] | None = None,
architecture_mode: str = "single_agent",
) -> AsyncGenerator[str, None]:
"""
Stream chat responses from the new SurfSense deep agent.
@ -1384,8 +1385,22 @@ async def stream_new_chat(
)
stream_result.request_id = request_id
stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}"
stream_result.architecture_mode = architecture_mode
stream_result.filesystem_mode = fs_mode
stream_result.client_platform = fs_platform
log_architecture_telemetry(
phase="turn_start",
source="new_chat",
status="started",
architecture_mode=architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=0.0,
token_total=0,
request_id=request_id,
turn_id=stream_result.turn_id,
)
_log_file_contract("turn_start", stream_result)
_perf_log.info(
"[stream_new_chat] filesystem_mode=%s client_platform=%s",
@ -1638,6 +1653,7 @@ async def stream_new_chat(
"search_space_id": search_space_id,
"request_id": request_id or "unknown",
"turn_id": stream_result.turn_id,
"architecture_mode": architecture_mode,
}
_perf_log.info(
@ -1669,6 +1685,7 @@ async def stream_new_chat(
configurable = {"thread_id": str(chat_id)}
configurable["request_id"] = request_id or "unknown"
configurable["turn_id"] = stream_result.turn_id
configurable["architecture_mode"] = architecture_mode
if checkpoint_id:
configurable["checkpoint_id"] = checkpoint_id
@ -1884,6 +1901,19 @@ async def stream_new_chat(
"call_details": accumulator.serialized_calls(),
},
)
log_architecture_telemetry(
phase="turn_end",
source="new_chat",
status="interrupted",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
)
yield streaming_service.format_finish_step()
yield streaming_service.format_finish()
@ -1956,6 +1986,19 @@ async def stream_new_chat(
"call_details": accumulator.serialized_calls(),
},
)
log_architecture_telemetry(
phase="turn_end",
source="new_chat",
status="completed",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
)
# Fire background memory extraction if the agent didn't handle it.
# Shared threads write to team memory; private threads write to user memory.
@ -2000,6 +2043,20 @@ async def stream_new_chat(
print(f"[stream_new_chat] {error_message}")
print(f"[stream_new_chat] Exception type: {type(e).__name__}")
print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}")
log_architecture_telemetry(
phase="turn_end",
source="new_chat",
status="error",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
extra={"error_type": type(e).__name__},
)
yield streaming_service.format_error(error_message)
yield streaming_service.format_finish_step()
@ -2093,6 +2150,7 @@ async def stream_resume_chat(
thread_visibility: ChatVisibility | None = None,
filesystem_selection: FilesystemSelection | None = None,
request_id: str | None = None,
architecture_mode: str = "single_agent",
) -> AsyncGenerator[str, None]:
streaming_service = VercelStreamingService()
stream_result = StreamResult()
@ -2103,8 +2161,22 @@ async def stream_resume_chat(
)
stream_result.request_id = request_id
stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}"
stream_result.architecture_mode = architecture_mode
stream_result.filesystem_mode = fs_mode
stream_result.client_platform = fs_platform
log_architecture_telemetry(
phase="turn_start",
source="resume_chat",
status="started",
architecture_mode=architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=0.0,
token_total=0,
request_id=request_id,
turn_id=stream_result.turn_id,
)
_log_file_contract("turn_start", stream_result)
_perf_log.info(
"[stream_resume] filesystem_mode=%s client_platform=%s",
@ -2250,6 +2322,7 @@ async def stream_resume_chat(
"thread_id": str(chat_id),
"request_id": request_id or "unknown",
"turn_id": stream_result.turn_id,
"architecture_mode": architecture_mode,
},
"recursion_limit": 80,
}
@ -2300,6 +2373,19 @@ async def stream_resume_chat(
"call_details": accumulator.serialized_calls(),
},
)
log_architecture_telemetry(
phase="turn_end",
source="resume_chat",
status="interrupted",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
)
yield streaming_service.format_finish_step()
yield streaming_service.format_finish()
@ -2353,6 +2439,19 @@ async def stream_resume_chat(
"call_details": accumulator.serialized_calls(),
},
)
log_architecture_telemetry(
phase="turn_end",
source="resume_chat",
status="completed",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
)
yield streaming_service.format_finish_step()
yield streaming_service.format_finish()
@ -2364,6 +2463,20 @@ async def stream_resume_chat(
error_message = f"Error during resume: {e!s}"
print(f"[stream_resume_chat] {error_message}")
print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}")
log_architecture_telemetry(
phase="turn_end",
source="resume_chat",
status="error",
architecture_mode=stream_result.architecture_mode,
orchestrator_used=False,
worker_count=0,
retry_count=0,
latency_ms=(time.perf_counter() - _t_total) * 1000.0,
token_total=accumulator.grand_total,
request_id=request_id,
turn_id=stream_result.turn_id,
extra={"error_type": type(e).__name__},
)
yield streaming_service.format_error(error_message)
yield streaming_service.format_finish_step()
yield streaming_service.format_finish()