From 52593d88dbf2dfa5d81587048f4d2bb0ea0d5cad Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 7 May 2026 16:00:15 +0200 Subject: [PATCH] Reorganize streaming orchestration modules into relay and orchestration folders. --- .../app/routes/new_chat_routes.py | 2 +- .../chat/streaming/orchestration/__init__.py | 8 +- .../streaming/orchestration/event_stream.py | 6 +- .../chat/streaming/orchestration/input.py | 23 +++++ .../{ => orchestration}/orchestrator.py | 33 +++++++ .../output.py} | 8 +- .../tasks/chat/streaming/relay/__init__.py | 4 + .../chat/streaming/{ => relay}/event_relay.py | 4 +- .../test_orchestration_event_stream.py | 6 +- .../test_orchestrator_stream_chat.py | 88 +++++++++++++++++++ 10 files changed, 170 insertions(+), 12 deletions(-) create mode 100644 surfsense_backend/app/tasks/chat/streaming/orchestration/input.py rename surfsense_backend/app/tasks/chat/streaming/{ => orchestration}/orchestrator.py (76%) rename surfsense_backend/app/tasks/chat/streaming/{stream_result.py => orchestration/output.py} (82%) rename surfsense_backend/app/tasks/chat/streaming/{ => relay}/event_relay.py (97%) create mode 100644 surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index 7f035daef..e54497f93 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -75,7 +75,7 @@ from app.tasks.chat.stream_new_chat import ( stream_new_chat as legacy_stream_new_chat, stream_resume_chat as legacy_stream_resume_chat, ) -from app.tasks.chat.streaming.orchestrator import ( +from app.tasks.chat.streaming.orchestration.orchestrator import ( stream_chat, stream_regenerate, stream_resume, diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py index 8b586f2be..6f683a410 100644 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py @@ -1,5 +1,11 @@ """Composable orchestration pieces for chat streaming.""" from app.tasks.chat.streaming.orchestration.event_stream import stream_agent_events +from app.tasks.chat.streaming.orchestration.input import StreamExecutionInput +from app.tasks.chat.streaming.orchestration.output import StreamOutput -__all__ = ["stream_agent_events"] +__all__ = [ + "StreamExecutionInput", + "StreamOutput", + "stream_agent_events", +] diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py index 1448cd86a..369883c3a 100644 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py @@ -6,9 +6,9 @@ from collections.abc import AsyncIterator from typing import Any from app.agents.new_chat.feature_flags import get_flags -from app.tasks.chat.streaming.event_relay import EventRelay +from app.tasks.chat.streaming.orchestration.output import StreamOutput +from app.tasks.chat.streaming.relay.event_relay import EventRelay from app.tasks.chat.streaming.relay.state import AgentEventRelayState -from app.tasks.chat.streaming.stream_result import StreamResult async def stream_agent_events( @@ -17,7 +17,7 @@ async def stream_agent_events( config: dict[str, Any], input_data: Any, streaming_service: Any, - result: StreamResult, + result: StreamOutput, step_prefix: str = "thinking", initial_step_id: str | None = None, initial_step_title: str = "", diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/input.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/input.py new file mode 100644 index 000000000..13d43b612 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/input.py @@ -0,0 +1,23 @@ +"""Inputs for orchestrator-owned streaming execution.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class StreamExecutionInput: + """Container for dependencies required by ``stream_agent_events``.""" + + agent: Any + config: dict[str, Any] + input_data: Any + streaming_service: Any + step_prefix: str = "thinking" + initial_step_id: str | None = None + initial_step_title: str = "" + initial_step_items: list[str] | None = None + content_builder: Any | None = None + runtime_context: Any = None + diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestrator.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py similarity index 76% rename from surfsense_backend/app/tasks/chat/streaming/orchestrator.py rename to surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py index e912dd632..ac7abc6f4 100644 --- a/surfsense_backend/app/tasks/chat/streaming/orchestrator.py +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py @@ -14,6 +14,9 @@ from typing import Any, Literal from app.agents.new_chat.filesystem_selection import FilesystemSelection from app.db import ChatVisibility from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat +from app.tasks.chat.streaming.orchestration.event_stream import stream_agent_events +from app.tasks.chat.streaming.orchestration.input import StreamExecutionInput +from app.tasks.chat.streaming.orchestration.output import StreamOutput async def stream_chat( @@ -34,8 +37,38 @@ async def stream_chat( filesystem_selection: FilesystemSelection | None = None, request_id: str | None = None, user_image_data_urls: list[str] | None = None, + orchestration_input: StreamExecutionInput | None = None, ) -> AsyncGenerator[str, None]: """Stream a new chat turn through the current production pipeline.""" + if orchestration_input is not None: + result = StreamOutput( + request_id=request_id, + turn_id=f"{chat_id}:orchestrator", + filesystem_mode=( + filesystem_selection.mode.value if filesystem_selection else "cloud" + ), + client_platform=( + filesystem_selection.client_platform.value + if filesystem_selection + else "web" + ), + ) + async for frame in stream_agent_events( + agent=orchestration_input.agent, + config=orchestration_input.config, + input_data=orchestration_input.input_data, + streaming_service=orchestration_input.streaming_service, + result=result, + step_prefix=orchestration_input.step_prefix, + initial_step_id=orchestration_input.initial_step_id, + initial_step_title=orchestration_input.initial_step_title, + initial_step_items=orchestration_input.initial_step_items, + content_builder=orchestration_input.content_builder, + runtime_context=orchestration_input.runtime_context, + ): + yield frame + return + async for chunk in stream_new_chat( user_query=user_query, search_space_id=search_space_id, diff --git a/surfsense_backend/app/tasks/chat/streaming/stream_result.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/output.py similarity index 82% rename from surfsense_backend/app/tasks/chat/streaming/stream_result.py rename to surfsense_backend/app/tasks/chat/streaming/orchestration/output.py index 8ea3bd295..0c4870ec4 100644 --- a/surfsense_backend/app/tasks/chat/streaming/stream_result.py +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/output.py @@ -1,4 +1,4 @@ -"""Mutable facts collected while streaming one agent turn.""" +"""Output facts collected while streaming one orchestrated agent turn.""" from __future__ import annotations @@ -7,7 +7,7 @@ from typing import Any @dataclass -class StreamResult: +class StreamOutput: accumulated_text: str = "" is_interrupted: bool = False interrupt_value: dict[str, Any] | None = None @@ -26,3 +26,7 @@ class StreamResult: commit_gate_reason: str = "" assistant_message_id: int | None = None content_builder: Any | None = field(default=None, repr=False) + + +# Backwards-compatible alias while imports migrate. +StreamResult = StreamOutput diff --git a/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py b/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py index c1a5e7175..351e878a8 100644 --- a/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py +++ b/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py @@ -1,3 +1,7 @@ """Relay state: thinking steps, tool bookkeeping, and stream helpers.""" from __future__ import annotations + +from app.tasks.chat.streaming.relay.event_relay import EventRelay, EventRelayConfig + +__all__ = ["EventRelay", "EventRelayConfig"] diff --git a/surfsense_backend/app/tasks/chat/streaming/event_relay.py b/surfsense_backend/app/tasks/chat/streaming/relay/event_relay.py similarity index 97% rename from surfsense_backend/app/tasks/chat/streaming/event_relay.py rename to surfsense_backend/app/tasks/chat/streaming/relay/event_relay.py index f86337ad7..072baac72 100644 --- a/surfsense_backend/app/tasks/chat/streaming/event_relay.py +++ b/surfsense_backend/app/tasks/chat/streaming/relay/event_relay.py @@ -16,11 +16,11 @@ from app.tasks.chat.streaming.handlers.custom_event_dispatch import ( ) from app.tasks.chat.streaming.handlers.tool_end import iter_tool_end_frames from app.tasks.chat.streaming.handlers.tool_start import iter_tool_start_frames +from app.tasks.chat.streaming.orchestration.output import StreamOutput from app.tasks.chat.streaming.relay.state import AgentEventRelayState from app.tasks.chat.streaming.relay.thinking_step_completion import ( complete_active_thinking_step, ) -from app.tasks.chat.streaming.stream_result import StreamResult @dataclass @@ -52,7 +52,7 @@ class EventRelay: events: AsyncIterator[dict[str, Any]], *, state: AgentEventRelayState, - result: StreamResult, + result: StreamOutput, step_prefix: str = "thinking", content_builder: Any | None = None, config: dict[str, Any] | None = None, diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py index e12283a75..e0a1877a8 100644 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py @@ -8,7 +8,7 @@ from typing import Any import pytest from app.tasks.chat.streaming.orchestration import stream_agent_events -from app.tasks.chat.streaming.stream_result import StreamResult +from app.tasks.chat.streaming.orchestration.output import StreamOutput pytestmark = pytest.mark.unit @@ -64,7 +64,7 @@ async def test_stream_agent_events_emits_text_lifecycle_and_updates_result() -> {"event": "on_chat_model_stream", "data": {"chunk": _Chunk(content=" world")}}, ] ) - result = StreamResult() + result = StreamOutput() frames = await _collect( stream_agent_events( @@ -90,7 +90,7 @@ async def test_stream_agent_events_emits_text_lifecycle_and_updates_result() -> async def test_stream_agent_events_passes_runtime_context_to_agent() -> None: service = _StreamingService() agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("x")}}]) - result = StreamResult() + result = StreamOutput() _ = await _collect( stream_agent_events( diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py new file mode 100644 index 000000000..cf54fdab0 --- /dev/null +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py @@ -0,0 +1,88 @@ +"""Behavior tests for orchestrator ``stream_chat`` public API.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +import pytest + +from app.tasks.chat.streaming.orchestration import StreamExecutionInput +from app.tasks.chat.streaming.orchestration.orchestrator import stream_chat + +pytestmark = pytest.mark.unit + + +@dataclass +class _Chunk: + content: Any = "" + additional_kwargs: dict[str, Any] = field(default_factory=dict) + tool_call_chunks: list[dict[str, Any]] = field(default_factory=list) + + +class _StreamingService: + def __init__(self) -> None: + self._text_idx = 0 + + def generate_text_id(self) -> str: + self._text_idx += 1 + return f"text-{self._text_idx}" + + def format_text_start(self, text_id: str) -> str: + return f"text_start:{text_id}" + + def format_text_delta(self, text_id: str, text: str) -> str: + return f"text_delta:{text_id}:{text}" + + def format_text_end(self, text_id: str) -> str: + return f"text_end:{text_id}" + + +class _Agent: + def __init__(self, events: list[dict[str, Any]]) -> None: + self.events = list(events) + self.calls: list[tuple[Any, dict[str, Any]]] = [] + + async def astream_events(self, input_data: Any, **kwargs: Any): + self.calls.append((input_data, kwargs)) + for event in self.events: + yield event + + +async def _collect(stream: Any) -> list[str]: + out: list[str] = [] + async for x in stream: + out.append(x) + return out + + +async def test_stream_chat_uses_orchestration_input_path() -> None: + service = _StreamingService() + agent = _Agent( + [ + {"event": "on_chat_model_stream", "data": {"chunk": _Chunk(content="hello")}}, + {"event": "on_chat_model_stream", "data": {"chunk": _Chunk(content="!")}}, + ] + ) + frames = await _collect( + stream_chat( + user_query="ignored-here", + search_space_id=1, + chat_id=77, + orchestration_input=StreamExecutionInput( + agent=agent, + config={"configurable": {"thread_id": "thread-1"}}, + input_data={"messages": []}, + streaming_service=service, + ), + ) + ) + + assert frames == [ + "text_start:text-1", + "text_delta:text-1:hello", + "text_delta:text-1:!", + "text_end:text-1", + ] + assert agent.calls + assert agent.calls[0][1]["version"] == "v2"