diff --git a/surfsense_backend/app/tasks/chat/streaming/shared/__init__.py b/surfsense_backend/app/tasks/chat/streaming/shared/__init__.py new file mode 100644 index 000000000..6c9f1f6b5 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/shared/__init__.py @@ -0,0 +1,15 @@ +"""Shared building blocks used across every streaming flow.""" + +from __future__ import annotations + +from app.tasks.chat.streaming.shared.stream_result import StreamResult +from app.tasks.chat.streaming.shared.utils import ( + resume_step_prefix, + safe_float, +) + +__all__ = [ + "StreamResult", + "resume_step_prefix", + "safe_float", +] diff --git a/surfsense_backend/app/tasks/chat/streaming/shared/stream_result.py b/surfsense_backend/app/tasks/chat/streaming/shared/stream_result.py new file mode 100644 index 000000000..a940e8a9f --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/shared/stream_result.py @@ -0,0 +1,37 @@ +"""Per-turn streaming state shared between the orchestrator and event loop.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class StreamResult: + accumulated_text: str = "" + is_interrupted: bool = False + sandbox_files: list[str] = field(default_factory=list) + request_id: str | None = None + turn_id: str = "" + filesystem_mode: str = "cloud" + client_platform: str = "web" + intent_detected: str = "chat_only" + intent_confidence: float = 0.0 + write_attempted: bool = False + write_succeeded: bool = False + verification_succeeded: bool = False + commit_gate_passed: bool = True + commit_gate_reason: str = "" + # Pre-allocated assistant ``new_chat_messages.id`` for this turn, captured by + # ``persist_assistant_shell`` right after the user row is persisted. ``None`` + # for the legacy/anonymous code paths that don't opt in to server-side + # ``ContentPart[]`` projection. + assistant_message_id: int | None = None + # In-memory mirror of the FE's assistant-ui ``ContentPartsState``, populated + # by the lifecycle methods called from the streaming event loop at each + # ``streaming_service.format_*`` yield site. Snapshot in the streaming + # ``finally`` to produce the rich JSONB persisted by + # ``finalize_assistant_turn``. ``repr=False`` keeps the log-on-error path + # (``StreamResult`` is logged in some error branches) from dumping a + # potentially-large parts list. + content_builder: Any | None = field(default=None, repr=False) diff --git a/surfsense_backend/app/tasks/chat/streaming/shared/utils.py b/surfsense_backend/app/tasks/chat/streaming/shared/utils.py new file mode 100644 index 000000000..fe6901543 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/shared/utils.py @@ -0,0 +1,27 @@ +"""Small utilities used by streaming orchestrators and phases.""" + +from __future__ import annotations + +from typing import Any + + +def resume_step_prefix(turn_id: str) -> str: + """Per-turn ``step_prefix`` for resume invocations. + + Each ``stream_agent_events`` call constructs a fresh + ``AgentEventRelayState`` with ``thinking_step_counter=0``, so two consecutive + resume turns would otherwise both emit ``thinking-resume-1``, ``-2`` etc. + The frontend rehydrates ``currentThinkingSteps`` from the immediate prior + assistant message at the start of every resume — if the new stream's IDs + collide with the seeded ones, React renders sibling Timeline rows with the + same key. Salting with ``turn_id`` guarantees disjoint IDs across resumes + within one thread. + """ + return f"thinking-resume-{turn_id}" + + +def safe_float(value: Any, default: float = 0.0) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default