From fc429d87024a6a0a36d520f23dd584bcf7bd8262 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 6 May 2026 20:08:47 +0200 Subject: [PATCH] Add streaming emitter and registry for scoped SSE writes. --- .../services/streaming/emitter/__init__.py | 29 +++++++++ .../app/services/streaming/emitter/emitter.py | 61 +++++++++++++++++++ .../services/streaming/emitter/registry.py | 51 ++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 surfsense_backend/app/services/streaming/emitter/__init__.py create mode 100644 surfsense_backend/app/services/streaming/emitter/emitter.py create mode 100644 surfsense_backend/app/services/streaming/emitter/registry.py diff --git a/surfsense_backend/app/services/streaming/emitter/__init__.py b/surfsense_backend/app/services/streaming/emitter/__init__.py new file mode 100644 index 000000000..7814894f3 --- /dev/null +++ b/surfsense_backend/app/services/streaming/emitter/__init__.py @@ -0,0 +1,29 @@ +"""Identity of the agent that emitted a streamed event. + +The wire field is ``emitted_by``; the Python identity is :class:`Emitter`. +``EmitterRegistry`` resolves which emitter owns a LangGraph event, with +LangGraph's own namespace metadata as the primary key and a parent_ids +walk as a fallback for cases where context vars don't propagate. +""" + +from __future__ import annotations + +from .emitter import ( + MAIN_EMITTER, + Emitter, + EmitterLevel, + attach_emitted_by, + main_emitter, + subagent_emitter, +) +from .registry import EmitterRegistry + +__all__ = [ + "MAIN_EMITTER", + "Emitter", + "EmitterLevel", + "EmitterRegistry", + "attach_emitted_by", + "main_emitter", + "subagent_emitter", +] diff --git a/surfsense_backend/app/services/streaming/emitter/emitter.py b/surfsense_backend/app/services/streaming/emitter/emitter.py new file mode 100644 index 000000000..08f625a69 --- /dev/null +++ b/surfsense_backend/app/services/streaming/emitter/emitter.py @@ -0,0 +1,61 @@ +"""Identity payload describing which agent produced a stream event.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal + +EmitterLevel = Literal["main", "subagent"] + + +@dataclass(frozen=True) +class Emitter: + level: EmitterLevel + subagent_type: str | None = None + subagent_run_id: str | None = None + parent_tool_call_id: str | None = None + extra: dict[str, Any] = field(default_factory=dict) + + def to_payload(self) -> dict[str, Any]: + payload: dict[str, Any] = {"level": self.level} + if self.subagent_type is not None: + payload["subagent_type"] = self.subagent_type + if self.subagent_run_id is not None: + payload["subagent_run_id"] = self.subagent_run_id + if self.parent_tool_call_id is not None: + payload["parent_tool_call_id"] = self.parent_tool_call_id + if self.extra: + payload.update(self.extra) + return payload + + +MAIN_EMITTER = Emitter(level="main") + + +def main_emitter() -> Emitter: + return MAIN_EMITTER + + +def subagent_emitter( + *, + subagent_type: str, + subagent_run_id: str, + parent_tool_call_id: str | None = None, + extra: dict[str, Any] | None = None, +) -> Emitter: + return Emitter( + level="subagent", + subagent_type=subagent_type, + subagent_run_id=subagent_run_id, + parent_tool_call_id=parent_tool_call_id, + extra=dict(extra or {}), + ) + + +def attach_emitted_by( + payload: dict[str, Any], emitter: Emitter | None +) -> dict[str, Any]: + if emitter is None: + return payload + payload["emitted_by"] = emitter.to_payload() + return payload diff --git a/surfsense_backend/app/services/streaming/emitter/registry.py b/surfsense_backend/app/services/streaming/emitter/registry.py new file mode 100644 index 000000000..cd3e10cdd --- /dev/null +++ b/surfsense_backend/app/services/streaming/emitter/registry.py @@ -0,0 +1,51 @@ +"""Resolve which agent owns a streamed event from its LangGraph run lineage.""" + +from __future__ import annotations + +from collections.abc import Iterable + +from .emitter import Emitter, main_emitter + + +class EmitterRegistry: + def __init__(self) -> None: + self._by_run_id: dict[str, Emitter] = {} + + def register(self, run_id: str, emitter: Emitter) -> None: + if not run_id: + return + self._by_run_id[run_id] = emitter + + def unregister(self, run_id: str) -> Emitter | None: + if not run_id: + return None + return self._by_run_id.pop(run_id, None) + + def get(self, run_id: str | None) -> Emitter | None: + if not run_id: + return None + return self._by_run_id.get(run_id) + + def resolve( + self, + *, + run_id: str | None, + parent_ids: Iterable[str] | None, + ) -> Emitter: + own = self.get(run_id) + if own is not None: + return own + if parent_ids: + for ancestor in reversed(list(parent_ids)): + emitter = self.get(ancestor) + if emitter is not None: + return emitter + return main_emitter() + + def has_active_subagents(self) -> bool: + return any( + emitter.level == "subagent" for emitter in self._by_run_id.values() + ) + + def clear(self) -> None: + self._by_run_id.clear()