Rename streaming runtime modules for clearer SRP boundaries.

This commit is contained in:
CREDO23 2026-05-07 15:41:33 +02:00
parent 4e664652a8
commit f8754a9dab
5 changed files with 173 additions and 8 deletions

View file

@ -0,0 +1,5 @@
"""Composable orchestration pieces for chat streaming."""
from app.tasks.chat.streaming.orchestration.event_stream import stream_agent_events
__all__ = ["stream_agent_events"]

View file

@ -0,0 +1,53 @@
"""Run LangGraph event streams through the EventRelay."""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Any
from app.agents.new_chat.feature_flags import get_flags
from app.tasks.chat.streaming.event_relay import EventRelay
from app.tasks.chat.streaming.relay.state import AgentEventRelayState
from app.tasks.chat.streaming.stream_result import StreamResult
async def stream_agent_events(
*,
agent: Any,
config: dict[str, Any],
input_data: Any,
streaming_service: Any,
result: StreamResult,
step_prefix: str = "thinking",
initial_step_id: str | None = None,
initial_step_title: str = "",
initial_step_items: list[str] | None = None,
content_builder: Any | None = None,
runtime_context: Any = None,
) -> AsyncIterator[str]:
"""Yield SSE frames from agent ``astream_events`` via ``EventRelay``."""
state = AgentEventRelayState.for_invocation(
initial_step_id=initial_step_id,
initial_step_title=initial_step_title,
initial_step_items=initial_step_items,
parity_v2=bool(get_flags().enable_stream_parity_v2),
)
astream_kwargs: dict[str, Any] = {"config": config, "version": "v2"}
if runtime_context is not None:
astream_kwargs["context"] = runtime_context
events = agent.astream_events(input_data, **astream_kwargs)
relay = EventRelay(streaming_service=streaming_service)
async for frame in relay.relay(
events,
state=state,
result=result,
step_prefix=step_prefix,
content_builder=content_builder,
config=config,
):
yield frame
result.accumulated_text = state.accumulated_text
result.agent_called_update_memory = state.called_update_memory