From 86f6b285ce9cedbf529a7d8325f4457f602f997a Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Thu, 30 Apr 2026 18:09:34 +0530 Subject: [PATCH] refactor(chat): introduce new stream handling utilities and restructure event processing for improved performance and maintainability --- .../new-chat/[[...chat_id]]/page.tsx | 205 +----------------- surfsense_web/lib/chat/stream-flush.ts | 19 ++ surfsense_web/lib/chat/stream-pipeline.ts | 191 ++++++++++++++++ 3 files changed, 217 insertions(+), 198 deletions(-) create mode 100644 surfsense_web/lib/chat/stream-flush.ts create mode 100644 surfsense_web/lib/chat/stream-pipeline.ts diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index d1dd14e06..82a12b6b1 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -71,23 +71,21 @@ import { setActivePodcastTaskId, } from "@/lib/chat/podcast-state"; import { - addStepSeparator, addToolCall, - appendReasoning, - appendText, - appendToolInputDelta, buildContentForPersistence, buildContentForUI, type ContentPartsState, - endReasoning, - FrameBatchedUpdater, - readSSEStream, - type SSEEvent, + type FrameBatchedUpdater, type ThinkingStepData, type ToolUIGate, - updateThinkingSteps, updateToolCall, } from "@/lib/chat/streaming-state"; +import { createStreamFlushHelpers } from "@/lib/chat/stream-flush"; +import { + consumeSseEvents, + hasPersistableContent, + processSharedStreamEvent, +} from "@/lib/chat/stream-pipeline"; import { appendMessage, createThread, @@ -134,33 +132,6 @@ const MobileReportPanel = dynamic( { ssr: false } ); -/** - * After a tool produces output, mark any previously-decided interrupt tool - * calls as completed so the ApprovalCard can transition from shimmer to done. - */ -function markInterruptsCompleted(contentParts: Array<{ type: string; result?: unknown }>): void { - for (const part of contentParts) { - if ( - part.type === "tool-call" && - typeof part.result === "object" && - part.result !== null && - (part.result as Record).__interrupt__ === true && - (part.result as Record).__decided__ && - !(part.result as Record).__completed__ - ) { - part.result = { ...(part.result as Record), __completed__: true }; - } - } -} - -function toStreamTerminalError( - event: Extract -): Error & { errorCode?: string } { - return Object.assign(new Error(event.errorText || "Server error"), { - errorCode: event.errorCode, - }); -} - async function toHttpResponseError(response: Response): Promise { const statusDefaultCode = response.status === 409 @@ -252,168 +223,6 @@ function tagPreAcceptSendFailure(error: unknown): unknown { }); } -type SharedStreamEventContext = { - contentPartsState: ContentPartsState; - toolsWithUI: ToolUIGate; - currentThinkingSteps: Map; - scheduleFlush: () => void; - forceFlush: () => void; - onTokenUsage?: (data: TokenUsageData) => void; - onToolOutputAvailable?: ( - event: Extract, - context: { - contentPartsState: ContentPartsState; - toolCallIndices: Map; - } - ) => void; -}; - -function createStreamFlushHelpers(flushMessages: () => void): { - batcher: FrameBatchedUpdater; - scheduleFlush: () => void; - forceFlush: () => void; -} { - const batcher = new FrameBatchedUpdater(); - const scheduleFlush = () => batcher.schedule(flushMessages); - // Force-flush helper: ``batcher.flush()`` is a no-op when - // ``dirty=false`` (e.g. a tool starts before any text streamed). - // ``scheduleFlush(); batcher.flush()`` sets the dirty bit first so - // terminal events render promptly without the throttle delay. - const forceFlush = () => { - scheduleFlush(); - batcher.flush(); - }; - return { batcher, scheduleFlush, forceFlush }; -} - -function hasPersistableContent(contentParts: ContentPartsState["contentParts"], toolsWithUI: ToolUIGate) { - return contentParts.some( - (part) => - (part.type === "text" && part.text.length > 0) || - (part.type === "reasoning" && part.text.length > 0) || - (part.type === "tool-call" && (toolsWithUI === "all" || toolsWithUI.has(part.toolName))) - ); -} - -function processSharedStreamEvent(parsed: SSEEvent, context: SharedStreamEventContext): boolean { - const { contentPartsState, toolsWithUI, currentThinkingSteps, scheduleFlush, forceFlush } = context; - const { contentParts, toolCallIndices } = contentPartsState; - - switch (parsed.type) { - case "text-delta": - appendText(contentPartsState, parsed.delta); - scheduleFlush(); - return true; - - case "reasoning-delta": - appendReasoning(contentPartsState, parsed.delta); - scheduleFlush(); - return true; - - case "reasoning-end": - endReasoning(contentPartsState); - scheduleFlush(); - return true; - - case "start-step": - addStepSeparator(contentPartsState); - scheduleFlush(); - return true; - - case "finish-step": - return true; - - case "tool-input-start": - addToolCall( - contentPartsState, - toolsWithUI, - parsed.toolCallId, - parsed.toolName, - {}, - false, - parsed.langchainToolCallId - ); - forceFlush(); - return true; - - case "tool-input-delta": - // High-frequency event: deltas can fire dozens of times per call, - // so use throttled scheduleFlush (NOT forceFlush) to coalesce. - appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta); - scheduleFlush(); - return true; - - case "tool-input-available": { - const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2); - if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(contentPartsState, parsed.toolCallId, { - args: parsed.input || {}, - argsText: finalArgsText, - langchainToolCallId: parsed.langchainToolCallId, - }); - } else { - addToolCall( - contentPartsState, - toolsWithUI, - parsed.toolCallId, - parsed.toolName, - parsed.input || {}, - false, - parsed.langchainToolCallId - ); - // addToolCall doesn't accept argsText today; backfill via - // updateToolCall so the new card renders pretty-printed JSON. - updateToolCall(contentPartsState, parsed.toolCallId, { - argsText: finalArgsText, - }); - } - forceFlush(); - return true; - } - - case "tool-output-available": - updateToolCall(contentPartsState, parsed.toolCallId, { - result: parsed.output, - langchainToolCallId: parsed.langchainToolCallId, - }); - markInterruptsCompleted(contentParts); - context.onToolOutputAvailable?.(parsed, { contentPartsState, toolCallIndices }); - forceFlush(); - return true; - - case "data-thinking-step": { - const stepData = parsed.data as ThinkingStepData; - if (stepData?.id) { - currentThinkingSteps.set(stepData.id, stepData); - const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps); - if (didUpdate) { - scheduleFlush(); - } - } - return true; - } - - case "data-token-usage": - context.onTokenUsage?.(parsed.data as TokenUsageData); - return true; - - case "error": - throw toStreamTerminalError(parsed); - - default: - return false; - } -} - -async function consumeSseEvents( - response: Response, - onEvent: (event: SSEEvent) => void | Promise -): Promise { - for await (const parsed of readSSEStream(response)) { - await onEvent(parsed); - } -} - /** * Zod schema for mentioned document info (for type-safe parsing) */ diff --git a/surfsense_web/lib/chat/stream-flush.ts b/surfsense_web/lib/chat/stream-flush.ts new file mode 100644 index 000000000..6d13c9237 --- /dev/null +++ b/surfsense_web/lib/chat/stream-flush.ts @@ -0,0 +1,19 @@ +import { FrameBatchedUpdater } from "@/lib/chat/streaming-state"; + +export function createStreamFlushHelpers(flushMessages: () => void): { + batcher: FrameBatchedUpdater; + scheduleFlush: () => void; + forceFlush: () => void; +} { + const batcher = new FrameBatchedUpdater(); + const scheduleFlush = () => batcher.schedule(flushMessages); + // Force-flush helper: ``batcher.flush()`` is a no-op when + // ``dirty=false`` (e.g. a tool starts before any text streamed). + // ``scheduleFlush(); batcher.flush()`` sets the dirty bit first so + // terminal events render promptly without the throttle delay. + const forceFlush = () => { + scheduleFlush(); + batcher.flush(); + }; + return { batcher, scheduleFlush, forceFlush }; +} diff --git a/surfsense_web/lib/chat/stream-pipeline.ts b/surfsense_web/lib/chat/stream-pipeline.ts new file mode 100644 index 000000000..8957bdea3 --- /dev/null +++ b/surfsense_web/lib/chat/stream-pipeline.ts @@ -0,0 +1,191 @@ +import { + addStepSeparator, + addToolCall, + appendReasoning, + appendText, + appendToolInputDelta, + type ContentPartsState, + endReasoning, + readSSEStream, + type SSEEvent, + type ThinkingStepData, + type ToolUIGate, + updateThinkingSteps, + updateToolCall, +} from "@/lib/chat/streaming-state"; + +export type SharedStreamEventContext = { + contentPartsState: ContentPartsState; + toolsWithUI: ToolUIGate; + currentThinkingSteps: Map; + scheduleFlush: () => void; + forceFlush: () => void; + onTokenUsage?: (data: Extract["data"]) => void; + onToolOutputAvailable?: ( + event: Extract, + context: { + contentPartsState: ContentPartsState; + toolCallIndices: Map; + } + ) => void; +}; + +/** + * After a tool produces output, mark any previously-decided interrupt tool + * calls as completed so the ApprovalCard can transition from shimmer to done. + */ +export function markInterruptsCompleted( + contentParts: Array<{ type: string; result?: unknown }> +): void { + for (const part of contentParts) { + if ( + part.type === "tool-call" && + typeof part.result === "object" && + part.result !== null && + (part.result as Record).__interrupt__ === true && + (part.result as Record).__decided__ && + !(part.result as Record).__completed__ + ) { + part.result = { ...(part.result as Record), __completed__: true }; + } + } +} + +export function hasPersistableContent( + contentParts: ContentPartsState["contentParts"], + toolsWithUI: ToolUIGate +) { + return contentParts.some( + (part) => + (part.type === "text" && part.text.length > 0) || + (part.type === "reasoning" && part.text.length > 0) || + (part.type === "tool-call" && (toolsWithUI === "all" || toolsWithUI.has(part.toolName))) + ); +} + +function toStreamTerminalError( + event: Extract +): Error & { errorCode?: string } { + return Object.assign(new Error(event.errorText || "Server error"), { + errorCode: event.errorCode, + }); +} + +export function processSharedStreamEvent(parsed: SSEEvent, context: SharedStreamEventContext): boolean { + const { contentPartsState, toolsWithUI, currentThinkingSteps, scheduleFlush, forceFlush } = context; + const { contentParts, toolCallIndices } = contentPartsState; + + switch (parsed.type) { + case "text-delta": + appendText(contentPartsState, parsed.delta); + scheduleFlush(); + return true; + + case "reasoning-delta": + appendReasoning(contentPartsState, parsed.delta); + scheduleFlush(); + return true; + + case "reasoning-end": + endReasoning(contentPartsState); + scheduleFlush(); + return true; + + case "start-step": + addStepSeparator(contentPartsState); + scheduleFlush(); + return true; + + case "finish-step": + return true; + + case "tool-input-start": + addToolCall( + contentPartsState, + toolsWithUI, + parsed.toolCallId, + parsed.toolName, + {}, + false, + parsed.langchainToolCallId + ); + forceFlush(); + return true; + + case "tool-input-delta": + // High-frequency event: deltas can fire dozens of times per call, + // so use throttled scheduleFlush (NOT forceFlush) to coalesce. + appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta); + scheduleFlush(); + return true; + + case "tool-input-available": { + const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2); + if (toolCallIndices.has(parsed.toolCallId)) { + updateToolCall(contentPartsState, parsed.toolCallId, { + args: parsed.input || {}, + argsText: finalArgsText, + langchainToolCallId: parsed.langchainToolCallId, + }); + } else { + addToolCall( + contentPartsState, + toolsWithUI, + parsed.toolCallId, + parsed.toolName, + parsed.input || {}, + false, + parsed.langchainToolCallId + ); + // addToolCall doesn't accept argsText today; backfill via + // updateToolCall so the new card renders pretty-printed JSON. + updateToolCall(contentPartsState, parsed.toolCallId, { + argsText: finalArgsText, + }); + } + forceFlush(); + return true; + } + + case "tool-output-available": + updateToolCall(contentPartsState, parsed.toolCallId, { + result: parsed.output, + langchainToolCallId: parsed.langchainToolCallId, + }); + markInterruptsCompleted(contentParts); + context.onToolOutputAvailable?.(parsed, { contentPartsState, toolCallIndices }); + forceFlush(); + return true; + + case "data-thinking-step": { + const stepData = parsed.data as ThinkingStepData; + if (stepData?.id) { + currentThinkingSteps.set(stepData.id, stepData); + const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps); + if (didUpdate) { + scheduleFlush(); + } + } + return true; + } + + case "data-token-usage": + context.onTokenUsage?.(parsed.data); + return true; + + case "error": + throw toStreamTerminalError(parsed); + + default: + return false; + } +} + +export async function consumeSseEvents( + response: Response, + onEvent: (event: SSEEvent) => void | Promise +): Promise { + for await (const parsed of readSSEStream(response)) { + await onEvent(parsed); + } +}