refactor(chat): stream agent events via stream_output and remove parity v2 flag

This commit is contained in:
CREDO23 2026-05-07 19:40:10 +02:00
parent 7e07092f67
commit 78f4747382
17 changed files with 76 additions and 1676 deletions

View file

@ -28,7 +28,6 @@ Defaults:
SURFSENSE_ENABLE_PERMISSION=true
SURFSENSE_ENABLE_DOOM_LOOP=true
SURFSENSE_ENABLE_LLM_TOOL_SELECTOR=false # adds a per-turn LLM call
SURFSENSE_ENABLE_STREAM_PARITY_V2=true
Master kill-switch (overrides everything else):
@ -88,15 +87,6 @@ class AgentFeatureFlags:
enable_action_log: bool = True
enable_revert_route: bool = True
# Streaming parity v2 — opt in to LangChain's structured
# ``AIMessageChunk`` content (typed reasoning blocks, tool-input
# deltas) and propagate the real ``tool_call_id`` to the SSE layer.
# When OFF the ``stream_new_chat`` task falls back to the str-only
# text path and the synthetic ``call_<run_id>`` tool-call id (no
# ``langchainToolCallId`` propagation). Schema migrations 135/136
# ship unconditionally because they're forward-compatible.
enable_stream_parity_v2: bool = True
# Plugins
enable_plugin_loader: bool = False
@ -169,7 +159,6 @@ class AgentFeatureFlags:
enable_kb_planner_runnable=False,
enable_action_log=False,
enable_revert_route=False,
enable_stream_parity_v2=False,
enable_plugin_loader=False,
enable_otel=False,
enable_agent_cache=False,
@ -208,10 +197,6 @@ class AgentFeatureFlags:
# Snapshot / revert
enable_action_log=_env_bool("SURFSENSE_ENABLE_ACTION_LOG", True),
enable_revert_route=_env_bool("SURFSENSE_ENABLE_REVERT_ROUTE", True),
# Streaming parity v2
enable_stream_parity_v2=_env_bool(
"SURFSENSE_ENABLE_STREAM_PARITY_V2", True
),
# Plugins
enable_plugin_loader=_env_bool("SURFSENSE_ENABLE_PLUGIN_LOADER", False),
# Observability

View file

@ -608,15 +608,14 @@ class VercelStreamingService:
Args:
tool_call_id: The unique tool call identifier. May be EITHER the
synthetic ``call_<run_id>`` id derived from LangGraph
``run_id`` (legacy / ``SURFSENSE_ENABLE_STREAM_PARITY_V2``
OFF, or the unmatched-fallback path under parity_v2) OR
the authoritative LangChain ``tool_call.id`` (parity_v2
path: when the provider streams ``tool_call_chunks`` we
register the ``index`` and reuse the lc-id as the card
id so live ``tool-input-delta`` events can be routed
without a downstream join). Either way, the same id is
preserved across ``tool-input-start`` / ``-delta`` /
``-available`` / ``tool-output-available`` for one call.
``run_id`` (unmatched chunk fallback when no ``index`` was
registered) OR the authoritative LangChain ``tool_call.id``
(when the provider streams ``tool_call_chunks`` we register
the ``index`` and reuse the lc-id as the card id so live
``tool-input-delta`` events route without a downstream join).
Either way, the same id is preserved across
``tool-input-start`` / ``-delta`` / ``-available`` /
``tool-output-available`` for one call.
tool_name: The name of the tool being called.
langchain_tool_call_id: Optional authoritative LangChain
``tool_call.id``. When set, surfaces as

View file

