From 78f4747382cead46c0c72040002562fe56bc35e4 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 7 May 2026 19:40:10 +0200 Subject: [PATCH] refactor(chat): stream agent events via stream_output and remove parity v2 flag --- docker/.env.example | 1 - surfsense_backend/.env.example | 8 - .../app/agents/new_chat/feature_flags.py | 15 - .../app/services/new_streaming_service.py | 17 +- .../app/tasks/chat/content_builder.py | 8 +- .../app/tasks/chat/stream_new_chat.py | 1524 +---------------- .../streaming/graph_stream/event_stream.py | 2 - .../streaming/handlers/chat_model_stream.py | 4 +- .../chat/streaming/handlers/tool_start.py | 24 +- .../app/tasks/chat/streaming/relay/state.py | 3 - .../agents/new_chat/test_feature_flags.py | 3 - .../chat/streaming/test_stage_2_parity.py | 4 +- .../unit/tasks/chat/test_content_builder.py | 4 +- .../tasks/chat/test_tool_input_streaming.py | 112 +- .../assistant-ui/reasoning-message-part.tsx | 4 +- .../components/assistant-ui/tool-fallback.tsx | 14 +- surfsense_web/lib/chat/streaming-state.ts | 5 +- 17 files changed, 76 insertions(+), 1676 deletions(-) diff --git a/docker/.env.example b/docker/.env.example index fd56bdccc..aba15f13f 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -324,7 +324,6 @@ SURFSENSE_ENABLE_ACTION_LOG=true SURFSENSE_ENABLE_REVERT_ROUTE=true SURFSENSE_ENABLE_PERMISSION=true SURFSENSE_ENABLE_DOOM_LOOP=true -SURFSENSE_ENABLE_STREAM_PARITY_V2=true # Periodic connector sync interval (default: 5m) # SCHEDULE_CHECKER_INTERVAL=5m diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index ba89059c8..3d442973c 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -315,14 +315,6 @@ LANGSMITH_PROJECT=surfsense # SURFSENSE_ENABLE_ACTION_LOG=false # SURFSENSE_ENABLE_REVERT_ROUTE=false # Backend-only; flip when UI ships -# Streaming parity v2 — opt in to LangChain's structured AIMessageChunk -# content (typed reasoning blocks, tool-input deltas) and propagate the -# real tool_call_id to the SSE layer. When OFF, the stream falls back to -# the str-only text path and synthetic "call_" tool-call ids. -# Schema migrations 135/136 ship unconditionally because they are -# forward-compatible. -# SURFSENSE_ENABLE_STREAM_PARITY_V2=false - # Plugins # SURFSENSE_ENABLE_PLUGIN_LOADER=false # Comma-separated allowlist of plugin entry-point names diff --git a/surfsense_backend/app/agents/new_chat/feature_flags.py b/surfsense_backend/app/agents/new_chat/feature_flags.py index b3dc0fa82..3cea051ef 100644 --- a/surfsense_backend/app/agents/new_chat/feature_flags.py +++ b/surfsense_backend/app/agents/new_chat/feature_flags.py @@ -28,7 +28,6 @@ Defaults: SURFSENSE_ENABLE_PERMISSION=true SURFSENSE_ENABLE_DOOM_LOOP=true SURFSENSE_ENABLE_LLM_TOOL_SELECTOR=false # adds a per-turn LLM call - SURFSENSE_ENABLE_STREAM_PARITY_V2=true Master kill-switch (overrides everything else): @@ -88,15 +87,6 @@ class AgentFeatureFlags: enable_action_log: bool = True enable_revert_route: bool = True - # Streaming parity v2 — opt in to LangChain's structured - # ``AIMessageChunk`` content (typed reasoning blocks, tool-input - # deltas) and propagate the real ``tool_call_id`` to the SSE layer. - # When OFF the ``stream_new_chat`` task falls back to the str-only - # text path and the synthetic ``call_`` tool-call id (no - # ``langchainToolCallId`` propagation). Schema migrations 135/136 - # ship unconditionally because they're forward-compatible. - enable_stream_parity_v2: bool = True - # Plugins enable_plugin_loader: bool = False @@ -169,7 +159,6 @@ class AgentFeatureFlags: enable_kb_planner_runnable=False, enable_action_log=False, enable_revert_route=False, - enable_stream_parity_v2=False, enable_plugin_loader=False, enable_otel=False, enable_agent_cache=False, @@ -208,10 +197,6 @@ class AgentFeatureFlags: # Snapshot / revert enable_action_log=_env_bool("SURFSENSE_ENABLE_ACTION_LOG", True), enable_revert_route=_env_bool("SURFSENSE_ENABLE_REVERT_ROUTE", True), - # Streaming parity v2 - enable_stream_parity_v2=_env_bool( - "SURFSENSE_ENABLE_STREAM_PARITY_V2", True - ), # Plugins enable_plugin_loader=_env_bool("SURFSENSE_ENABLE_PLUGIN_LOADER", False), # Observability diff --git a/surfsense_backend/app/services/new_streaming_service.py b/surfsense_backend/app/services/new_streaming_service.py index 55129668c..cec0c8a5e 100644 --- a/surfsense_backend/app/services/new_streaming_service.py +++ b/surfsense_backend/app/services/new_streaming_service.py @@ -608,15 +608,14 @@ class VercelStreamingService: Args: tool_call_id: The unique tool call identifier. May be EITHER the synthetic ``call_`` id derived from LangGraph - ``run_id`` (legacy / ``SURFSENSE_ENABLE_STREAM_PARITY_V2`` - OFF, or the unmatched-fallback path under parity_v2) OR - the authoritative LangChain ``tool_call.id`` (parity_v2 - path: when the provider streams ``tool_call_chunks`` we - register the ``index`` and reuse the lc-id as the card - id so live ``tool-input-delta`` events can be routed - without a downstream join). Either way, the same id is - preserved across ``tool-input-start`` / ``-delta`` / - ``-available`` / ``tool-output-available`` for one call. + ``run_id`` (unmatched chunk fallback when no ``index`` was + registered) OR the authoritative LangChain ``tool_call.id`` + (when the provider streams ``tool_call_chunks`` we register + the ``index`` and reuse the lc-id as the card id so live + ``tool-input-delta`` events route without a downstream join). + Either way, the same id is preserved across + ``tool-input-start`` / ``-delta`` / ``-available`` / + ``tool-output-available`` for one call. tool_name: The name of the tool being called. langchain_tool_call_id: Optional authoritative LangChain ``tool_call.id``. When set, surfaces as diff --git a/surfsense_backend/app/tasks/chat/content_builder.py b/surfsense_backend/app/tasks/chat/content_builder.py index 041cab286..32b49e6b5 100644 --- a/surfsense_backend/app/tasks/chat/content_builder.py +++ b/surfsense_backend/app/tasks/chat/content_builder.py @@ -85,8 +85,8 @@ class AssistantContentBuilder: self._current_text_idx: int = -1 self._current_reasoning_idx: int = -1 # ``ui_id``-keyed indexes for tool-call parts. ``ui_id`` is the - # synthetic ``call_`` (legacy) or the LangChain - # ``tool_call.id`` (parity_v2) — same key the streaming layer + # synthetic ``call_`` (chunk fallback) or the LangChain + # ``tool_call.id`` (indexed chunk path) — same key the streaming layer # threads through every ``tool-input-*`` / ``tool-output-*`` event. self._tool_call_idx_by_ui_id: dict[str, int] = {} # Live argsText accumulator (concatenated ``tool-input-delta`` chunks) @@ -181,7 +181,7 @@ class AssistantContentBuilder: """Register a tool-call card. Args are filled in by later events.""" if not ui_id: return - # Skip duplicate registration: parity_v2 may emit + # Skip duplicate registration: the stream may emit # ``tool-input-start`` from both ``on_chat_model_stream`` # (when tool_call_chunks register a name) and ``on_tool_start`` # (the canonical path). The FE de-dupes via ``toolCallIndices``; @@ -243,7 +243,7 @@ class AssistantContentBuilder: pretty-printed JSON, sets the full ``args`` dict, and backfills ``langchainToolCallId`` if it wasn't known at ``tool-input-start`` time. Also creates the card if no prior ``tool-input-start`` registered it - (legacy parity_v2-OFF / late-registration paths). + (late-registration when no prior ``tool-input-start``). """ if not ui_id: return diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 1a2f38077..8e135179a 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -9,13 +9,11 @@ Supports loading LLM configurations from: - NewLLMConfig database table (positive IDs for user-created configs with prompt settings) """ -import ast import asyncio import contextlib import gc import json import logging -import re import time from collections.abc import AsyncGenerator from dataclasses import dataclass, field @@ -33,7 +31,6 @@ from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent from app.agents.new_chat.checkpointer import get_checkpointer from app.agents.new_chat.context import SurfSenseContextSchema from app.agents.new_chat.errors import BusyError -from app.agents.new_chat.feature_flags import get_flags from app.agents.new_chat.filesystem_selection import FilesystemMode, FilesystemSelection from app.agents.new_chat.llm_config import ( AgentConfig, @@ -77,6 +74,7 @@ from app.services.chat_session_state_service import ( ) from app.services.connector_service import ConnectorService from app.services.new_streaming_service import VercelStreamingService +from app.tasks.chat.streaming.graph_stream.event_stream import stream_output from app.utils.content_utils import bootstrap_history_from_db from app.utils.perf import get_perf_logger, log_system_snapshot, trim_native_heap from app.utils.user_message_multimodal import build_human_message_content @@ -729,9 +727,9 @@ def _legacy_match_lc_id( ) -> str | None: """Best-effort match a buffered ``tool_call_chunk`` to a tool name. - Pure extract of the legacy in-line match used at ``on_tool_start`` for - parity_v2-OFF and unmatched (chunk path didn't register an index for - this call) tools. Pops the next id-bearing chunk whose ``name`` + Pure extract of the in-line match used at ``on_tool_start`` when the + chunk path didn't register an index for this call. Pops the next + id-bearing chunk whose ``name`` matches ``tool_name`` (or any id-bearing chunk as a fallback) and returns its id. Mutates ``pending_tool_call_chunks`` and ``lc_tool_call_id_by_run`` in place. @@ -803,1505 +801,22 @@ async def _stream_agent_events( Yields: SSE-formatted strings for each event. """ - accumulated_text = "" - current_text_id: str | None = None - thinking_step_counter = 1 if initial_step_id else 0 - tool_step_ids: dict[str, str] = {} - completed_step_ids: set[str] = set() - last_active_step_id: str | None = initial_step_id - last_active_step_title: str = initial_step_title - last_active_step_items: list[str] = initial_step_items or [] - just_finished_tool: bool = False - active_tool_depth: int = 0 # Track nesting: >0 means we're inside a tool - called_update_memory: bool = False + async for sse in stream_output( + agent=agent, + config=config, + input_data=input_data, + streaming_service=streaming_service, + result=result, + step_prefix=step_prefix, + initial_step_id=initial_step_id, + initial_step_title=initial_step_title, + initial_step_items=initial_step_items, + content_builder=content_builder, + runtime_context=runtime_context, + ): + yield sse - # Reasoning-block streaming. We open a reasoning block on the - # first reasoning delta of a step, append deltas as they arrive, and - # close it when text starts (the model has switched to writing its - # answer) or ``on_chat_model_end`` fires for the model node. Reuses - # the same Vercel format-helpers as text-start/delta/end. - current_reasoning_id: str | None = None - - # Streaming-parity v2 feature flag. When OFF we keep the legacy - # shape: str-only content, no reasoning blocks, no - # ``langchainToolCallId`` propagation. The schema migrations - # (135 / 136) ship unconditionally because they're forward-compatible. - parity_v2 = bool(get_flags().enable_stream_parity_v2) - - # Best-effort attach of LangChain ``tool_call_id`` to the synthetic - # ``call_`` card id we already emit. We accumulate - # ``tool_call_chunks`` from ``on_chat_model_stream``, key them by - # name, and pop the next unconsumed entry at ``on_tool_start``. The - # authoritative id is later filled in at ``on_tool_end`` from - # ``ToolMessage.tool_call_id``. Under parity_v2 we ALSO short-circuit - # this list for chunks that already registered into ``index_to_meta`` - # below — so this list is reserved for the parity_v2-OFF / unmatched - # fallback path only and never re-pops a chunk we already streamed. - pending_tool_call_chunks: list[dict[str, Any]] = [] - lc_tool_call_id_by_run: dict[str, str] = {} - file_path_by_run: dict[str, str] = {} - - # parity_v2 only: live tool-call argument streaming. ``index_to_meta`` - # is keyed by the chunk's ``index`` field — LangChain - # ``ToolCallChunk``s for the same call share an index but only the - # first chunk carries id+name (subsequent ones are id=None, - # name=None, args=""). We register an index when both id and - # name are observed on a chunk (per ToolCallChunk semantics they - # arrive together on the first chunk), then route every later chunk - # at that index to the same ``ui_id`` as a ``tool-input-delta``. - # ``ui_tool_call_id_by_run`` maps LangGraph ``run_id`` to the - # ``ui_id`` used for that call's ``tool-input-start`` so the matching - # ``tool-output-available`` (emitted from ``on_tool_end``) lands on - # the same card. - index_to_meta: dict[int, dict[str, str]] = {} - ui_tool_call_id_by_run: dict[str, str] = {} - - # Per-tool-end mutable cache for the LangChain tool_call_id resolved - # at ``on_tool_end``. ``_emit_tool_output`` reads this so every - # ``format_tool_output_available`` call automatically carries the - # authoritative id without duplicating the kwarg at every call site. - current_lc_tool_call_id: dict[str, str | None] = {"value": None} - - def _emit_tool_output(call_id: str, output: Any) -> str: - # Drive the builder before formatting the SSE so the in-memory - # ContentPart[] mirror sees the result attached to the same - # card the FE will render. Builder method is a no-op when - # ``content_builder`` is None (anonymous / legacy paths). - if content_builder is not None: - content_builder.on_tool_output_available( - call_id, output, current_lc_tool_call_id["value"] - ) - return streaming_service.format_tool_output_available( - call_id, - output, - langchain_tool_call_id=current_lc_tool_call_id["value"], - ) - - def _emit_thinking_step( - *, - step_id: str, - title: str, - status: str = "in_progress", - items: list[str] | None = None, - ) -> str: - """Format a thinking-step SSE event and notify the builder. - - Single helper used at every ``format_thinking_step`` yield site - in this generator. Drives ``AssistantContentBuilder.on_thinking_step`` - first so the FE-mirror state lands the update before the SSE - carrying the same data leaves the wire — order matches the FE - pipeline (``processSharedStreamEvent`` updates state, then - flushes). Builder call is a no-op when ``content_builder`` is - None (anonymous / legacy paths). - """ - if content_builder is not None: - content_builder.on_thinking_step(step_id, title, status, items) - return streaming_service.format_thinking_step( - step_id=step_id, - title=title, - status=status, - items=items, - ) - - def next_thinking_step_id() -> str: - nonlocal thinking_step_counter - thinking_step_counter += 1 - return f"{step_prefix}-{thinking_step_counter}" - - def complete_current_step() -> str | None: - nonlocal last_active_step_id - if last_active_step_id and last_active_step_id not in completed_step_ids: - completed_step_ids.add(last_active_step_id) - event = _emit_thinking_step( - step_id=last_active_step_id, - title=last_active_step_title, - status="completed", - items=last_active_step_items if last_active_step_items else None, - ) - last_active_step_id = None - return event - return None - - # Per-invocation runtime context (Phase 1.5). When supplied, - # ``KnowledgePriorityMiddleware`` reads ``mentioned_document_ids`` - # from ``runtime.context`` instead of its constructor closure — the - # prerequisite that lets the compiled-agent cache (Phase 1) reuse a - # single graph across turns. Astream_events_kwargs stays empty when - # callers leave ``runtime_context`` as ``None`` to preserve the - # legacy code path bit-for-bit. - astream_kwargs: dict[str, Any] = {"config": config, "version": "v2"} - if runtime_context is not None: - astream_kwargs["context"] = runtime_context - - async for event in agent.astream_events(input_data, **astream_kwargs): - event_type = event.get("event", "") - - if event_type == "on_chat_model_stream": - if active_tool_depth > 0: - continue # Suppress inner-tool LLM tokens from leaking into chat - if "surfsense:internal" in event.get("tags", []): - continue # Suppress middleware-internal LLM tokens (e.g. KB search classification) - chunk = event.get("data", {}).get("chunk") - if not chunk: - continue - parts = _extract_chunk_parts(chunk) - - reasoning_delta = parts["reasoning"] - text_delta = parts["text"] - - # Reasoning streaming. Open a reasoning block on first - # delta; append every subsequent delta until text begins. - # When text starts we close the reasoning block first so the - # frontend sees the natural hand-off. Gated behind the - # parity-v2 flag so legacy deployments keep today's shape. - if parity_v2 and reasoning_delta: - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - if content_builder is not None: - content_builder.on_text_end(current_text_id) - current_text_id = None - if current_reasoning_id is None: - completion_event = complete_current_step() - if completion_event: - yield completion_event - if just_finished_tool: - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - just_finished_tool = False - current_reasoning_id = streaming_service.generate_reasoning_id() - yield streaming_service.format_reasoning_start(current_reasoning_id) - if content_builder is not None: - content_builder.on_reasoning_start(current_reasoning_id) - yield streaming_service.format_reasoning_delta( - current_reasoning_id, reasoning_delta - ) - if content_builder is not None: - content_builder.on_reasoning_delta( - current_reasoning_id, reasoning_delta - ) - - if text_delta: - if current_reasoning_id is not None: - yield streaming_service.format_reasoning_end(current_reasoning_id) - if content_builder is not None: - content_builder.on_reasoning_end(current_reasoning_id) - current_reasoning_id = None - if current_text_id is None: - completion_event = complete_current_step() - if completion_event: - yield completion_event - if just_finished_tool: - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - just_finished_tool = False - current_text_id = streaming_service.generate_text_id() - yield streaming_service.format_text_start(current_text_id) - if content_builder is not None: - content_builder.on_text_start(current_text_id) - yield streaming_service.format_text_delta(current_text_id, text_delta) - accumulated_text += text_delta - if content_builder is not None: - content_builder.on_text_delta(current_text_id, text_delta) - - # Live tool-call argument streaming. Runs AFTER text/reasoning - # processing so chunks containing both stay in their natural - # wire order (text → text-end → tool-input-start). Active - # text/reasoning are closed inside the registration branch - # before ``tool-input-start`` so the frontend sees a clean - # part boundary even when providers interleave. - if parity_v2 and parts["tool_call_chunks"]: - for tcc in parts["tool_call_chunks"]: - idx = tcc.get("index") - - # Register this index when we first see id+name - # TOGETHER. Per LangChain ToolCallChunk semantics the - # first chunk for a tool call carries both fields - # together; later chunks have id=None, name=None and - # only ``args``. Requiring BOTH keeps wire - # ``tool-input-start`` always carrying a real - # toolName (assistant-ui's typed tool-part dispatch - # keys off it). - if idx is not None and idx not in index_to_meta: - lc_id = tcc.get("id") - name = tcc.get("name") - if lc_id and name: - ui_id = lc_id - - # Close active text/reasoning so wire - # ordering stays clean even on providers - # that interleave text and tool-call chunks - # within the same stream window. - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - if content_builder is not None: - content_builder.on_text_end(current_text_id) - current_text_id = None - if current_reasoning_id is not None: - yield streaming_service.format_reasoning_end( - current_reasoning_id - ) - if content_builder is not None: - content_builder.on_reasoning_end( - current_reasoning_id - ) - current_reasoning_id = None - - index_to_meta[idx] = { - "ui_id": ui_id, - "lc_id": lc_id, - "name": name, - } - yield streaming_service.format_tool_input_start( - ui_id, - name, - langchain_tool_call_id=lc_id, - ) - if content_builder is not None: - content_builder.on_tool_input_start(ui_id, name, lc_id) - - # Emit args delta for any chunk at a registered - # index (including idless continuations). Once an - # index is owned by ``index_to_meta`` we DO NOT - # append to ``pending_tool_call_chunks`` — that list - # is reserved for the parity_v2-OFF / unmatched - # fallback path so it never re-pops chunks already - # consumed here (skip-append). - meta = index_to_meta.get(idx) if idx is not None else None - if meta: - args_chunk = tcc.get("args") or "" - if args_chunk: - yield streaming_service.format_tool_input_delta( - meta["ui_id"], args_chunk - ) - if content_builder is not None: - content_builder.on_tool_input_delta( - meta["ui_id"], args_chunk - ) - else: - pending_tool_call_chunks.append(tcc) - - elif event_type == "on_tool_start": - active_tool_depth += 1 - tool_name = event.get("name", "unknown_tool") - run_id = event.get("run_id", "") - tool_input = event.get("data", {}).get("input", {}) - if tool_name in ("write_file", "edit_file"): - result.write_attempted = True - if isinstance(tool_input, dict): - file_path = tool_input.get("file_path") - if isinstance(file_path, str) and file_path.strip() and run_id: - file_path_by_run[run_id] = file_path.strip() - - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - if content_builder is not None: - content_builder.on_text_end(current_text_id) - current_text_id = None - - if last_active_step_title != "Synthesizing response": - completion_event = complete_current_step() - if completion_event: - yield completion_event - - just_finished_tool = False - tool_step_id = next_thinking_step_id() - tool_step_ids[run_id] = tool_step_id - last_active_step_id = tool_step_id - - if tool_name == "ls": - ls_path = ( - tool_input.get("path", "/") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Listing files" - last_active_step_items = [ls_path] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Listing files", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "read_file": - fp = ( - tool_input.get("file_path", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - display_fp = fp if len(fp) <= 80 else "…" + fp[-77:] - last_active_step_title = "Reading file" - last_active_step_items = [display_fp] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Reading file", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "write_file": - fp = ( - tool_input.get("file_path", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - display_fp = fp if len(fp) <= 80 else "…" + fp[-77:] - last_active_step_title = "Writing file" - last_active_step_items = [display_fp] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Writing file", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "edit_file": - fp = ( - tool_input.get("file_path", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - display_fp = fp if len(fp) <= 80 else "…" + fp[-77:] - last_active_step_title = "Editing file" - last_active_step_items = [display_fp] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Editing file", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "glob": - pat = ( - tool_input.get("pattern", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - base_path = ( - tool_input.get("path", "/") if isinstance(tool_input, dict) else "/" - ) - last_active_step_title = "Searching files" - last_active_step_items = [f"{pat} in {base_path}"] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Searching files", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "grep": - pat = ( - tool_input.get("pattern", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - grep_path = ( - tool_input.get("path", "") if isinstance(tool_input, dict) else "" - ) - display_pat = pat[:60] + ("…" if len(pat) > 60 else "") - last_active_step_title = "Searching content" - last_active_step_items = [ - f'"{display_pat}"' + (f" in {grep_path}" if grep_path else "") - ] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Searching content", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "rm": - rm_path = ( - tool_input.get("path", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - display_path = rm_path if len(rm_path) <= 80 else "…" + rm_path[-77:] - last_active_step_title = "Deleting file" - last_active_step_items = [display_path] if display_path else [] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Deleting file", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "rmdir": - rmdir_path = ( - tool_input.get("path", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - display_path = ( - rmdir_path if len(rmdir_path) <= 80 else "…" + rmdir_path[-77:] - ) - last_active_step_title = "Deleting folder" - last_active_step_items = [display_path] if display_path else [] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Deleting folder", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "mkdir": - mkdir_path = ( - tool_input.get("path", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - display_path = ( - mkdir_path if len(mkdir_path) <= 80 else "…" + mkdir_path[-77:] - ) - last_active_step_title = "Creating folder" - last_active_step_items = [display_path] if display_path else [] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Creating folder", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "move_file": - src = ( - tool_input.get("source_path", "") - if isinstance(tool_input, dict) - else "" - ) - dst = ( - tool_input.get("destination_path", "") - if isinstance(tool_input, dict) - else "" - ) - display_src = src if len(src) <= 60 else "…" + src[-57:] - display_dst = dst if len(dst) <= 60 else "…" + dst[-57:] - last_active_step_title = "Moving file" - last_active_step_items = ( - [f"{display_src} → {display_dst}"] if src or dst else [] - ) - yield _emit_thinking_step( - step_id=tool_step_id, - title="Moving file", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "write_todos": - todos = ( - tool_input.get("todos", []) if isinstance(tool_input, dict) else [] - ) - todo_count = len(todos) if isinstance(todos, list) else 0 - last_active_step_title = "Planning tasks" - last_active_step_items = ( - [f"{todo_count} task{'s' if todo_count != 1 else ''}"] - if todo_count - else [] - ) - yield _emit_thinking_step( - step_id=tool_step_id, - title="Planning tasks", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "save_document": - doc_title = ( - tool_input.get("title", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - display_title = doc_title[:60] + ("…" if len(doc_title) > 60 else "") - last_active_step_title = "Saving document" - last_active_step_items = [display_title] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Saving document", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "generate_image": - prompt = ( - tool_input.get("prompt", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Generating image" - last_active_step_items = [ - f"Prompt: {prompt[:80]}{'...' if len(prompt) > 80 else ''}" - ] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Generating image", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "scrape_webpage": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Scraping webpage" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Scraping webpage", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "generate_podcast": - podcast_title = ( - tool_input.get("podcast_title", "SurfSense Podcast") - if isinstance(tool_input, dict) - else "SurfSense Podcast" - ) - content_len = len( - tool_input.get("source_content", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Generating podcast" - last_active_step_items = [ - f"Title: {podcast_title}", - f"Content: {content_len:,} characters", - "Preparing audio generation...", - ] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Generating podcast", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "generate_report": - report_topic = ( - tool_input.get("topic", "Report") - if isinstance(tool_input, dict) - else "Report" - ) - is_revision = bool( - isinstance(tool_input, dict) and tool_input.get("parent_report_id") - ) - step_title = "Revising report" if is_revision else "Generating report" - last_active_step_title = step_title - last_active_step_items = [ - f"Topic: {report_topic}", - "Analyzing source content...", - ] - yield _emit_thinking_step( - step_id=tool_step_id, - title=step_title, - status="in_progress", - items=last_active_step_items, - ) - elif tool_name in ("execute", "execute_code"): - cmd = ( - tool_input.get("command", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - display_cmd = cmd[:80] + ("…" if len(cmd) > 80 else "") - last_active_step_title = "Running command" - last_active_step_items = [f"$ {display_cmd}"] - yield _emit_thinking_step( - step_id=tool_step_id, - title="Running command", - status="in_progress", - items=last_active_step_items, - ) - else: - # Fallback for tools without a curated thinking-step title - # (typically connector tools, MCP-registered tools, or - # newly added tools that haven't been wired up here yet). - # Render the snake_cased name as a sentence-cased phrase - # so non-technical users see e.g. "Send gmail email" - # rather than the raw identifier "send_gmail_email". - last_active_step_title = ( - tool_name.replace("_", " ").strip().capitalize() or tool_name - ) - last_active_step_items = [] - yield _emit_thinking_step( - step_id=tool_step_id, - title=last_active_step_title, - status="in_progress", - ) - - # Resolve the card identity. If the chunk-emission loop - # already registered an ``index`` for this tool call (parity_v2 - # path), reuse the same ui_id so the card sees: - # tool-input-start → deltas… → tool-input-available → - # tool-output-available all keyed by lc_id. Otherwise fall - # back to the synthetic ``call_`` id and the legacy - # best-effort match against ``pending_tool_call_chunks``. - matched_meta: dict[str, str] | None = None - if parity_v2: - # FIFO over indices 0,1,2…; first unassigned same-name - # match wins. Handles parallel same-name calls (e.g. two - # write_file calls) deterministically as long as the - # model interleaves on_tool_start in the same order it - # streamed the args. - taken_ui_ids = set(ui_tool_call_id_by_run.values()) - for meta in index_to_meta.values(): - if meta["name"] == tool_name and meta["ui_id"] not in taken_ui_ids: - matched_meta = meta - break - - tool_call_id: str - langchain_tool_call_id: str | None = None - if matched_meta is not None: - tool_call_id = matched_meta["ui_id"] - langchain_tool_call_id = matched_meta["lc_id"] - # ``tool-input-start`` already fired during chunk - # emission — skip the duplicate. No pruning is needed - # because the chunk-emission loop intentionally never - # appends registered-index chunks to - # ``pending_tool_call_chunks`` (skip-append). - if run_id: - lc_tool_call_id_by_run[run_id] = matched_meta["lc_id"] - else: - tool_call_id = ( - f"call_{run_id[:32]}" - if run_id - else streaming_service.generate_tool_call_id() - ) - # Legacy fallback: parity_v2 OFF, or parity_v2 ON but the - # provider didn't stream tool_call_chunks for this call - # (no index registered). Run the existing best-effort - # match BEFORE emitting start so we still attach an - # authoritative ``langchainToolCallId`` when possible. - if parity_v2: - langchain_tool_call_id = _legacy_match_lc_id( - pending_tool_call_chunks, - tool_name, - run_id, - lc_tool_call_id_by_run, - ) - yield streaming_service.format_tool_input_start( - tool_call_id, - tool_name, - langchain_tool_call_id=langchain_tool_call_id, - ) - if content_builder is not None: - content_builder.on_tool_input_start( - tool_call_id, tool_name, langchain_tool_call_id - ) - - if run_id: - ui_tool_call_id_by_run[run_id] = tool_call_id - - # Sanitize tool_input: strip runtime-injected non-serializable - # values (e.g. LangChain ToolRuntime) before sending over SSE. - if isinstance(tool_input, dict): - _safe_input: dict[str, Any] = {} - for _k, _v in tool_input.items(): - try: - json.dumps(_v) - _safe_input[_k] = _v - except (TypeError, ValueError, OverflowError): - pass - else: - _safe_input = {"input": tool_input} - yield streaming_service.format_tool_input_available( - tool_call_id, - tool_name, - _safe_input, - langchain_tool_call_id=langchain_tool_call_id, - ) - if content_builder is not None: - content_builder.on_tool_input_available( - tool_call_id, - tool_name, - _safe_input, - langchain_tool_call_id, - ) - - elif event_type == "on_tool_end": - active_tool_depth = max(0, active_tool_depth - 1) - run_id = event.get("run_id", "") - tool_name = event.get("name", "unknown_tool") - raw_output = event.get("data", {}).get("output", "") - staged_file_path = file_path_by_run.pop(run_id, None) if run_id else None - - if tool_name == "update_memory": - called_update_memory = True - - if hasattr(raw_output, "content"): - content = raw_output.content - if isinstance(content, str): - try: - tool_output = json.loads(content) - except (json.JSONDecodeError, TypeError): - tool_output = {"result": content} - elif isinstance(content, dict): - tool_output = content - else: - tool_output = {"result": str(content)} - elif isinstance(raw_output, dict): - tool_output = raw_output - else: - tool_output = {"result": str(raw_output) if raw_output else "completed"} - - if tool_name in ("write_file", "edit_file"): - if _tool_output_has_error(tool_output): - # Keep successful evidence if a previous write/edit in this turn succeeded. - pass - else: - result.write_succeeded = True - result.verification_succeeded = True - - # Look up the SAME card id used at on_tool_start (either the - # parity_v2 lc-id-derived ui_id or the legacy synthetic - # ``call_``) so the output event always lands on the - # same card as start/delta/available. Fallback preserves the - # legacy synthetic shape for parity_v2-OFF / unknown-run paths. - tool_call_id = ui_tool_call_id_by_run.get( - run_id, - f"call_{run_id[:32]}" if run_id else "call_unknown", - ) - original_step_id = tool_step_ids.get( - run_id, f"{step_prefix}-unknown-{run_id[:8]}" - ) - completed_step_ids.add(original_step_id) - - # Authoritative LangChain tool_call_id from the returned - # ``ToolMessage``. Falls back to whatever we matched - # at ``on_tool_start`` time (kept in ``lc_tool_call_id_by_run``) - # if the output isn't a ToolMessage. The value is stored in - # ``current_lc_tool_call_id`` so ``_emit_tool_output`` - # picks it up for every output emit below. - # - # Emitted in BOTH parity_v2 and legacy modes: the chat tool - # card needs the LangChain id to match against the - # ``data-action-log`` SSE event (keyed by ``lc_tool_call_id``) - # so the inline Revert button can light up. Reading - # ``raw_output.tool_call_id`` is a cheap, non-mutating attribute - # access that is safe regardless of feature-flag state. - current_lc_tool_call_id["value"] = None - authoritative = getattr(raw_output, "tool_call_id", None) - if isinstance(authoritative, str) and authoritative: - current_lc_tool_call_id["value"] = authoritative - if run_id: - lc_tool_call_id_by_run[run_id] = authoritative - elif run_id and run_id in lc_tool_call_id_by_run: - current_lc_tool_call_id["value"] = lc_tool_call_id_by_run[run_id] - - if tool_name == "read_file": - yield _emit_thinking_step( - step_id=original_step_id, - title="Reading file", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "write_file": - yield _emit_thinking_step( - step_id=original_step_id, - title="Writing file", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "edit_file": - yield _emit_thinking_step( - step_id=original_step_id, - title="Editing file", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "glob": - yield _emit_thinking_step( - step_id=original_step_id, - title="Searching files", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "grep": - yield _emit_thinking_step( - step_id=original_step_id, - title="Searching content", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "rm": - yield _emit_thinking_step( - step_id=original_step_id, - title="Deleting file", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "rmdir": - yield _emit_thinking_step( - step_id=original_step_id, - title="Deleting folder", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "mkdir": - yield _emit_thinking_step( - step_id=original_step_id, - title="Creating folder", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "move_file": - yield _emit_thinking_step( - step_id=original_step_id, - title="Moving file", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "write_todos": - yield _emit_thinking_step( - step_id=original_step_id, - title="Planning tasks", - status="completed", - items=last_active_step_items, - ) - elif tool_name == "save_document": - result_str = ( - tool_output.get("result", "") - if isinstance(tool_output, dict) - else str(tool_output) - ) - is_error = "Error" in result_str - completed_items = [ - *last_active_step_items, - result_str[:80] if is_error else "Saved to knowledge base", - ] - yield _emit_thinking_step( - step_id=original_step_id, - title="Saving document", - status="completed", - items=completed_items, - ) - elif tool_name == "generate_image": - if isinstance(tool_output, dict) and not tool_output.get("error"): - completed_items = [ - *last_active_step_items, - "Image generated successfully", - ] - else: - error_msg = ( - tool_output.get("error", "Generation failed") - if isinstance(tool_output, dict) - else "Generation failed" - ) - completed_items = [*last_active_step_items, f"Error: {error_msg}"] - yield _emit_thinking_step( - step_id=original_step_id, - title="Generating image", - status="completed", - items=completed_items, - ) - elif tool_name == "scrape_webpage": - if isinstance(tool_output, dict): - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to scrape')[:50]}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:50]}{'...' if len(title) > 50 else ''}", - f"Extracted: {word_count:,} words", - ] - else: - completed_items = [*last_active_step_items, "Content extracted"] - yield _emit_thinking_step( - step_id=original_step_id, - title="Scraping webpage", - status="completed", - items=completed_items, - ) - elif tool_name == "generate_podcast": - podcast_status = ( - tool_output.get("status", "unknown") - if isinstance(tool_output, dict) - else "unknown" - ) - podcast_title = ( - tool_output.get("title", "Podcast") - if isinstance(tool_output, dict) - else "Podcast" - ) - if podcast_status in ("pending", "generating", "processing"): - completed_items = [ - f"Title: {podcast_title}", - "Podcast generation started", - "Processing in background...", - ] - elif podcast_status == "already_generating": - completed_items = [ - f"Title: {podcast_title}", - "Podcast already in progress", - "Please wait for it to complete", - ] - elif podcast_status in ("failed", "error"): - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - completed_items = [ - f"Title: {podcast_title}", - f"Error: {error_msg[:50]}", - ] - elif podcast_status in ("ready", "success"): - completed_items = [ - f"Title: {podcast_title}", - "Podcast ready", - ] - else: - completed_items = last_active_step_items - yield _emit_thinking_step( - step_id=original_step_id, - title="Generating podcast", - status="completed", - items=completed_items, - ) - elif tool_name == "generate_video_presentation": - vp_status = ( - tool_output.get("status", "unknown") - if isinstance(tool_output, dict) - else "unknown" - ) - vp_title = ( - tool_output.get("title", "Presentation") - if isinstance(tool_output, dict) - else "Presentation" - ) - if vp_status in ("pending", "generating"): - completed_items = [ - f"Title: {vp_title}", - "Presentation generation started", - "Processing in background...", - ] - elif vp_status == "failed": - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - completed_items = [ - f"Title: {vp_title}", - f"Error: {error_msg[:50]}", - ] - else: - completed_items = last_active_step_items - yield _emit_thinking_step( - step_id=original_step_id, - title="Generating video presentation", - status="completed", - items=completed_items, - ) - elif tool_name == "generate_report": - report_status = ( - tool_output.get("status", "unknown") - if isinstance(tool_output, dict) - else "unknown" - ) - report_title = ( - tool_output.get("title", "Report") - if isinstance(tool_output, dict) - else "Report" - ) - word_count = ( - tool_output.get("word_count", 0) - if isinstance(tool_output, dict) - else 0 - ) - is_revision = ( - tool_output.get("is_revision", False) - if isinstance(tool_output, dict) - else False - ) - step_title = "Revising report" if is_revision else "Generating report" - - if report_status == "ready": - completed_items = [ - f"Topic: {report_title}", - f"{word_count:,} words", - "Report ready", - ] - elif report_status == "failed": - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - completed_items = [ - f"Topic: {report_title}", - f"Error: {error_msg[:50]}", - ] - else: - completed_items = last_active_step_items - - yield _emit_thinking_step( - step_id=original_step_id, - title=step_title, - status="completed", - items=completed_items, - ) - elif tool_name in ("execute", "execute_code"): - raw_text = ( - tool_output.get("result", "") - if isinstance(tool_output, dict) - else str(tool_output) - ) - m = re.match(r"^Exit code:\s*(\d+)", raw_text) - exit_code_val = int(m.group(1)) if m else None - if exit_code_val is not None and exit_code_val == 0: - completed_items = [ - *last_active_step_items, - "Completed successfully", - ] - elif exit_code_val is not None: - completed_items = [ - *last_active_step_items, - f"Exit code: {exit_code_val}", - ] - else: - completed_items = [*last_active_step_items, "Finished"] - yield _emit_thinking_step( - step_id=original_step_id, - title="Running command", - status="completed", - items=completed_items, - ) - elif tool_name == "ls": - if isinstance(tool_output, dict): - ls_output = tool_output.get("result", "") - elif isinstance(tool_output, str): - ls_output = tool_output - else: - ls_output = str(tool_output) if tool_output else "" - file_names: list[str] = [] - if ls_output: - paths: list[str] = [] - try: - parsed = ast.literal_eval(ls_output) - if isinstance(parsed, list): - paths = [str(p) for p in parsed] - except (ValueError, SyntaxError): - paths = [ - line.strip() - for line in ls_output.strip().split("\n") - if line.strip() - ] - for p in paths: - name = p.rstrip("/").split("/")[-1] - if name and len(name) <= 40: - file_names.append(name) - elif name: - file_names.append(name[:37] + "...") - if file_names: - if len(file_names) <= 5: - completed_items = [f"[{name}]" for name in file_names] - else: - completed_items = [f"[{name}]" for name in file_names[:4]] - completed_items.append(f"(+{len(file_names) - 4} more)") - else: - completed_items = ["No files found"] - yield _emit_thinking_step( - step_id=original_step_id, - title="Listing files", - status="completed", - items=completed_items, - ) - else: - # Fallback completion title — see the matching in-progress - # branch above for the wording rationale. - fallback_title = ( - tool_name.replace("_", " ").strip().capitalize() or tool_name - ) - yield _emit_thinking_step( - step_id=original_step_id, - title=fallback_title, - status="completed", - items=last_active_step_items, - ) - - just_finished_tool = True - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - - if tool_name == "generate_podcast": - yield _emit_tool_output( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if isinstance(tool_output, dict) and tool_output.get("status") in ( - "pending", - "generating", - "processing", - ): - yield streaming_service.format_terminal_info( - f"Podcast queued: {tool_output.get('title', 'Podcast')}", - "success", - ) - elif isinstance(tool_output, dict) and tool_output.get("status") in ( - "ready", - "success", - ): - yield streaming_service.format_terminal_info( - f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", - "success", - ) - elif isinstance(tool_output, dict) and tool_output.get("status") in ( - "failed", - "error", - ): - error_msg = tool_output.get("error", "Unknown error") - yield streaming_service.format_terminal_info( - f"Podcast generation failed: {error_msg}", - "error", - ) - elif tool_name == "generate_video_presentation": - yield _emit_tool_output( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if ( - isinstance(tool_output, dict) - and tool_output.get("status") == "pending" - ): - yield streaming_service.format_terminal_info( - f"Video presentation queued: {tool_output.get('title', 'Presentation')}", - "success", - ) - elif ( - isinstance(tool_output, dict) - and tool_output.get("status") == "failed" - ): - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - yield streaming_service.format_terminal_info( - f"Presentation generation failed: {error_msg}", - "error", - ) - elif tool_name == "generate_image": - yield _emit_tool_output( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if isinstance(tool_output, dict): - if tool_output.get("error"): - yield streaming_service.format_terminal_info( - f"Image generation failed: {tool_output['error'][:60]}", - "error", - ) - else: - yield streaming_service.format_terminal_info( - "Image generated successfully", - "success", - ) - elif tool_name == "scrape_webpage": - if isinstance(tool_output, dict): - display_output = { - k: v for k, v in tool_output.items() if k != "content" - } - if "content" in tool_output: - content = tool_output.get("content", "") - display_output["content_preview"] = ( - content[:500] + "..." if len(content) > 500 else content - ) - yield _emit_tool_output( - tool_call_id, - display_output, - ) - else: - yield _emit_tool_output( - tool_call_id, - {"result": tool_output}, - ) - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - yield streaming_service.format_terminal_info( - f"Scraped: {title[:40]}{'...' if len(title) > 40 else ''} ({word_count:,} words)", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to scrape") - if isinstance(tool_output, dict) - else "Failed to scrape" - ) - yield streaming_service.format_terminal_info( - f"Scrape failed: {error_msg}", - "error", - ) - elif tool_name in ("write_file", "edit_file"): - resolved_path = _extract_resolved_file_path( - tool_name=tool_name, - tool_output=tool_output, - tool_input={"file_path": staged_file_path} - if staged_file_path - else None, - ) - result_text = _tool_output_to_text(tool_output) - if _tool_output_has_error(tool_output): - yield _emit_tool_output( - tool_call_id, - { - "status": "error", - "error": result_text, - "path": resolved_path, - }, - ) - else: - yield _emit_tool_output( - tool_call_id, - { - "status": "completed", - "path": resolved_path, - "result": result_text, - }, - ) - elif tool_name == "generate_report": - # Stream the full report result so frontend can render the ReportCard - yield _emit_tool_output( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - # Send appropriate terminal message based on status - if ( - isinstance(tool_output, dict) - and tool_output.get("status") == "ready" - ): - word_count = tool_output.get("word_count", 0) - yield streaming_service.format_terminal_info( - f"Report generated: {tool_output.get('title', 'Report')} ({word_count:,} words)", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - yield streaming_service.format_terminal_info( - f"Report generation failed: {error_msg}", - "error", - ) - elif tool_name == "generate_resume": - yield _emit_tool_output( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if ( - isinstance(tool_output, dict) - and tool_output.get("status") == "ready" - ): - yield streaming_service.format_terminal_info( - f"Resume generated: {tool_output.get('title', 'Resume')}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - yield streaming_service.format_terminal_info( - f"Resume generation failed: {error_msg}", - "error", - ) - elif tool_name in ( - "create_notion_page", - "update_notion_page", - "delete_notion_page", - "create_linear_issue", - "update_linear_issue", - "delete_linear_issue", - "create_google_drive_file", - "delete_google_drive_file", - "create_onedrive_file", - "delete_onedrive_file", - "create_dropbox_file", - "delete_dropbox_file", - "create_gmail_draft", - "update_gmail_draft", - "send_gmail_email", - "trash_gmail_email", - "create_calendar_event", - "update_calendar_event", - "delete_calendar_event", - "create_jira_issue", - "update_jira_issue", - "delete_jira_issue", - "create_confluence_page", - "update_confluence_page", - "delete_confluence_page", - ): - yield _emit_tool_output( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - elif tool_name in ("execute", "execute_code"): - raw_text = ( - tool_output.get("result", "") - if isinstance(tool_output, dict) - else str(tool_output) - ) - exit_code: int | None = None - output_text = raw_text - m = re.match(r"^Exit code:\s*(\d+)", raw_text) - if m: - exit_code = int(m.group(1)) - om = re.search(r"\nOutput:\n([\s\S]*)", raw_text) - output_text = om.group(1) if om else "" - thread_id_str = config.get("configurable", {}).get("thread_id", "") - - for sf_match in re.finditer( - r"^SANDBOX_FILE:\s*(.+)$", output_text, re.MULTILINE - ): - fpath = sf_match.group(1).strip() - if fpath and fpath not in result.sandbox_files: - result.sandbox_files.append(fpath) - - yield _emit_tool_output( - tool_call_id, - { - "exit_code": exit_code, - "output": output_text, - "thread_id": thread_id_str, - }, - ) - elif tool_name == "web_search": - xml = ( - tool_output.get("result", str(tool_output)) - if isinstance(tool_output, dict) - else str(tool_output) - ) - citations: dict[str, dict[str, str]] = {} - for m in re.finditer( - r"<!\[CDATA\[(.*?)\]\]>\s*", - xml, - ): - title, url = m.group(1).strip(), m.group(2).strip() - if url.startswith("http") and url not in citations: - citations[url] = {"title": title} - for m in re.finditer( - r"", - xml, - ): - chunk_url, content = m.group(1).strip(), m.group(2).strip() - if ( - chunk_url.startswith("http") - and chunk_url in citations - and content - ): - citations[chunk_url]["snippet"] = ( - content[:200] + "…" if len(content) > 200 else content - ) - yield _emit_tool_output( - tool_call_id, - {"status": "completed", "citations": citations}, - ) - else: - yield _emit_tool_output( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - f"Tool {tool_name} completed", "success" - ) - - elif event_type == "on_custom_event" and event.get("name") == "report_progress": - # Live progress updates from inside the generate_report tool - data = event.get("data", {}) - message = data.get("message", "") - if message and last_active_step_id: - phase = data.get("phase", "") - # Always keep the "Topic: ..." line - topic_items = [ - item for item in last_active_step_items if item.startswith("Topic:") - ] - - if phase in ("revising_section", "adding_section"): - # During section-level ops: keep plan summary + show current op - plan_items = [ - item - for item in last_active_step_items - if item.startswith("Topic:") - or item.startswith("Modifying ") - or item.startswith("Adding ") - or item.startswith("Removing ") - ] - # Only keep plan_items that don't end with "..." (not progress lines) - plan_items = [ - item for item in plan_items if not item.endswith("...") - ] - last_active_step_items = [*plan_items, message] - else: - # Phase transitions: replace everything after topic - last_active_step_items = [*topic_items, message] - - yield _emit_thinking_step( - step_id=last_active_step_id, - title=last_active_step_title, - status="in_progress", - items=last_active_step_items, - ) - - elif ( - event_type == "on_custom_event" and event.get("name") == "document_created" - ): - data = event.get("data", {}) - if data.get("id"): - yield streaming_service.format_data( - "documents-updated", - { - "action": "created", - "document": data, - }, - ) - - elif event_type == "on_custom_event" and event.get("name") == "action_log": - # Surface a freshly committed AgentActionLog row so the chat - # tool card can render its Revert button immediately. - data = event.get("data", {}) - if data.get("id") is not None: - yield streaming_service.format_data("action-log", data) - - elif ( - event_type == "on_custom_event" - and event.get("name") == "action_log_updated" - ): - # Reversibility flipped in kb_persistence after the SAVEPOINT - # for a destructive op (rm/rmdir/move/edit/write) committed. - # Frontend uses this to flip the card's Revert - # button on without re-fetching the actions list. - data = event.get("data", {}) - if data.get("id") is not None: - yield streaming_service.format_data("action-log-updated", data) - - elif event_type in ("on_chain_end", "on_agent_end"): - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - if content_builder is not None: - content_builder.on_text_end(current_text_id) - current_text_id = None - - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - if content_builder is not None: - content_builder.on_text_end(current_text_id) - - completion_event = complete_current_step() - if completion_event: - yield completion_event + accumulated_text = result.accumulated_text state = await agent.aget_state(config) state_values = getattr(state, "values", {}) or {} @@ -2397,7 +912,6 @@ async def _stream_agent_events( result.commit_gate_reason = "" result.accumulated_text = accumulated_text - result.agent_called_update_memory = called_update_memory _log_file_contract("turn_outcome", result) interrupt_value = _first_interrupt_value(state) diff --git a/surfsense_backend/app/tasks/chat/streaming/graph_stream/event_stream.py b/surfsense_backend/app/tasks/chat/streaming/graph_stream/event_stream.py index 9142dd914..9a309f9d7 100644 --- a/surfsense_backend/app/tasks/chat/streaming/graph_stream/event_stream.py +++ b/surfsense_backend/app/tasks/chat/streaming/graph_stream/event_stream.py @@ -5,7 +5,6 @@ from __future__ import annotations from collections.abc import AsyncIterator from typing import Any -from app.agents.new_chat.feature_flags import get_flags from app.tasks.chat.streaming.graph_stream.result import StreamingResult from app.tasks.chat.streaming.relay.event_relay import EventRelay from app.tasks.chat.streaming.relay.state import AgentEventRelayState @@ -30,7 +29,6 @@ async def stream_output( initial_step_id=initial_step_id, initial_step_title=initial_step_title, initial_step_items=initial_step_items, - parity_v2=bool(get_flags().enable_stream_parity_v2), ) astream_kwargs: dict[str, Any] = {"config": config, "version": "v2"} diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/chat_model_stream.py b/surfsense_backend/app/tasks/chat/streaming/handlers/chat_model_stream.py index 861342b32..ef86dae56 100644 --- a/surfsense_backend/app/tasks/chat/streaming/handlers/chat_model_stream.py +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/chat_model_stream.py @@ -33,7 +33,7 @@ def iter_chat_model_stream_frames( reasoning_delta = parts["reasoning"] text_delta = parts["text"] - if state.parity_v2 and reasoning_delta: + if reasoning_delta: if state.current_text_id is not None: yield streaming_service.format_text_end(state.current_text_id) if content_builder is not None: @@ -100,7 +100,7 @@ def iter_chat_model_stream_frames( if content_builder is not None: content_builder.on_text_delta(state.current_text_id, text_delta) - if state.parity_v2 and parts["tool_call_chunks"]: + if parts["tool_call_chunks"]: for tcc in parts["tool_call_chunks"]: idx = tcc.get("index") diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tool_start.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tool_start.py index c316cc74a..e7d2d7f78 100644 --- a/surfsense_backend/app/tasks/chat/streaming/handlers/tool_start.py +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tool_start.py @@ -77,12 +77,11 @@ def iter_tool_start_frames( yield emit_thinking_step_frame(**frame_kw) matched_meta: dict[str, str] | None = None - if state.parity_v2: - taken_ui_ids = set(state.ui_tool_call_id_by_run.values()) - for meta in state.index_to_meta.values(): - if meta["name"] == tool_name and meta["ui_id"] not in taken_ui_ids: - matched_meta = meta - break + taken_ui_ids = set(state.ui_tool_call_id_by_run.values()) + for meta in state.index_to_meta.values(): + if meta["name"] == tool_name and meta["ui_id"] not in taken_ui_ids: + matched_meta = meta + break tool_call_id: str langchain_tool_call_id: str | None = None @@ -97,13 +96,12 @@ def iter_tool_start_frames( if run_id else streaming_service.generate_tool_call_id() ) - if state.parity_v2: - langchain_tool_call_id = match_buffered_langchain_tool_call_id( - state.pending_tool_call_chunks, - tool_name, - run_id, - state.lc_tool_call_id_by_run, - ) + langchain_tool_call_id = match_buffered_langchain_tool_call_id( + state.pending_tool_call_chunks, + tool_name, + run_id, + state.lc_tool_call_id_by_run, + ) yield streaming_service.format_tool_input_start( tool_call_id, tool_name, diff --git a/surfsense_backend/app/tasks/chat/streaming/relay/state.py b/surfsense_backend/app/tasks/chat/streaming/relay/state.py index e8e35d0b2..7bd996606 100644 --- a/surfsense_backend/app/tasks/chat/streaming/relay/state.py +++ b/surfsense_backend/app/tasks/chat/streaming/relay/state.py @@ -22,7 +22,6 @@ class AgentEventRelayState: active_tool_depth: int = 0 called_update_memory: bool = False current_reasoning_id: str | None = None - parity_v2: bool = False pending_tool_call_chunks: list[dict[str, Any]] = field(default_factory=list) lc_tool_call_id_by_run: dict[str, str] = field(default_factory=dict) file_path_by_run: dict[str, str] = field(default_factory=dict) @@ -39,7 +38,6 @@ class AgentEventRelayState: initial_step_id: str | None = None, initial_step_title: str = "", initial_step_items: list[str] | None = None, - parity_v2: bool, ) -> AgentEventRelayState: counter = 1 if initial_step_id else 0 return cls( @@ -47,7 +45,6 @@ class AgentEventRelayState: last_active_step_id=initial_step_id, last_active_step_title=initial_step_title, last_active_step_items=list(initial_step_items or []), - parity_v2=parity_v2, ) def next_thinking_step_id(self, step_prefix: str) -> str: diff --git a/surfsense_backend/tests/unit/agents/new_chat/test_feature_flags.py b/surfsense_backend/tests/unit/agents/new_chat/test_feature_flags.py index 6800be2af..099aea882 100644 --- a/surfsense_backend/tests/unit/agents/new_chat/test_feature_flags.py +++ b/surfsense_backend/tests/unit/agents/new_chat/test_feature_flags.py @@ -31,7 +31,6 @@ def _clear_all(monkeypatch: pytest.MonkeyPatch) -> None: "SURFSENSE_ENABLE_KB_PLANNER_RUNNABLE", "SURFSENSE_ENABLE_ACTION_LOG", "SURFSENSE_ENABLE_REVERT_ROUTE", - "SURFSENSE_ENABLE_STREAM_PARITY_V2", "SURFSENSE_ENABLE_PLUGIN_LOADER", "SURFSENSE_ENABLE_OTEL", "SURFSENSE_ENABLE_AGENT_CACHE", @@ -61,7 +60,6 @@ def test_defaults_match_shipped_agent_stack(monkeypatch: pytest.MonkeyPatch) -> assert flags.enable_kb_planner_runnable is True assert flags.enable_action_log is True assert flags.enable_revert_route is True - assert flags.enable_stream_parity_v2 is True assert flags.enable_plugin_loader is False assert flags.enable_otel is False # Phase 2: agent cache is now default-on (the prerequisite tool @@ -127,7 +125,6 @@ def test_each_flag_can_be_set_independently(monkeypatch: pytest.MonkeyPatch) -> "enable_kb_planner_runnable": "SURFSENSE_ENABLE_KB_PLANNER_RUNNABLE", "enable_action_log": "SURFSENSE_ENABLE_ACTION_LOG", "enable_revert_route": "SURFSENSE_ENABLE_REVERT_ROUTE", - "enable_stream_parity_v2": "SURFSENSE_ENABLE_STREAM_PARITY_V2", "enable_plugin_loader": "SURFSENSE_ENABLE_PLUGIN_LOADER", "enable_otel": "SURFSENSE_ENABLE_OTEL", } diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_2_parity.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_2_parity.py index 892bb7a6a..9ae7defec 100644 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_2_parity.py +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_2_parity.py @@ -137,7 +137,7 @@ def test_complete_active_thinking_step_mirrors_closure_semantics() -> None: def test_agent_event_relay_state_factory_matches_counter_rule() -> None: - s0 = AgentEventRelayState.for_invocation(parity_v2=False) + s0 = AgentEventRelayState.for_invocation() assert s0.thinking_step_counter == 0 assert s0.last_active_step_id is None @@ -145,11 +145,9 @@ def test_agent_event_relay_state_factory_matches_counter_rule() -> None: initial_step_id="thinking-resume-1", initial_step_title="Inherited", initial_step_items=["Topic: X"], - parity_v2=True, ) assert s1.thinking_step_counter == 1 assert s1.last_active_step_id == "thinking-resume-1" - assert s1.parity_v2 is True assert s1.next_thinking_step_id("thinking") == "thinking-2" diff --git a/surfsense_backend/tests/unit/tasks/chat/test_content_builder.py b/surfsense_backend/tests/unit/tasks/chat/test_content_builder.py index c317eba20..4b1fadd9c 100644 --- a/surfsense_backend/tests/unit/tasks/chat/test_content_builder.py +++ b/surfsense_backend/tests/unit/tasks/chat/test_content_builder.py @@ -161,7 +161,7 @@ class TestToolHeavyTurn: _assert_jsonb_safe(snap) def test_tool_input_available_without_prior_start_creates_card(self): - # Legacy / parity_v2-OFF path: tool-input-available may be + # Late-registration: tool-input-available may be # emitted without a prior tool-input-start (no streamed # tool_call_chunks). The card should still be created. b = AssistantContentBuilder() @@ -187,7 +187,7 @@ class TestToolHeavyTurn: assert part["result"] == {"matches": 3} def test_tool_input_start_idempotent_for_same_ui_id(self): - # parity_v2: tool-input-start can fire from BOTH the chunk + # tool-input-start can fire from BOTH the chunk # registration path AND the canonical ``on_tool_start`` path. # The second call must not create a duplicate part. b = AssistantContentBuilder() diff --git a/surfsense_backend/tests/unit/tasks/chat/test_tool_input_streaming.py b/surfsense_backend/tests/unit/tasks/chat/test_tool_input_streaming.py index 60750396c..ada32d168 100644 --- a/surfsense_backend/tests/unit/tasks/chat/test_tool_input_streaming.py +++ b/surfsense_backend/tests/unit/tasks/chat/test_tool_input_streaming.py @@ -1,16 +1,13 @@ """Unit tests for live tool-call argument streaming. -Pins the wire format that ``_stream_agent_events`` emits when -``SURFSENSE_ENABLE_STREAM_PARITY_V2=true``: ``tool-input-start`` → -``tool-input-delta``... → ``tool-input-available`` → ``tool-output-available`` -all keyed by the same LangChain ``tool_call.id``. +Pins the wire format that ``_stream_agent_events`` emits: +``tool-input-start`` → ``tool-input-delta``... → ``tool-input-available`` → +``tool-output-available``, keyed consistently with LangChain ``tool_call.id`` +when the model streams indexed chunks. Identity is tracked in ``index_to_meta`` (per-chunk ``index``) and -``ui_tool_call_id_by_run`` (LangGraph ``run_id``); both are private to -``_stream_agent_events`` so we exercise them via the public wire output. - -These tests also lock in the legacy / parity_v2-OFF behaviour so the -synthetic ``call_`` shape stays stable for older clients. +``ui_tool_call_id_by_run`` (LangGraph ``run_id``); both are internal to the +streaming layer so we assert on the public SSE payloads. """ from __future__ import annotations @@ -22,8 +19,6 @@ from typing import Any import pytest -import app.tasks.chat.stream_new_chat as stream_module -from app.agents.new_chat.feature_flags import AgentFeatureFlags from app.services.new_streaming_service import VercelStreamingService from app.tasks.chat.stream_new_chat import ( StreamResult, @@ -164,24 +159,6 @@ def _tool_end( } -@pytest.fixture -def parity_v2_on(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr( - stream_module, - "get_flags", - lambda: AgentFeatureFlags(enable_stream_parity_v2=True), - ) - - -@pytest.fixture -def parity_v2_off(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr( - stream_module, - "get_flags", - lambda: AgentFeatureFlags(enable_stream_parity_v2=False), - ) - - async def _drain( events: list[dict[str, Any]], state: _FakeAgentState | None = None ) -> list[dict[str, Any]]: @@ -253,12 +230,12 @@ class TestLegacyMatch: # --------------------------------------------------------------------------- -# parity_v2 wire format tests. +# Tool input streaming wire format # --------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_idless_chunk_merging_by_index(parity_v2_on: None) -> None: +async def test_idless_chunk_merging_by_index() -> None: """First chunk carries id+name; later idless chunks at the same ``index`` merge into the SAME ``tool-input-start`` ui id and emit one ``tool-input-delta`` per chunk.""" @@ -302,9 +279,7 @@ async def test_idless_chunk_merging_by_index(parity_v2_on: None) -> None: @pytest.mark.asyncio -async def test_two_interleaved_tool_calls_route_by_index( - parity_v2_on: None, -) -> None: +async def test_two_interleaved_tool_calls_route_by_index() -> None: """Two same-name calls with distinct indices keep their deltas routed to the right card.""" events = [ @@ -344,7 +319,7 @@ async def test_two_interleaved_tool_calls_route_by_index( @pytest.mark.asyncio -async def test_identity_stable_across_lifecycle(parity_v2_on: None) -> None: +async def test_identity_stable_across_lifecycle() -> None: """Whatever id ``tool-input-start`` chose must be the SAME id used on ``tool-input-available`` AND ``tool-output-available``.""" events = [ @@ -367,7 +342,7 @@ async def test_identity_stable_across_lifecycle(parity_v2_on: None) -> None: @pytest.mark.asyncio -async def test_no_duplicate_tool_input_start(parity_v2_on: None) -> None: +async def test_no_duplicate_tool_input_start() -> None: """When the chunk-emission loop already fired ``tool-input-start`` for this run, ``on_tool_start`` MUST NOT emit a second one.""" events = [ @@ -386,9 +361,7 @@ async def test_no_duplicate_tool_input_start(parity_v2_on: None) -> None: @pytest.mark.asyncio -async def test_active_text_closes_before_early_tool_input_start( - parity_v2_on: None, -) -> None: +async def test_active_text_closes_before_early_tool_input_start() -> None: """Streaming a text-delta then a tool-call chunk in subsequent chunks: the wire MUST contain ``text-end`` before the FIRST ``tool-input-start`` (clean part boundary on the frontend).""" @@ -409,9 +382,7 @@ async def test_active_text_closes_before_early_tool_input_start( @pytest.mark.asyncio -async def test_mixed_text_and_tool_chunk_preserve_order( - parity_v2_on: None, -) -> None: +async def test_mixed_text_and_tool_chunk_preserve_order() -> None: """One AIMessageChunk that carries BOTH ``text`` content AND ``tool_call_chunks`` should emit the text delta FIRST, then close text, then ``tool-input-start``+``tool-input-delta``.""" @@ -441,45 +412,7 @@ async def test_mixed_text_and_tool_chunk_preserve_order( @pytest.mark.asyncio -async def test_parity_v2_off_preserves_legacy_shape( - parity_v2_off: None, -) -> None: - """When the flag is OFF, no deltas are emitted and the ``toolCallId`` - is ``call_`` (NOT the lc id).""" - events = [ - _model_stream( - tool_call_chunks=[ - {"id": "lc-1", "name": "ls", "args": '{"path":"/"}', "index": 0} - ] - ), - _tool_start(name="ls", run_id="run-A", input_payload={"path": "/"}), - _tool_end(name="ls", run_id="run-A", tool_call_id="lc-1"), - ] - payloads = await _drain(events) - - assert _of_type(payloads, "tool-input-delta") == [] - starts = _of_type(payloads, "tool-input-start") - assert len(starts) == 1 - assert starts[0]["toolCallId"].startswith("call_run-A") - # No ``langchainToolCallId`` propagation on ``tool-input-start`` in - # legacy mode (the start event fires before the ToolMessage is - # available, so we can't extract the authoritative LangChain id yet). - assert "langchainToolCallId" not in starts[0] - output = _of_type(payloads, "tool-output-available") - assert output[0]["toolCallId"].startswith("call_run-A") - # ``tool-output-available`` MUST carry ``langchainToolCallId`` even - # in legacy mode: the chat tool card uses it to backfill the - # LangChain id and join against the ``data-action-log`` SSE event - # (keyed by ``lc_tool_call_id``) so the inline Revert button can - # light up. Sourced from the returned ``ToolMessage.tool_call_id``, - # which is populated regardless of feature-flag state. - assert output[0]["langchainToolCallId"] == "lc-1" - - -@pytest.mark.asyncio -async def test_skip_append_prevents_stale_id_reuse( - parity_v2_on: None, -) -> None: +async def test_skip_append_prevents_stale_id_reuse() -> None: """Two same-name tools: the SECOND tool's ``langchainToolCallId`` must NOT come from the first tool's chunk (``pending_tool_call_chunks`` must stay empty for indexed-registered chunks).""" @@ -506,9 +439,7 @@ async def test_skip_append_prevents_stale_id_reuse( @pytest.mark.asyncio -async def test_registration_waits_for_both_id_and_name( - parity_v2_on: None, -) -> None: +async def test_registration_waits_for_both_id_and_name() -> None: """An id-only chunk (no name yet) must NOT emit ``tool-input-start``.""" events = [ _model_stream( @@ -520,12 +451,9 @@ async def test_registration_waits_for_both_id_and_name( @pytest.mark.asyncio -async def test_unmatched_fallback_still_attaches_lc_id( - parity_v2_on: None, -) -> None: - """parity_v2 ON, but the provider didn't include an ``index``: the - legacy fallback path must still emit ``tool-input-start`` with the - matching ``langchainToolCallId``.""" +async def test_unmatched_fallback_still_attaches_lc_id() -> None: + """When the provider omits chunk ``index``, buffered chunks still get a + ``tool-input-start`` with the matching ``langchainToolCallId``.""" events = [ # No index on the chunk → not registered into index_to_meta; # falls through to ``pending_tool_call_chunks`` so the legacy @@ -542,9 +470,7 @@ async def test_unmatched_fallback_still_attaches_lc_id( @pytest.mark.asyncio -async def test_interrupt_request_uses_task_that_contains_interrupt( - parity_v2_on: None, -) -> None: +async def test_interrupt_request_uses_task_that_contains_interrupt() -> None: interrupt_payload = { "type": "calendar_event_create", "action": { diff --git a/surfsense_web/components/assistant-ui/reasoning-message-part.tsx b/surfsense_web/components/assistant-ui/reasoning-message-part.tsx index 70636eab8..6e7aaf048 100644 --- a/surfsense_web/components/assistant-ui/reasoning-message-part.tsx +++ b/surfsense_web/components/assistant-ui/reasoning-message-part.tsx @@ -7,8 +7,8 @@ import { TextShimmerLoader } from "@/components/prompt-kit/loader"; import { cn } from "@/lib/utils"; /** - * Renders the structured `reasoning` part emitted by the backend's - * stream-parity v2 path (A1). + * Renders the structured `reasoning` part emitted by the backend stream + * (typed reasoning deltas from the chat model). * * Behaviour mirrors the existing `ThinkingStepsDisplay`: * - collapsed by default; diff --git a/surfsense_web/components/assistant-ui/tool-fallback.tsx b/surfsense_web/components/assistant-ui/tool-fallback.tsx index 06082c9c7..ba58f4158 100644 --- a/surfsense_web/components/assistant-ui/tool-fallback.tsx +++ b/surfsense_web/components/assistant-ui/tool-fallback.tsx @@ -48,13 +48,11 @@ import { cn } from "@/lib/utils"; * stream, post-stream reversibility flip, and explicit revert clicks. * * Match key (in priority order): - * 1. ``a.tool_call_id === toolCallId`` — direct hit in parity_v2 when - * the model streamed ``tool_call_chunks`` so the card's synthetic - * id IS the LangChain id. - * 2. ``a.tool_call_id === langchainToolCallId`` — legacy mode (or - * parity_v2 with provider-side chunk emission) where the card's - * synthetic id is ``call_`` and the LangChain id is - * backfilled onto the part by ``tool-output-available``. + * 1. ``a.tool_call_id === toolCallId`` — direct hit when the model + * streamed ``tool_call_chunks`` so the card id matches the LangChain id. + * 2. ``a.tool_call_id === langchainToolCallId`` — synthetic card id is + * ``call_`` and the LangChain id is backfilled by + * ``tool-output-available``. * 3. ``(chat_turn_id, tool_name, position-within-turn)`` — fallback * for cards whose synthetic id is ``call_`` AND whose * ``langchainToolCallId`` never got backfilled (provider emitted @@ -116,7 +114,7 @@ function ToolCardRevertButton({ const action = useMemo(() => { // Tier 1 + 2: O(1) Map-backed direct id match. Covers - // ~all parity_v2 streams and any legacy stream that backfilled + // Indexed chunk streams and any stream that backfilled // ``langchainToolCallId`` via ``tool-output-available``. const direct = findByToolCallId(toolCallId) ?? findByToolCallId(langchainToolCallId); if (direct) return direct; diff --git a/surfsense_web/lib/chat/streaming-state.ts b/surfsense_web/lib/chat/streaming-state.ts index 27047ecfe..809e214d1 100644 --- a/surfsense_web/lib/chat/streaming-state.ts +++ b/surfsense_web/lib/chat/streaming-state.ts @@ -421,9 +421,8 @@ export type SSEEvent = /** * Live tool-call argument delta. Concatenated into * ``argsText`` on the matching ``tool-call`` content part - * by ``appendToolInputDelta``. parity_v2 only — the legacy - * code path emits ``tool-input-available`` without prior - * deltas. + * by ``appendToolInputDelta``. Some providers emit + * ``tool-input-available`` without prior deltas. */ type: "tool-input-delta"; toolCallId: string;