diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index 2db00a03d..448148229 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -12,6 +12,7 @@ import { useParams, useSearchParams } from "next/navigation"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; import { z } from "zod"; +import { addToolCall, appendText, buildContentForPersistence, buildContentForUI, type ContentPart, type ContentPartsState, readSSEStream, type ThinkingStepData, updateToolCall } from "@/lib/chat/streaming-state"; import { clearTargetCommentIdAtom, currentThreadAtom, @@ -125,15 +126,6 @@ const TOOLS_WITH_UI = new Set([ // "write_todos", // Disabled for now ]); -/** - * Type for thinking step data from the backend - */ -interface ThinkingStepData { - id: string; - title: string; - status: "pending" | "in_progress" | "completed"; - items: string[]; -} export default function NewChatPage() { const params = useParams(); @@ -540,102 +532,11 @@ export default function NewChatPage() { const assistantMsgId = `msg-assistant-${Date.now()}`; const currentThinkingSteps = new Map(); - // Ordered content parts to preserve inline tool call positions - // Each part is either a text segment or a tool call - type ContentPart = - | { type: "text"; text: string } - | { - type: "tool-call"; - toolCallId: string; - toolName: string; - args: Record; - result?: unknown; - }; - const contentParts: ContentPart[] = []; + const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() }; + const { contentParts, toolCallIndices } = contentPartsState; let wasInterrupted = false; - // Track the current text segment index (for appending text deltas) - let currentTextPartIndex = -1; - // Map to track tool call indices for updating results - const toolCallIndices = new Map(); - - // Helper to get or create the current text part for appending text - const appendText = (delta: string) => { - if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") { - // Append to existing text part - (contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta; - } else { - // Create new text part - contentParts.push({ type: "text", text: delta }); - currentTextPartIndex = contentParts.length - 1; - } - }; - - // Helper to add a tool call (this "breaks" the current text segment) - const addToolCall = (toolCallId: string, toolName: string, args: Record) => { - if (TOOLS_WITH_UI.has(toolName)) { - contentParts.push({ - type: "tool-call", - toolCallId, - toolName, - args, - }); - toolCallIndices.set(toolCallId, contentParts.length - 1); - // Reset text part index so next text creates a new segment - currentTextPartIndex = -1; - } - }; - - // Helper to update a tool call's args or result - const updateToolCall = ( - toolCallId: string, - update: { args?: Record; result?: unknown } - ) => { - const index = toolCallIndices.get(toolCallId); - if (index !== undefined && contentParts[index]?.type === "tool-call") { - const tc = contentParts[index] as ContentPart & { type: "tool-call" }; - if (update.args) tc.args = update.args; - if (update.result !== undefined) tc.result = update.result; - } - }; - - // Helper to build content for UI (without thinking-steps to avoid assistant-ui errors) - const buildContentForUI = (): ThreadMessageLike["content"] => { - // Filter to only include text parts with content and tool-calls with UI - const filtered = contentParts.filter((part) => { - if (part.type === "text") return part.text.length > 0; - if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName); - return false; - }); - return filtered.length > 0 - ? (filtered as ThreadMessageLike["content"]) - : [{ type: "text", text: "" }]; - }; - - // Helper to build content for persistence (includes thinking-steps for restoration) - const buildContentForPersistence = (): unknown[] => { - const parts: unknown[] = []; - - // Include thinking steps for persistence - if (currentThinkingSteps.size > 0) { - parts.push({ - type: "thinking-steps", - steps: Array.from(currentThinkingSteps.values()), - }); - } - - // Add content parts (filtered) - for (const part of contentParts) { - if (part.type === "text" && part.text.length > 0) { - parts.push(part); - } else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) { - parts.push(part); - } - } - - return parts.length > 0 ? parts : [{ type: "text", text: "" }]; - }; // Add placeholder assistant message setMessages((prev) => [ @@ -701,50 +602,23 @@ export default function NewChatPage() { throw new Error(`Backend error: ${response.status}`); } - if (!response.body) { - throw new Error("No response body"); - } - - // Parse SSE stream - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const events = buffer.split(/\r?\n\r?\n/); - buffer = events.pop() || ""; - - for (const event of events) { - const lines = event.split(/\r?\n/); - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - const data = line.slice(6).trim(); - if (!data || data === "[DONE]") continue; - - try { - const parsed = JSON.parse(data); - + for await (const parsed of readSSEStream(response)) { switch (parsed.type) { case "text-delta": - appendText(parsed.delta); + appendText(contentPartsState, parsed.delta); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-start": // Add tool call inline - this breaks the current text segment - addToolCall(parsed.toolCallId, parsed.toolName, {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {}); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -752,13 +626,13 @@ export default function NewChatPage() { case "tool-input-available": { // Update existing tool call's args, or add if not exists if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(parsed.toolCallId, { args: parsed.input || {} }); + updateToolCall(contentPartsState, parsed.toolCallId, { args: parsed.input || {} }); } else { - addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {}); } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -766,7 +640,7 @@ export default function NewChatPage() { case "tool-output-available": { // Update the tool call with its result - updateToolCall(parsed.toolCallId, { result: parsed.output }); + updateToolCall(contentPartsState, parsed.toolCallId, { result: parsed.output }); // Handle podcast-specific logic if (parsed.output?.status === "pending" && parsed.output?.podcast_id) { // Check if this is a podcast tool by looking at the content part @@ -780,7 +654,7 @@ export default function NewChatPage() { } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -843,20 +717,20 @@ export default function NewChatPage() { } ); if (existingIdx) { - updateToolCall(existingIdx[0], { + updateToolCall(contentPartsState, existingIdx[0], { result: { __interrupt__: true, ...interruptData }, }); } else { const tcId = `interrupt-${action.name}`; - addToolCall(tcId, action.name, action.args); - updateToolCall(tcId, { + addToolCall(contentPartsState, TOOLS_WITH_UI, tcId, action.name, action.args); + updateToolCall(contentPartsState, tcId, { result: { __interrupt__: true, ...interruptData }, }); } } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); if (currentThreadId) { @@ -872,20 +746,11 @@ export default function NewChatPage() { case "error": throw new Error(parsed.errorText || "Server error"); } - } catch (e) { - if (e instanceof SyntaxError) continue; - throw e; - } - } - } - } - } finally { - reader.releaseLock(); } // Persist assistant message (with thinking steps for restoration on refresh) // Skip persistence for interrupted messages -- handleResume will persist the final version - const finalContent = buildContentForPersistence(); + const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps); if (contentParts.length > 0 && !wasInterrupted) { try { const savedMessage = await appendMessage(currentThreadId, { @@ -933,7 +798,7 @@ export default function NewChatPage() { (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) ); if (hasContent && currentThreadId) { - const partialContent = buildContentForPersistence(); + const partialContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps); try { const savedMessage = await appendMessage(currentThreadId, { role: "assistant", @@ -1019,18 +884,8 @@ export default function NewChatPage() { (messageThinkingSteps.get(assistantMsgId) ?? []).map((s) => [s.id, s]) ); - type ContentPart = - | { type: "text"; text: string } - | { - type: "tool-call"; - toolCallId: string; - toolName: string; - args: Record; - result?: unknown; - }; - const contentParts: ContentPart[] = []; - let currentTextPartIndex = -1; - const toolCallIndices = new Map(); + const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() }; + const { contentParts, toolCallIndices } = contentPartsState; const existingMsg = messages.find((m) => m.id === assistantMsgId); if (existingMsg && Array.isArray(existingMsg.content)) { @@ -1039,7 +894,7 @@ export default function NewChatPage() { const p = part as Record; if (p.type === "text") { contentParts.push({ type: "text", text: String(p.text ?? "") }); - currentTextPartIndex = contentParts.length - 1; + contentPartsState.currentTextPartIndex = contentParts.length - 1; } else if (p.type === "tool-call") { toolCallIndices.set(String(p.toolCallId), contentParts.length); contentParts.push({ @@ -1049,7 +904,7 @@ export default function NewChatPage() { args: (p.args as Record) ?? {}, result: p.result as unknown, }); - currentTextPartIndex = -1; + contentPartsState.currentTextPartIndex = -1; } } } @@ -1072,68 +927,7 @@ export default function NewChatPage() { } } - const appendText = (delta: string) => { - if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") { - (contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta; - } else { - contentParts.push({ type: "text", text: delta }); - currentTextPartIndex = contentParts.length - 1; - } - }; - const addToolCall = (toolCallId: string, toolName: string, args: Record) => { - if (TOOLS_WITH_UI.has(toolName)) { - contentParts.push({ - type: "tool-call", - toolCallId, - toolName, - args, - }); - toolCallIndices.set(toolCallId, contentParts.length - 1); - currentTextPartIndex = -1; - } - }; - - const updateToolCall = ( - toolCallId: string, - update: { args?: Record; result?: unknown } - ) => { - const index = toolCallIndices.get(toolCallId); - if (index !== undefined && contentParts[index]?.type === "tool-call") { - const tc = contentParts[index] as ContentPart & { type: "tool-call" }; - if (update.args) tc.args = update.args; - if (update.result !== undefined) tc.result = update.result; - } - }; - - const buildContentForUI = (): ThreadMessageLike["content"] => { - const filtered = contentParts.filter((part) => { - if (part.type === "text") return part.text.length > 0; - if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName); - return false; - }); - return filtered.length > 0 - ? (filtered as ThreadMessageLike["content"]) - : [{ type: "text", text: "" }]; - }; - - const buildContentForPersistence = (): unknown[] => { - const parts: unknown[] = []; - if (currentThinkingSteps.size > 0) { - parts.push({ - type: "thinking-steps", - steps: Array.from(currentThinkingSteps.values()), - }); - } - for (const part of contentParts) { - if (part.type === "text" && part.text.length > 0) { - parts.push(part); - } else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) { - parts.push(part); - } - } - return parts.length > 0 ? parts : [{ type: "text", text: "" }]; - }; try { const backendUrl = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000"; @@ -1154,74 +948,48 @@ export default function NewChatPage() { throw new Error(`Backend error: ${response.status}`); } - if (!response.body) { - throw new Error("No response body"); - } - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const events = buffer.split(/\r?\n\r?\n/); - buffer = events.pop() || ""; - - for (const event of events) { - const lines = event.split(/\r?\n/); - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - const data = line.slice(6).trim(); - if (!data || data === "[DONE]") continue; - - try { - const parsed = JSON.parse(data); - + for await (const parsed of readSSEStream(response)) { switch (parsed.type) { case "text-delta": - appendText(parsed.delta); + appendText(contentPartsState, parsed.delta); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-start": - addToolCall(parsed.toolCallId, parsed.toolName, {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {}); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-available": if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(parsed.toolCallId, { + updateToolCall(contentPartsState, parsed.toolCallId, { args: parsed.input || {}, }); } else { - addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {}); } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-output-available": - updateToolCall(parsed.toolCallId, { + updateToolCall(contentPartsState, parsed.toolCallId, { result: parsed.output, }); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -1253,7 +1021,7 @@ export default function NewChatPage() { } ); if (existingIdx) { - updateToolCall(existingIdx[0], { + updateToolCall(contentPartsState, existingIdx[0], { result: { __interrupt__: true, ...interruptData, @@ -1261,8 +1029,8 @@ export default function NewChatPage() { }); } else { const tcId = `interrupt-${action.name}`; - addToolCall(tcId, action.name, action.args); - updateToolCall(tcId, { + addToolCall(contentPartsState, TOOLS_WITH_UI, tcId, action.name, action.args); + updateToolCall(contentPartsState, tcId, { result: { __interrupt__: true, ...interruptData, @@ -1272,7 +1040,7 @@ export default function NewChatPage() { } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); setPendingInterrupt({ @@ -1285,19 +1053,10 @@ export default function NewChatPage() { case "error": throw new Error(parsed.errorText || "Server error"); - } - } catch (e) { - if (e instanceof SyntaxError) continue; - throw e; } - } - } - } - } finally { - reader.releaseLock(); } - const finalContent = buildContentForPersistence(); + const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps); if (contentParts.length > 0) { try { const savedMessage = await appendMessage(resumeThreadId, { @@ -1456,77 +1215,10 @@ export default function NewChatPage() { const assistantMsgId = `msg-assistant-${Date.now()}`; const currentThinkingSteps = new Map(); - // Content parts tracking (same as onNew) - type ContentPart = - | { type: "text"; text: string } - | { - type: "tool-call"; - toolCallId: string; - toolName: string; - args: Record; - result?: unknown; - }; - const contentParts: ContentPart[] = []; - let currentTextPartIndex = -1; - const toolCallIndices = new Map(); + const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() }; + const { contentParts, toolCallIndices } = contentPartsState; - const appendText = (delta: string) => { - if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") { - (contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta; - } else { - contentParts.push({ type: "text", text: delta }); - currentTextPartIndex = contentParts.length - 1; - } - }; - const addToolCall = (toolCallId: string, toolName: string, args: Record) => { - if (TOOLS_WITH_UI.has(toolName)) { - contentParts.push({ type: "tool-call", toolCallId, toolName, args }); - toolCallIndices.set(toolCallId, contentParts.length - 1); - currentTextPartIndex = -1; - } - }; - - const updateToolCall = ( - toolCallId: string, - update: { args?: Record; result?: unknown } - ) => { - const index = toolCallIndices.get(toolCallId); - if (index !== undefined && contentParts[index]?.type === "tool-call") { - const tc = contentParts[index] as ContentPart & { type: "tool-call" }; - if (update.args) tc.args = update.args; - if (update.result !== undefined) tc.result = update.result; - } - }; - - const buildContentForUI = (): ThreadMessageLike["content"] => { - const filtered = contentParts.filter((part) => { - if (part.type === "text") return part.text.length > 0; - if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName); - return false; - }); - return filtered.length > 0 - ? (filtered as ThreadMessageLike["content"]) - : [{ type: "text", text: "" }]; - }; - - const buildContentForPersistence = (): unknown[] => { - const parts: unknown[] = []; - if (currentThinkingSteps.size > 0) { - parts.push({ - type: "thinking-steps", - steps: Array.from(currentThinkingSteps.values()), - }); - } - for (const part of contentParts) { - if (part.type === "text" && part.text.length > 0) { - parts.push(part); - } else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) { - parts.push(part); - } - } - return parts.length > 0 ? parts : [{ type: "text", text: "" }]; - }; // Add placeholder messages to UI // Always add back the user message (with new query for edit, or original content for reload) @@ -1570,68 +1262,41 @@ export default function NewChatPage() { throw new Error(`Backend error: ${response.status}`); } - if (!response.body) { - throw new Error("No response body"); - } - - // Parse SSE stream (same logic as onNew) - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const events = buffer.split(/\r?\n\r?\n/); - buffer = events.pop() || ""; - - for (const event of events) { - const lines = event.split(/\r?\n/); - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - const data = line.slice(6).trim(); - if (!data || data === "[DONE]") continue; - - try { - const parsed = JSON.parse(data); - + for await (const parsed of readSSEStream(response)) { switch (parsed.type) { case "text-delta": - appendText(parsed.delta); + appendText(contentPartsState, parsed.delta); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-start": - addToolCall(parsed.toolCallId, parsed.toolName, {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {}); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-available": if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(parsed.toolCallId, { args: parsed.input || {} }); + updateToolCall(contentPartsState, parsed.toolCallId, { args: parsed.input || {} }); } else { - addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {}); } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-output-available": - updateToolCall(parsed.toolCallId, { result: parsed.output }); + updateToolCall(contentPartsState, parsed.toolCallId, { result: parsed.output }); if (parsed.output?.status === "pending" && parsed.output?.podcast_id) { const idx = toolCallIndices.get(parsed.toolCallId); if (idx !== undefined) { @@ -1643,7 +1308,7 @@ export default function NewChatPage() { } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -1663,20 +1328,11 @@ export default function NewChatPage() { case "error": throw new Error(parsed.errorText || "Server error"); - } - } catch (e) { - if (e instanceof SyntaxError) continue; - throw e; } - } - } - } - } finally { - reader.releaseLock(); } // Persist messages after streaming completes - const finalContent = buildContentForPersistence(); + const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps); if (contentParts.length > 0) { try { // Persist user message (for both edit and reload modes, since backend deleted it) diff --git a/surfsense_web/lib/chat/streaming-state.ts b/surfsense_web/lib/chat/streaming-state.ts new file mode 100644 index 000000000..7165cd9f3 --- /dev/null +++ b/surfsense_web/lib/chat/streaming-state.ts @@ -0,0 +1,184 @@ +import type { ThreadMessageLike } from "@assistant-ui/react"; + +/** + * Extracted from page.tsx lines 131-136. + * Used across onNew, handleResume, and handleRegenerate. + */ +export interface ThinkingStepData { + id: string; + title: string; + status: "pending" | "in_progress" | "completed"; + items: string[]; +} + +/** + * Extracted from page.tsx lines 537-545. + * Duplicated in onNew, handleResume, and handleRegenerate. + */ +export type ContentPart = + | { type: "text"; text: string } + | { + type: "tool-call"; + toolCallId: string; + toolName: string; + args: Record; + result?: unknown; + }; + +/** + * Mutable state shared by the content-part helpers (appendText, addToolCall, etc.). + * All handlers create this same set of variables -- this groups them into one object + * so helpers can read/write them by reference. + */ +export interface ContentPartsState { + contentParts: ContentPart[]; + currentTextPartIndex: number; + toolCallIndices: Map; +} + +/** + * Extracted from page.tsx lines 556-573 (onNew). + * Identical in handleResume (lines 1057-1064) and handleRegenerate (lines 1445-1452). + */ +export function appendText(state: ContentPartsState, delta: string): void { + if (state.currentTextPartIndex >= 0 && state.contentParts[state.currentTextPartIndex]?.type === "text") { + (state.contentParts[state.currentTextPartIndex] as { type: "text"; text: string }).text += delta; + } else { + state.contentParts.push({ type: "text", text: delta }); + state.currentTextPartIndex = state.contentParts.length - 1; + } +} + +/** + * Extracted from page.tsx line 540 (onNew). + * Identical in handleResume (line 1029) and handleRegenerate (line 1407). + */ +export function addToolCall( + state: ContentPartsState, + toolsWithUI: Set, + toolCallId: string, + toolName: string, + args: Record +): void { + if (toolsWithUI.has(toolName)) { + state.contentParts.push({ + type: "tool-call", + toolCallId, + toolName, + args, + }); + state.toolCallIndices.set(toolCallId, state.contentParts.length - 1); + state.currentTextPartIndex = -1; + } +} + +/** + * Extracted from page.tsx line 540 (onNew). + * Identical in handleResume (line 1027) and handleRegenerate (line 1387). + */ +export function updateToolCall( + state: ContentPartsState, + toolCallId: string, + update: { args?: Record; result?: unknown } +): void { + const index = state.toolCallIndices.get(toolCallId); + if (index !== undefined && state.contentParts[index]?.type === "tool-call") { + const tc = state.contentParts[index] as ContentPart & { type: "tool-call" }; + if (update.args) tc.args = update.args; + if (update.result !== undefined) tc.result = update.result; + } +} + +/** + * Extracted from page.tsx line 539 (onNew). + * Identical in handleResume and handleRegenerate. + */ +export function buildContentForUI( + state: ContentPartsState, + toolsWithUI: Set +): ThreadMessageLike["content"] { + const filtered = state.contentParts.filter((part) => { + if (part.type === "text") return part.text.length > 0; + if (part.type === "tool-call") return toolsWithUI.has(part.toolName); + return false; + }); + return filtered.length > 0 + ? (filtered as ThreadMessageLike["content"]) + : [{ type: "text", text: "" }]; +} + +/** + * Extracted from page.tsx line 553 (onNew). + * Identical in handleResume and handleRegenerate. + */ +export function buildContentForPersistence( + state: ContentPartsState, + toolsWithUI: Set, + currentThinkingSteps: Map +): unknown[] { + const parts: unknown[] = []; + + if (currentThinkingSteps.size > 0) { + parts.push({ + type: "thinking-steps", + steps: Array.from(currentThinkingSteps.values()), + }); + } + + for (const part of state.contentParts) { + if (part.type === "text" && part.text.length > 0) { + parts.push(part); + } else if (part.type === "tool-call" && toolsWithUI.has(part.toolName)) { + parts.push(part); + } + } + + return parts.length > 0 ? parts : [{ type: "text", text: "" }]; +} + +/** + * Async generator that reads an SSE stream and yields parsed JSON objects. + * Handles buffering, event splitting, and skips malformed JSON / [DONE] lines. + * + * Extracted from the identical SSE reading boilerplate in onNew, handleResume, + * and handleRegenerate. + */ +// biome-ignore lint/suspicious/noExplicitAny: matches JSON.parse return type +export async function* readSSEStream(response: Response): AsyncGenerator { + if (!response.body) { + throw new Error("No response body"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const events = buffer.split(/\r?\n\r?\n/); + buffer = events.pop() || ""; + + for (const event of events) { + const lines = event.split(/\r?\n/); + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + const data = line.slice(6).trim(); + if (!data || data === "[DONE]") continue; + + try { + yield JSON.parse(data); + } catch (e) { + if (e instanceof SyntaxError) continue; + throw e; + } + } + } + } + } finally { + reader.releaseLock(); + } +}