refactor(chat): add streaming/flows/resume_chat/ per-concern leaf modules

Three focused modules used by the upcoming resume-chat orchestrator:

* runtime_context: build_resume_chat_runtime_context assembles the
  SurfSenseContextSchema for a resume turn (handles empty mention
  lists, since resume requests do not carry fresh @-mentions).
* assistant_shell: persist_resume_assistant_shell writes a fresh
  assistant row for the resumed turn so the post-stream finalize
  has a target.
* resume_routing: build_resume_routing collects the pending
  interrupts across paused subagents and slices the flat list of
  ResumeDecision[] into the correct (thread, subagent) buckets so
  LangGraph routes each decision back to the right paused tool call.

Add-only; no orchestrator yet (next commit).
This commit is contained in:
CREDO23 2026-05-25 21:50:03 +02:00
parent b2a0888588
commit 885d4acda9
3 changed files with 119 additions and 0 deletions

View file

@ -0,0 +1,31 @@
"""Pre-write a fresh assistant row for this resume turn.
The original (interrupted) ``stream_new_chat`` invocation already persisted
its own assistant row anchored to a different ``turn_id``; resume allocates a
new ``turn_id`` (per-request, see ``orchestrator``) so we need a separate row
keyed on the same ``(thread_id, turn_id, ASSISTANT)`` invariant.
Idempotent against migration 141's partial unique index — recovers the
existing id on retry.
Resume does NOT emit ``data-user-message-id``: the user row is from the
original interrupted turn (different ``turn_id``) and is never re-persisted
here. See B5 in the ``sse-based_message_id_handshake`` plan.
"""
from __future__ import annotations
from app.tasks.chat.persistence import persist_assistant_shell
async def persist_resume_assistant_shell(
*,
chat_id: int,
user_id: str | None,
turn_id: str,
) -> int | None:
return await persist_assistant_shell(
chat_id=chat_id,
user_id=user_id,
turn_id=turn_id,
)

View file

@ -0,0 +1,65 @@
"""Route a flat ``decisions`` list back to the right paused subagent.
Each pending interrupt is stamped with its originating ``tool_call_id`` (see
``checkpointed_subagent_middleware.propagation``) so the resume slicer can
re-target each ``HumanReview`` decision at the right ``tool_call_id``.
LangGraph rejects scalar ``Command(resume=...)`` when multiple interrupts are
pending (parallel HITL); the mapped form works for the single-pause case too,
so we always use it.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from app.utils.perf import get_perf_logger
_perf_log = get_perf_logger()
logger = logging.getLogger(__name__)
@dataclass
class ResumeRoutingPayload:
"""Resolved per-``tool_call_id`` resume slices + the lg-shaped resume map."""
routed_resume_value: dict[str, Any]
lg_resume_map: dict[str, Any]
async def build_resume_routing(
agent: Any,
*,
chat_id: int,
decisions: list[dict],
) -> ResumeRoutingPayload:
"""Read parent_state, collect pending tool-calls, slice decisions, build map.
The middleware reads its per-``tool_call_id`` resume slice from the
``surfsense_resume_value`` configurable; parallel siblings each pop their
own entry so they never race.
"""
from app.agents.multi_agent_chat.middleware.main_agent.checkpointed_subagent_middleware.resume_routing import (
build_lg_resume_map,
collect_pending_tool_calls,
slice_decisions_by_tool_call,
)
parent_state = await agent.aget_state(
{"configurable": {"thread_id": str(chat_id)}}
)
pending = collect_pending_tool_calls(parent_state)
_perf_log.info(
"[hitl_route] resume_entry chat_id=%s decisions=%d pending_subagents=%d",
chat_id,
len(decisions),
len(pending),
)
routed_resume_value = slice_decisions_by_tool_call(decisions, pending)
lg_resume_map = build_lg_resume_map(parent_state, routed_resume_value)
return ResumeRoutingPayload(
routed_resume_value=routed_resume_value,
lg_resume_map=lg_resume_map,
)

View file

@ -0,0 +1,23 @@
"""Build the per-invocation ``SurfSenseContextSchema`` for a resume turn.
Resume doesn't carry new ``mentioned_document_ids`` (those are seeded by the
original turn). We still build the context so future middleware extensions
can rely on ``runtime.context`` always being populated.
"""
from __future__ import annotations
from app.agents.new_chat.context import SurfSenseContextSchema
def build_resume_chat_runtime_context(
*,
search_space_id: int,
request_id: str | None,
turn_id: str,
) -> SurfSenseContextSchema:
return SurfSenseContextSchema(
search_space_id=search_space_id,
request_id=request_id,
turn_id=turn_id,
)