refactor(chat): drop alternate streaming entry path; use graph_stream

This commit is contained in:
CREDO23 2026-05-07 19:25:20 +02:00
parent 52895e37e9
commit 7e07092f67
23 changed files with 61 additions and 1278 deletions

View file

@ -0,0 +1,21 @@
"""LangGraph ``astream_events`` → SSE (``stream_output`` + ``StreamingResult``).
Imports are lazy to avoid a circular import with ``relay.event_relay``.
"""
from __future__ import annotations
__all__ = ["StreamingResult", "stream_output"]
def __getattr__(name: str):
if name == "stream_output":
from app.tasks.chat.streaming.graph_stream.event_stream import stream_output
return stream_output
if name == "StreamingResult":
from app.tasks.chat.streaming.graph_stream.result import StreamingResult
return StreamingResult
msg = f"module {__name__!r} has no attribute {name!r}"
raise AttributeError(msg)

View file

@ -0,0 +1,53 @@
"""Run LangGraph event streams through ``EventRelay``."""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Any
from app.agents.new_chat.feature_flags import get_flags
from app.tasks.chat.streaming.graph_stream.result import StreamingResult
from app.tasks.chat.streaming.relay.event_relay import EventRelay
from app.tasks.chat.streaming.relay.state import AgentEventRelayState
async def stream_output(
*,
agent: Any,
config: dict[str, Any],
input_data: Any,
streaming_service: Any,
result: StreamingResult,
step_prefix: str = "thinking",
initial_step_id: str | None = None,
initial_step_title: str = "",
initial_step_items: list[str] | None = None,
content_builder: Any | None = None,
runtime_context: Any = None,
) -> AsyncIterator[str]:
"""Yield SSE frames from agent ``astream_events`` via ``EventRelay``."""
state = AgentEventRelayState.for_invocation(
initial_step_id=initial_step_id,
initial_step_title=initial_step_title,
initial_step_items=initial_step_items,
parity_v2=bool(get_flags().enable_stream_parity_v2),
)
astream_kwargs: dict[str, Any] = {"config": config, "version": "v2"}
if runtime_context is not None:
astream_kwargs["context"] = runtime_context
events = agent.astream_events(input_data, **astream_kwargs)
relay = EventRelay(streaming_service=streaming_service)
async for frame in relay.relay(
events,
state=state,
result=result,
step_prefix=step_prefix,
content_builder=content_builder,
config=config,
):
yield frame
result.accumulated_text = state.accumulated_text
result.agent_called_update_memory = state.called_update_memory

View file

@ -0,0 +1,28 @@
"""Mutable facts collected while relaying one agent stream (``stream_output``)."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class StreamingResult:
accumulated_text: str = ""
is_interrupted: bool = False
interrupt_value: dict[str, Any] | None = None
sandbox_files: list[str] = field(default_factory=list)
agent_called_update_memory: bool = False
request_id: str | None = None
turn_id: str = ""
filesystem_mode: str = "cloud"
client_platform: str = "web"
intent_detected: str = "chat_only"
intent_confidence: float = 0.0
write_attempted: bool = False
write_succeeded: bool = False
verification_succeeded: bool = False
commit_gate_passed: bool = True
commit_gate_reason: str = ""
assistant_message_id: int | None = None
content_builder: Any | None = field(default=None, repr=False)