diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index c431ab304..50e2ffa7f 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -49,7 +49,11 @@ import { type TokenUsageData, TokenUsageProvider, } from "@/components/assistant-ui/token-usage-context"; -import { type HitlDecision, PendingInterruptProvider } from "@/features/chat-messages/hitl"; +import { + type HitlDecision, + PendingInterruptProvider, + type PendingInterruptState, +} from "@/features/chat-messages/hitl"; import { TimelineDataUI } from "@/features/chat-messages/timeline"; import { applyActionLogSse, @@ -208,7 +212,10 @@ const MentionedDocumentInfoSchema = z.object({ id: z.number(), title: z.string(), document_type: z.string(), - kind: z.union([z.literal("doc"), z.literal("folder")]).optional().default("doc"), + kind: z + .union([z.literal("doc"), z.literal("folder")]) + .optional() + .default("doc"), }); const MentionedDocumentsPartSchema = z.object({ @@ -269,12 +276,16 @@ export default function NewChatPage() { const [tokenUsageStore] = useState(() => createTokenUsageStore()); const abortControllerRef = useRef(null); const recentCancelRequestedAtRef = useRef(0); - const [pendingInterrupt, setPendingInterrupt] = useState<{ - threadId: number; - assistantMsgId: string; - interruptData: Record; - bundleToolCallIds: string[]; - } | null>(null); + // One entry per paused subagent, in receipt order (which matches the + // backend's ``state.interrupts`` traversal — and therefore the order + // ``slice_decisions_by_tool_call`` consumes on resume). Cleared on submit + // or on a fresh user turn. + const [pendingInterrupts, setPendingInterrupts] = useState([]); + // Per-card staged decisions held until every pending card has submitted, + // at which point we batch them into one ``hitl-decision`` event in the + // same order as ``pendingInterrupts``. Using a ref because partial + // progress should not re-render the page. + const stagedDecisionsByInterruptIdRef = useRef>(new Map()); const toolsWithUI = TOOLS_WITH_UI_ALL; const setMessageDocumentsMap = useSetAtom(messageDocumentsMapAtom); @@ -1029,9 +1040,7 @@ export default function NewChatPage() { mentioned_surfsense_doc_ids: hasSurfsenseDocIds ? mentionedDocumentIds.surfsense_doc_ids : undefined, - mentioned_folder_ids: hasFolderIds - ? mentionedDocumentIds.folder_ids - : undefined, + mentioned_folder_ids: hasFolderIds ? mentionedDocumentIds.folder_ids : undefined, // Full mention metadata (docs + folders, with // ``kind`` discriminator) so the BE can embed a // ``mentioned-documents`` ContentPart on the @@ -1193,12 +1202,24 @@ export default function NewChatPage() { ) ); if (currentThreadId) { - setPendingInterrupt({ - threadId: currentThreadId, - assistantMsgId, - interruptData, - bundleToolCallIds, - }); + // ``tool_call_id`` is stamped on the backend by + // ``checkpointed_subagent_middleware``. Without it we + // can't address the paused subagent on resume — skip + // rather than fabricate a synthetic key. + const interruptId = String(interruptData.tool_call_id ?? ""); + if (interruptId) { + const incoming: PendingInterruptState = { + interruptId, + threadId: currentThreadId, + assistantMsgId, + interruptData, + bundleToolCallIds, + }; + setPendingInterrupts((prev) => { + const without = prev.filter((p) => p.interruptId !== interruptId); + return [...without, incoming]; + }); + } } break; } @@ -1274,7 +1295,7 @@ export default function NewChatPage() { // by ``persist_assistant_shell``. Rename the optimistic // id, migrate ``tokenUsageStore`` so any pending // ``data-token-usage`` payload binds to the new id, - // remap any in-flight ``pendingInterrupt`` reference, + // remap any in-flight ``pendingInterrupts`` entries, // and reassign the closure variable so the in-stream // flush callback (line ~1074) keeps writing to the // renamed message. @@ -1290,10 +1311,12 @@ export default function NewChatPage() { : m ) ); - setPendingInterrupt((prev) => - prev && prev.assistantMsgId === oldAssistantMsgId - ? { ...prev, assistantMsgId: newAssistantMsgId } - : prev + setPendingInterrupts((prev) => + prev.map((p) => + p.assistantMsgId === oldAssistantMsgId + ? { ...p, assistantMsgId: newAssistantMsgId } + : p + ) ); assistantMsgId = newAssistantMsgId; break; @@ -1380,14 +1403,23 @@ export default function NewChatPage() { edited_action?: { name: string; args: Record }; }> ) => { - if (!pendingInterrupt) return; - const { threadId: resumeThreadId } = pendingInterrupt; + if (pendingInterrupts.length === 0) return; + // All cards in this turn share the same threadId/assistantMsgId + // (they're siblings of one parent agent step), so reading from + // the first entry is safe. + const resumeThreadId = pendingInterrupts[0].threadId; // Destructured separately as ``let`` so the SSE // ``data-assistant-message-id`` handler (resume always // allocates a fresh server-side row) can rename it to // the canonical ``msg-{db_id}`` mid-stream. - let assistantMsgId = pendingInterrupt.assistantMsgId; - setPendingInterrupt(null); + let assistantMsgId = pendingInterrupts[0].assistantMsgId; + // Concatenate every card's tool-call ids in pendingInterrupts order; + // this matches the ``decisions`` ordering produced by + // ``handleApprovalSubmit`` and the backend slicer's traversal of + // ``state.interrupts``. + const allBundleToolCallIds = pendingInterrupts.flatMap((p) => p.bundleToolCallIds); + setPendingInterrupts([]); + stagedDecisionsByInterruptIdRef.current.clear(); setIsRunning(true); const token = getBearerToken(); @@ -1464,7 +1496,7 @@ export default function NewChatPage() { // collapse onto ``decisions[0]``. Cards outside the bundle are // untouched. Mirrors the host ``hitl-decision`` handler. const decisionByTcId = new Map(); - const tcIds = pendingInterrupt.bundleToolCallIds; + const tcIds = allBundleToolCallIds; if (decisions.length === tcIds.length) { for (let i = 0; i < tcIds.length; i++) decisionByTcId.set(tcIds[i], decisions[i]); } @@ -1596,12 +1628,22 @@ export default function NewChatPage() { : m ) ); - setPendingInterrupt({ - threadId: resumeThreadId, - assistantMsgId, - interruptData, - bundleToolCallIds, - }); + { + const interruptId = String(interruptData.tool_call_id ?? ""); + if (interruptId) { + const incoming: PendingInterruptState = { + interruptId, + threadId: resumeThreadId, + assistantMsgId, + interruptData, + bundleToolCallIds, + }; + setPendingInterrupts((prev) => { + const without = prev.filter((p) => p.interruptId !== interruptId); + return [...without, incoming]; + }); + } + } break; } @@ -1679,7 +1721,7 @@ export default function NewChatPage() { } }, [ - pendingInterrupt, + pendingInterrupts, messages, searchSpaceId, localFilesystemEnabled, @@ -1700,17 +1742,19 @@ export default function NewChatPage() { edited_action?: { name: string; args: Record }; }>; }; - if (!detail?.decisions || !pendingInterrupt) return; + if (!detail?.decisions || pendingInterrupts.length === 0) return; const incoming = detail.decisions; if (incoming.length === 0) return; - const tcIds = pendingInterrupt.bundleToolCallIds; + // Concatenated tool-call ids across every pending card, in the + // order ``handleApprovalSubmit`` produced ``incoming``. + const tcIds = pendingInterrupts.flatMap((p) => p.bundleToolCallIds); const N = tcIds.length; - // Bundles must submit exactly one decision per action_request. - // Refuse rather than silently broadcast a single decision across - // the bundle (would mis-apply rejects/edits and diverge from - // what handleResume sends to /resume). - if (N > 1 && incoming.length !== N) { + // Refuse rather than silently broadcast or drop. The orchestrator + // only fires ``hitl-decision`` once every pending card has + // submitted, so a count mismatch indicates a contract drift + // (and would later make the backend slicer raise). + if (incoming.length !== N) { toast.error( `Cannot resume: ${incoming.length} decision(s) submitted for ${N} pending actions.` ); @@ -1721,9 +1765,12 @@ export default function NewChatPage() { for (let i = 0; i < tcIds.length; i++) byTcId.set(tcIds[i], incoming[i]); const submittedDecisions = tcIds.map((id) => byTcId.get(id)!); + // All pending cards belong to the same assistant message, so a + // single content-update pass suffices. + const targetAssistantMsgId = pendingInterrupts[0].assistantMsgId; setMessages((prev) => prev.map((m) => { - if (m.id !== pendingInterrupt.assistantMsgId) return m; + if (m.id !== targetAssistantMsgId) return m; const parts = m.content as unknown as Array>; const newContent = parts.map((part) => { const tcId = part.toolCallId as string | undefined; @@ -1760,7 +1807,7 @@ export default function NewChatPage() { }; window.addEventListener("hitl-decision", handler); return () => window.removeEventListener("hitl-decision", handler); - }, [handleResume, pendingInterrupt]); + }, [handleResume, pendingInterrupts]); // Convert message (pass through since already in correct format) const convertMessage = useCallback( @@ -1900,12 +1947,10 @@ export default function NewChatPage() { filesystem_mode: selection.filesystem_mode, client_platform: selection.client_platform, local_filesystem_mounts: selection.local_filesystem_mounts, - mentioned_document_ids: - regenerateDocIds.length > 0 ? regenerateDocIds : undefined, + mentioned_document_ids: regenerateDocIds.length > 0 ? regenerateDocIds : undefined, mentioned_surfsense_doc_ids: regenerateSurfsenseDocIds.length > 0 ? regenerateSurfsenseDocIds : undefined, - mentioned_folder_ids: - regenerateFolderIds.length > 0 ? regenerateFolderIds : undefined, + mentioned_folder_ids: regenerateFolderIds.length > 0 ? regenerateFolderIds : undefined, // Full mention metadata for the regenerate-specific // source list. Only meaningful for edit (the BE only // re-persists a user row when ``user_query`` is set); @@ -2284,11 +2329,32 @@ export default function NewChatPage() { [handleRegenerate, messages, agentActionItems] ); - const handleApprovalSubmit = useCallback((orderedDecisions: HitlDecision[]) => { - window.dispatchEvent( - new CustomEvent("hitl-decision", { detail: { decisions: orderedDecisions } }) - ); - }, []); + const handleApprovalSubmit = useCallback( + (interruptId: string, decisions: HitlDecision[]) => { + // Stage this card's decisions; only fire the resume once every + // pending card in the current turn has submitted, so the + // backend slicer sees a single concatenated decisions list + // whose total matches the parent state's pending action count. + stagedDecisionsByInterruptIdRef.current.set(interruptId, decisions); + if (stagedDecisionsByInterruptIdRef.current.size < pendingInterrupts.length) { + return; + } + const ordered: HitlDecision[] = []; + for (const pi of pendingInterrupts) { + const staged = stagedDecisionsByInterruptIdRef.current.get(pi.interruptId); + if (!staged) { + // Defensive: a missing entry means the staging map and + // the pending list disagreed for one cycle. Bail rather + // than dispatch a count-mismatched batch. + return; + } + ordered.push(...staged); + } + stagedDecisionsByInterruptIdRef.current.clear(); + window.dispatchEvent(new CustomEvent("hitl-decision", { detail: { decisions: ordered } })); + }, + [pendingInterrupts] + ); const handleEditDialogChoice = useCallback( async (choice: EditMessageDialogChoice) => { @@ -2361,7 +2427,7 @@ export default function NewChatPage() {
diff --git a/surfsense_web/features/chat-messages/hitl/approval/pending-interrupt-context.tsx b/surfsense_web/features/chat-messages/hitl/approval/pending-interrupt-context.tsx index 2c193d952..9e4153ff5 100644 --- a/surfsense_web/features/chat-messages/hitl/approval/pending-interrupt-context.tsx +++ b/surfsense_web/features/chat-messages/hitl/approval/pending-interrupt-context.tsx @@ -3,8 +3,10 @@ import { createContext, type ReactNode, useContext } from "react"; import type { HitlDecision } from "../types"; -/** Snapshot of one in-flight HITL interrupt; ``null`` when nothing is pending. */ +/** One in-flight HITL interrupt (one paused subagent). */ export interface PendingInterruptState { + /** Stable id keyed by the parent ``tool_call_id`` stamped on the interrupt. */ + interruptId: string; threadId: number; assistantMsgId: string; interruptData: Record; @@ -12,8 +14,19 @@ export interface PendingInterruptState { } export interface PendingInterruptValue { - pendingInterrupt: PendingInterruptState | null; - onSubmit: (decisions: HitlDecision[]) => void; + /** + * Every paused subagent for the current turn, in the order the SSE stream + * delivered them — which matches ``state.interrupts`` traversal on the + * backend, which is the order ``slice_decisions_by_tool_call`` consumes. + */ + pendingInterrupts: PendingInterruptState[]; + /** + * Stage one card's decisions. The orchestrator (page-level) batches across + * cards and dispatches the resume only once every pending interrupt has + * submitted, so the backend slicer sees a single concatenated decisions + * list whose total matches the parent state's pending action count. + */ + onSubmit: (interruptId: string, decisions: HitlDecision[]) => void; } const PendingInterruptContext = createContext(null); @@ -24,16 +37,16 @@ const PendingInterruptContext = createContext(null * page root. */ export function PendingInterruptProvider({ - pendingInterrupt, + pendingInterrupts, onSubmit, children, }: { - pendingInterrupt: PendingInterruptState | null; - onSubmit: (decisions: HitlDecision[]) => void; + pendingInterrupts: PendingInterruptState[]; + onSubmit: (interruptId: string, decisions: HitlDecision[]) => void; children: ReactNode; }) { return ( - + {children} ); diff --git a/surfsense_web/features/chat-messages/timeline/data-renderer.tsx b/surfsense_web/features/chat-messages/timeline/data-renderer.tsx index 861e35fd2..fb3dda047 100644 --- a/surfsense_web/features/chat-messages/timeline/data-renderer.tsx +++ b/surfsense_web/features/chat-messages/timeline/data-renderer.tsx @@ -11,10 +11,9 @@ const noopSubmit = () => {}; /** * assistant-ui data UI for the ``thinking-steps`` data-part. * - * Re-scopes the global ``PendingInterruptProvider`` per message: the - * approval card only mounts under the assistant message that owns - * the interrupt (otherwise every message in scrollback would render - * its own card). + * Re-scopes the global ``PendingInterruptProvider`` per message: approval + * cards only mount under the assistant message that owns the interrupt + * (otherwise every message in scrollback would render its own cards). */ function TimelineDataRenderer({ data }: { name: string; data: unknown }) { const isThreadRunning = useAuiState(({ thread }) => thread.isRunning); @@ -23,10 +22,10 @@ function TimelineDataRenderer({ data }: { name: string; data: unknown }) { const content = useAuiState(({ message }) => message?.content); const messageId = useAuiState(({ message }) => message?.id); const pendingValue = usePendingInterrupt(); - const pendingForThisMessage = - pendingValue?.pendingInterrupt && pendingValue.pendingInterrupt.assistantMsgId === messageId - ? pendingValue.pendingInterrupt - : null; + const pendingForThisMessage = useMemo( + () => (pendingValue?.pendingInterrupts ?? []).filter((p) => p.assistantMsgId === messageId), + [pendingValue?.pendingInterrupts, messageId] + ); const onSubmit = pendingValue?.onSubmit ?? noopSubmit; const steps = useMemo( @@ -39,11 +38,11 @@ function TimelineDataRenderer({ data }: { name: string; data: unknown }) { [steps, content] ); - if (items.length === 0 && !pendingForThisMessage) return null; + if (items.length === 0 && pendingForThisMessage.length === 0) return null; return (
- +
diff --git a/surfsense_web/features/chat-messages/timeline/timeline.tsx b/surfsense_web/features/chat-messages/timeline/timeline.tsx index f51034733..31c86fb9c 100644 --- a/surfsense_web/features/chat-messages/timeline/timeline.tsx +++ b/surfsense_web/features/chat-messages/timeline/timeline.tsx @@ -32,9 +32,9 @@ export const Timeline: FC<{ isThreadRunning?: boolean; }> = ({ items, isThreadRunning = true }) => { const pendingValue = usePendingInterrupt(); - const pendingInterrupt = pendingValue?.pendingInterrupt ?? null; + const pendingInterrupts = pendingValue?.pendingInterrupts ?? []; const onSubmit = pendingValue?.onSubmit; - const hasPending = pendingInterrupt !== null; + const hasPending = pendingInterrupts.length > 0; // Apply the override here so downstream (grouping, headers, dots) // sees the corrected status without threading a callback. Keeps @@ -135,9 +135,15 @@ export const Timeline: FC<{ /> ); })} - {pendingInterrupt && onSubmit && ( -
- + {hasPending && onSubmit && ( +
+ {pendingInterrupts.map((pi) => ( + onSubmit(pi.interruptId, decisions)} + /> + ))}
)}
diff --git a/surfsense_web/lib/chat/streaming-state.ts b/surfsense_web/lib/chat/streaming-state.ts index 1d057ef94..d9fb2ac99 100644 --- a/surfsense_web/lib/chat/streaming-state.ts +++ b/surfsense_web/lib/chat/streaming-state.ts @@ -565,7 +565,7 @@ export type SSEEvent = * the assistant-side row of the current turn. The frontend * renames its optimistic ``msg-assistant-XXX`` placeholder * id, migrates the local ``tokenUsageStore`` and - * ``pendingInterrupt`` references, and binds the running + * ``pendingInterrupts`` entries, and binds the running * mutable ``assistantMsgId`` closure variable to the * canonical id for the rest of the stream. */