From 7581a7c9c3247bc977ae556daddd25cc185ef2eb Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 6 May 2026 20:08:48 +0200 Subject: [PATCH] Add chat streaming relay state and thinking-step SSE helpers. --- .../tasks/chat/streaming/relay/__init__.py | 3 + .../app/tasks/chat/streaming/relay/state.py | 55 +++++++++++++++++++ .../relay/thinking_step_completion.py | 31 +++++++++++ .../chat/streaming/relay/thinking_step_sse.py | 24 ++++++++ 4 files changed, 113 insertions(+) create mode 100644 surfsense_backend/app/tasks/chat/streaming/relay/__init__.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/relay/state.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/relay/thinking_step_completion.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/relay/thinking_step_sse.py diff --git a/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py b/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py new file mode 100644 index 000000000..c1a5e7175 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py @@ -0,0 +1,3 @@ +"""Relay state: thinking steps, tool bookkeeping, and stream helpers.""" + +from __future__ import annotations diff --git a/surfsense_backend/app/tasks/chat/streaming/relay/state.py b/surfsense_backend/app/tasks/chat/streaming/relay/state.py new file mode 100644 index 000000000..e8e35d0b2 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/relay/state.py @@ -0,0 +1,55 @@ +"""Mutable counters and maps for one agent stream.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class AgentEventRelayState: + """Tracks text, thinking steps, tool depth, and pending tool-call metadata.""" + + accumulated_text: str = "" + current_text_id: str | None = None + thinking_step_counter: int = 0 + tool_step_ids: dict[str, str] = field(default_factory=dict) + completed_step_ids: set[str] = field(default_factory=set) + last_active_step_id: str | None = None + last_active_step_title: str = "" + last_active_step_items: list[str] = field(default_factory=list) + just_finished_tool: bool = False + active_tool_depth: int = 0 + called_update_memory: bool = False + current_reasoning_id: str | None = None + parity_v2: bool = False + pending_tool_call_chunks: list[dict[str, Any]] = field(default_factory=list) + lc_tool_call_id_by_run: dict[str, str] = field(default_factory=dict) + file_path_by_run: dict[str, str] = field(default_factory=dict) + index_to_meta: dict[int, dict[str, str]] = field(default_factory=dict) + ui_tool_call_id_by_run: dict[str, str] = field(default_factory=dict) + current_lc_tool_call_id: dict[str, str | None] = field( + default_factory=lambda: {"value": None} + ) + + @classmethod + def for_invocation( + cls, + *, + initial_step_id: str | None = None, + initial_step_title: str = "", + initial_step_items: list[str] | None = None, + parity_v2: bool, + ) -> AgentEventRelayState: + counter = 1 if initial_step_id else 0 + return cls( + thinking_step_counter=counter, + last_active_step_id=initial_step_id, + last_active_step_title=initial_step_title, + last_active_step_items=list(initial_step_items or []), + parity_v2=parity_v2, + ) + + def next_thinking_step_id(self, step_prefix: str) -> str: + self.thinking_step_counter += 1 + return f"{step_prefix}-{self.thinking_step_counter}" diff --git a/surfsense_backend/app/tasks/chat/streaming/relay/thinking_step_completion.py b/surfsense_backend/app/tasks/chat/streaming/relay/thinking_step_completion.py new file mode 100644 index 000000000..a0be71281 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/relay/thinking_step_completion.py @@ -0,0 +1,31 @@ +"""Close the in-progress thinking step with a completed status frame.""" + +from __future__ import annotations + +from typing import Any + +from .thinking_step_sse import emit_thinking_step_frame + + +def complete_active_thinking_step( + *, + streaming_service: Any, + content_builder: Any | None, + last_active_step_id: str | None, + last_active_step_title: str, + last_active_step_items: list[str], + completed_step_ids: set[str], +) -> tuple[str | None, str | None]: + """Emit a completed thinking-step frame once; return (frame or None, next active step id).""" + if last_active_step_id and last_active_step_id not in completed_step_ids: + completed_step_ids.add(last_active_step_id) + event = emit_thinking_step_frame( + streaming_service=streaming_service, + content_builder=content_builder, + step_id=last_active_step_id, + title=last_active_step_title, + status="completed", + items=last_active_step_items if last_active_step_items else None, + ) + return event, None + return None, last_active_step_id diff --git a/surfsense_backend/app/tasks/chat/streaming/relay/thinking_step_sse.py b/surfsense_backend/app/tasks/chat/streaming/relay/thinking_step_sse.py new file mode 100644 index 000000000..9e8c08dd5 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/relay/thinking_step_sse.py @@ -0,0 +1,24 @@ +"""Thinking-step SSE plus optional content-builder updates.""" + +from __future__ import annotations + +from typing import Any + + +def emit_thinking_step_frame( + *, + streaming_service: Any, + content_builder: Any | None, + step_id: str, + title: str, + status: str = "in_progress", + items: list[str] | None = None, +) -> str: + if content_builder is not None: + content_builder.on_thinking_step(step_id, title, status, items) + return streaming_service.format_thinking_step( + step_id=step_id, + title=title, + status=status, + items=items, + )