@ -85,8 +85,8 @@ class AssistantContentBuilder:
self._current_text_idx: int = -1
self._current_reasoning_idx: int = -1
# ``ui_id``-keyed indexes for tool-call parts. ``ui_id`` is the
# synthetic ``call_<run_id>`` (legacy) or the LangChain
# ``tool_call.id`` (parity_v2) — same key the streaming layer
# synthetic ``call_<run_id>`` (chunk fallback) or the LangChain
# ``tool_call.id`` (indexed chunk path) — same key the streaming layer
# threads through every ``tool-input-*`` / ``tool-output-*`` event.
self._tool_call_idx_by_ui_id: dict[str, int] = {}
# Live argsText accumulator (concatenated ``tool-input-delta`` chunks)
@ -181,7 +181,7 @@ class AssistantContentBuilder:
"""Register a tool-call card. Args are filled in by later events."""
if not ui_id:
return
# Skip duplicate registration: parity_v2 may emit
# Skip duplicate registration: the stream may emit
# ``tool-input-start`` from both ``on_chat_model_stream``
# (when tool_call_chunks register a name) and ``on_tool_start``
# (the canonical path). The FE de-dupes via ``toolCallIndices``;
@ -243,7 +243,7 @@ class AssistantContentBuilder:
pretty-printed JSON, sets the full ``args`` dict, and backfills
``langchainToolCallId`` if it wasn't known at ``tool-input-start`` time.
Also creates the card if no prior ``tool-input-start`` registered it
(legacy parity_v2-OFF / late-registration paths).
(late-registration when no prior ``tool-input-start``).
"""
if not ui_id:
return

File diff suppressed because it is too large Load diff

View file

@ -5,7 +5,6 @@ from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Any
from app.agents.new_chat.feature_flags import get_flags
from app.tasks.chat.streaming.graph_stream.result import StreamingResult
from app.tasks.chat.streaming.relay.event_relay import EventRelay
from app.tasks.chat.streaming.relay.state import AgentEventRelayState
@ -30,7 +29,6 @@ async def stream_output(
initial_step_id=initial_step_id,
initial_step_title=initial_step_title,
initial_step_items=initial_step_items,
parity_v2=bool(get_flags().enable_stream_parity_v2),
)
astream_kwargs: dict[str, Any] = {"config": config, "version": "v2"}

View file

@ -33,7 +33,7 @@ def iter_chat_model_stream_frames(
reasoning_delta = parts["reasoning"]
text_delta = parts["text"]
if state.parity_v2 and reasoning_delta:
if reasoning_delta:
if state.current_text_id is not None:
yield streaming_service.format_text_end(state.current_text_id)
if content_builder is not None:
@ -100,7 +100,7 @@ def iter_chat_model_stream_frames(
if content_builder is not None:
content_builder.on_text_delta(state.current_text_id, text_delta)
if state.parity_v2 and parts["tool_call_chunks"]:
if parts["tool_call_chunks"]:
for tcc in parts["tool_call_chunks"]:
idx = tcc.get("index")

View file

@ -77,12 +77,11 @@ def iter_tool_start_frames(
yield emit_thinking_step_frame(**frame_kw)
matched_meta: dict[str, str] | None = None
if state.parity_v2:
taken_ui_ids = set(state.ui_tool_call_id_by_run.values())
for meta in state.index_to_meta.values():
if meta["name"] == tool_name and meta["ui_id"] not in taken_ui_ids:
matched_meta = meta
break
taken_ui_ids = set(state.ui_tool_call_id_by_run.values())
for meta in state.index_to_meta.values():
if meta["name"] == tool_name and meta["ui_id"] not in taken_ui_ids:
matched_meta = meta
break
tool_call_id: str
langchain_tool_call_id: str | None = None
@ -97,13 +96,12 @@ def iter_tool_start_frames(
if run_id
else streaming_service.generate_tool_call_id()
)
if state.parity_v2:
langchain_tool_call_id = match_buffered_langchain_tool_call_id(
state.pending_tool_call_chunks,
tool_name,
run_id,
state.lc_tool_call_id_by_run,
)
langchain_tool_call_id = match_buffered_langchain_tool_call_id(
state.pending_tool_call_chunks,
tool_name,
run_id,
state.lc_tool_call_id_by_run,
)
yield streaming_service.format_tool_input_start(
tool_call_id,
tool_name,

View file

@ -22,7 +22,6 @@ class AgentEventRelayState:
active_tool_depth: int = 0
called_update_memory: bool = False
current_reasoning_id: str | None = None
parity_v2: bool = False
pending_tool_call_chunks: list[dict[str, Any]] = field(default_factory=list)
lc_tool_call_id_by_run: dict[str, str] = field(default_factory=dict)
file_path_by_run: dict[str, str] = field(default_factory=dict)
@ -39,7 +38,6 @@ class AgentEventRelayState:
initial_step_id: str | None = None,
initial_step_title: str = "",
initial_step_items: list[str] | None = None,
parity_v2: bool,
) -> AgentEventRelayState:
counter = 1 if initial_step_id else 0
return cls(
@ -47,7 +45,6 @@ class AgentEventRelayState:
last_active_step_id=initial_step_id,
last_active_step_title=initial_step_title,
last_active_step_items=list(initial_step_items or []),
parity_v2=parity_v2,
)
def next_thinking_step_id(self, step_prefix: str) -> str: