From 885d4acda921ff8b8c0cb10171cf63bcffbc5845 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Mon, 25 May 2026 21:50:03 +0200 Subject: [PATCH] refactor(chat): add streaming/flows/resume_chat/ per-concern leaf modules Three focused modules used by the upcoming resume-chat orchestrator: * runtime_context: build_resume_chat_runtime_context assembles the SurfSenseContextSchema for a resume turn (handles empty mention lists, since resume requests do not carry fresh @-mentions). * assistant_shell: persist_resume_assistant_shell writes a fresh assistant row for the resumed turn so the post-stream finalize has a target. * resume_routing: build_resume_routing collects the pending interrupts across paused subagents and slices the flat list of ResumeDecision[] into the correct (thread, subagent) buckets so LangGraph routes each decision back to the right paused tool call. Add-only; no orchestrator yet (next commit). --- .../flows/resume_chat/assistant_shell.py | 31 +++++++++ .../flows/resume_chat/resume_routing.py | 65 +++++++++++++++++++ .../flows/resume_chat/runtime_context.py | 23 +++++++ 3 files changed, 119 insertions(+) create mode 100644 surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/assistant_shell.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/resume_routing.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/runtime_context.py diff --git a/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/assistant_shell.py b/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/assistant_shell.py new file mode 100644 index 000000000..2f34387f8 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/assistant_shell.py @@ -0,0 +1,31 @@ +"""Pre-write a fresh assistant row for this resume turn. + +The original (interrupted) ``stream_new_chat`` invocation already persisted +its own assistant row anchored to a different ``turn_id``; resume allocates a +new ``turn_id`` (per-request, see ``orchestrator``) so we need a separate row +keyed on the same ``(thread_id, turn_id, ASSISTANT)`` invariant. + +Idempotent against migration 141's partial unique index — recovers the +existing id on retry. + +Resume does NOT emit ``data-user-message-id``: the user row is from the +original interrupted turn (different ``turn_id``) and is never re-persisted +here. See B5 in the ``sse-based_message_id_handshake`` plan. +""" + +from __future__ import annotations + +from app.tasks.chat.persistence import persist_assistant_shell + + +async def persist_resume_assistant_shell( + *, + chat_id: int, + user_id: str | None, + turn_id: str, +) -> int | None: + return await persist_assistant_shell( + chat_id=chat_id, + user_id=user_id, + turn_id=turn_id, + ) diff --git a/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/resume_routing.py b/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/resume_routing.py new file mode 100644 index 000000000..300fbc9bd --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/resume_routing.py @@ -0,0 +1,65 @@ +"""Route a flat ``decisions`` list back to the right paused subagent. + +Each pending interrupt is stamped with its originating ``tool_call_id`` (see +``checkpointed_subagent_middleware.propagation``) so the resume slicer can +re-target each ``HumanReview`` decision at the right ``tool_call_id``. + +LangGraph rejects scalar ``Command(resume=...)`` when multiple interrupts are +pending (parallel HITL); the mapped form works for the single-pause case too, +so we always use it. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any + +from app.utils.perf import get_perf_logger + +_perf_log = get_perf_logger() +logger = logging.getLogger(__name__) + + +@dataclass +class ResumeRoutingPayload: + """Resolved per-``tool_call_id`` resume slices + the lg-shaped resume map.""" + + routed_resume_value: dict[str, Any] + lg_resume_map: dict[str, Any] + + +async def build_resume_routing( + agent: Any, + *, + chat_id: int, + decisions: list[dict], +) -> ResumeRoutingPayload: + """Read parent_state, collect pending tool-calls, slice decisions, build map. + + The middleware reads its per-``tool_call_id`` resume slice from the + ``surfsense_resume_value`` configurable; parallel siblings each pop their + own entry so they never race. + """ + from app.agents.multi_agent_chat.middleware.main_agent.checkpointed_subagent_middleware.resume_routing import ( + build_lg_resume_map, + collect_pending_tool_calls, + slice_decisions_by_tool_call, + ) + + parent_state = await agent.aget_state( + {"configurable": {"thread_id": str(chat_id)}} + ) + pending = collect_pending_tool_calls(parent_state) + _perf_log.info( + "[hitl_route] resume_entry chat_id=%s decisions=%d pending_subagents=%d", + chat_id, + len(decisions), + len(pending), + ) + routed_resume_value = slice_decisions_by_tool_call(decisions, pending) + lg_resume_map = build_lg_resume_map(parent_state, routed_resume_value) + return ResumeRoutingPayload( + routed_resume_value=routed_resume_value, + lg_resume_map=lg_resume_map, + ) diff --git a/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/runtime_context.py b/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/runtime_context.py new file mode 100644 index 000000000..59d5d8ca7 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/flows/resume_chat/runtime_context.py @@ -0,0 +1,23 @@ +"""Build the per-invocation ``SurfSenseContextSchema`` for a resume turn. + +Resume doesn't carry new ``mentioned_document_ids`` (those are seeded by the +original turn). We still build the context so future middleware extensions +can rely on ``runtime.context`` always being populated. +""" + +from __future__ import annotations + +from app.agents.new_chat.context import SurfSenseContextSchema + + +def build_resume_chat_runtime_context( + *, + search_space_id: int, + request_id: str | None, + turn_id: str, +) -> SurfSenseContextSchema: + return SurfSenseContextSchema( + search_space_id=search_space_id, + request_id=request_id, + turn_id=turn_id, + )