Merge remote-tracking branch 'upstream/dev' into feature/multi-agent

This commit is contained in:
CREDO23 2026-05-01 00:05:20 +02:00
commit 5d3b8b9ca9
83 changed files with 10514 additions and 638 deletions

View file

@ -31,6 +31,7 @@ from sqlalchemy.orm import selectinload
from app.agents.multi_agent_chat.integration import create_multi_agent_chat
from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent
from app.agents.new_chat.checkpointer import get_checkpointer
from app.agents.new_chat.feature_flags import get_flags
from app.agents.new_chat.filesystem_selection import FilesystemMode, FilesystemSelection
from app.agents.new_chat.llm_config import (
AgentConfig,
@ -72,6 +73,91 @@ _perf_log = get_perf_logger()
logger = logging.getLogger(__name__)
def _extract_chunk_parts(chunk: Any) -> dict[str, Any]:
"""Decompose an ``AIMessageChunk`` into typed text/reasoning/tool-call parts.
Returns a dict with three keys:
* ``text`` concatenated string content (empty string if the chunk
contributes none).
* ``reasoning`` concatenated reasoning content (empty string if the
chunk contributes none).
* ``tool_call_chunks`` flat list of LangChain ``tool_call_chunk``
dicts surfaced from either the typed-block list or the
``tool_call_chunks`` attribute.
Background
----------
``AIMessageChunk.content`` can be:
* a ``str`` (most providers), or
* a ``list`` of typed blocks ``{type: 'text' | 'reasoning' |
'tool_call_chunk' | 'tool_use' | ..., text/content/...}`` for
Anthropic, Bedrock, and several reasoning configurations.
Reasoning may also live under
``chunk.additional_kwargs['reasoning_content']`` (some providers
surface it that way instead of as a typed block). Tool-call chunks
may live under ``chunk.tool_call_chunks`` even when ``content`` is a
plain string.
Earlier versions only handled the ``isinstance(content, str)`` branch
and silently dropped reasoning blocks + tool-call chunks emitted by
LangChain ``AIMessageChunk``s.
"""
out: dict[str, Any] = {"text": "", "reasoning": "", "tool_call_chunks": []}
if chunk is None:
return out
content = getattr(chunk, "content", None)
if isinstance(content, str):
if content:
out["text"] = content
elif isinstance(content, list):
text_parts: list[str] = []
reasoning_parts: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
block_type = block.get("type")
if block_type == "text":
value = block.get("text") or block.get("content") or ""
if isinstance(value, str) and value:
text_parts.append(value)
elif block_type == "reasoning":
value = (
block.get("reasoning")
or block.get("text")
or block.get("content")
or ""
)
if isinstance(value, str) and value:
reasoning_parts.append(value)
elif block_type in ("tool_call_chunk", "tool_use"):
out["tool_call_chunks"].append(block)
if text_parts:
out["text"] = "".join(text_parts)
if reasoning_parts:
out["reasoning"] = "".join(reasoning_parts)
additional = getattr(chunk, "additional_kwargs", None) or {}
if isinstance(additional, dict):
extra_reasoning = additional.get("reasoning_content")
if isinstance(extra_reasoning, str) and extra_reasoning:
existing = out["reasoning"]
out["reasoning"] = (
(existing + extra_reasoning) if existing else extra_reasoning
)
extra_tool_chunks = getattr(chunk, "tool_call_chunks", None)
if isinstance(extra_tool_chunks, list):
for tcc in extra_tool_chunks:
if isinstance(tcc, dict):
out["tool_call_chunks"].append(tcc)
return out
def format_mentioned_surfsense_docs_as_context(
documents: list[SurfsenseDocsDocument],
) -> str:
@ -254,6 +340,42 @@ def _log_file_contract(stage: str, result: StreamResult, **extra: Any) -> None:
)
def _legacy_match_lc_id(
pending_tool_call_chunks: list[dict[str, Any]],
tool_name: str,
run_id: str,
lc_tool_call_id_by_run: dict[str, str],
) -> str | None:
"""Best-effort match a buffered ``tool_call_chunk`` to a tool name.
Pure extract of the legacy in-line match used at ``on_tool_start`` for
parity_v2-OFF and unmatched (chunk path didn't register an index for
this call) tools. Pops the next id-bearing chunk whose ``name``
matches ``tool_name`` (or any id-bearing chunk as a fallback) and
returns its id. Mutates ``pending_tool_call_chunks`` and
``lc_tool_call_id_by_run`` in place.
"""
matched_idx: int | None = None
for idx, tcc in enumerate(pending_tool_call_chunks):
if tcc.get("name") == tool_name and tcc.get("id"):
matched_idx = idx
break
if matched_idx is None:
for idx, tcc in enumerate(pending_tool_call_chunks):
if tcc.get("id"):
matched_idx = idx
break
if matched_idx is None:
return None
matched = pending_tool_call_chunks.pop(matched_idx)
candidate = matched.get("id")
if isinstance(candidate, str) and candidate:
if run_id:
lc_tool_call_id_by_run[run_id] = candidate
return candidate
return None
async def _stream_agent_events(
agent: Any,
config: dict[str, Any],
@ -268,6 +390,7 @@ async def _stream_agent_events(
fallback_commit_search_space_id: int | None = None,
fallback_commit_created_by_id: str | None = None,
fallback_commit_filesystem_mode: FilesystemMode = FilesystemMode.CLOUD,
fallback_commit_thread_id: int | None = None,
) -> AsyncGenerator[str, None]:
"""Shared async generator that streams and formats astream_events from the agent.
@ -300,6 +423,59 @@ async def _stream_agent_events(
active_tool_depth: int = 0 # Track nesting: >0 means we're inside a tool
called_update_memory: bool = False
# Reasoning-block streaming. We open a reasoning block on the
# first reasoning delta of a step, append deltas as they arrive, and
# close it when text starts (the model has switched to writing its
# answer) or ``on_chat_model_end`` fires for the model node. Reuses
# the same Vercel format-helpers as text-start/delta/end.
current_reasoning_id: str | None = None
# Streaming-parity v2 feature flag. When OFF we keep the legacy
# shape: str-only content, no reasoning blocks, no
# ``langchainToolCallId`` propagation. The schema migrations
# (135 / 136) ship unconditionally because they're forward-compatible.
parity_v2 = bool(get_flags().enable_stream_parity_v2)
# Best-effort attach of LangChain ``tool_call_id`` to the synthetic
# ``call_<run_id>`` card id we already emit. We accumulate
# ``tool_call_chunks`` from ``on_chat_model_stream``, key them by
# name, and pop the next unconsumed entry at ``on_tool_start``. The
# authoritative id is later filled in at ``on_tool_end`` from
# ``ToolMessage.tool_call_id``. Under parity_v2 we ALSO short-circuit
# this list for chunks that already registered into ``index_to_meta``
# below — so this list is reserved for the parity_v2-OFF / unmatched
# fallback path only and never re-pops a chunk we already streamed.
pending_tool_call_chunks: list[dict[str, Any]] = []
lc_tool_call_id_by_run: dict[str, str] = {}
# parity_v2 only: live tool-call argument streaming. ``index_to_meta``
# is keyed by the chunk's ``index`` field — LangChain
# ``ToolCallChunk``s for the same call share an index but only the
# first chunk carries id+name (subsequent ones are id=None,
# name=None, args="<delta>"). We register an index when both id and
# name are observed on a chunk (per ToolCallChunk semantics they
# arrive together on the first chunk), then route every later chunk
# at that index to the same ``ui_id`` as a ``tool-input-delta``.
# ``ui_tool_call_id_by_run`` maps LangGraph ``run_id`` to the
# ``ui_id`` used for that call's ``tool-input-start`` so the matching
# ``tool-output-available`` (emitted from ``on_tool_end``) lands on
# the same card.
index_to_meta: dict[int, dict[str, str]] = {}
ui_tool_call_id_by_run: dict[str, str] = {}
# Per-tool-end mutable cache for the LangChain tool_call_id resolved
# at ``on_tool_end``. ``_emit_tool_output`` reads this so every
# ``format_tool_output_available`` call automatically carries the
# authoritative id without duplicating the kwarg at every call site.
current_lc_tool_call_id: dict[str, str | None] = {"value": None}
def _emit_tool_output(call_id: str, output: Any) -> str:
return streaming_service.format_tool_output_available(
call_id,
output,
langchain_tool_call_id=current_lc_tool_call_id["value"],
)
def next_thinking_step_id() -> str:
nonlocal thinking_step_counter
thinking_step_counter += 1
@ -328,22 +504,119 @@ async def _stream_agent_events(
if "surfsense:internal" in event.get("tags", []):
continue # Suppress middleware-internal LLM tokens (e.g. KB search classification)
chunk = event.get("data", {}).get("chunk")
if chunk and hasattr(chunk, "content"):
content = chunk.content
if content and isinstance(content, str):
if current_text_id is None:
completion_event = complete_current_step()
if completion_event:
yield completion_event
if just_finished_tool:
last_active_step_id = None
last_active_step_title = ""
last_active_step_items = []
just_finished_tool = False
current_text_id = streaming_service.generate_text_id()
yield streaming_service.format_text_start(current_text_id)
yield streaming_service.format_text_delta(current_text_id, content)
accumulated_text += content
if not chunk:
continue
parts = _extract_chunk_parts(chunk)
reasoning_delta = parts["reasoning"]
text_delta = parts["text"]
# Reasoning streaming. Open a reasoning block on first
# delta; append every subsequent delta until text begins.
# When text starts we close the reasoning block first so the
# frontend sees the natural hand-off. Gated behind the
# parity-v2 flag so legacy deployments keep today's shape.
if parity_v2 and reasoning_delta:
if current_text_id is not None:
yield streaming_service.format_text_end(current_text_id)
current_text_id = None
if current_reasoning_id is None:
completion_event = complete_current_step()
if completion_event:
yield completion_event
if just_finished_tool:
last_active_step_id = None
last_active_step_title = ""
last_active_step_items = []
just_finished_tool = False
current_reasoning_id = streaming_service.generate_reasoning_id()
yield streaming_service.format_reasoning_start(current_reasoning_id)
yield streaming_service.format_reasoning_delta(
current_reasoning_id, reasoning_delta
)
if text_delta:
if current_reasoning_id is not None:
yield streaming_service.format_reasoning_end(current_reasoning_id)
current_reasoning_id = None
if current_text_id is None:
completion_event = complete_current_step()
if completion_event:
yield completion_event
if just_finished_tool:
last_active_step_id = None
last_active_step_title = ""
last_active_step_items = []
just_finished_tool = False
current_text_id = streaming_service.generate_text_id()
yield streaming_service.format_text_start(current_text_id)
yield streaming_service.format_text_delta(current_text_id, text_delta)
accumulated_text += text_delta
# Live tool-call argument streaming. Runs AFTER text/reasoning
# processing so chunks containing both stay in their natural
# wire order (text → text-end → tool-input-start). Active
# text/reasoning are closed inside the registration branch
# before ``tool-input-start`` so the frontend sees a clean
# part boundary even when providers interleave.
if parity_v2 and parts["tool_call_chunks"]:
for tcc in parts["tool_call_chunks"]:
idx = tcc.get("index")
# Register this index when we first see id+name
# TOGETHER. Per LangChain ToolCallChunk semantics the
# first chunk for a tool call carries both fields
# together; later chunks have id=None, name=None and
# only ``args``. Requiring BOTH keeps wire
# ``tool-input-start`` always carrying a real
# toolName (assistant-ui's typed tool-part dispatch
# keys off it).
if idx is not None and idx not in index_to_meta:
lc_id = tcc.get("id")
name = tcc.get("name")
if lc_id and name:
ui_id = lc_id
# Close active text/reasoning so wire
# ordering stays clean even on providers
# that interleave text and tool-call chunks
# within the same stream window.
if current_text_id is not None:
yield streaming_service.format_text_end(current_text_id)
current_text_id = None
if current_reasoning_id is not None:
yield streaming_service.format_reasoning_end(
current_reasoning_id
)
current_reasoning_id = None
index_to_meta[idx] = {
"ui_id": ui_id,
"lc_id": lc_id,
"name": name,
}
yield streaming_service.format_tool_input_start(
ui_id,
name,
langchain_tool_call_id=lc_id,
)
# Emit args delta for any chunk at a registered
# index (including idless continuations). Once an
# index is owned by ``index_to_meta`` we DO NOT
# append to ``pending_tool_call_chunks`` — that list
# is reserved for the parity_v2-OFF / unmatched
# fallback path so it never re-pops chunks already
# consumed here (skip-append).
meta = index_to_meta.get(idx) if idx is not None else None
if meta:
args_chunk = tcc.get("args") or ""
if args_chunk:
yield streaming_service.format_tool_input_delta(
meta["ui_id"], args_chunk
)
else:
pending_tool_call_chunks.append(tcc)
elif event_type == "on_tool_start":
active_tool_depth += 1
@ -463,6 +736,95 @@ async def _stream_agent_events(
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "rm":
rm_path = (
tool_input.get("path", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
display_path = rm_path if len(rm_path) <= 80 else "" + rm_path[-77:]
last_active_step_title = "Deleting file"
last_active_step_items = [display_path] if display_path else []
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Deleting file",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "rmdir":
rmdir_path = (
tool_input.get("path", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
display_path = (
rmdir_path if len(rmdir_path) <= 80 else "" + rmdir_path[-77:]
)
last_active_step_title = "Deleting folder"
last_active_step_items = [display_path] if display_path else []
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Deleting folder",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "mkdir":
mkdir_path = (
tool_input.get("path", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
display_path = (
mkdir_path if len(mkdir_path) <= 80 else "" + mkdir_path[-77:]
)
last_active_step_title = "Creating folder"
last_active_step_items = [display_path] if display_path else []
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Creating folder",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "move_file":
src = (
tool_input.get("source_path", "")
if isinstance(tool_input, dict)
else ""
)
dst = (
tool_input.get("destination_path", "")
if isinstance(tool_input, dict)
else ""
)
display_src = src if len(src) <= 60 else "" + src[-57:]
display_dst = dst if len(dst) <= 60 else "" + dst[-57:]
last_active_step_title = "Moving file"
last_active_step_items = (
[f"{display_src}{display_dst}"] if src or dst else []
)
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Moving file",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "write_todos":
todos = (
tool_input.get("todos", []) if isinstance(tool_input, dict) else []
)
todo_count = len(todos) if isinstance(todos, list) else 0
last_active_step_title = "Planning tasks"
last_active_step_items = (
[f"{todo_count} task{'s' if todo_count != 1 else ''}"]
if todo_count
else []
)
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Planning tasks",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "save_document":
doc_title = (
tool_input.get("title", "")
@ -570,7 +932,15 @@ async def _stream_agent_events(
items=last_active_step_items,
)
else:
last_active_step_title = f"Using {tool_name.replace('_', ' ')}"
# Fallback for tools without a curated thinking-step title
# (typically connector tools, MCP-registered tools, or
# newly added tools that haven't been wired up here yet).
# Render the snake_cased name as a sentence-cased phrase
# so non-technical users see e.g. "Send gmail email"
# rather than the raw identifier "send_gmail_email".
last_active_step_title = (
tool_name.replace("_", " ").strip().capitalize() or tool_name
)
last_active_step_items = []
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
@ -578,12 +948,65 @@ async def _stream_agent_events(
status="in_progress",
)
tool_call_id = (
f"call_{run_id[:32]}"
if run_id
else streaming_service.generate_tool_call_id()
)
yield streaming_service.format_tool_input_start(tool_call_id, tool_name)
# Resolve the card identity. If the chunk-emission loop
# already registered an ``index`` for this tool call (parity_v2
# path), reuse the same ui_id so the card sees:
# tool-input-start → deltas… → tool-input-available →
# tool-output-available all keyed by lc_id. Otherwise fall
# back to the synthetic ``call_<run_id>`` id and the legacy
# best-effort match against ``pending_tool_call_chunks``.
matched_meta: dict[str, str] | None = None
if parity_v2:
# FIFO over indices 0,1,2…; first unassigned same-name
# match wins. Handles parallel same-name calls (e.g. two
# write_file calls) deterministically as long as the
# model interleaves on_tool_start in the same order it
# streamed the args.
taken_ui_ids = set(ui_tool_call_id_by_run.values())
for meta in index_to_meta.values():
if meta["name"] == tool_name and meta["ui_id"] not in taken_ui_ids:
matched_meta = meta
break
tool_call_id: str
langchain_tool_call_id: str | None = None
if matched_meta is not None:
tool_call_id = matched_meta["ui_id"]
langchain_tool_call_id = matched_meta["lc_id"]
# ``tool-input-start`` already fired during chunk
# emission — skip the duplicate. No pruning is needed
# because the chunk-emission loop intentionally never
# appends registered-index chunks to
# ``pending_tool_call_chunks`` (skip-append).
if run_id:
lc_tool_call_id_by_run[run_id] = matched_meta["lc_id"]
else:
tool_call_id = (
f"call_{run_id[:32]}"
if run_id
else streaming_service.generate_tool_call_id()
)
# Legacy fallback: parity_v2 OFF, or parity_v2 ON but the
# provider didn't stream tool_call_chunks for this call
# (no index registered). Run the existing best-effort
# match BEFORE emitting start so we still attach an
# authoritative ``langchainToolCallId`` when possible.
if parity_v2:
langchain_tool_call_id = _legacy_match_lc_id(
pending_tool_call_chunks,
tool_name,
run_id,
lc_tool_call_id_by_run,
)
yield streaming_service.format_tool_input_start(
tool_call_id,
tool_name,
langchain_tool_call_id=langchain_tool_call_id,
)
if run_id:
ui_tool_call_id_by_run[run_id] = tool_call_id
# Sanitize tool_input: strip runtime-injected non-serializable
# values (e.g. LangChain ToolRuntime) before sending over SSE.
if isinstance(tool_input, dict):
@ -600,6 +1023,7 @@ async def _stream_agent_events(
tool_call_id,
tool_name,
_safe_input,
langchain_tool_call_id=langchain_tool_call_id,
)
elif event_type == "on_tool_end":
@ -635,12 +1059,42 @@ async def _stream_agent_events(
result.write_succeeded = True
result.verification_succeeded = True
tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown"
# Look up the SAME card id used at on_tool_start (either the
# parity_v2 lc-id-derived ui_id or the legacy synthetic
# ``call_<run_id>``) so the output event always lands on the
# same card as start/delta/available. Fallback preserves the
# legacy synthetic shape for parity_v2-OFF / unknown-run paths.
tool_call_id = ui_tool_call_id_by_run.get(
run_id,
f"call_{run_id[:32]}" if run_id else "call_unknown",
)
original_step_id = tool_step_ids.get(
run_id, f"{step_prefix}-unknown-{run_id[:8]}"
)
completed_step_ids.add(original_step_id)
# Authoritative LangChain tool_call_id from the returned
# ``ToolMessage``. Falls back to whatever we matched
# at ``on_tool_start`` time (kept in ``lc_tool_call_id_by_run``)
# if the output isn't a ToolMessage. The value is stored in
# ``current_lc_tool_call_id`` so ``_emit_tool_output``
# picks it up for every output emit below.
#
# Emitted in BOTH parity_v2 and legacy modes: the chat tool
# card needs the LangChain id to match against the
# ``data-action-log`` SSE event (keyed by ``lc_tool_call_id``)
# so the inline Revert button can light up. Reading
# ``raw_output.tool_call_id`` is a cheap, non-mutating attribute
# access that is safe regardless of feature-flag state.
current_lc_tool_call_id["value"] = None
authoritative = getattr(raw_output, "tool_call_id", None)
if isinstance(authoritative, str) and authoritative:
current_lc_tool_call_id["value"] = authoritative
if run_id:
lc_tool_call_id_by_run[run_id] = authoritative
elif run_id and run_id in lc_tool_call_id_by_run:
current_lc_tool_call_id["value"] = lc_tool_call_id_by_run[run_id]
if tool_name == "read_file":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
@ -676,6 +1130,41 @@ async def _stream_agent_events(
status="completed",
items=last_active_step_items,
)
elif tool_name == "rm":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Deleting file",
status="completed",
items=last_active_step_items,
)
elif tool_name == "rmdir":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Deleting folder",
status="completed",
items=last_active_step_items,
)
elif tool_name == "mkdir":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Creating folder",
status="completed",
items=last_active_step_items,
)
elif tool_name == "move_file":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Moving file",
status="completed",
items=last_active_step_items,
)
elif tool_name == "write_todos":
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Planning tasks",
status="completed",
items=last_active_step_items,
)
elif tool_name == "save_document":
result_str = (
tool_output.get("result", "")
@ -927,9 +1416,14 @@ async def _stream_agent_events(
items=completed_items,
)
else:
# Fallback completion title — see the matching in-progress
# branch above for the wording rationale.
fallback_title = (
tool_name.replace("_", " ").strip().capitalize() or tool_name
)
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title=f"Using {tool_name.replace('_', ' ')}",
title=fallback_title,
status="completed",
items=last_active_step_items,
)
@ -940,7 +1434,7 @@ async def _stream_agent_events(
last_active_step_items = []
if tool_name == "generate_podcast":
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
tool_output
if isinstance(tool_output, dict)
@ -965,7 +1459,7 @@ async def _stream_agent_events(
"error",
)
elif tool_name == "generate_video_presentation":
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
tool_output
if isinstance(tool_output, dict)
@ -993,7 +1487,7 @@ async def _stream_agent_events(
"error",
)
elif tool_name == "generate_image":
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
tool_output
if isinstance(tool_output, dict)
@ -1020,12 +1514,12 @@ async def _stream_agent_events(
display_output["content_preview"] = (
content[:500] + "..." if len(content) > 500 else content
)
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
display_output,
)
else:
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
{"result": tool_output},
)
@ -1053,7 +1547,7 @@ async def _stream_agent_events(
)
result_text = _tool_output_to_text(tool_output)
if _tool_output_has_error(tool_output):
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
{
"status": "error",
@ -1062,7 +1556,7 @@ async def _stream_agent_events(
},
)
else:
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
{
"status": "completed",
@ -1072,7 +1566,7 @@ async def _stream_agent_events(
)
elif tool_name == "generate_report":
# Stream the full report result so frontend can render the ReportCard
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
tool_output
if isinstance(tool_output, dict)
@ -1099,7 +1593,7 @@ async def _stream_agent_events(
"error",
)
elif tool_name == "generate_resume":
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
tool_output
if isinstance(tool_output, dict)
@ -1150,7 +1644,7 @@ async def _stream_agent_events(
"update_confluence_page",
"delete_confluence_page",
):
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
tool_output
if isinstance(tool_output, dict)
@ -1178,7 +1672,7 @@ async def _stream_agent_events(
if fpath and fpath not in result.sandbox_files:
result.sandbox_files.append(fpath)
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
{
"exit_code": exit_code,
@ -1213,12 +1707,12 @@ async def _stream_agent_events(
citations[chunk_url]["snippet"] = (
content[:200] + "" if len(content) > 200 else content
)
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
{"status": "completed", "citations": citations},
)
else:
yield streaming_service.format_tool_output_available(
yield _emit_tool_output(
tool_call_id,
{"status": "completed", "result_length": len(str(tool_output))},
)
@ -1276,6 +1770,25 @@ async def _stream_agent_events(
},
)
elif event_type == "on_custom_event" and event.get("name") == "action_log":
# Surface a freshly committed AgentActionLog row so the chat
# tool card can render its Revert button immediately.
data = event.get("data", {})
if data.get("id") is not None:
yield streaming_service.format_data("action-log", data)
elif (
event_type == "on_custom_event"
and event.get("name") == "action_log_updated"
):
# Reversibility flipped in kb_persistence after the SAVEPOINT
# for a destructive op (rm/rmdir/move/edit/write) committed.
# Frontend uses this to flip the card's Revert
# button on without re-fetching the actions list.
data = event.get("data", {})
if data.get("id") is not None:
yield streaming_service.format_data("action-log-updated", data)
elif event_type in ("on_chain_end", "on_agent_end"):
if current_text_id is not None:
yield streaming_service.format_text_end(current_text_id)
@ -1293,11 +1806,12 @@ async def _stream_agent_events(
# Safety net: if astream_events was cancelled before
# KnowledgeBasePersistenceMiddleware.aafter_agent ran, any staged work
# (dirty_paths / staged_dirs / pending_moves) will still be in the
# checkpointed state. Run the SAME shared commit helper here so the
# turn's writes don't get lost on client disconnect, then push the
# delta back into the graph using `as_node=...` so reducers fire as if
# the after_agent hook produced it.
# (dirty_paths / staged_dirs / pending_moves / pending_deletes /
# pending_dir_deletes) will still be in the checkpointed state. Run
# the SAME shared commit helper here so the turn's writes don't get
# lost on client disconnect, then push the delta back into the graph
# using `as_node=...` so reducers fire as if the after_agent hook
# produced it.
if (
fallback_commit_filesystem_mode == FilesystemMode.CLOUD
and fallback_commit_search_space_id is not None
@ -1305,6 +1819,8 @@ async def _stream_agent_events(
(state_values.get("dirty_paths") or [])
or (state_values.get("staged_dirs") or [])
or (state_values.get("pending_moves") or [])
or (state_values.get("pending_deletes") or [])
or (state_values.get("pending_dir_deletes") or [])
)
):
try:
@ -1313,6 +1829,7 @@ async def _stream_agent_events(
search_space_id=fallback_commit_search_space_id,
created_by_id=fallback_commit_created_by_id,
filesystem_mode=fallback_commit_filesystem_mode,
thread_id=fallback_commit_thread_id,
dispatch_events=False,
)
if delta:
@ -1753,13 +2270,33 @@ async def stream_new_chat(
config = {
"configurable": configurable,
"recursion_limit": 80, # Increase from default 25 to allow more tool iterations
# Effectively uncapped, matching the agent-level
# ``with_config`` default in ``chat_deepagent.create_agent``
# and the unbounded ``while(true)`` loop used by OpenCode's
# ``session/processor.ts``. Real circuit-breakers live in
# middleware: ``DoomLoopMiddleware`` (sliding-window tool
# signature check), plus ``enable_tool_call_limit`` /
# ``enable_model_call_limit`` when those flags are set. The
# original LangGraph default of 25 (and our previous 80
# bump) hit users on legitimate multi-tool plans.
"recursion_limit": 10_000,
}
# Start the message stream
yield streaming_service.format_message_start()
yield streaming_service.format_start_step()
# Surface the per-turn correlation id at the very start of the
# stream so the frontend can stamp it onto the in-flight
# assistant message and replay it via ``appendMessage``
# for durable storage. Tool/action-log events DO carry it later,
# but pure-text turns never produce action-log events; this
# event guarantees the frontend learns the turn id regardless.
yield streaming_service.format_data(
"turn-info",
{"chat_turn_id": stream_result.turn_id},
)
# Initial thinking step - analyzing the request
if mentioned_surfsense_docs:
initial_title = "Analyzing referenced content"
@ -1910,6 +2447,7 @@ async def stream_new_chat(
if filesystem_selection
else FilesystemMode.CLOUD
),
fallback_commit_thread_id=chat_id,
):
if not _first_event_logged:
_perf_log.info(
@ -2353,11 +2891,22 @@ async def stream_resume_chat(
"request_id": request_id or "unknown",
"turn_id": stream_result.turn_id,
},
"recursion_limit": 80,
# See ``stream_new_chat`` above for rationale: effectively
# uncapped to mirror the agent default and OpenCode's
# session loop. Doom-loop / call-limit middleware enforce
# the real ceiling.
"recursion_limit": 10_000,
}
yield streaming_service.format_message_start()
yield streaming_service.format_start_step()
# Same rationale as ``stream_new_chat``: emit the turn id so
# resumed streams can be persisted with their correlation id
# intact.
yield streaming_service.format_data(
"turn-info",
{"chat_turn_id": stream_result.turn_id},
)
_t_stream_start = time.perf_counter()
_first_event_logged = False
@ -2375,6 +2924,7 @@ async def stream_resume_chat(
if filesystem_selection
else FilesystemMode.CLOUD
),
fallback_commit_thread_id=chat_id,
):
if not _first_event_logged:
_perf_log.info(