Add chat streaming relay state and thinking-step SSE helpers.

This commit is contained in:
CREDO23 2026-05-06 20:08:48 +02:00
parent c25b78c304
commit 7581a7c9c3
4 changed files with 113 additions and 0 deletions

View file

@ -0,0 +1,3 @@
"""Relay state: thinking steps, tool bookkeeping, and stream helpers."""
from __future__ import annotations

View file

@ -0,0 +1,55 @@
"""Mutable counters and maps for one agent stream."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class AgentEventRelayState:
"""Tracks text, thinking steps, tool depth, and pending tool-call metadata."""
accumulated_text: str = ""
current_text_id: str | None = None
thinking_step_counter: int = 0
tool_step_ids: dict[str, str] = field(default_factory=dict)
completed_step_ids: set[str] = field(default_factory=set)
last_active_step_id: str | None = None
last_active_step_title: str = ""
last_active_step_items: list[str] = field(default_factory=list)
just_finished_tool: bool = False
active_tool_depth: int = 0
called_update_memory: bool = False
current_reasoning_id: str | None = None
parity_v2: bool = False
pending_tool_call_chunks: list[dict[str, Any]] = field(default_factory=list)
lc_tool_call_id_by_run: dict[str, str] = field(default_factory=dict)
file_path_by_run: dict[str, str] = field(default_factory=dict)
index_to_meta: dict[int, dict[str, str]] = field(default_factory=dict)
ui_tool_call_id_by_run: dict[str, str] = field(default_factory=dict)
current_lc_tool_call_id: dict[str, str | None] = field(
default_factory=lambda: {"value": None}
)
@classmethod
def for_invocation(
cls,
*,
initial_step_id: str | None = None,
initial_step_title: str = "",
initial_step_items: list[str] | None = None,
parity_v2: bool,
) -> AgentEventRelayState:
counter = 1 if initial_step_id else 0
return cls(
thinking_step_counter=counter,
last_active_step_id=initial_step_id,
last_active_step_title=initial_step_title,
last_active_step_items=list(initial_step_items or []),
parity_v2=parity_v2,
)
def next_thinking_step_id(self, step_prefix: str) -> str:
self.thinking_step_counter += 1
return f"{step_prefix}-{self.thinking_step_counter}"

View file

@ -0,0 +1,31 @@
"""Close the in-progress thinking step with a completed status frame."""
from __future__ import annotations
from typing import Any
from .thinking_step_sse import emit_thinking_step_frame
def complete_active_thinking_step(
*,
streaming_service: Any,
content_builder: Any | None,
last_active_step_id: str | None,
last_active_step_title: str,
last_active_step_items: list[str],
completed_step_ids: set[str],
) -> tuple[str | None, str | None]:
"""Emit a completed thinking-step frame once; return (frame or None, next active step id)."""
if last_active_step_id and last_active_step_id not in completed_step_ids:
completed_step_ids.add(last_active_step_id)
event = emit_thinking_step_frame(
streaming_service=streaming_service,
content_builder=content_builder,
step_id=last_active_step_id,
title=last_active_step_title,
status="completed",
items=last_active_step_items if last_active_step_items else None,
)
return event, None
return None, last_active_step_id

View file

@ -0,0 +1,24 @@
"""Thinking-step SSE plus optional content-builder updates."""
from __future__ import annotations
from typing import Any
def emit_thinking_step_frame(
*,
streaming_service: Any,
content_builder: Any | None,
step_id: str,
title: str,
status: str = "in_progress",
items: list[str] | None = None,
) -> str:
if content_builder is not None:
content_builder.on_thinking_step(step_id, title, status, items)
return streaming_service.format_thinking_step(
step_id=step_id,
title=title,
status=status,
items=items,
)