chat-messages: render and batch-submit multiple HITL approval cards

This commit is contained in:
CREDO23 2026-05-13 21:00:01 +02:00
parent 0fd87ccb7f
commit 1bb9f435e5
5 changed files with 160 additions and 76 deletions

View file

@ -49,7 +49,11 @@ import {
type TokenUsageData, type TokenUsageData,
TokenUsageProvider, TokenUsageProvider,
} from "@/components/assistant-ui/token-usage-context"; } from "@/components/assistant-ui/token-usage-context";
import { type HitlDecision, PendingInterruptProvider } from "@/features/chat-messages/hitl"; import {
type HitlDecision,
PendingInterruptProvider,
type PendingInterruptState,
} from "@/features/chat-messages/hitl";
import { TimelineDataUI } from "@/features/chat-messages/timeline"; import { TimelineDataUI } from "@/features/chat-messages/timeline";
import { import {
applyActionLogSse, applyActionLogSse,
@ -208,7 +212,10 @@ const MentionedDocumentInfoSchema = z.object({
id: z.number(), id: z.number(),
title: z.string(), title: z.string(),
document_type: z.string(), document_type: z.string(),
kind: z.union([z.literal("doc"), z.literal("folder")]).optional().default("doc"), kind: z
.union([z.literal("doc"), z.literal("folder")])
.optional()
.default("doc"),
}); });
const MentionedDocumentsPartSchema = z.object({ const MentionedDocumentsPartSchema = z.object({
@ -269,12 +276,16 @@ export default function NewChatPage() {
const [tokenUsageStore] = useState(() => createTokenUsageStore()); const [tokenUsageStore] = useState(() => createTokenUsageStore());
const abortControllerRef = useRef<AbortController | null>(null); const abortControllerRef = useRef<AbortController | null>(null);
const recentCancelRequestedAtRef = useRef(0); const recentCancelRequestedAtRef = useRef(0);
const [pendingInterrupt, setPendingInterrupt] = useState<{ // One entry per paused subagent, in receipt order (which matches the
threadId: number; // backend's ``state.interrupts`` traversal — and therefore the order
assistantMsgId: string; // ``slice_decisions_by_tool_call`` consumes on resume). Cleared on submit
interruptData: Record<string, unknown>; // or on a fresh user turn.
bundleToolCallIds: string[]; const [pendingInterrupts, setPendingInterrupts] = useState<PendingInterruptState[]>([]);
} | null>(null); // Per-card staged decisions held until every pending card has submitted,
// at which point we batch them into one ``hitl-decision`` event in the
// same order as ``pendingInterrupts``. Using a ref because partial
// progress should not re-render the page.
const stagedDecisionsByInterruptIdRef = useRef<Map<string, HitlDecision[]>>(new Map());
const toolsWithUI = TOOLS_WITH_UI_ALL; const toolsWithUI = TOOLS_WITH_UI_ALL;
const setMessageDocumentsMap = useSetAtom(messageDocumentsMapAtom); const setMessageDocumentsMap = useSetAtom(messageDocumentsMapAtom);
@ -1029,9 +1040,7 @@ export default function NewChatPage() {
mentioned_surfsense_doc_ids: hasSurfsenseDocIds mentioned_surfsense_doc_ids: hasSurfsenseDocIds
? mentionedDocumentIds.surfsense_doc_ids ? mentionedDocumentIds.surfsense_doc_ids
: undefined, : undefined,
mentioned_folder_ids: hasFolderIds mentioned_folder_ids: hasFolderIds ? mentionedDocumentIds.folder_ids : undefined,
? mentionedDocumentIds.folder_ids
: undefined,
// Full mention metadata (docs + folders, with // Full mention metadata (docs + folders, with
// ``kind`` discriminator) so the BE can embed a // ``kind`` discriminator) so the BE can embed a
// ``mentioned-documents`` ContentPart on the // ``mentioned-documents`` ContentPart on the
@ -1193,12 +1202,24 @@ export default function NewChatPage() {
) )
); );
if (currentThreadId) { if (currentThreadId) {
setPendingInterrupt({ // ``tool_call_id`` is stamped on the backend by
threadId: currentThreadId, // ``checkpointed_subagent_middleware``. Without it we
assistantMsgId, // can't address the paused subagent on resume — skip
interruptData, // rather than fabricate a synthetic key.
bundleToolCallIds, const interruptId = String(interruptData.tool_call_id ?? "");
}); if (interruptId) {
const incoming: PendingInterruptState = {
interruptId,
threadId: currentThreadId,
assistantMsgId,
interruptData,
bundleToolCallIds,
};
setPendingInterrupts((prev) => {
const without = prev.filter((p) => p.interruptId !== interruptId);
return [...without, incoming];
});
}
} }
break; break;
} }
@ -1274,7 +1295,7 @@ export default function NewChatPage() {
// by ``persist_assistant_shell``. Rename the optimistic // by ``persist_assistant_shell``. Rename the optimistic
// id, migrate ``tokenUsageStore`` so any pending // id, migrate ``tokenUsageStore`` so any pending
// ``data-token-usage`` payload binds to the new id, // ``data-token-usage`` payload binds to the new id,
// remap any in-flight ``pendingInterrupt`` reference, // remap any in-flight ``pendingInterrupts`` entries,
// and reassign the closure variable so the in-stream // and reassign the closure variable so the in-stream
// flush callback (line ~1074) keeps writing to the // flush callback (line ~1074) keeps writing to the
// renamed message. // renamed message.
@ -1290,10 +1311,12 @@ export default function NewChatPage() {
: m : m
) )
); );
setPendingInterrupt((prev) => setPendingInterrupts((prev) =>
prev && prev.assistantMsgId === oldAssistantMsgId prev.map((p) =>
? { ...prev, assistantMsgId: newAssistantMsgId } p.assistantMsgId === oldAssistantMsgId
: prev ? { ...p, assistantMsgId: newAssistantMsgId }
: p
)
); );
assistantMsgId = newAssistantMsgId; assistantMsgId = newAssistantMsgId;
break; break;
@ -1380,14 +1403,23 @@ export default function NewChatPage() {
edited_action?: { name: string; args: Record<string, unknown> }; edited_action?: { name: string; args: Record<string, unknown> };
}> }>
) => { ) => {
if (!pendingInterrupt) return; if (pendingInterrupts.length === 0) return;
const { threadId: resumeThreadId } = pendingInterrupt; // All cards in this turn share the same threadId/assistantMsgId
// (they're siblings of one parent agent step), so reading from
// the first entry is safe.
const resumeThreadId = pendingInterrupts[0].threadId;
// Destructured separately as ``let`` so the SSE // Destructured separately as ``let`` so the SSE
// ``data-assistant-message-id`` handler (resume always // ``data-assistant-message-id`` handler (resume always
// allocates a fresh server-side row) can rename it to // allocates a fresh server-side row) can rename it to
// the canonical ``msg-{db_id}`` mid-stream. // the canonical ``msg-{db_id}`` mid-stream.
let assistantMsgId = pendingInterrupt.assistantMsgId; let assistantMsgId = pendingInterrupts[0].assistantMsgId;
setPendingInterrupt(null); // Concatenate every card's tool-call ids in pendingInterrupts order;
// this matches the ``decisions`` ordering produced by
// ``handleApprovalSubmit`` and the backend slicer's traversal of
// ``state.interrupts``.
const allBundleToolCallIds = pendingInterrupts.flatMap((p) => p.bundleToolCallIds);
setPendingInterrupts([]);
stagedDecisionsByInterruptIdRef.current.clear();
setIsRunning(true); setIsRunning(true);
const token = getBearerToken(); const token = getBearerToken();
@ -1464,7 +1496,7 @@ export default function NewChatPage() {
// collapse onto ``decisions[0]``. Cards outside the bundle are // collapse onto ``decisions[0]``. Cards outside the bundle are
// untouched. Mirrors the host ``hitl-decision`` handler. // untouched. Mirrors the host ``hitl-decision`` handler.
const decisionByTcId = new Map<string, (typeof decisions)[number]>(); const decisionByTcId = new Map<string, (typeof decisions)[number]>();
const tcIds = pendingInterrupt.bundleToolCallIds; const tcIds = allBundleToolCallIds;
if (decisions.length === tcIds.length) { if (decisions.length === tcIds.length) {
for (let i = 0; i < tcIds.length; i++) decisionByTcId.set(tcIds[i], decisions[i]); for (let i = 0; i < tcIds.length; i++) decisionByTcId.set(tcIds[i], decisions[i]);
} }
@ -1596,12 +1628,22 @@ export default function NewChatPage() {
: m : m
) )
); );
setPendingInterrupt({ {
threadId: resumeThreadId, const interruptId = String(interruptData.tool_call_id ?? "");
assistantMsgId, if (interruptId) {
interruptData, const incoming: PendingInterruptState = {
bundleToolCallIds, interruptId,
}); threadId: resumeThreadId,
assistantMsgId,
interruptData,
bundleToolCallIds,
};
setPendingInterrupts((prev) => {
const without = prev.filter((p) => p.interruptId !== interruptId);
return [...without, incoming];
});
}
}
break; break;
} }
@ -1679,7 +1721,7 @@ export default function NewChatPage() {
} }
}, },
[ [
pendingInterrupt, pendingInterrupts,
messages, messages,
searchSpaceId, searchSpaceId,
localFilesystemEnabled, localFilesystemEnabled,
@ -1700,17 +1742,19 @@ export default function NewChatPage() {
edited_action?: { name: string; args: Record<string, unknown> }; edited_action?: { name: string; args: Record<string, unknown> };
}>; }>;
}; };
if (!detail?.decisions || !pendingInterrupt) return; if (!detail?.decisions || pendingInterrupts.length === 0) return;
const incoming = detail.decisions; const incoming = detail.decisions;
if (incoming.length === 0) return; if (incoming.length === 0) return;
const tcIds = pendingInterrupt.bundleToolCallIds; // Concatenated tool-call ids across every pending card, in the
// order ``handleApprovalSubmit`` produced ``incoming``.
const tcIds = pendingInterrupts.flatMap((p) => p.bundleToolCallIds);
const N = tcIds.length; const N = tcIds.length;
// Bundles must submit exactly one decision per action_request. // Refuse rather than silently broadcast or drop. The orchestrator
// Refuse rather than silently broadcast a single decision across // only fires ``hitl-decision`` once every pending card has
// the bundle (would mis-apply rejects/edits and diverge from // submitted, so a count mismatch indicates a contract drift
// what handleResume sends to /resume). // (and would later make the backend slicer raise).
if (N > 1 && incoming.length !== N) { if (incoming.length !== N) {
toast.error( toast.error(
`Cannot resume: ${incoming.length} decision(s) submitted for ${N} pending actions.` `Cannot resume: ${incoming.length} decision(s) submitted for ${N} pending actions.`
); );
@ -1721,9 +1765,12 @@ export default function NewChatPage() {
for (let i = 0; i < tcIds.length; i++) byTcId.set(tcIds[i], incoming[i]); for (let i = 0; i < tcIds.length; i++) byTcId.set(tcIds[i], incoming[i]);
const submittedDecisions = tcIds.map((id) => byTcId.get(id)!); const submittedDecisions = tcIds.map((id) => byTcId.get(id)!);
// All pending cards belong to the same assistant message, so a
// single content-update pass suffices.
const targetAssistantMsgId = pendingInterrupts[0].assistantMsgId;
setMessages((prev) => setMessages((prev) =>
prev.map((m) => { prev.map((m) => {
if (m.id !== pendingInterrupt.assistantMsgId) return m; if (m.id !== targetAssistantMsgId) return m;
const parts = m.content as unknown as Array<Record<string, unknown>>; const parts = m.content as unknown as Array<Record<string, unknown>>;
const newContent = parts.map((part) => { const newContent = parts.map((part) => {
const tcId = part.toolCallId as string | undefined; const tcId = part.toolCallId as string | undefined;
@ -1760,7 +1807,7 @@ export default function NewChatPage() {
}; };
window.addEventListener("hitl-decision", handler); window.addEventListener("hitl-decision", handler);
return () => window.removeEventListener("hitl-decision", handler); return () => window.removeEventListener("hitl-decision", handler);
}, [handleResume, pendingInterrupt]); }, [handleResume, pendingInterrupts]);
// Convert message (pass through since already in correct format) // Convert message (pass through since already in correct format)
const convertMessage = useCallback( const convertMessage = useCallback(
@ -1900,12 +1947,10 @@ export default function NewChatPage() {
filesystem_mode: selection.filesystem_mode, filesystem_mode: selection.filesystem_mode,
client_platform: selection.client_platform, client_platform: selection.client_platform,
local_filesystem_mounts: selection.local_filesystem_mounts, local_filesystem_mounts: selection.local_filesystem_mounts,
mentioned_document_ids: mentioned_document_ids: regenerateDocIds.length > 0 ? regenerateDocIds : undefined,
regenerateDocIds.length > 0 ? regenerateDocIds : undefined,
mentioned_surfsense_doc_ids: mentioned_surfsense_doc_ids:
regenerateSurfsenseDocIds.length > 0 ? regenerateSurfsenseDocIds : undefined, regenerateSurfsenseDocIds.length > 0 ? regenerateSurfsenseDocIds : undefined,
mentioned_folder_ids: mentioned_folder_ids: regenerateFolderIds.length > 0 ? regenerateFolderIds : undefined,
regenerateFolderIds.length > 0 ? regenerateFolderIds : undefined,
// Full mention metadata for the regenerate-specific // Full mention metadata for the regenerate-specific
// source list. Only meaningful for edit (the BE only // source list. Only meaningful for edit (the BE only
// re-persists a user row when ``user_query`` is set); // re-persists a user row when ``user_query`` is set);
@ -2284,11 +2329,32 @@ export default function NewChatPage() {
[handleRegenerate, messages, agentActionItems] [handleRegenerate, messages, agentActionItems]
); );
const handleApprovalSubmit = useCallback((orderedDecisions: HitlDecision[]) => { const handleApprovalSubmit = useCallback(
window.dispatchEvent( (interruptId: string, decisions: HitlDecision[]) => {
new CustomEvent("hitl-decision", { detail: { decisions: orderedDecisions } }) // Stage this card's decisions; only fire the resume once every
); // pending card in the current turn has submitted, so the
}, []); // backend slicer sees a single concatenated decisions list
// whose total matches the parent state's pending action count.
stagedDecisionsByInterruptIdRef.current.set(interruptId, decisions);
if (stagedDecisionsByInterruptIdRef.current.size < pendingInterrupts.length) {
return;
}
const ordered: HitlDecision[] = [];
for (const pi of pendingInterrupts) {
const staged = stagedDecisionsByInterruptIdRef.current.get(pi.interruptId);
if (!staged) {
// Defensive: a missing entry means the staging map and
// the pending list disagreed for one cycle. Bail rather
// than dispatch a count-mismatched batch.
return;
}
ordered.push(...staged);
}
stagedDecisionsByInterruptIdRef.current.clear();
window.dispatchEvent(new CustomEvent("hitl-decision", { detail: { decisions: ordered } }));
},
[pendingInterrupts]
);
const handleEditDialogChoice = useCallback( const handleEditDialogChoice = useCallback(
async (choice: EditMessageDialogChoice) => { async (choice: EditMessageDialogChoice) => {
@ -2361,7 +2427,7 @@ export default function NewChatPage() {
<TimelineDataUI /> <TimelineDataUI />
<StepSeparatorDataUI /> <StepSeparatorDataUI />
<PendingInterruptProvider <PendingInterruptProvider
pendingInterrupt={pendingInterrupt} pendingInterrupts={pendingInterrupts}
onSubmit={handleApprovalSubmit} onSubmit={handleApprovalSubmit}
> >
<div key={searchSpaceId} className="flex h-full overflow-hidden"> <div key={searchSpaceId} className="flex h-full overflow-hidden">

View file

@ -3,8 +3,10 @@
import { createContext, type ReactNode, useContext } from "react"; import { createContext, type ReactNode, useContext } from "react";
import type { HitlDecision } from "../types"; import type { HitlDecision } from "../types";
/** Snapshot of one in-flight HITL interrupt; ``null`` when nothing is pending. */ /** One in-flight HITL interrupt (one paused subagent). */
export interface PendingInterruptState { export interface PendingInterruptState {
/** Stable id keyed by the parent ``tool_call_id`` stamped on the interrupt. */
interruptId: string;
threadId: number; threadId: number;
assistantMsgId: string; assistantMsgId: string;
interruptData: Record<string, unknown>; interruptData: Record<string, unknown>;
@ -12,8 +14,19 @@ export interface PendingInterruptState {
} }
export interface PendingInterruptValue { export interface PendingInterruptValue {
pendingInterrupt: PendingInterruptState | null; /**
onSubmit: (decisions: HitlDecision[]) => void; * Every paused subagent for the current turn, in the order the SSE stream
* delivered them which matches ``state.interrupts`` traversal on the
* backend, which is the order ``slice_decisions_by_tool_call`` consumes.
*/
pendingInterrupts: PendingInterruptState[];
/**
* Stage one card's decisions. The orchestrator (page-level) batches across
* cards and dispatches the resume only once every pending interrupt has
* submitted, so the backend slicer sees a single concatenated decisions
* list whose total matches the parent state's pending action count.
*/
onSubmit: (interruptId: string, decisions: HitlDecision[]) => void;
} }
const PendingInterruptContext = createContext<PendingInterruptValue | null>(null); const PendingInterruptContext = createContext<PendingInterruptValue | null>(null);
@ -24,16 +37,16 @@ const PendingInterruptContext = createContext<PendingInterruptValue | null>(null
* page root. * page root.
*/ */
export function PendingInterruptProvider({ export function PendingInterruptProvider({
pendingInterrupt, pendingInterrupts,
onSubmit, onSubmit,
children, children,
}: { }: {
pendingInterrupt: PendingInterruptState | null; pendingInterrupts: PendingInterruptState[];
onSubmit: (decisions: HitlDecision[]) => void; onSubmit: (interruptId: string, decisions: HitlDecision[]) => void;
children: ReactNode; children: ReactNode;
}) { }) {
return ( return (
<PendingInterruptContext.Provider value={{ pendingInterrupt, onSubmit }}> <PendingInterruptContext.Provider value={{ pendingInterrupts, onSubmit }}>
{children} {children}
</PendingInterruptContext.Provider> </PendingInterruptContext.Provider>
); );

View file

@ -11,10 +11,9 @@ const noopSubmit = () => {};
/** /**
* assistant-ui data UI for the ``thinking-steps`` data-part. * assistant-ui data UI for the ``thinking-steps`` data-part.
* *
* Re-scopes the global ``PendingInterruptProvider`` per message: the * Re-scopes the global ``PendingInterruptProvider`` per message: approval
* approval card only mounts under the assistant message that owns * cards only mount under the assistant message that owns the interrupt
* the interrupt (otherwise every message in scrollback would render * (otherwise every message in scrollback would render its own cards).
* its own card).
*/ */
function TimelineDataRenderer({ data }: { name: string; data: unknown }) { function TimelineDataRenderer({ data }: { name: string; data: unknown }) {
const isThreadRunning = useAuiState(({ thread }) => thread.isRunning); const isThreadRunning = useAuiState(({ thread }) => thread.isRunning);
@ -23,10 +22,10 @@ function TimelineDataRenderer({ data }: { name: string; data: unknown }) {
const content = useAuiState(({ message }) => message?.content); const content = useAuiState(({ message }) => message?.content);
const messageId = useAuiState(({ message }) => message?.id); const messageId = useAuiState(({ message }) => message?.id);
const pendingValue = usePendingInterrupt(); const pendingValue = usePendingInterrupt();
const pendingForThisMessage = const pendingForThisMessage = useMemo(
pendingValue?.pendingInterrupt && pendingValue.pendingInterrupt.assistantMsgId === messageId () => (pendingValue?.pendingInterrupts ?? []).filter((p) => p.assistantMsgId === messageId),
? pendingValue.pendingInterrupt [pendingValue?.pendingInterrupts, messageId]
: null; );
const onSubmit = pendingValue?.onSubmit ?? noopSubmit; const onSubmit = pendingValue?.onSubmit ?? noopSubmit;
const steps = useMemo<ThinkingStepInput[]>( const steps = useMemo<ThinkingStepInput[]>(
@ -39,11 +38,11 @@ function TimelineDataRenderer({ data }: { name: string; data: unknown }) {
[steps, content] [steps, content]
); );
if (items.length === 0 && !pendingForThisMessage) return null; if (items.length === 0 && pendingForThisMessage.length === 0) return null;
return ( return (
<div className="mb-3 -mx-2 leading-normal"> <div className="mb-3 -mx-2 leading-normal">
<PendingInterruptProvider pendingInterrupt={pendingForThisMessage} onSubmit={onSubmit}> <PendingInterruptProvider pendingInterrupts={pendingForThisMessage} onSubmit={onSubmit}>
<Timeline items={items} isThreadRunning={isMessageStreaming} /> <Timeline items={items} isThreadRunning={isMessageStreaming} />
</PendingInterruptProvider> </PendingInterruptProvider>
</div> </div>

View file

@ -32,9 +32,9 @@ export const Timeline: FC<{
isThreadRunning?: boolean; isThreadRunning?: boolean;
}> = ({ items, isThreadRunning = true }) => { }> = ({ items, isThreadRunning = true }) => {
const pendingValue = usePendingInterrupt(); const pendingValue = usePendingInterrupt();
const pendingInterrupt = pendingValue?.pendingInterrupt ?? null; const pendingInterrupts = pendingValue?.pendingInterrupts ?? [];
const onSubmit = pendingValue?.onSubmit; const onSubmit = pendingValue?.onSubmit;
const hasPending = pendingInterrupt !== null; const hasPending = pendingInterrupts.length > 0;
// Apply the override here so downstream (grouping, headers, dots) // Apply the override here so downstream (grouping, headers, dots)
// sees the corrected status without threading a callback. Keeps // sees the corrected status without threading a callback. Keeps
@ -135,9 +135,15 @@ export const Timeline: FC<{
/> />
); );
})} })}
{pendingInterrupt && onSubmit && ( {hasPending && onSubmit && (
<div className="pl-5"> <div className="pl-5 space-y-3">
<HitlApprovalCard pendingInterrupt={pendingInterrupt} onSubmit={onSubmit} /> {pendingInterrupts.map((pi) => (
<HitlApprovalCard
key={pi.interruptId}
pendingInterrupt={pi}
onSubmit={(decisions) => onSubmit(pi.interruptId, decisions)}
/>
))}
</div> </div>
)} )}
</div> </div>

View file

@ -565,7 +565,7 @@ export type SSEEvent =
* the assistant-side row of the current turn. The frontend * the assistant-side row of the current turn. The frontend
* renames its optimistic ``msg-assistant-XXX`` placeholder * renames its optimistic ``msg-assistant-XXX`` placeholder
* id, migrates the local ``tokenUsageStore`` and * id, migrates the local ``tokenUsageStore`` and
* ``pendingInterrupt`` references, and binds the running * ``pendingInterrupts`` entries, and binds the running
* mutable ``assistantMsgId`` closure variable to the * mutable ``assistantMsgId`` closure variable to the
* canonical id for the rest of the stream. * canonical id for the rest of the stream.
*/ */