Add streaming emitter and registry for scoped SSE writes.

This commit is contained in:
CREDO23 2026-05-06 20:08:47 +02:00
parent 5510c6c314
commit fc429d8702
3 changed files with 141 additions and 0 deletions

View file

@ -0,0 +1,29 @@
"""Identity of the agent that emitted a streamed event.
The wire field is ``emitted_by``; the Python identity is :class:`Emitter`.
``EmitterRegistry`` resolves which emitter owns a LangGraph event, with
LangGraph's own namespace metadata as the primary key and a parent_ids
walk as a fallback for cases where context vars don't propagate.
"""
from __future__ import annotations
from .emitter import (
MAIN_EMITTER,
Emitter,
EmitterLevel,
attach_emitted_by,
main_emitter,
subagent_emitter,
)
from .registry import EmitterRegistry
__all__ = [
"MAIN_EMITTER",
"Emitter",
"EmitterLevel",
"EmitterRegistry",
"attach_emitted_by",
"main_emitter",
"subagent_emitter",
]

View file

@ -0,0 +1,61 @@
"""Identity payload describing which agent produced a stream event."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Literal
EmitterLevel = Literal["main", "subagent"]
@dataclass(frozen=True)
class Emitter:
level: EmitterLevel
subagent_type: str | None = None
subagent_run_id: str | None = None
parent_tool_call_id: str | None = None
extra: dict[str, Any] = field(default_factory=dict)
def to_payload(self) -> dict[str, Any]:
payload: dict[str, Any] = {"level": self.level}
if self.subagent_type is not None:
payload["subagent_type"] = self.subagent_type
if self.subagent_run_id is not None:
payload["subagent_run_id"] = self.subagent_run_id
if self.parent_tool_call_id is not None:
payload["parent_tool_call_id"] = self.parent_tool_call_id
if self.extra:
payload.update(self.extra)
return payload
MAIN_EMITTER = Emitter(level="main")
def main_emitter() -> Emitter:
return MAIN_EMITTER
def subagent_emitter(
*,
subagent_type: str,
subagent_run_id: str,
parent_tool_call_id: str | None = None,
extra: dict[str, Any] | None = None,
) -> Emitter:
return Emitter(
level="subagent",
subagent_type=subagent_type,
subagent_run_id=subagent_run_id,
parent_tool_call_id=parent_tool_call_id,
extra=dict(extra or {}),
)
def attach_emitted_by(
payload: dict[str, Any], emitter: Emitter | None
) -> dict[str, Any]:
if emitter is None:
return payload
payload["emitted_by"] = emitter.to_payload()
return payload

View file

@ -0,0 +1,51 @@
"""Resolve which agent owns a streamed event from its LangGraph run lineage."""
from __future__ import annotations
from collections.abc import Iterable
from .emitter import Emitter, main_emitter
class EmitterRegistry:
def __init__(self) -> None:
self._by_run_id: dict[str, Emitter] = {}
def register(self, run_id: str, emitter: Emitter) -> None:
if not run_id:
return
self._by_run_id[run_id] = emitter
def unregister(self, run_id: str) -> Emitter | None:
if not run_id:
return None
return self._by_run_id.pop(run_id, None)
def get(self, run_id: str | None) -> Emitter | None:
if not run_id:
return None
return self._by_run_id.get(run_id)
def resolve(
self,
*,
run_id: str | None,
parent_ids: Iterable[str] | None,
) -> Emitter:
own = self.get(run_id)
if own is not None:
return own
if parent_ids:
for ancestor in reversed(list(parent_ids)):
emitter = self.get(ancestor)
if emitter is not None:
return emitter
return main_emitter()
def has_active_subagents(self) -> bool:
return any(
emitter.level == "subagent" for emitter in self._by_run_id.values()
)
def clear(self) -> None:
self._by_run_id.clear()