From f8754a9dab480d2cb112f32c2cf1b4c67b4949ea Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 7 May 2026 15:41:33 +0200 Subject: [PATCH] Rename streaming runtime modules for clearer SRP boundaries. --- .../streaming/{runtime.py => agent_setup.py} | 2 +- .../chat/streaming/orchestration/__init__.py | 5 + .../streaming/orchestration/event_stream.py | 53 +++++++++ ...strator_runtime.py => test_agent_setup.py} | 14 +-- .../test_orchestration_event_stream.py | 107 ++++++++++++++++++ 5 files changed, 173 insertions(+), 8 deletions(-) rename surfsense_backend/app/tasks/chat/streaming/{runtime.py => agent_setup.py} (97%) create mode 100644 surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py rename surfsense_backend/tests/unit/tasks/chat/streaming/{test_orchestrator_runtime.py => test_agent_setup.py} (87%) create mode 100644 surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py diff --git a/surfsense_backend/app/tasks/chat/streaming/runtime.py b/surfsense_backend/app/tasks/chat/streaming/agent_setup.py similarity index 97% rename from surfsense_backend/app/tasks/chat/streaming/runtime.py rename to surfsense_backend/app/tasks/chat/streaming/agent_setup.py index b45da2789..f67c6ad65 100644 --- a/surfsense_backend/app/tasks/chat/streaming/runtime.py +++ b/surfsense_backend/app/tasks/chat/streaming/agent_setup.py @@ -1,4 +1,4 @@ -"""Runtime setup helpers for orchestrated chat streaming.""" +"""Agent setup helpers for orchestrated chat streaming.""" from __future__ import annotations diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py new file mode 100644 index 000000000..8b586f2be --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py @@ -0,0 +1,5 @@ +"""Composable orchestration pieces for chat streaming.""" + +from app.tasks.chat.streaming.orchestration.event_stream import stream_agent_events + +__all__ = ["stream_agent_events"] diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py new file mode 100644 index 000000000..1448cd86a --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py @@ -0,0 +1,53 @@ +"""Run LangGraph event streams through the EventRelay.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any + +from app.agents.new_chat.feature_flags import get_flags +from app.tasks.chat.streaming.event_relay import EventRelay +from app.tasks.chat.streaming.relay.state import AgentEventRelayState +from app.tasks.chat.streaming.stream_result import StreamResult + + +async def stream_agent_events( + *, + agent: Any, + config: dict[str, Any], + input_data: Any, + streaming_service: Any, + result: StreamResult, + step_prefix: str = "thinking", + initial_step_id: str | None = None, + initial_step_title: str = "", + initial_step_items: list[str] | None = None, + content_builder: Any | None = None, + runtime_context: Any = None, +) -> AsyncIterator[str]: + """Yield SSE frames from agent ``astream_events`` via ``EventRelay``.""" + state = AgentEventRelayState.for_invocation( + initial_step_id=initial_step_id, + initial_step_title=initial_step_title, + initial_step_items=initial_step_items, + parity_v2=bool(get_flags().enable_stream_parity_v2), + ) + + astream_kwargs: dict[str, Any] = {"config": config, "version": "v2"} + if runtime_context is not None: + astream_kwargs["context"] = runtime_context + + events = agent.astream_events(input_data, **astream_kwargs) + relay = EventRelay(streaming_service=streaming_service) + async for frame in relay.relay( + events, + state=state, + result=result, + step_prefix=step_prefix, + content_builder=content_builder, + config=config, + ): + yield frame + + result.accumulated_text = state.accumulated_text + result.agent_called_update_memory = state.called_update_memory diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_runtime.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_agent_setup.py similarity index 87% rename from surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_runtime.py rename to surfsense_backend/tests/unit/tasks/chat/streaming/test_agent_setup.py index edb05edfa..e1f7dd027 100644 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_runtime.py +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_agent_setup.py @@ -1,4 +1,4 @@ -"""Behavior tests for streaming runtime helpers.""" +"""Behavior tests for streaming agent setup helpers.""" from __future__ import annotations @@ -8,7 +8,7 @@ from typing import Any import pytest -from app.tasks.chat.streaming import runtime +from app.tasks.chat.streaming import agent_setup pytestmark = pytest.mark.unit @@ -29,7 +29,7 @@ async def test_preflight_llm_calls_litellm_when_model_present( ) llm = types.SimpleNamespace(model="openai/test", api_key="k", api_base="b") - await runtime.preflight_llm(llm, is_provider_rate_limited=lambda _: False) + await agent_setup.preflight_llm(llm, is_provider_rate_limited=lambda _: False) assert calls["model"] == "openai/test" assert calls["max_tokens"] == 1 @@ -52,7 +52,7 @@ async def test_preflight_llm_rethrows_rate_limited(monkeypatch: pytest.MonkeyPat ) with pytest.raises(_RateLimitedError): - await runtime.preflight_llm( + await agent_setup.preflight_llm( types.SimpleNamespace(model="openai/test"), is_provider_rate_limited=lambda exc: isinstance(exc, _RateLimitedError), ) @@ -74,7 +74,7 @@ async def test_preflight_llm_skips_probe_for_auto_model( types.SimpleNamespace(acompletion=_fake_acompletion), ) - await runtime.preflight_llm( + await agent_setup.preflight_llm( types.SimpleNamespace(model="auto"), is_provider_rate_limited=lambda _: False, ) @@ -88,7 +88,7 @@ async def test_build_main_agent_for_thread_forwards_arguments() -> None: seen.update(kwargs) return "agent" - out = await runtime.build_main_agent_for_thread( + out = await agent_setup.build_main_agent_for_thread( _factory, llm="llm", search_space_id=1, @@ -116,5 +116,5 @@ async def test_settle_speculative_agent_build_swallows_exceptions() -> None: import asyncio task = asyncio.create_task(_boom()) - await runtime.settle_speculative_agent_build(task) + await agent_setup.settle_speculative_agent_build(task) assert task.done() diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py new file mode 100644 index 000000000..e12283a75 --- /dev/null +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py @@ -0,0 +1,107 @@ +"""Behavior tests for orchestration event-stream execution.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +import pytest + +from app.tasks.chat.streaming.orchestration import stream_agent_events +from app.tasks.chat.streaming.stream_result import StreamResult + +pytestmark = pytest.mark.unit + + +@dataclass +class _Chunk: + content: Any = "" + additional_kwargs: dict[str, Any] = field(default_factory=dict) + tool_call_chunks: list[dict[str, Any]] = field(default_factory=list) + + +class _StreamingService: + def __init__(self) -> None: + self._text_idx = 0 + + def generate_text_id(self) -> str: + self._text_idx += 1 + return f"text-{self._text_idx}" + + def format_text_start(self, text_id: str) -> str: + return f"text_start:{text_id}" + + def format_text_delta(self, text_id: str, text: str) -> str: + return f"text_delta:{text_id}:{text}" + + def format_text_end(self, text_id: str) -> str: + return f"text_end:{text_id}" + + +class _Agent: + def __init__(self, events: list[dict[str, Any]]) -> None: + self.events = list(events) + self.calls: list[tuple[Any, dict[str, Any]]] = [] + + async def astream_events(self, input_data: Any, **kwargs: Any): + self.calls.append((input_data, kwargs)) + for event in self.events: + yield event + + +async def _collect(stream: Any) -> list[str]: + out: list[str] = [] + async for x in stream: + out.append(x) + return out + + +async def test_stream_agent_events_emits_text_lifecycle_and_updates_result() -> None: + service = _StreamingService() + agent = _Agent( + [ + {"event": "on_chat_model_stream", "data": {"chunk": _Chunk(content="Hello")}}, + {"event": "on_chat_model_stream", "data": {"chunk": _Chunk(content=" world")}}, + ] + ) + result = StreamResult() + + frames = await _collect( + stream_agent_events( + agent=agent, + config={"configurable": {"thread_id": "t-1"}}, + input_data={"messages": []}, + streaming_service=service, + result=result, + ) + ) + + assert frames == [ + "text_start:text-1", + "text_delta:text-1:Hello", + "text_delta:text-1: world", + "text_end:text-1", + ] + assert result.accumulated_text == "Hello world" + assert result.agent_called_update_memory is False + assert agent.calls[0][1]["version"] == "v2" + + +async def test_stream_agent_events_passes_runtime_context_to_agent() -> None: + service = _StreamingService() + agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("x")}}]) + result = StreamResult() + + _ = await _collect( + stream_agent_events( + agent=agent, + config={"configurable": {"thread_id": "t-2"}}, + input_data={"messages": []}, + streaming_service=service, + result=result, + runtime_context={"mentioned_document_ids": [1, 2]}, + ) + ) + + assert agent.calls + assert agent.calls[0][1]["context"] == {"mentioned_document_ids": [1, 2]}