From 6465ea181a25a8c6d003572ea4707aa9e1dcf3cc Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Thu, 30 Apr 2026 18:09:18 +0530 Subject: [PATCH] refactor(chat): streamline NewChatPage component by removing unused functions and integrating new stream handling utilities for improved performance --- .../new-chat/[[...chat_id]]/page.tsx | 625 +++++++----------- 1 file changed, 255 insertions(+), 370 deletions(-) diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index fe625f169..d1dd14e06 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -252,6 +252,168 @@ function tagPreAcceptSendFailure(error: unknown): unknown { }); } +type SharedStreamEventContext = { + contentPartsState: ContentPartsState; + toolsWithUI: ToolUIGate; + currentThinkingSteps: Map; + scheduleFlush: () => void; + forceFlush: () => void; + onTokenUsage?: (data: TokenUsageData) => void; + onToolOutputAvailable?: ( + event: Extract, + context: { + contentPartsState: ContentPartsState; + toolCallIndices: Map; + } + ) => void; +}; + +function createStreamFlushHelpers(flushMessages: () => void): { + batcher: FrameBatchedUpdater; + scheduleFlush: () => void; + forceFlush: () => void; +} { + const batcher = new FrameBatchedUpdater(); + const scheduleFlush = () => batcher.schedule(flushMessages); + // Force-flush helper: ``batcher.flush()`` is a no-op when + // ``dirty=false`` (e.g. a tool starts before any text streamed). + // ``scheduleFlush(); batcher.flush()`` sets the dirty bit first so + // terminal events render promptly without the throttle delay. + const forceFlush = () => { + scheduleFlush(); + batcher.flush(); + }; + return { batcher, scheduleFlush, forceFlush }; +} + +function hasPersistableContent(contentParts: ContentPartsState["contentParts"], toolsWithUI: ToolUIGate) { + return contentParts.some( + (part) => + (part.type === "text" && part.text.length > 0) || + (part.type === "reasoning" && part.text.length > 0) || + (part.type === "tool-call" && (toolsWithUI === "all" || toolsWithUI.has(part.toolName))) + ); +} + +function processSharedStreamEvent(parsed: SSEEvent, context: SharedStreamEventContext): boolean { + const { contentPartsState, toolsWithUI, currentThinkingSteps, scheduleFlush, forceFlush } = context; + const { contentParts, toolCallIndices } = contentPartsState; + + switch (parsed.type) { + case "text-delta": + appendText(contentPartsState, parsed.delta); + scheduleFlush(); + return true; + + case "reasoning-delta": + appendReasoning(contentPartsState, parsed.delta); + scheduleFlush(); + return true; + + case "reasoning-end": + endReasoning(contentPartsState); + scheduleFlush(); + return true; + + case "start-step": + addStepSeparator(contentPartsState); + scheduleFlush(); + return true; + + case "finish-step": + return true; + + case "tool-input-start": + addToolCall( + contentPartsState, + toolsWithUI, + parsed.toolCallId, + parsed.toolName, + {}, + false, + parsed.langchainToolCallId + ); + forceFlush(); + return true; + + case "tool-input-delta": + // High-frequency event: deltas can fire dozens of times per call, + // so use throttled scheduleFlush (NOT forceFlush) to coalesce. + appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta); + scheduleFlush(); + return true; + + case "tool-input-available": { + const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2); + if (toolCallIndices.has(parsed.toolCallId)) { + updateToolCall(contentPartsState, parsed.toolCallId, { + args: parsed.input || {}, + argsText: finalArgsText, + langchainToolCallId: parsed.langchainToolCallId, + }); + } else { + addToolCall( + contentPartsState, + toolsWithUI, + parsed.toolCallId, + parsed.toolName, + parsed.input || {}, + false, + parsed.langchainToolCallId + ); + // addToolCall doesn't accept argsText today; backfill via + // updateToolCall so the new card renders pretty-printed JSON. + updateToolCall(contentPartsState, parsed.toolCallId, { + argsText: finalArgsText, + }); + } + forceFlush(); + return true; + } + + case "tool-output-available": + updateToolCall(contentPartsState, parsed.toolCallId, { + result: parsed.output, + langchainToolCallId: parsed.langchainToolCallId, + }); + markInterruptsCompleted(contentParts); + context.onToolOutputAvailable?.(parsed, { contentPartsState, toolCallIndices }); + forceFlush(); + return true; + + case "data-thinking-step": { + const stepData = parsed.data as ThinkingStepData; + if (stepData?.id) { + currentThinkingSteps.set(stepData.id, stepData); + const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps); + if (didUpdate) { + scheduleFlush(); + } + } + return true; + } + + case "data-token-usage": + context.onTokenUsage?.(parsed.data as TokenUsageData); + return true; + + case "error": + throw toStreamTerminalError(parsed); + + default: + return false; + } +} + +async function consumeSseEvents( + response: Response, + onEvent: (event: SSEEvent) => void | Promise +): Promise { + for await (const parsed of readSSEStream(response)) { + await onEvent(parsed); + } +} + /** * Zod schema for mentioned document info (for type-safe parsing) */ @@ -456,7 +618,7 @@ export default function NewChatPage() { threadId: number | null; assistantMsgId: string; content: unknown; - tokenUsage?: Record; + tokenUsage?: TokenUsageData; turnId?: string | null; logContext: string; onRemapped?: (newMsgId: string) => void; @@ -1055,8 +1217,6 @@ export default function NewChatPage() { // Prepare assistant message const assistantMsgId = `msg-assistant-${Date.now()}`; const currentThinkingSteps = new Map(); - const batcher = new FrameBatchedUpdater(); - const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, @@ -1065,11 +1225,12 @@ export default function NewChatPage() { }; const { contentParts, toolCallIndices } = contentPartsState; let wasInterrupted = false; - let tokenUsageData: Record | null = null; + let tokenUsageData: TokenUsageData | null = null; let newAccepted = false; let userPersisted = false; // Captured from ``data-turn-info`` at stream start. let streamedChatTurnId: string | null = null; + let streamBatcher: FrameBatchedUpdater | null = null; try { const backendUrl = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000"; @@ -1152,123 +1313,37 @@ export default function NewChatPage() { ) ); }; - const scheduleFlush = () => batcher.schedule(flushMessages); - // Force-flush helper: ``batcher.flush()`` is a no-op when - // ``dirty=false`` (e.g. a tool starts before any text - // streamed). ``scheduleFlush(); batcher.flush()`` sets - // the dirty bit FIRST so terminal events render - // promptly without the 50ms throttle delay. - const forceFlush = () => { - scheduleFlush(); - batcher.flush(); - }; + const { batcher, scheduleFlush, forceFlush } = createStreamFlushHelpers(flushMessages); + streamBatcher = batcher; - for await (const parsed of readSSEStream(response)) { - switch (parsed.type) { - case "text-delta": - appendText(contentPartsState, parsed.delta); - scheduleFlush(); - break; - - case "reasoning-delta": - appendReasoning(contentPartsState, parsed.delta); - scheduleFlush(); - break; - - case "reasoning-end": - endReasoning(contentPartsState); - scheduleFlush(); - break; - - case "start-step": - addStepSeparator(contentPartsState); - scheduleFlush(); - break; - - case "finish-step": - break; - - case "tool-input-start": - addToolCall( - contentPartsState, - toolsWithUI, - parsed.toolCallId, - parsed.toolName, - {}, - false, - parsed.langchainToolCallId - ); - forceFlush(); - break; - - case "tool-input-delta": - // High-frequency event: deltas can fire dozens - // of times per call, so use throttled - // scheduleFlush (NOT forceFlush) to coalesce. - appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta); - scheduleFlush(); - break; - - case "tool-input-available": { - const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2); - if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(contentPartsState, parsed.toolCallId, { - args: parsed.input || {}, - argsText: finalArgsText, - langchainToolCallId: parsed.langchainToolCallId, - }); - } else { - addToolCall( - contentPartsState, - toolsWithUI, - parsed.toolCallId, - parsed.toolName, - parsed.input || {}, - false, - parsed.langchainToolCallId - ); - // addToolCall doesn't accept argsText today; - // backfill via updateToolCall so the new card - // renders pretty-printed JSON. - updateToolCall(contentPartsState, parsed.toolCallId, { - argsText: finalArgsText, - }); - } - forceFlush(); - break; - } - - case "tool-output-available": { - updateToolCall(contentPartsState, parsed.toolCallId, { - result: parsed.output, - langchainToolCallId: parsed.langchainToolCallId, - }); - markInterruptsCompleted(contentParts); - if (parsed.output?.status === "pending" && parsed.output?.podcast_id) { - const idx = toolCallIndices.get(parsed.toolCallId); - if (idx !== undefined) { - const part = contentParts[idx]; - if (part?.type === "tool-call" && part.toolName === "generate_podcast") { - setActivePodcastTaskId(String(parsed.output.podcast_id)); + await consumeSseEvents(response, async (parsed) => { + if ( + processSharedStreamEvent(parsed, { + contentPartsState, + toolsWithUI, + currentThinkingSteps, + scheduleFlush, + forceFlush, + onTokenUsage: (data) => { + tokenUsageData = data; + tokenUsageStore.set(assistantMsgId, data); + }, + onToolOutputAvailable: (event, sharedCtx) => { + if (event.output?.status === "pending" && event.output?.podcast_id) { + const idx = sharedCtx.toolCallIndices.get(event.toolCallId); + if (idx !== undefined) { + const part = sharedCtx.contentPartsState.contentParts[idx]; + if (part?.type === "tool-call" && part.toolName === "generate_podcast") { + setActivePodcastTaskId(String(event.output.podcast_id)); + } } } - } - forceFlush(); - break; - } - - case "data-thinking-step": { - const stepData = parsed.data as ThinkingStepData; - if (stepData?.id) { - currentThinkingSteps.set(stepData.id, stepData); - const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps); - if (didUpdate) { - scheduleFlush(); - } - } - break; - } - + }, + }) + ) { + return; + } + switch (parsed.type) { case "data-thread-title-update": { const titleData = parsed.data as { threadId: number; title: string }; if (titleData?.title && titleData?.threadId === currentThreadId) { @@ -1374,16 +1449,8 @@ export default function NewChatPage() { } break; } - - case "data-token-usage": - tokenUsageData = parsed.data; - tokenUsageStore.set(assistantMsgId, parsed.data as TokenUsageData); - break; - - case "error": - throw toStreamTerminalError(parsed); } - } + }); batcher.flush(); @@ -1425,7 +1492,7 @@ export default function NewChatPage() { trackChatResponseReceived(searchSpaceId, currentThreadId); } } catch (error) { - batcher.dispose(); + streamBatcher?.dispose(); await handleStreamTerminalError({ error, flow: "new", @@ -1448,13 +1515,7 @@ export default function NewChatPage() { } } - const hasContent = contentParts.some( - (part) => - (part.type === "text" && part.text.length > 0) || - (part.type === "reasoning" && part.text.length > 0) || - (part.type === "tool-call" && - (toolsWithUI === "all" || toolsWithUI.has(part.toolName))) - ); + const hasContent = hasPersistableContent(contentParts, toolsWithUI); if (hasContent && currentThreadId) { const partialContent = buildContentForPersistence(contentPartsState, toolsWithUI); await persistAssistantTurn({ @@ -1543,7 +1604,6 @@ export default function NewChatPage() { abortControllerRef.current = controller; const currentThinkingSteps = new Map(); - const batcher = new FrameBatchedUpdater(); const contentPartsState: ContentPartsState = { contentParts: [], @@ -1552,10 +1612,11 @@ export default function NewChatPage() { toolCallIndices: new Map(), }; const { contentParts, toolCallIndices } = contentPartsState; - let tokenUsageData: Record | null = null; + let tokenUsageData: TokenUsageData | null = null; let resumeAccepted = false; // Captured from ``data-turn-info`` at stream start. let streamedChatTurnId: string | null = null; + let streamBatcher: FrameBatchedUpdater | null = null; const existingMsg = messages.find((m) => m.id === assistantMsgId); if (existingMsg && Array.isArray(existingMsg.content)) { @@ -1664,102 +1725,26 @@ export default function NewChatPage() { ) ); }; - const scheduleFlush = () => batcher.schedule(flushMessages); - const forceFlush = () => { - scheduleFlush(); - batcher.flush(); - }; + const { batcher, scheduleFlush, forceFlush } = createStreamFlushHelpers(flushMessages); + streamBatcher = batcher; - for await (const parsed of readSSEStream(response)) { + await consumeSseEvents(response, async (parsed) => { + if ( + processSharedStreamEvent(parsed, { + contentPartsState, + toolsWithUI, + currentThinkingSteps, + scheduleFlush, + forceFlush, + onTokenUsage: (data) => { + tokenUsageData = data; + tokenUsageStore.set(assistantMsgId, data); + }, + }) + ) { + return; + } switch (parsed.type) { - case "text-delta": - appendText(contentPartsState, parsed.delta); - scheduleFlush(); - break; - - case "reasoning-delta": - appendReasoning(contentPartsState, parsed.delta); - scheduleFlush(); - break; - - case "reasoning-end": - endReasoning(contentPartsState); - scheduleFlush(); - break; - - case "start-step": - addStepSeparator(contentPartsState); - scheduleFlush(); - break; - - case "finish-step": - break; - - case "tool-input-start": - addToolCall( - contentPartsState, - toolsWithUI, - parsed.toolCallId, - parsed.toolName, - {}, - false, - parsed.langchainToolCallId - ); - forceFlush(); - break; - - case "tool-input-delta": - appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta); - scheduleFlush(); - break; - - case "tool-input-available": { - const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2); - if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(contentPartsState, parsed.toolCallId, { - args: parsed.input || {}, - argsText: finalArgsText, - langchainToolCallId: parsed.langchainToolCallId, - }); - } else { - addToolCall( - contentPartsState, - toolsWithUI, - parsed.toolCallId, - parsed.toolName, - parsed.input || {}, - false, - parsed.langchainToolCallId - ); - updateToolCall(contentPartsState, parsed.toolCallId, { - argsText: finalArgsText, - }); - } - forceFlush(); - break; - } - - case "tool-output-available": - updateToolCall(contentPartsState, parsed.toolCallId, { - result: parsed.output, - langchainToolCallId: parsed.langchainToolCallId, - }); - markInterruptsCompleted(contentParts); - forceFlush(); - break; - - case "data-thinking-step": { - const stepData = parsed.data as ThinkingStepData; - if (stepData?.id) { - currentThinkingSteps.set(stepData.id, stepData); - const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps); - if (didUpdate) { - scheduleFlush(); - } - } - break; - } - case "data-interrupt-request": { const interruptData = parsed.data as Record; const actionRequests = (interruptData.action_requests ?? []) as Array<{ @@ -1830,16 +1815,8 @@ export default function NewChatPage() { } break; } - - case "data-token-usage": - tokenUsageData = parsed.data; - tokenUsageStore.set(assistantMsgId, parsed.data as TokenUsageData); - break; - - case "error": - throw toStreamTerminalError(parsed); } - } + }); batcher.flush(); @@ -1855,7 +1832,7 @@ export default function NewChatPage() { }); } } catch (error) { - batcher.dispose(); + streamBatcher?.dispose(); await handleStreamTerminalError({ error, flow: "resume", @@ -1864,13 +1841,7 @@ export default function NewChatPage() { accepted: resumeAccepted, onAbort: async () => { if (!resumeAccepted) return; - const hasContent = contentParts.some( - (part) => - (part.type === "text" && part.text.length > 0) || - (part.type === "reasoning" && part.text.length > 0) || - (part.type === "tool-call" && - (toolsWithUI === "all" || toolsWithUI.has(part.toolName))) - ); + const hasContent = hasPersistableContent(contentParts, toolsWithUI); if (!hasContent) return; const partialContent = buildContentForPersistence(contentPartsState, toolsWithUI); await persistAssistantTurn({ @@ -1891,6 +1862,7 @@ export default function NewChatPage() { pendingInterrupt, messages, searchSpaceId, + queryClient, tokenUsageStore, handleStreamTerminalError, persistAssistantTurn, @@ -2045,15 +2017,15 @@ export default function NewChatPage() { currentReasoningPartIndex: -1, toolCallIndices: new Map(), }; - const { contentParts, toolCallIndices } = contentPartsState; - const batcher = new FrameBatchedUpdater(); - let tokenUsageData: Record | null = null; + const { contentParts } = contentPartsState; + let tokenUsageData: TokenUsageData | null = null; let regenerateAccepted = false; let userPersisted = false; // Captured from ``data-turn-info`` at stream start; stamped // onto persisted messages so future edits can locate the // right LangGraph checkpoint. let streamedChatTurnId: string | null = null; + let streamBatcher: FrameBatchedUpdater | null = null; // Add placeholder messages to UI // Always add back the user message (with new query for edit, or original content for reload) @@ -2155,111 +2127,37 @@ export default function NewChatPage() { ) ); }; - const scheduleFlush = () => batcher.schedule(flushMessages); - const forceFlush = () => { - scheduleFlush(); - batcher.flush(); - }; + const { batcher, scheduleFlush, forceFlush } = createStreamFlushHelpers(flushMessages); + streamBatcher = batcher; - for await (const parsed of readSSEStream(response)) { - switch (parsed.type) { - case "text-delta": - appendText(contentPartsState, parsed.delta); - scheduleFlush(); - break; - - case "reasoning-delta": - appendReasoning(contentPartsState, parsed.delta); - scheduleFlush(); - break; - - case "reasoning-end": - endReasoning(contentPartsState); - scheduleFlush(); - break; - - case "start-step": - addStepSeparator(contentPartsState); - scheduleFlush(); - break; - - case "finish-step": - break; - - case "tool-input-start": - addToolCall( - contentPartsState, - toolsWithUI, - parsed.toolCallId, - parsed.toolName, - {}, - false, - parsed.langchainToolCallId - ); - forceFlush(); - break; - - case "tool-input-delta": - appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta); - scheduleFlush(); - break; - - case "tool-input-available": { - const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2); - if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(contentPartsState, parsed.toolCallId, { - args: parsed.input || {}, - argsText: finalArgsText, - langchainToolCallId: parsed.langchainToolCallId, - }); - } else { - addToolCall( - contentPartsState, - toolsWithUI, - parsed.toolCallId, - parsed.toolName, - parsed.input || {}, - false, - parsed.langchainToolCallId - ); - updateToolCall(contentPartsState, parsed.toolCallId, { - argsText: finalArgsText, - }); - } - forceFlush(); - break; - } - - case "tool-output-available": - updateToolCall(contentPartsState, parsed.toolCallId, { - result: parsed.output, - langchainToolCallId: parsed.langchainToolCallId, - }); - markInterruptsCompleted(contentParts); - if (parsed.output?.status === "pending" && parsed.output?.podcast_id) { - const idx = toolCallIndices.get(parsed.toolCallId); - if (idx !== undefined) { - const part = contentParts[idx]; - if (part?.type === "tool-call" && part.toolName === "generate_podcast") { - setActivePodcastTaskId(String(parsed.output.podcast_id)); + await consumeSseEvents(response, async (parsed) => { + if ( + processSharedStreamEvent(parsed, { + contentPartsState, + toolsWithUI, + currentThinkingSteps, + scheduleFlush, + forceFlush, + onTokenUsage: (data) => { + tokenUsageData = data; + tokenUsageStore.set(assistantMsgId, data); + }, + onToolOutputAvailable: (event, sharedCtx) => { + if (event.output?.status === "pending" && event.output?.podcast_id) { + const idx = sharedCtx.toolCallIndices.get(event.toolCallId); + if (idx !== undefined) { + const part = sharedCtx.contentPartsState.contentParts[idx]; + if (part?.type === "tool-call" && part.toolName === "generate_podcast") { + setActivePodcastTaskId(String(event.output.podcast_id)); + } } } - } - forceFlush(); - break; - - case "data-thinking-step": { - const stepData = parsed.data as ThinkingStepData; - if (stepData?.id) { - currentThinkingSteps.set(stepData.id, stepData); - const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps); - if (didUpdate) { - scheduleFlush(); - } - } - break; - } - + }, + }) + ) { + return; + } + switch (parsed.type) { case "data-action-log": { if (threadId !== null) { applyActionLogSse(queryClient, threadId, searchSpaceId, parsed.data); @@ -2326,16 +2224,8 @@ export default function NewChatPage() { } break; } - - case "data-token-usage": - tokenUsageData = parsed.data; - tokenUsageStore.set(assistantMsgId, parsed.data as TokenUsageData); - break; - - case "error": - throw toStreamTerminalError(parsed); } - } + }); batcher.flush(); @@ -2364,7 +2254,7 @@ export default function NewChatPage() { trackChatResponseReceived(searchSpaceId, threadId); } } catch (error) { - batcher.dispose(); + streamBatcher?.dispose(); await handleStreamTerminalError({ error, flow: "regenerate", @@ -2384,13 +2274,7 @@ export default function NewChatPage() { }); userPersisted = Boolean(persistedUserMsgId); } - const hasContent = contentParts.some( - (part) => - (part.type === "text" && part.text.length > 0) || - (part.type === "reasoning" && part.text.length > 0) || - (part.type === "tool-call" && - (toolsWithUI === "all" || toolsWithUI.has(part.toolName))) - ); + const hasContent = hasPersistableContent(contentParts, toolsWithUI); if (!hasContent) return; const partialContent = buildContentForPersistence(contentPartsState, toolsWithUI); await persistAssistantTurn({ @@ -2428,6 +2312,7 @@ export default function NewChatPage() { disabledTools, messageDocumentsMap, setMessageDocumentsMap, + queryClient, tokenUsageStore, handleStreamTerminalError, persistAssistantTurn,