extract shared streaming helpers from page.tsx into streaming-state.ts

Move duplicated types (ThinkingStepData, ContentPart), content-part helpers
(appendText, addToolCall, updateToolCall, buildContentForUI,
buildContentForPersistence), and SSE read loop (readSSEStream) into a
shared module. Removes ~395 lines of tripled code from page.tsx.
This commit is contained in:
CREDO23 2026-02-11 13:50:46 +02:00
parent 8f81c9859d
commit 5d1c386105
2 changed files with 235 additions and 395 deletions

View file

@ -12,6 +12,7 @@ import { useParams, useSearchParams } from "next/navigation";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner";
import { z } from "zod";
import { addToolCall, appendText, buildContentForPersistence, buildContentForUI, type ContentPart, type ContentPartsState, readSSEStream, type ThinkingStepData, updateToolCall } from "@/lib/chat/streaming-state";
import {
clearTargetCommentIdAtom,
currentThreadAtom,
@ -125,15 +126,6 @@ const TOOLS_WITH_UI = new Set([
// "write_todos", // Disabled for now
]);
/**
* Type for thinking step data from the backend
*/
interface ThinkingStepData {
id: string;
title: string;
status: "pending" | "in_progress" | "completed";
items: string[];
}
export default function NewChatPage() {
const params = useParams();
@ -540,102 +532,11 @@ export default function NewChatPage() {
const assistantMsgId = `msg-assistant-${Date.now()}`;
const currentThinkingSteps = new Map<string, ThinkingStepData>();
// Ordered content parts to preserve inline tool call positions
// Each part is either a text segment or a tool call
type ContentPart =
| { type: "text"; text: string }
| {
type: "tool-call";
toolCallId: string;
toolName: string;
args: Record<string, unknown>;
result?: unknown;
};
const contentParts: ContentPart[] = [];
const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() };
const { contentParts, toolCallIndices } = contentPartsState;
let wasInterrupted = false;
// Track the current text segment index (for appending text deltas)
let currentTextPartIndex = -1;
// Map to track tool call indices for updating results
const toolCallIndices = new Map<string, number>();
// Helper to get or create the current text part for appending text
const appendText = (delta: string) => {
if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") {
// Append to existing text part
(contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta;
} else {
// Create new text part
contentParts.push({ type: "text", text: delta });
currentTextPartIndex = contentParts.length - 1;
}
};
// Helper to add a tool call (this "breaks" the current text segment)
const addToolCall = (toolCallId: string, toolName: string, args: Record<string, unknown>) => {
if (TOOLS_WITH_UI.has(toolName)) {
contentParts.push({
type: "tool-call",
toolCallId,
toolName,
args,
});
toolCallIndices.set(toolCallId, contentParts.length - 1);
// Reset text part index so next text creates a new segment
currentTextPartIndex = -1;
}
};
// Helper to update a tool call's args or result
const updateToolCall = (
toolCallId: string,
update: { args?: Record<string, unknown>; result?: unknown }
) => {
const index = toolCallIndices.get(toolCallId);
if (index !== undefined && contentParts[index]?.type === "tool-call") {
const tc = contentParts[index] as ContentPart & { type: "tool-call" };
if (update.args) tc.args = update.args;
if (update.result !== undefined) tc.result = update.result;
}
};
// Helper to build content for UI (without thinking-steps to avoid assistant-ui errors)
const buildContentForUI = (): ThreadMessageLike["content"] => {
// Filter to only include text parts with content and tool-calls with UI
const filtered = contentParts.filter((part) => {
if (part.type === "text") return part.text.length > 0;
if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName);
return false;
});
return filtered.length > 0
? (filtered as ThreadMessageLike["content"])
: [{ type: "text", text: "" }];
};
// Helper to build content for persistence (includes thinking-steps for restoration)
const buildContentForPersistence = (): unknown[] => {
const parts: unknown[] = [];
// Include thinking steps for persistence
if (currentThinkingSteps.size > 0) {
parts.push({
type: "thinking-steps",
steps: Array.from(currentThinkingSteps.values()),
});
}
// Add content parts (filtered)
for (const part of contentParts) {
if (part.type === "text" && part.text.length > 0) {
parts.push(part);
} else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) {
parts.push(part);
}
}
return parts.length > 0 ? parts : [{ type: "text", text: "" }];
};
// Add placeholder assistant message
setMessages((prev) => [
@ -701,50 +602,23 @@ export default function NewChatPage() {
throw new Error(`Backend error: ${response.status}`);
}
if (!response.body) {
throw new Error("No response body");
}
// Parse SSE stream
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split(/\r?\n\r?\n/);
buffer = events.pop() || "";
for (const event of events) {
const lines = event.split(/\r?\n/);
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (!data || data === "[DONE]") continue;
try {
const parsed = JSON.parse(data);
for await (const parsed of readSSEStream(response)) {
switch (parsed.type) {
case "text-delta":
appendText(parsed.delta);
appendText(contentPartsState, parsed.delta);
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
case "tool-input-start":
// Add tool call inline - this breaks the current text segment
addToolCall(parsed.toolCallId, parsed.toolName, {});
addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {});
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
@ -752,13 +626,13 @@ export default function NewChatPage() {
case "tool-input-available": {
// Update existing tool call's args, or add if not exists
if (toolCallIndices.has(parsed.toolCallId)) {
updateToolCall(parsed.toolCallId, { args: parsed.input || {} });
updateToolCall(contentPartsState, parsed.toolCallId, { args: parsed.input || {} });
} else {
addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {});
addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {});
}
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
@ -766,7 +640,7 @@ export default function NewChatPage() {
case "tool-output-available": {
// Update the tool call with its result
updateToolCall(parsed.toolCallId, { result: parsed.output });
updateToolCall(contentPartsState, parsed.toolCallId, { result: parsed.output });
// Handle podcast-specific logic
if (parsed.output?.status === "pending" && parsed.output?.podcast_id) {
// Check if this is a podcast tool by looking at the content part
@ -780,7 +654,7 @@ export default function NewChatPage() {
}
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
@ -843,20 +717,20 @@ export default function NewChatPage() {
}
);
if (existingIdx) {
updateToolCall(existingIdx[0], {
updateToolCall(contentPartsState, existingIdx[0], {
result: { __interrupt__: true, ...interruptData },
});
} else {
const tcId = `interrupt-${action.name}`;
addToolCall(tcId, action.name, action.args);
updateToolCall(tcId, {
addToolCall(contentPartsState, TOOLS_WITH_UI, tcId, action.name, action.args);
updateToolCall(contentPartsState, tcId, {
result: { __interrupt__: true, ...interruptData },
});
}
}
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
if (currentThreadId) {
@ -872,20 +746,11 @@ export default function NewChatPage() {
case "error":
throw new Error(parsed.errorText || "Server error");
}
} catch (e) {
if (e instanceof SyntaxError) continue;
throw e;
}
}
}
}
} finally {
reader.releaseLock();
}
// Persist assistant message (with thinking steps for restoration on refresh)
// Skip persistence for interrupted messages -- handleResume will persist the final version
const finalContent = buildContentForPersistence();
const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps);
if (contentParts.length > 0 && !wasInterrupted) {
try {
const savedMessage = await appendMessage(currentThreadId, {
@ -933,7 +798,7 @@ export default function NewChatPage() {
(part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName))
);
if (hasContent && currentThreadId) {
const partialContent = buildContentForPersistence();
const partialContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps);
try {
const savedMessage = await appendMessage(currentThreadId, {
role: "assistant",
@ -1019,18 +884,8 @@ export default function NewChatPage() {
(messageThinkingSteps.get(assistantMsgId) ?? []).map((s) => [s.id, s])
);
type ContentPart =
| { type: "text"; text: string }
| {
type: "tool-call";
toolCallId: string;
toolName: string;
args: Record<string, unknown>;
result?: unknown;
};
const contentParts: ContentPart[] = [];
let currentTextPartIndex = -1;
const toolCallIndices = new Map<string, number>();
const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() };
const { contentParts, toolCallIndices } = contentPartsState;
const existingMsg = messages.find((m) => m.id === assistantMsgId);
if (existingMsg && Array.isArray(existingMsg.content)) {
@ -1039,7 +894,7 @@ export default function NewChatPage() {
const p = part as Record<string, unknown>;
if (p.type === "text") {
contentParts.push({ type: "text", text: String(p.text ?? "") });
currentTextPartIndex = contentParts.length - 1;
contentPartsState.currentTextPartIndex = contentParts.length - 1;
} else if (p.type === "tool-call") {
toolCallIndices.set(String(p.toolCallId), contentParts.length);
contentParts.push({
@ -1049,7 +904,7 @@ export default function NewChatPage() {
args: (p.args as Record<string, unknown>) ?? {},
result: p.result as unknown,
});
currentTextPartIndex = -1;
contentPartsState.currentTextPartIndex = -1;
}
}
}
@ -1072,68 +927,7 @@ export default function NewChatPage() {
}
}
const appendText = (delta: string) => {
if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") {
(contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta;
} else {
contentParts.push({ type: "text", text: delta });
currentTextPartIndex = contentParts.length - 1;
}
};
const addToolCall = (toolCallId: string, toolName: string, args: Record<string, unknown>) => {
if (TOOLS_WITH_UI.has(toolName)) {
contentParts.push({
type: "tool-call",
toolCallId,
toolName,
args,
});
toolCallIndices.set(toolCallId, contentParts.length - 1);
currentTextPartIndex = -1;
}
};
const updateToolCall = (
toolCallId: string,
update: { args?: Record<string, unknown>; result?: unknown }
) => {
const index = toolCallIndices.get(toolCallId);
if (index !== undefined && contentParts[index]?.type === "tool-call") {
const tc = contentParts[index] as ContentPart & { type: "tool-call" };
if (update.args) tc.args = update.args;
if (update.result !== undefined) tc.result = update.result;
}
};
const buildContentForUI = (): ThreadMessageLike["content"] => {
const filtered = contentParts.filter((part) => {
if (part.type === "text") return part.text.length > 0;
if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName);
return false;
});
return filtered.length > 0
? (filtered as ThreadMessageLike["content"])
: [{ type: "text", text: "" }];
};
const buildContentForPersistence = (): unknown[] => {
const parts: unknown[] = [];
if (currentThinkingSteps.size > 0) {
parts.push({
type: "thinking-steps",
steps: Array.from(currentThinkingSteps.values()),
});
}
for (const part of contentParts) {
if (part.type === "text" && part.text.length > 0) {
parts.push(part);
} else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) {
parts.push(part);
}
}
return parts.length > 0 ? parts : [{ type: "text", text: "" }];
};
try {
const backendUrl = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000";
@ -1154,74 +948,48 @@ export default function NewChatPage() {
throw new Error(`Backend error: ${response.status}`);
}
if (!response.body) {
throw new Error("No response body");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split(/\r?\n\r?\n/);
buffer = events.pop() || "";
for (const event of events) {
const lines = event.split(/\r?\n/);
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (!data || data === "[DONE]") continue;
try {
const parsed = JSON.parse(data);
for await (const parsed of readSSEStream(response)) {
switch (parsed.type) {
case "text-delta":
appendText(parsed.delta);
appendText(contentPartsState, parsed.delta);
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
case "tool-input-start":
addToolCall(parsed.toolCallId, parsed.toolName, {});
addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {});
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
case "tool-input-available":
if (toolCallIndices.has(parsed.toolCallId)) {
updateToolCall(parsed.toolCallId, {
updateToolCall(contentPartsState, parsed.toolCallId, {
args: parsed.input || {},
});
} else {
addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {});
addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {});
}
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
case "tool-output-available":
updateToolCall(parsed.toolCallId, {
updateToolCall(contentPartsState, parsed.toolCallId, {
result: parsed.output,
});
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
@ -1253,7 +1021,7 @@ export default function NewChatPage() {
}
);
if (existingIdx) {
updateToolCall(existingIdx[0], {
updateToolCall(contentPartsState, existingIdx[0], {
result: {
__interrupt__: true,
...interruptData,
@ -1261,8 +1029,8 @@ export default function NewChatPage() {
});
} else {
const tcId = `interrupt-${action.name}`;
addToolCall(tcId, action.name, action.args);
updateToolCall(tcId, {
addToolCall(contentPartsState, TOOLS_WITH_UI, tcId, action.name, action.args);
updateToolCall(contentPartsState, tcId, {
result: {
__interrupt__: true,
...interruptData,
@ -1272,7 +1040,7 @@ export default function NewChatPage() {
}
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
setPendingInterrupt({
@ -1285,19 +1053,10 @@ export default function NewChatPage() {
case "error":
throw new Error(parsed.errorText || "Server error");
}
} catch (e) {
if (e instanceof SyntaxError) continue;
throw e;
}
}
}
}
} finally {
reader.releaseLock();
}
const finalContent = buildContentForPersistence();
const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps);
if (contentParts.length > 0) {
try {
const savedMessage = await appendMessage(resumeThreadId, {
@ -1456,77 +1215,10 @@ export default function NewChatPage() {
const assistantMsgId = `msg-assistant-${Date.now()}`;
const currentThinkingSteps = new Map<string, ThinkingStepData>();
// Content parts tracking (same as onNew)
type ContentPart =
| { type: "text"; text: string }
| {
type: "tool-call";
toolCallId: string;
toolName: string;
args: Record<string, unknown>;
result?: unknown;
};
const contentParts: ContentPart[] = [];
let currentTextPartIndex = -1;
const toolCallIndices = new Map<string, number>();
const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() };
const { contentParts, toolCallIndices } = contentPartsState;
const appendText = (delta: string) => {
if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") {
(contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta;
} else {
contentParts.push({ type: "text", text: delta });
currentTextPartIndex = contentParts.length - 1;
}
};
const addToolCall = (toolCallId: string, toolName: string, args: Record<string, unknown>) => {
if (TOOLS_WITH_UI.has(toolName)) {
contentParts.push({ type: "tool-call", toolCallId, toolName, args });
toolCallIndices.set(toolCallId, contentParts.length - 1);
currentTextPartIndex = -1;
}
};
const updateToolCall = (
toolCallId: string,
update: { args?: Record<string, unknown>; result?: unknown }
) => {
const index = toolCallIndices.get(toolCallId);
if (index !== undefined && contentParts[index]?.type === "tool-call") {
const tc = contentParts[index] as ContentPart & { type: "tool-call" };
if (update.args) tc.args = update.args;
if (update.result !== undefined) tc.result = update.result;
}
};
const buildContentForUI = (): ThreadMessageLike["content"] => {
const filtered = contentParts.filter((part) => {
if (part.type === "text") return part.text.length > 0;
if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName);
return false;
});
return filtered.length > 0
? (filtered as ThreadMessageLike["content"])
: [{ type: "text", text: "" }];
};
const buildContentForPersistence = (): unknown[] => {
const parts: unknown[] = [];
if (currentThinkingSteps.size > 0) {
parts.push({
type: "thinking-steps",
steps: Array.from(currentThinkingSteps.values()),
});
}
for (const part of contentParts) {
if (part.type === "text" && part.text.length > 0) {
parts.push(part);
} else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) {
parts.push(part);
}
}
return parts.length > 0 ? parts : [{ type: "text", text: "" }];
};
// Add placeholder messages to UI
// Always add back the user message (with new query for edit, or original content for reload)
@ -1570,68 +1262,41 @@ export default function NewChatPage() {
throw new Error(`Backend error: ${response.status}`);
}
if (!response.body) {
throw new Error("No response body");
}
// Parse SSE stream (same logic as onNew)
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split(/\r?\n\r?\n/);
buffer = events.pop() || "";
for (const event of events) {
const lines = event.split(/\r?\n/);
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (!data || data === "[DONE]") continue;
try {
const parsed = JSON.parse(data);
for await (const parsed of readSSEStream(response)) {
switch (parsed.type) {
case "text-delta":
appendText(parsed.delta);
appendText(contentPartsState, parsed.delta);
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
case "tool-input-start":
addToolCall(parsed.toolCallId, parsed.toolName, {});
addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {});
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
case "tool-input-available":
if (toolCallIndices.has(parsed.toolCallId)) {
updateToolCall(parsed.toolCallId, { args: parsed.input || {} });
updateToolCall(contentPartsState, parsed.toolCallId, { args: parsed.input || {} });
} else {
addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {});
addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {});
}
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
case "tool-output-available":
updateToolCall(parsed.toolCallId, { result: parsed.output });
updateToolCall(contentPartsState, parsed.toolCallId, { result: parsed.output });
if (parsed.output?.status === "pending" && parsed.output?.podcast_id) {
const idx = toolCallIndices.get(parsed.toolCallId);
if (idx !== undefined) {
@ -1643,7 +1308,7 @@ export default function NewChatPage() {
}
setMessages((prev) =>
prev.map((m) =>
m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m
m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m
)
);
break;
@ -1663,20 +1328,11 @@ export default function NewChatPage() {
case "error":
throw new Error(parsed.errorText || "Server error");
}
} catch (e) {
if (e instanceof SyntaxError) continue;
throw e;
}
}
}
}
} finally {
reader.releaseLock();
}
// Persist messages after streaming completes
const finalContent = buildContentForPersistence();
const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps);
if (contentParts.length > 0) {
try {
// Persist user message (for both edit and reload modes, since backend deleted it)