refactor(chat): add streaming/shared/ package for StreamResult and utils

Foundation layer for the parallel refactor of stream_new_chat.py.
Extracts the StreamResult dataclass (tracks per-turn streaming state)
and a small set of shared utilities (resume_step_prefix, safe_float).

Add-only; no existing code imports from this package yet. Existing
stream_new_chat.py keeps its inline equivalents until cutover.
This commit is contained in:
CREDO23 2026-05-25 21:48:04 +02:00
parent 18c66409a0
commit 4910263c93
3 changed files with 79 additions and 0 deletions

View file

@ -0,0 +1,15 @@
"""Shared building blocks used across every streaming flow."""
from __future__ import annotations
from app.tasks.chat.streaming.shared.stream_result import StreamResult
from app.tasks.chat.streaming.shared.utils import (
resume_step_prefix,
safe_float,
)
__all__ = [
"StreamResult",
"resume_step_prefix",
"safe_float",
]

View file

@ -0,0 +1,37 @@
"""Per-turn streaming state shared between the orchestrator and event loop."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class StreamResult:
accumulated_text: str = ""
is_interrupted: bool = False
sandbox_files: list[str] = field(default_factory=list)
request_id: str | None = None
turn_id: str = ""
filesystem_mode: str = "cloud"
client_platform: str = "web"
intent_detected: str = "chat_only"
intent_confidence: float = 0.0
write_attempted: bool = False
write_succeeded: bool = False
verification_succeeded: bool = False
commit_gate_passed: bool = True
commit_gate_reason: str = ""
# Pre-allocated assistant ``new_chat_messages.id`` for this turn, captured by
# ``persist_assistant_shell`` right after the user row is persisted. ``None``
# for the legacy/anonymous code paths that don't opt in to server-side
# ``ContentPart[]`` projection.
assistant_message_id: int | None = None
# In-memory mirror of the FE's assistant-ui ``ContentPartsState``, populated
# by the lifecycle methods called from the streaming event loop at each
# ``streaming_service.format_*`` yield site. Snapshot in the streaming
# ``finally`` to produce the rich JSONB persisted by
# ``finalize_assistant_turn``. ``repr=False`` keeps the log-on-error path
# (``StreamResult`` is logged in some error branches) from dumping a
# potentially-large parts list.
content_builder: Any | None = field(default=None, repr=False)

View file

@ -0,0 +1,27 @@
"""Small utilities used by streaming orchestrators and phases."""
from __future__ import annotations
from typing import Any
def resume_step_prefix(turn_id: str) -> str:
"""Per-turn ``step_prefix`` for resume invocations.
Each ``stream_agent_events`` call constructs a fresh
``AgentEventRelayState`` with ``thinking_step_counter=0``, so two consecutive
resume turns would otherwise both emit ``thinking-resume-1``, ``-2`` etc.
The frontend rehydrates ``currentThinkingSteps`` from the immediate prior
assistant message at the start of every resume if the new stream's IDs
collide with the seeded ones, React renders sibling Timeline rows with the
same key. Salting with ``turn_id`` guarantees disjoint IDs across resumes
within one thread.
"""
return f"thinking-resume-{turn_id}"
def safe_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default