From 932bf22a34813ac164e58669fa8d082b605b78ca Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Sat, 9 May 2026 22:54:07 +0200 Subject: [PATCH] chat: fix mixed-decision HITL crash and fold resumed assistant messages into the interrupted bubble. --- .../config.py | 46 +++++ .../task_tool.py | 7 + .../new-chat/[[...chat_id]]/page.tsx | 13 +- .../chat-messages/timeline/build-timeline.ts | 4 +- surfsense_web/lib/chat/message-utils.ts | 174 ++++++++++++++---- surfsense_web/lib/chat/streaming-state.ts | 4 +- 6 files changed, 208 insertions(+), 40 deletions(-) diff --git a/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/config.py b/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/config.py index 16211686c..ac232b92a 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/config.py +++ b/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/config.py @@ -6,12 +6,19 @@ exposes the side-channel ``stream_resume_chat`` uses to ferry resume payloads. from __future__ import annotations +import logging from typing import Any from langchain.tools import ToolRuntime from .constants import DEFAULT_SUBAGENT_RECURSION_LIMIT +logger = logging.getLogger(__name__) + +# langgraph stores the parent task's scratchpad under this configurable key; +# subagents inherit the chain via ``parent_scratchpad`` fallback. +_LANGGRAPH_SCRATCHPAD_KEY = "__pregel_scratchpad" + def subagent_invoke_config(runtime: ToolRuntime) -> dict[str, Any]: """RunnableConfig for the nested invoke; raises ``recursion_limit`` to the parent's budget.""" @@ -42,3 +49,42 @@ def has_surfsense_resume(runtime: ToolRuntime) -> bool: if not isinstance(configurable, dict): return False return "surfsense_resume_value" in configurable + + +def drain_parent_null_resume(runtime: ToolRuntime) -> None: + """Consume the parent's lingering ``NULL_TASK_ID/RESUME`` write before delegating. + + ``stream_resume_chat`` wakes the main agent with + ``Command(resume={"decisions": [...]})`` so the propagated + ``_lg_interrupt(...)`` can return. langgraph stores that payload as the + parent task's ``null_resume`` pending write, which only gets consumed + *after* ``subagent.[a]invoke`` returns (when the post-call propagation + re-fires). While the subagent is mid-execution, any *new* ``interrupt()`` + inside it (e.g. a follow-up tool call after a mixed approve/reject) walks + ``subagent_scratchpad → parent_scratchpad.get_null_resume`` and picks up + the parent's still-live decisions — mismatching against a different number + of hanging tool calls and crashing ``HumanInTheLoopMiddleware``. + + Draining the write here closes that cross-graph leak so subagent + interrupts pause cleanly and re-propagate as a fresh approval card. + """ + cfg = runtime.config or {} + configurable = cfg.get("configurable") if isinstance(cfg, dict) else None + if not isinstance(configurable, dict): + return + scratchpad = configurable.get(_LANGGRAPH_SCRATCHPAD_KEY) + if scratchpad is None: + return + consume = getattr(scratchpad, "get_null_resume", None) + if not callable(consume): + return + try: + consume(True) + except Exception: + # Defensive: if langgraph's internal scratchpad shape changes we don't + # want to break the resume path. Worst case the original ValueError + # still surfaces — same behavior as before this fix. + logger.debug( + "drain_parent_null_resume: scratchpad.get_null_resume raised", + exc_info=True, + ) diff --git a/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py b/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py index 5668f8ddb..7c0dd8624 100644 --- a/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py +++ b/surfsense_backend/app/agents/multi_agent_chat/middleware/main_agent/checkpointed_subagent_middleware/task_tool.py @@ -20,6 +20,7 @@ from langgraph.types import Command from .config import ( consume_surfsense_resume, + drain_parent_null_resume, has_surfsense_resume, subagent_invoke_config, ) @@ -157,6 +158,9 @@ def build_task_tool_with_parent_config( ) expected = hitlrequest_action_count(pending_value) resume_value = fan_out_decisions_to_match(resume_value, expected) + # Prevent the parent's resume payload from leaking into subagent + # interrupts via langgraph's parent_scratchpad fallback. + drain_parent_null_resume(runtime) result = subagent.invoke( build_resume_command(resume_value, pending_id), config=sub_config, @@ -221,6 +225,9 @@ def build_task_tool_with_parent_config( ) expected = hitlrequest_action_count(pending_value) resume_value = fan_out_decisions_to_match(resume_value, expected) + # Prevent the parent's resume payload from leaking into subagent + # interrupts via langgraph's parent_scratchpad fallback. + drain_parent_null_resume(runtime) result = await subagent.ainvoke( build_resume_command(resume_value, pending_id), config=sub_config, diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index 76f48bc92..9550eed05 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -64,7 +64,10 @@ import { documentsApiService } from "@/lib/apis/documents-api.service"; import { getBearerToken } from "@/lib/auth-utils"; import { type ChatFlow, classifyChatError } from "@/lib/chat/chat-error-classifier"; import { tagPreAcceptSendFailure, toHttpResponseError } from "@/lib/chat/chat-request-errors"; -import { convertToThreadMessage, filterSupersededAbortedMessages } from "@/lib/chat/message-utils"; +import { + convertToThreadMessage, + reconcileInterruptedAssistantMessages, +} from "@/lib/chat/message-utils"; import { isPodcastGenerating, looksLikePodcastRequest, @@ -395,7 +398,7 @@ export default function NewChatPage() { const memberById = new Map(membersData?.map((m) => [m.user_id, m]) ?? []); const prevById = new Map(prev.map((m) => [m.id, m])); - return filterSupersededAbortedMessages(syncedMessages).map((msg) => { + return reconcileInterruptedAssistantMessages(syncedMessages).map((msg) => { const member = msg.author_id ? (memberById.get(msg.author_id) ?? null) : null; // Preserve existing author info if member lookup fails (e.g., cloned chats) @@ -622,9 +625,9 @@ export default function NewChatPage() { setCurrentThread(threadData); if (messagesResponse.messages && messagesResponse.messages.length > 0) { - const loadedMessages = filterSupersededAbortedMessages(messagesResponse.messages).map( - convertToThreadMessage - ); + const loadedMessages = reconcileInterruptedAssistantMessages( + messagesResponse.messages + ).map(convertToThreadMessage); setMessages(loadedMessages); for (const msg of messagesResponse.messages) { diff --git a/surfsense_web/features/chat-messages/timeline/build-timeline.ts b/surfsense_web/features/chat-messages/timeline/build-timeline.ts index 20ae6d596..d4365f211 100644 --- a/surfsense_web/features/chat-messages/timeline/build-timeline.ts +++ b/surfsense_web/features/chat-messages/timeline/build-timeline.ts @@ -81,8 +81,8 @@ interface ToolCallSlim { * During the live-resume window the in-memory message holds BOTH the * OLD interrupt-frame parts AND the freshly-streamed resume parts in * a new ``task`` scope. Without this filter we'd render both until - * the next reload (where ``filterSupersededAbortedMessages`` drops - * the OLD row upstream). + * the next reload (where ``reconcileInterruptedAssistantMessages`` + * folds the OLD row into the resume row upstream). * * A tool-call is "interrupt-affected" when it either carries * ``__interrupt__`` directly or sits in a span that contains one. An diff --git a/surfsense_web/lib/chat/message-utils.ts b/surfsense_web/lib/chat/message-utils.ts index 3267afd76..935dda539 100644 --- a/surfsense_web/lib/chat/message-utils.ts +++ b/surfsense_web/lib/chat/message-utils.ts @@ -1,7 +1,7 @@ import type { ThreadMessageLike } from "@assistant-ui/react"; import type { MessageRecord } from "./thread-persistence"; -/** Minimal shape used by ``filterSupersededAbortedMessages``. */ +/** Minimal shape used by the interrupt/resume reconciler. */ interface AbortableMessage { id: number; role: string; @@ -9,14 +9,28 @@ interface AbortableMessage { turn_id?: string | null; } +function isAssistant(msg: AbortableMessage): boolean { + return msg.role.toLowerCase() === "assistant"; +} + +/** True when the row carries at least one tool-call with ``state: "aborted"``. */ +function hasAbortedToolCall(msg: AbortableMessage): boolean { + if (!isAssistant(msg) || !Array.isArray(msg.content)) return false; + for (const part of msg.content) { + if (typeof part !== "object" || part === null) continue; + if ((part as { type?: string }).type !== "tool-call") continue; + if ((part as { state?: unknown }).state === "aborted") return true; + } + return false; +} + /** - * True when the row is a frozen interrupt frame: an assistant message - * whose tool-calls all carry ``state: "aborted"``. A single non-aborted - * tool-call disqualifies (defensive against future mixed states). + * True when EVERY tool-call on the row is aborted. The row is then a + * frozen interrupt frame with no salvageable activity — safe to drop + * outright. */ -function isAbortedAssistantMessage(msg: AbortableMessage): boolean { - if (msg.role.toLowerCase() !== "assistant") return false; - if (!Array.isArray(msg.content)) return false; +function isFullyAbortedAssistantMessage(msg: AbortableMessage): boolean { + if (!isAssistant(msg) || !Array.isArray(msg.content)) return false; let hasToolCalls = false; for (const part of msg.content) { if (typeof part !== "object" || part === null) continue; @@ -28,42 +42,140 @@ function isAbortedAssistantMessage(msg: AbortableMessage): boolean { } /** - * Positional supersede check: an aborted assistant row is superseded - * iff another assistant row appears later before any user row. - * - * NOT turn-id-based: ``stream_resume_chat`` allocates a fresh - * ``turn_id`` for the resumed row, so interrupt+resume rows never - * share a turn_id. Conversational adjacency is the reliable signal — - * an assistant→assistant pair without a user row between them is the - * unique signature of an interrupt+resume cycle. + * Locate the resume row that supersedes ``messages[idx]``. The + * ``stream_resume_chat`` flow allocates a fresh ``turn_id`` so we + * can't pair on it; conversational adjacency (assistant → assistant + * with no user row between) is the unique signature. Skips already- + * dropped indices so chained interrupt-resumes still pair cleanly. */ -function isSupersededByLaterAssistant( +function findResumeSuccessorIdx( messages: readonly T[], - idx: number -): boolean { + idx: number, + dropped: ReadonlySet +): number | null { for (let i = idx + 1; i < messages.length; i++) { + if (dropped.has(i)) continue; const role = messages[i].role.toLowerCase(); - if (role === "user") return false; - if (role === "assistant") return true; + if (role === "user") return null; + if (role === "assistant") return i; } - return false; + return null; +} + +/** Read ``data.steps`` from either ``data-thinking-steps`` (modern) or ``thinking-steps`` (legacy). */ +function extractStepsFromPart(part: unknown): unknown[] | null { + if (typeof part !== "object" || part === null) return null; + const p = part as { type?: unknown; data?: unknown; steps?: unknown }; + if (p.type === "data-thinking-steps") { + const data = p.data as { steps?: unknown } | undefined; + return Array.isArray(data?.steps) ? data.steps : []; + } + if (p.type === "thinking-steps") { + return Array.isArray(p.steps) ? (p.steps as unknown[]) : []; + } + return null; +} + +/** Split a content array into (combined steps, all other parts in order). */ +function partitionContent(content: unknown): { steps: unknown[]; others: unknown[] } { + if (!Array.isArray(content)) return { steps: [], others: [] }; + const steps: unknown[] = []; + const others: unknown[] = []; + for (const part of content) { + const partSteps = extractStepsFromPart(part); + if (partSteps !== null) { + steps.push(...partSteps); + continue; + } + others.push(part); + } + return { steps, others }; } /** - * Drop frozen interrupt-frame rows once they have a resumed - * continuation. Pure (returns a new array). Caller passes messages in - * chronological order. + * Fold an interrupt-frame row's content into its resume successor so + * the user sees one assistant turn instead of two stacked bubbles. + * Successor's metadata wins (id, created_at, turn_id, token_usage, + * author) — that's the row the per-turn revert button keys to. * - * Never-resumed aborts are preserved (user navigated away mid-decision) - * so the user still sees what happened. + * Order: combined ``data-thinking-steps`` (older steps then newer) at + * index 0, followed by older's other parts in order, then newer's. The + * older row's aborted ``task`` wrapper is preserved so the rejected + * attempt remains visible alongside the successful retry; both spans + * survive and ``groupItems`` renders them as sibling task branches in + * one timeline. */ -export function filterSupersededAbortedMessages( +function mergeInterruptedIntoResume(older: T, newer: T): T { + const olderParts = partitionContent(older.content); + const newerParts = partitionContent(newer.content); + + const mergedSteps = [...olderParts.steps, ...newerParts.steps]; + const mergedContent: unknown[] = []; + if (mergedSteps.length > 0) { + mergedContent.push({ type: "data-thinking-steps", data: { steps: mergedSteps } }); + } + mergedContent.push(...olderParts.others, ...newerParts.others); + + return { ...newer, content: mergedContent }; +} + +/** + * Reconcile interrupt-frame and resume rows so the UI shows one + * assistant turn per user turn even when the backend persists them as + * separate ``new_chat_messages`` rows. + * + * Two cases, both keyed on conversational adjacency (assistant → + * assistant with no user row between): + * + * - **Fully aborted older row** (every tool-call ``state: "aborted"``, + * no salvageable activity) → drop the older row. + * - **Partially aborted older row** (mixed completed + aborted, e.g. + * inner subagent tools ran before the interrupt) → fold its content + * into the successor. Successor metadata wins. + * + * Never-resumed aborts (user navigated away mid-decision) survive so + * the user still sees what happened. + * + * Pure: returns a new array with new merged objects when needed. + * Caller passes messages in chronological order. + */ +export function reconcileInterruptedAssistantMessages( messages: readonly T[] ): T[] { - return messages.filter((msg, idx) => { - if (!isAbortedAssistantMessage(msg)) return true; - return !isSupersededByLaterAssistant(messages, idx); - }); + const dropped = new Set(); + const mergeInto = new Map(); + + for (let i = 0; i < messages.length; i++) { + if (dropped.has(i)) continue; + const msg = messages[i]; + if (!hasAbortedToolCall(msg)) continue; + + const successorIdx = findResumeSuccessorIdx(messages, i, dropped); + if (successorIdx === null) continue; + + dropped.add(i); + if (!isFullyAbortedAssistantMessage(msg)) { + const list = mergeInto.get(successorIdx) ?? []; + list.push(i); + mergeInto.set(successorIdx, list); + } + } + + const result: T[] = []; + for (let i = 0; i < messages.length; i++) { + if (dropped.has(i)) continue; + const olderIdxs = mergeInto.get(i); + if (olderIdxs && olderIdxs.length > 0) { + let merged = messages[i]; + for (const olderIdx of olderIdxs) { + merged = mergeInterruptedIntoResume(messages[olderIdx], merged); + } + result.push(merged); + continue; + } + result.push(messages[i]); + } + return result; } /** diff --git a/surfsense_web/lib/chat/streaming-state.ts b/surfsense_web/lib/chat/streaming-state.ts index e3bfdcaea..1d057ef94 100644 --- a/surfsense_web/lib/chat/streaming-state.ts +++ b/surfsense_web/lib/chat/streaming-state.ts @@ -79,8 +79,8 @@ export interface ContentPartsState { * the resume stream's first ``start-step`` fires * ``addStepSeparator`` while rehydrated OLD content already makes * ``hasContent`` true → a divider lands between OLD and NEW - * content with no semantic value (OLD content is filtered by - * ``buildTimeline`` + ``filterSupersededAbortedMessages``, + * content with no semantic value (OLD content is folded by + * ``buildTimeline`` + ``reconcileInterruptedAssistantMessages``, * persisted state carries no separator, so the line vanishes on * reload). */