From 5510c6c3147e80a4de1b6a25e58bc6479032482e Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 6 May 2026 20:08:47 +0200 Subject: [PATCH] Add typed event payload modules for the streaming service. --- .../app/services/streaming/events/__init__.py | 29 +++++ .../services/streaming/events/action_log.py | 24 ++++ .../app/services/streaming/events/data.py | 118 ++++++++++++++++++ .../app/services/streaming/events/error.py | 23 ++++ .../services/streaming/events/interrupt.py | 56 +++++++++ .../services/streaming/events/lifecycle.py | 29 +++++ .../services/streaming/events/reasoning.py | 36 ++++++ .../app/services/streaming/events/source.py | 59 +++++++++ .../streaming/events/subagent_lifecycle.py | 86 +++++++++++++ .../app/services/streaming/events/text.py | 31 +++++ .../app/services/streaming/events/tool.py | 80 ++++++++++++ 11 files changed, 571 insertions(+) create mode 100644 surfsense_backend/app/services/streaming/events/__init__.py create mode 100644 surfsense_backend/app/services/streaming/events/action_log.py create mode 100644 surfsense_backend/app/services/streaming/events/data.py create mode 100644 surfsense_backend/app/services/streaming/events/error.py create mode 100644 surfsense_backend/app/services/streaming/events/interrupt.py create mode 100644 surfsense_backend/app/services/streaming/events/lifecycle.py create mode 100644 surfsense_backend/app/services/streaming/events/reasoning.py create mode 100644 surfsense_backend/app/services/streaming/events/source.py create mode 100644 surfsense_backend/app/services/streaming/events/subagent_lifecycle.py create mode 100644 surfsense_backend/app/services/streaming/events/text.py create mode 100644 surfsense_backend/app/services/streaming/events/tool.py diff --git a/surfsense_backend/app/services/streaming/events/__init__.py b/surfsense_backend/app/services/streaming/events/__init__.py new file mode 100644 index 000000000..91a8ff854 --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/__init__.py @@ -0,0 +1,29 @@ +"""SSE event payload formatters, one module per event family.""" + +from __future__ import annotations + +from . import ( + action_log, + data, + error, + interrupt, + lifecycle, + reasoning, + source, + subagent_lifecycle, + text, + tool, +) + +__all__ = [ + "action_log", + "data", + "error", + "interrupt", + "lifecycle", + "reasoning", + "source", + "subagent_lifecycle", + "text", + "tool", +] diff --git a/surfsense_backend/app/services/streaming/events/action_log.py b/surfsense_backend/app/services/streaming/events/action_log.py new file mode 100644 index 000000000..0a8e46f0a --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/action_log.py @@ -0,0 +1,24 @@ +"""Action-log events relayed from ``ActionLogMiddleware`` custom dispatches.""" + +from __future__ import annotations + +from typing import Any + +from ..emitter import Emitter +from .data import format_data + + +def format_action_log( + payload: dict[str, Any], + *, + emitter: Emitter | None = None, +) -> str: + return format_data("action-log", payload, emitter=emitter) + + +def format_action_log_updated( + payload: dict[str, Any], + *, + emitter: Emitter | None = None, +) -> str: + return format_data("action-log-updated", payload, emitter=emitter) diff --git a/surfsense_backend/app/services/streaming/events/data.py b/surfsense_backend/app/services/streaming/events/data.py new file mode 100644 index 000000000..f6e190578 --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/data.py @@ -0,0 +1,118 @@ +"""Generic ``data-*`` envelopes and SurfSense-specific data parts. + +Inner ``data`` dict fields use snake_case. Legacy ``threadId`` / +``messageId`` keys are preserved where they cross the AI SDK boundary. +""" + +from __future__ import annotations + +from typing import Any + +from ..emitter import Emitter, attach_emitted_by +from ..envelope import format_sse + + +def format_data( + data_type: str, + data: Any, + *, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = {"type": f"data-{data_type}", "data": data} + return format_sse(attach_emitted_by(payload, emitter)) + + +def format_terminal_info( + text: str, + *, + message_type: str = "info", + emitter: Emitter | None = None, +) -> str: + return format_data( + "terminal-info", + {"text": text, "type": message_type}, + emitter=emitter, + ) + + +def format_further_questions( + questions: list[str], + *, + emitter: Emitter | None = None, +) -> str: + return format_data("further-questions", {"questions": questions}, emitter=emitter) + + +def format_thinking_step( + *, + step_id: str, + title: str, + status: str = "in_progress", + items: list[str] | None = None, + emitter: Emitter | None = None, +) -> str: + return format_data( + "thinking-step", + { + "id": step_id, + "title": title, + "status": status, + "items": items or [], + }, + emitter=emitter, + ) + + +def format_thread_title_update( + *, + thread_id: int, + title: str, + emitter: Emitter | None = None, +) -> str: + return format_data( + "thread-title-update", + {"threadId": thread_id, "title": title}, + emitter=emitter, + ) + + +def format_turn_info( + *, + chat_turn_id: str, + emitter: Emitter | None = None, +) -> str: + return format_data("turn-info", {"chat_turn_id": chat_turn_id}, emitter=emitter) + + +def format_turn_status( + *, + status: str, + emitter: Emitter | None = None, +) -> str: + return format_data("turn-status", {"status": status}, emitter=emitter) + + +def format_user_message_id( + *, + message_id: str, + turn_id: str, + emitter: Emitter | None = None, +) -> str: + return format_data( + "user-message-id", + {"message_id": message_id, "turn_id": turn_id}, + emitter=emitter, + ) + + +def format_assistant_message_id( + *, + message_id: str, + turn_id: str, + emitter: Emitter | None = None, +) -> str: + return format_data( + "assistant-message-id", + {"message_id": message_id, "turn_id": turn_id}, + emitter=emitter, + ) diff --git a/surfsense_backend/app/services/streaming/events/error.py b/surfsense_backend/app/services/streaming/events/error.py new file mode 100644 index 000000000..cd190d1f4 --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/error.py @@ -0,0 +1,23 @@ +"""Single terminal error path the orchestrator must route through.""" + +from __future__ import annotations + +from typing import Any + +from ..emitter import Emitter, attach_emitted_by +from ..envelope import format_sse + + +def format_error( + error_text: str, + *, + error_code: str | None = None, + extra: dict[str, Any] | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = {"type": "error", "errorText": error_text} + if error_code: + payload["errorCode"] = error_code + if extra: + payload.update(extra) + return format_sse(attach_emitted_by(payload, emitter)) diff --git a/surfsense_backend/app/services/streaming/events/interrupt.py b/surfsense_backend/app/services/streaming/events/interrupt.py new file mode 100644 index 000000000..0334b10b3 --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/interrupt.py @@ -0,0 +1,56 @@ +"""Interrupt-request events with a single canonical payload shape.""" + +from __future__ import annotations + +from typing import Any + +from ..emitter import Emitter +from .data import format_data + + +def normalize_interrupt_payload(interrupt_value: dict[str, Any]) -> dict[str, Any]: + if "action_requests" in interrupt_value and "review_configs" in interrupt_value: + return interrupt_value + + interrupt_type = interrupt_value.get("type", "unknown") + message = interrupt_value.get("message") + action = interrupt_value.get("action", {}) or {} + context = interrupt_value.get("context", {}) or {} + + normalized: dict[str, Any] = { + "action_requests": [ + { + "name": action.get("tool", "unknown_tool"), + "args": action.get("params", {}), + } + ], + "review_configs": [ + { + "action_name": action.get("tool", "unknown_tool"), + "allowed_decisions": ["approve", "edit", "reject"], + } + ], + "interrupt_type": interrupt_type, + "context": context, + } + if message: + normalized["message"] = message + return normalized + + +def format_interrupt_request( + interrupt_value: dict[str, Any], + *, + interrupt_id: str | None = None, + pending_interrupt_count: int | None = None, + chat_turn_id: str | None = None, + emitter: Emitter | None = None, +) -> str: + payload = normalize_interrupt_payload(interrupt_value) + if interrupt_id is not None: + payload["interrupt_id"] = interrupt_id + if pending_interrupt_count is not None: + payload["pending_interrupt_count"] = pending_interrupt_count + if chat_turn_id is not None: + payload["chat_turn_id"] = chat_turn_id + return format_data("interrupt-request", payload, emitter=emitter) diff --git a/surfsense_backend/app/services/streaming/events/lifecycle.py b/surfsense_backend/app/services/streaming/events/lifecycle.py new file mode 100644 index 000000000..019718b67 --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/lifecycle.py @@ -0,0 +1,29 @@ +"""High-level message and step lifecycle events. + +Wire verbs are fixed by the AI SDK protocol (``start`` / ``finish`` for +the whole message, ``start-step`` / ``finish-step`` for each step). +Python helpers always read ``format__`` so pairs are +visible at the call site. +""" + +from __future__ import annotations + +from ..emitter import Emitter, attach_emitted_by +from ..envelope import format_sse + + +def format_message_start(message_id: str, *, emitter: Emitter | None = None) -> str: + payload = {"type": "start", "messageId": message_id} + return format_sse(attach_emitted_by(payload, emitter)) + + +def format_message_finish(*, emitter: Emitter | None = None) -> str: + return format_sse(attach_emitted_by({"type": "finish"}, emitter)) + + +def format_step_start(*, emitter: Emitter | None = None) -> str: + return format_sse(attach_emitted_by({"type": "start-step"}, emitter)) + + +def format_step_finish(*, emitter: Emitter | None = None) -> str: + return format_sse(attach_emitted_by({"type": "finish-step"}, emitter)) diff --git a/surfsense_backend/app/services/streaming/events/reasoning.py b/surfsense_backend/app/services/streaming/events/reasoning.py new file mode 100644 index 000000000..5b912d43a --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/reasoning.py @@ -0,0 +1,36 @@ +"""Reasoning-block streaming events.""" + +from __future__ import annotations + +from ..emitter import Emitter, attach_emitted_by +from ..envelope import format_sse + + +def format_reasoning_start( + reasoning_id: str, *, emitter: Emitter | None = None +) -> str: + return format_sse( + attach_emitted_by({"type": "reasoning-start", "id": reasoning_id}, emitter) + ) + + +def format_reasoning_delta( + reasoning_id: str, + delta: str, + *, + emitter: Emitter | None = None, +) -> str: + return format_sse( + attach_emitted_by( + {"type": "reasoning-delta", "id": reasoning_id, "delta": delta}, + emitter, + ) + ) + + +def format_reasoning_end( + reasoning_id: str, *, emitter: Emitter | None = None +) -> str: + return format_sse( + attach_emitted_by({"type": "reasoning-end", "id": reasoning_id}, emitter) + ) diff --git a/surfsense_backend/app/services/streaming/events/source.py b/surfsense_backend/app/services/streaming/events/source.py new file mode 100644 index 000000000..54541e8d2 --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/source.py @@ -0,0 +1,59 @@ +"""Source and file reference events.""" + +from __future__ import annotations + +from typing import Any + +from ..emitter import Emitter, attach_emitted_by +from ..envelope import format_sse + + +def format_source_url( + url: str, + *, + source_id: str | None = None, + title: str | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "type": "source-url", + "sourceId": source_id or url, + "url": url, + } + if title: + payload["title"] = title + return format_sse(attach_emitted_by(payload, emitter)) + + +def format_source_document( + source_id: str, + *, + media_type: str = "file", + title: str | None = None, + description: str | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "type": "source-document", + "sourceId": source_id, + "mediaType": media_type, + } + if title: + payload["title"] = title + if description: + payload["description"] = description + return format_sse(attach_emitted_by(payload, emitter)) + + +def format_file( + url: str, + media_type: str, + *, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "type": "file", + "url": url, + "mediaType": media_type, + } + return format_sse(attach_emitted_by(payload, emitter)) diff --git a/surfsense_backend/app/services/streaming/events/subagent_lifecycle.py b/surfsense_backend/app/services/streaming/events/subagent_lifecycle.py new file mode 100644 index 000000000..6dd2d4eab --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/subagent_lifecycle.py @@ -0,0 +1,86 @@ +"""Sub-agent lifecycle events the FE pairs into one timeline lane. + +A sub-agent run is a high-level boundary (a whole agent invocation), +so we use the ``start`` / ``finish`` verb pair, matching how the AI SDK +spells message- and step-level lifecycles. +""" + +from __future__ import annotations + +from typing import Any + +from ..emitter import Emitter +from .data import format_data + + +def format_subagent_start( + *, + subagent_run_id: str, + subagent_type: str, + parent_tool_call_id: str, + chat_turn_id: str | None = None, + description: str | None = None, + started_at: str | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "subagent_run_id": subagent_run_id, + "subagent_type": subagent_type, + "parent_tool_call_id": parent_tool_call_id, + } + if chat_turn_id is not None: + payload["chat_turn_id"] = chat_turn_id + if description is not None: + payload["description"] = description + if started_at is not None: + payload["started_at"] = started_at + return format_data("subagent-start", payload, emitter=emitter) + + +def format_subagent_finish( + *, + subagent_run_id: str, + subagent_type: str, + parent_tool_call_id: str, + status: str = "completed", + ended_at: str | None = None, + duration_ms: int | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "subagent_run_id": subagent_run_id, + "subagent_type": subagent_type, + "parent_tool_call_id": parent_tool_call_id, + "status": status, + } + if ended_at is not None: + payload["ended_at"] = ended_at + if duration_ms is not None: + payload["duration_ms"] = duration_ms + return format_data("subagent-finish", payload, emitter=emitter) + + +def format_subagent_error( + *, + subagent_run_id: str, + subagent_type: str, + parent_tool_call_id: str, + error_text: str, + error_type: str | None = None, + ended_at: str | None = None, + duration_ms: int | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "subagent_run_id": subagent_run_id, + "subagent_type": subagent_type, + "parent_tool_call_id": parent_tool_call_id, + "error_text": error_text, + } + if error_type is not None: + payload["error_type"] = error_type + if ended_at is not None: + payload["ended_at"] = ended_at + if duration_ms is not None: + payload["duration_ms"] = duration_ms + return format_data("subagent-error", payload, emitter=emitter) diff --git a/surfsense_backend/app/services/streaming/events/text.py b/surfsense_backend/app/services/streaming/events/text.py new file mode 100644 index 000000000..3baebdebb --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/text.py @@ -0,0 +1,31 @@ +"""Text-block streaming events.""" + +from __future__ import annotations + +from ..emitter import Emitter, attach_emitted_by +from ..envelope import format_sse + + +def format_text_start(text_id: str, *, emitter: Emitter | None = None) -> str: + return format_sse( + attach_emitted_by({"type": "text-start", "id": text_id}, emitter) + ) + + +def format_text_delta( + text_id: str, + delta: str, + *, + emitter: Emitter | None = None, +) -> str: + return format_sse( + attach_emitted_by( + {"type": "text-delta", "id": text_id, "delta": delta}, emitter + ) + ) + + +def format_text_end(text_id: str, *, emitter: Emitter | None = None) -> str: + return format_sse( + attach_emitted_by({"type": "text-end", "id": text_id}, emitter) + ) diff --git a/surfsense_backend/app/services/streaming/events/tool.py b/surfsense_backend/app/services/streaming/events/tool.py new file mode 100644 index 000000000..c85dc061b --- /dev/null +++ b/surfsense_backend/app/services/streaming/events/tool.py @@ -0,0 +1,80 @@ +"""Tool-call streaming events. + +``toolCallId`` and ``langchainToolCallId`` are AI SDK protocol fields +and stay camelCase. Sub-agent provenance rides on the snake_case +top-level ``emitted_by`` envelope added by :func:`attach_emitted_by`. +""" + +from __future__ import annotations + +from typing import Any + +from ..emitter import Emitter, attach_emitted_by +from ..envelope import format_sse + + +def format_tool_input_start( + tool_call_id: str, + tool_name: str, + *, + langchain_tool_call_id: str | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "type": "tool-input-start", + "toolCallId": tool_call_id, + "toolName": tool_name, + } + if langchain_tool_call_id: + payload["langchainToolCallId"] = langchain_tool_call_id + return format_sse(attach_emitted_by(payload, emitter)) + + +def format_tool_input_delta( + tool_call_id: str, + input_text_delta: str, + *, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "type": "tool-input-delta", + "toolCallId": tool_call_id, + "inputTextDelta": input_text_delta, + } + return format_sse(attach_emitted_by(payload, emitter)) + + +def format_tool_input_available( + tool_call_id: str, + tool_name: str, + input_data: dict[str, Any], + *, + langchain_tool_call_id: str | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "type": "tool-input-available", + "toolCallId": tool_call_id, + "toolName": tool_name, + "input": input_data, + } + if langchain_tool_call_id: + payload["langchainToolCallId"] = langchain_tool_call_id + return format_sse(attach_emitted_by(payload, emitter)) + + +def format_tool_output_available( + tool_call_id: str, + output: Any, + *, + langchain_tool_call_id: str | None = None, + emitter: Emitter | None = None, +) -> str: + payload: dict[str, Any] = { + "type": "tool-output-available", + "toolCallId": tool_call_id, + "output": output, + } + if langchain_tool_call_id: + payload["langchainToolCallId"] = langchain_tool_call_id + return format_sse(attach_emitted_by(payload, emitter))