mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-12 09:12:40 +02:00
chat: fix mixed-decision HITL crash and fold resumed assistant messages into the interrupted bubble.
This commit is contained in:
parent
2e132513be
commit
932bf22a34
6 changed files with 208 additions and 40 deletions
|
|
@ -6,12 +6,19 @@ exposes the side-channel ``stream_resume_chat`` uses to ferry resume payloads.
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from langchain.tools import ToolRuntime
|
||||
|
||||
from .constants import DEFAULT_SUBAGENT_RECURSION_LIMIT
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# langgraph stores the parent task's scratchpad under this configurable key;
|
||||
# subagents inherit the chain via ``parent_scratchpad`` fallback.
|
||||
_LANGGRAPH_SCRATCHPAD_KEY = "__pregel_scratchpad"
|
||||
|
||||
|
||||
def subagent_invoke_config(runtime: ToolRuntime) -> dict[str, Any]:
|
||||
"""RunnableConfig for the nested invoke; raises ``recursion_limit`` to the parent's budget."""
|
||||
|
|
@ -42,3 +49,42 @@ def has_surfsense_resume(runtime: ToolRuntime) -> bool:
|
|||
if not isinstance(configurable, dict):
|
||||
return False
|
||||
return "surfsense_resume_value" in configurable
|
||||
|
||||
|
||||
def drain_parent_null_resume(runtime: ToolRuntime) -> None:
|
||||
"""Consume the parent's lingering ``NULL_TASK_ID/RESUME`` write before delegating.
|
||||
|
||||
``stream_resume_chat`` wakes the main agent with
|
||||
``Command(resume={"decisions": [...]})`` so the propagated
|
||||
``_lg_interrupt(...)`` can return. langgraph stores that payload as the
|
||||
parent task's ``null_resume`` pending write, which only gets consumed
|
||||
*after* ``subagent.[a]invoke`` returns (when the post-call propagation
|
||||
re-fires). While the subagent is mid-execution, any *new* ``interrupt()``
|
||||
inside it (e.g. a follow-up tool call after a mixed approve/reject) walks
|
||||
``subagent_scratchpad → parent_scratchpad.get_null_resume`` and picks up
|
||||
the parent's still-live decisions — mismatching against a different number
|
||||
of hanging tool calls and crashing ``HumanInTheLoopMiddleware``.
|
||||
|
||||
Draining the write here closes that cross-graph leak so subagent
|
||||
interrupts pause cleanly and re-propagate as a fresh approval card.
|
||||
"""
|
||||
cfg = runtime.config or {}
|
||||
configurable = cfg.get("configurable") if isinstance(cfg, dict) else None
|
||||
if not isinstance(configurable, dict):
|
||||
return
|
||||
scratchpad = configurable.get(_LANGGRAPH_SCRATCHPAD_KEY)
|
||||
if scratchpad is None:
|
||||
return
|
||||
consume = getattr(scratchpad, "get_null_resume", None)
|
||||
if not callable(consume):
|
||||
return
|
||||
try:
|
||||
consume(True)
|
||||
except Exception:
|
||||
# Defensive: if langgraph's internal scratchpad shape changes we don't
|
||||
# want to break the resume path. Worst case the original ValueError
|
||||
# still surfaces — same behavior as before this fix.
|
||||
logger.debug(
|
||||
"drain_parent_null_resume: scratchpad.get_null_resume raised",
|
||||
exc_info=True,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ from langgraph.types import Command
|
|||
|
||||
from .config import (
|
||||
consume_surfsense_resume,
|
||||
drain_parent_null_resume,
|
||||
has_surfsense_resume,
|
||||
subagent_invoke_config,
|
||||
)
|
||||
|
|
@ -157,6 +158,9 @@ def build_task_tool_with_parent_config(
|
|||
)
|
||||
expected = hitlrequest_action_count(pending_value)
|
||||
resume_value = fan_out_decisions_to_match(resume_value, expected)
|
||||
# Prevent the parent's resume payload from leaking into subagent
|
||||
# interrupts via langgraph's parent_scratchpad fallback.
|
||||
drain_parent_null_resume(runtime)
|
||||
result = subagent.invoke(
|
||||
build_resume_command(resume_value, pending_id),
|
||||
config=sub_config,
|
||||
|
|
@ -221,6 +225,9 @@ def build_task_tool_with_parent_config(
|
|||
)
|
||||
expected = hitlrequest_action_count(pending_value)
|
||||
resume_value = fan_out_decisions_to_match(resume_value, expected)
|
||||
# Prevent the parent's resume payload from leaking into subagent
|
||||
# interrupts via langgraph's parent_scratchpad fallback.
|
||||
drain_parent_null_resume(runtime)
|
||||
result = await subagent.ainvoke(
|
||||
build_resume_command(resume_value, pending_id),
|
||||
config=sub_config,
|
||||
|
|
|
|||
|
|
@ -64,7 +64,10 @@ import { documentsApiService } from "@/lib/apis/documents-api.service";
|
|||
import { getBearerToken } from "@/lib/auth-utils";
|
||||
import { type ChatFlow, classifyChatError } from "@/lib/chat/chat-error-classifier";
|
||||
import { tagPreAcceptSendFailure, toHttpResponseError } from "@/lib/chat/chat-request-errors";
|
||||
import { convertToThreadMessage, filterSupersededAbortedMessages } from "@/lib/chat/message-utils";
|
||||
import {
|
||||
convertToThreadMessage,
|
||||
reconcileInterruptedAssistantMessages,
|
||||
} from "@/lib/chat/message-utils";
|
||||
import {
|
||||
isPodcastGenerating,
|
||||
looksLikePodcastRequest,
|
||||
|
|
@ -395,7 +398,7 @@ export default function NewChatPage() {
|
|||
const memberById = new Map(membersData?.map((m) => [m.user_id, m]) ?? []);
|
||||
const prevById = new Map(prev.map((m) => [m.id, m]));
|
||||
|
||||
return filterSupersededAbortedMessages(syncedMessages).map((msg) => {
|
||||
return reconcileInterruptedAssistantMessages(syncedMessages).map((msg) => {
|
||||
const member = msg.author_id ? (memberById.get(msg.author_id) ?? null) : null;
|
||||
|
||||
// Preserve existing author info if member lookup fails (e.g., cloned chats)
|
||||
|
|
@ -622,9 +625,9 @@ export default function NewChatPage() {
|
|||
setCurrentThread(threadData);
|
||||
|
||||
if (messagesResponse.messages && messagesResponse.messages.length > 0) {
|
||||
const loadedMessages = filterSupersededAbortedMessages(messagesResponse.messages).map(
|
||||
convertToThreadMessage
|
||||
);
|
||||
const loadedMessages = reconcileInterruptedAssistantMessages(
|
||||
messagesResponse.messages
|
||||
).map(convertToThreadMessage);
|
||||
setMessages(loadedMessages);
|
||||
|
||||
for (const msg of messagesResponse.messages) {
|
||||
|
|
|
|||
|
|
@ -81,8 +81,8 @@ interface ToolCallSlim {
|
|||
* During the live-resume window the in-memory message holds BOTH the
|
||||
* OLD interrupt-frame parts AND the freshly-streamed resume parts in
|
||||
* a new ``task`` scope. Without this filter we'd render both until
|
||||
* the next reload (where ``filterSupersededAbortedMessages`` drops
|
||||
* the OLD row upstream).
|
||||
* the next reload (where ``reconcileInterruptedAssistantMessages``
|
||||
* folds the OLD row into the resume row upstream).
|
||||
*
|
||||
* A tool-call is "interrupt-affected" when it either carries
|
||||
* ``__interrupt__`` directly or sits in a span that contains one. An
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import type { ThreadMessageLike } from "@assistant-ui/react";
|
||||
import type { MessageRecord } from "./thread-persistence";
|
||||
|
||||
/** Minimal shape used by ``filterSupersededAbortedMessages``. */
|
||||
/** Minimal shape used by the interrupt/resume reconciler. */
|
||||
interface AbortableMessage {
|
||||
id: number;
|
||||
role: string;
|
||||
|
|
@ -9,14 +9,28 @@ interface AbortableMessage {
|
|||
turn_id?: string | null;
|
||||
}
|
||||
|
||||
function isAssistant(msg: AbortableMessage): boolean {
|
||||
return msg.role.toLowerCase() === "assistant";
|
||||
}
|
||||
|
||||
/** True when the row carries at least one tool-call with ``state: "aborted"``. */
|
||||
function hasAbortedToolCall(msg: AbortableMessage): boolean {
|
||||
if (!isAssistant(msg) || !Array.isArray(msg.content)) return false;
|
||||
for (const part of msg.content) {
|
||||
if (typeof part !== "object" || part === null) continue;
|
||||
if ((part as { type?: string }).type !== "tool-call") continue;
|
||||
if ((part as { state?: unknown }).state === "aborted") return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* True when the row is a frozen interrupt frame: an assistant message
|
||||
* whose tool-calls all carry ``state: "aborted"``. A single non-aborted
|
||||
* tool-call disqualifies (defensive against future mixed states).
|
||||
* True when EVERY tool-call on the row is aborted. The row is then a
|
||||
* frozen interrupt frame with no salvageable activity — safe to drop
|
||||
* outright.
|
||||
*/
|
||||
function isAbortedAssistantMessage(msg: AbortableMessage): boolean {
|
||||
if (msg.role.toLowerCase() !== "assistant") return false;
|
||||
if (!Array.isArray(msg.content)) return false;
|
||||
function isFullyAbortedAssistantMessage(msg: AbortableMessage): boolean {
|
||||
if (!isAssistant(msg) || !Array.isArray(msg.content)) return false;
|
||||
let hasToolCalls = false;
|
||||
for (const part of msg.content) {
|
||||
if (typeof part !== "object" || part === null) continue;
|
||||
|
|
@ -28,42 +42,140 @@ function isAbortedAssistantMessage(msg: AbortableMessage): boolean {
|
|||
}
|
||||
|
||||
/**
|
||||
* Positional supersede check: an aborted assistant row is superseded
|
||||
* iff another assistant row appears later before any user row.
|
||||
*
|
||||
* NOT turn-id-based: ``stream_resume_chat`` allocates a fresh
|
||||
* ``turn_id`` for the resumed row, so interrupt+resume rows never
|
||||
* share a turn_id. Conversational adjacency is the reliable signal —
|
||||
* an assistant→assistant pair without a user row between them is the
|
||||
* unique signature of an interrupt+resume cycle.
|
||||
* Locate the resume row that supersedes ``messages[idx]``. The
|
||||
* ``stream_resume_chat`` flow allocates a fresh ``turn_id`` so we
|
||||
* can't pair on it; conversational adjacency (assistant → assistant
|
||||
* with no user row between) is the unique signature. Skips already-
|
||||
* dropped indices so chained interrupt-resumes still pair cleanly.
|
||||
*/
|
||||
function isSupersededByLaterAssistant<T extends AbortableMessage>(
|
||||
function findResumeSuccessorIdx<T extends AbortableMessage>(
|
||||
messages: readonly T[],
|
||||
idx: number
|
||||
): boolean {
|
||||
idx: number,
|
||||
dropped: ReadonlySet<number>
|
||||
): number | null {
|
||||
for (let i = idx + 1; i < messages.length; i++) {
|
||||
if (dropped.has(i)) continue;
|
||||
const role = messages[i].role.toLowerCase();
|
||||
if (role === "user") return false;
|
||||
if (role === "assistant") return true;
|
||||
if (role === "user") return null;
|
||||
if (role === "assistant") return i;
|
||||
}
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Read ``data.steps`` from either ``data-thinking-steps`` (modern) or ``thinking-steps`` (legacy). */
|
||||
function extractStepsFromPart(part: unknown): unknown[] | null {
|
||||
if (typeof part !== "object" || part === null) return null;
|
||||
const p = part as { type?: unknown; data?: unknown; steps?: unknown };
|
||||
if (p.type === "data-thinking-steps") {
|
||||
const data = p.data as { steps?: unknown } | undefined;
|
||||
return Array.isArray(data?.steps) ? data.steps : [];
|
||||
}
|
||||
if (p.type === "thinking-steps") {
|
||||
return Array.isArray(p.steps) ? (p.steps as unknown[]) : [];
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Split a content array into (combined steps, all other parts in order). */
|
||||
function partitionContent(content: unknown): { steps: unknown[]; others: unknown[] } {
|
||||
if (!Array.isArray(content)) return { steps: [], others: [] };
|
||||
const steps: unknown[] = [];
|
||||
const others: unknown[] = [];
|
||||
for (const part of content) {
|
||||
const partSteps = extractStepsFromPart(part);
|
||||
if (partSteps !== null) {
|
||||
steps.push(...partSteps);
|
||||
continue;
|
||||
}
|
||||
others.push(part);
|
||||
}
|
||||
return { steps, others };
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop frozen interrupt-frame rows once they have a resumed
|
||||
* continuation. Pure (returns a new array). Caller passes messages in
|
||||
* chronological order.
|
||||
* Fold an interrupt-frame row's content into its resume successor so
|
||||
* the user sees one assistant turn instead of two stacked bubbles.
|
||||
* Successor's metadata wins (id, created_at, turn_id, token_usage,
|
||||
* author) — that's the row the per-turn revert button keys to.
|
||||
*
|
||||
* Never-resumed aborts are preserved (user navigated away mid-decision)
|
||||
* so the user still sees what happened.
|
||||
* Order: combined ``data-thinking-steps`` (older steps then newer) at
|
||||
* index 0, followed by older's other parts in order, then newer's. The
|
||||
* older row's aborted ``task`` wrapper is preserved so the rejected
|
||||
* attempt remains visible alongside the successful retry; both spans
|
||||
* survive and ``groupItems`` renders them as sibling task branches in
|
||||
* one timeline.
|
||||
*/
|
||||
export function filterSupersededAbortedMessages<T extends AbortableMessage>(
|
||||
function mergeInterruptedIntoResume<T extends AbortableMessage>(older: T, newer: T): T {
|
||||
const olderParts = partitionContent(older.content);
|
||||
const newerParts = partitionContent(newer.content);
|
||||
|
||||
const mergedSteps = [...olderParts.steps, ...newerParts.steps];
|
||||
const mergedContent: unknown[] = [];
|
||||
if (mergedSteps.length > 0) {
|
||||
mergedContent.push({ type: "data-thinking-steps", data: { steps: mergedSteps } });
|
||||
}
|
||||
mergedContent.push(...olderParts.others, ...newerParts.others);
|
||||
|
||||
return { ...newer, content: mergedContent };
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile interrupt-frame and resume rows so the UI shows one
|
||||
* assistant turn per user turn even when the backend persists them as
|
||||
* separate ``new_chat_messages`` rows.
|
||||
*
|
||||
* Two cases, both keyed on conversational adjacency (assistant →
|
||||
* assistant with no user row between):
|
||||
*
|
||||
* - **Fully aborted older row** (every tool-call ``state: "aborted"``,
|
||||
* no salvageable activity) → drop the older row.
|
||||
* - **Partially aborted older row** (mixed completed + aborted, e.g.
|
||||
* inner subagent tools ran before the interrupt) → fold its content
|
||||
* into the successor. Successor metadata wins.
|
||||
*
|
||||
* Never-resumed aborts (user navigated away mid-decision) survive so
|
||||
* the user still sees what happened.
|
||||
*
|
||||
* Pure: returns a new array with new merged objects when needed.
|
||||
* Caller passes messages in chronological order.
|
||||
*/
|
||||
export function reconcileInterruptedAssistantMessages<T extends AbortableMessage>(
|
||||
messages: readonly T[]
|
||||
): T[] {
|
||||
return messages.filter((msg, idx) => {
|
||||
if (!isAbortedAssistantMessage(msg)) return true;
|
||||
return !isSupersededByLaterAssistant(messages, idx);
|
||||
});
|
||||
const dropped = new Set<number>();
|
||||
const mergeInto = new Map<number, number[]>();
|
||||
|
||||
for (let i = 0; i < messages.length; i++) {
|
||||
if (dropped.has(i)) continue;
|
||||
const msg = messages[i];
|
||||
if (!hasAbortedToolCall(msg)) continue;
|
||||
|
||||
const successorIdx = findResumeSuccessorIdx(messages, i, dropped);
|
||||
if (successorIdx === null) continue;
|
||||
|
||||
dropped.add(i);
|
||||
if (!isFullyAbortedAssistantMessage(msg)) {
|
||||
const list = mergeInto.get(successorIdx) ?? [];
|
||||
list.push(i);
|
||||
mergeInto.set(successorIdx, list);
|
||||
}
|
||||
}
|
||||
|
||||
const result: T[] = [];
|
||||
for (let i = 0; i < messages.length; i++) {
|
||||
if (dropped.has(i)) continue;
|
||||
const olderIdxs = mergeInto.get(i);
|
||||
if (olderIdxs && olderIdxs.length > 0) {
|
||||
let merged = messages[i];
|
||||
for (const olderIdx of olderIdxs) {
|
||||
merged = mergeInterruptedIntoResume(messages[olderIdx], merged);
|
||||
}
|
||||
result.push(merged);
|
||||
continue;
|
||||
}
|
||||
result.push(messages[i]);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -79,8 +79,8 @@ export interface ContentPartsState {
|
|||
* the resume stream's first ``start-step`` fires
|
||||
* ``addStepSeparator`` while rehydrated OLD content already makes
|
||||
* ``hasContent`` true → a divider lands between OLD and NEW
|
||||
* content with no semantic value (OLD content is filtered by
|
||||
* ``buildTimeline`` + ``filterSupersededAbortedMessages``,
|
||||
* content with no semantic value (OLD content is folded by
|
||||
* ``buildTimeline`` + ``reconcileInterruptedAssistantMessages``,
|
||||
* persisted state carries no separator, so the line vanishes on
|
||||
* reload).
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue