mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-06 06:12:40 +02:00
refactor(chat): streamline NewChatPage component by removing unused functions and integrating new stream handling utilities for improved performance
This commit is contained in:
parent
1d6d7e3eb1
commit
6465ea181a
1 changed files with 255 additions and 370 deletions
|
|
@ -252,6 +252,168 @@ function tagPreAcceptSendFailure(error: unknown): unknown {
|
|||
});
|
||||
}
|
||||
|
||||
type SharedStreamEventContext = {
|
||||
contentPartsState: ContentPartsState;
|
||||
toolsWithUI: ToolUIGate;
|
||||
currentThinkingSteps: Map<string, ThinkingStepData>;
|
||||
scheduleFlush: () => void;
|
||||
forceFlush: () => void;
|
||||
onTokenUsage?: (data: TokenUsageData) => void;
|
||||
onToolOutputAvailable?: (
|
||||
event: Extract<SSEEvent, { type: "tool-output-available" }>,
|
||||
context: {
|
||||
contentPartsState: ContentPartsState;
|
||||
toolCallIndices: Map<string, number>;
|
||||
}
|
||||
) => void;
|
||||
};
|
||||
|
||||
function createStreamFlushHelpers(flushMessages: () => void): {
|
||||
batcher: FrameBatchedUpdater;
|
||||
scheduleFlush: () => void;
|
||||
forceFlush: () => void;
|
||||
} {
|
||||
const batcher = new FrameBatchedUpdater();
|
||||
const scheduleFlush = () => batcher.schedule(flushMessages);
|
||||
// Force-flush helper: ``batcher.flush()`` is a no-op when
|
||||
// ``dirty=false`` (e.g. a tool starts before any text streamed).
|
||||
// ``scheduleFlush(); batcher.flush()`` sets the dirty bit first so
|
||||
// terminal events render promptly without the throttle delay.
|
||||
const forceFlush = () => {
|
||||
scheduleFlush();
|
||||
batcher.flush();
|
||||
};
|
||||
return { batcher, scheduleFlush, forceFlush };
|
||||
}
|
||||
|
||||
function hasPersistableContent(contentParts: ContentPartsState["contentParts"], toolsWithUI: ToolUIGate) {
|
||||
return contentParts.some(
|
||||
(part) =>
|
||||
(part.type === "text" && part.text.length > 0) ||
|
||||
(part.type === "reasoning" && part.text.length > 0) ||
|
||||
(part.type === "tool-call" && (toolsWithUI === "all" || toolsWithUI.has(part.toolName)))
|
||||
);
|
||||
}
|
||||
|
||||
function processSharedStreamEvent(parsed: SSEEvent, context: SharedStreamEventContext): boolean {
|
||||
const { contentPartsState, toolsWithUI, currentThinkingSteps, scheduleFlush, forceFlush } = context;
|
||||
const { contentParts, toolCallIndices } = contentPartsState;
|
||||
|
||||
switch (parsed.type) {
|
||||
case "text-delta":
|
||||
appendText(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "reasoning-delta":
|
||||
appendReasoning(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "reasoning-end":
|
||||
endReasoning(contentPartsState);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "start-step":
|
||||
addStepSeparator(contentPartsState);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "finish-step":
|
||||
return true;
|
||||
|
||||
case "tool-input-start":
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
{},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
forceFlush();
|
||||
return true;
|
||||
|
||||
case "tool-input-delta":
|
||||
// High-frequency event: deltas can fire dozens of times per call,
|
||||
// so use throttled scheduleFlush (NOT forceFlush) to coalesce.
|
||||
appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "tool-input-available": {
|
||||
const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2);
|
||||
if (toolCallIndices.has(parsed.toolCallId)) {
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
args: parsed.input || {},
|
||||
argsText: finalArgsText,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
} else {
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
parsed.input || {},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
// addToolCall doesn't accept argsText today; backfill via
|
||||
// updateToolCall so the new card renders pretty-printed JSON.
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
argsText: finalArgsText,
|
||||
});
|
||||
}
|
||||
forceFlush();
|
||||
return true;
|
||||
}
|
||||
|
||||
case "tool-output-available":
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
result: parsed.output,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
markInterruptsCompleted(contentParts);
|
||||
context.onToolOutputAvailable?.(parsed, { contentPartsState, toolCallIndices });
|
||||
forceFlush();
|
||||
return true;
|
||||
|
||||
case "data-thinking-step": {
|
||||
const stepData = parsed.data as ThinkingStepData;
|
||||
if (stepData?.id) {
|
||||
currentThinkingSteps.set(stepData.id, stepData);
|
||||
const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps);
|
||||
if (didUpdate) {
|
||||
scheduleFlush();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
case "data-token-usage":
|
||||
context.onTokenUsage?.(parsed.data as TokenUsageData);
|
||||
return true;
|
||||
|
||||
case "error":
|
||||
throw toStreamTerminalError(parsed);
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function consumeSseEvents(
|
||||
response: Response,
|
||||
onEvent: (event: SSEEvent) => void | Promise<void>
|
||||
): Promise<void> {
|
||||
for await (const parsed of readSSEStream(response)) {
|
||||
await onEvent(parsed);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Zod schema for mentioned document info (for type-safe parsing)
|
||||
*/
|
||||
|
|
@ -456,7 +618,7 @@ export default function NewChatPage() {
|
|||
threadId: number | null;
|
||||
assistantMsgId: string;
|
||||
content: unknown;
|
||||
tokenUsage?: Record<string, unknown>;
|
||||
tokenUsage?: TokenUsageData;
|
||||
turnId?: string | null;
|
||||
logContext: string;
|
||||
onRemapped?: (newMsgId: string) => void;
|
||||
|
|
@ -1055,8 +1217,6 @@ export default function NewChatPage() {
|
|||
// Prepare assistant message
|
||||
const assistantMsgId = `msg-assistant-${Date.now()}`;
|
||||
const currentThinkingSteps = new Map<string, ThinkingStepData>();
|
||||
const batcher = new FrameBatchedUpdater();
|
||||
|
||||
const contentPartsState: ContentPartsState = {
|
||||
contentParts: [],
|
||||
currentTextPartIndex: -1,
|
||||
|
|
@ -1065,11 +1225,12 @@ export default function NewChatPage() {
|
|||
};
|
||||
const { contentParts, toolCallIndices } = contentPartsState;
|
||||
let wasInterrupted = false;
|
||||
let tokenUsageData: Record<string, unknown> | null = null;
|
||||
let tokenUsageData: TokenUsageData | null = null;
|
||||
let newAccepted = false;
|
||||
let userPersisted = false;
|
||||
// Captured from ``data-turn-info`` at stream start.
|
||||
let streamedChatTurnId: string | null = null;
|
||||
let streamBatcher: FrameBatchedUpdater | null = null;
|
||||
|
||||
try {
|
||||
const backendUrl = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000";
|
||||
|
|
@ -1152,123 +1313,37 @@ export default function NewChatPage() {
|
|||
)
|
||||
);
|
||||
};
|
||||
const scheduleFlush = () => batcher.schedule(flushMessages);
|
||||
// Force-flush helper: ``batcher.flush()`` is a no-op when
|
||||
// ``dirty=false`` (e.g. a tool starts before any text
|
||||
// streamed). ``scheduleFlush(); batcher.flush()`` sets
|
||||
// the dirty bit FIRST so terminal events render
|
||||
// promptly without the 50ms throttle delay.
|
||||
const forceFlush = () => {
|
||||
scheduleFlush();
|
||||
batcher.flush();
|
||||
};
|
||||
const { batcher, scheduleFlush, forceFlush } = createStreamFlushHelpers(flushMessages);
|
||||
streamBatcher = batcher;
|
||||
|
||||
for await (const parsed of readSSEStream(response)) {
|
||||
switch (parsed.type) {
|
||||
case "text-delta":
|
||||
appendText(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "reasoning-delta":
|
||||
appendReasoning(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "reasoning-end":
|
||||
endReasoning(contentPartsState);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "start-step":
|
||||
addStepSeparator(contentPartsState);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "finish-step":
|
||||
break;
|
||||
|
||||
case "tool-input-start":
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
{},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
forceFlush();
|
||||
break;
|
||||
|
||||
case "tool-input-delta":
|
||||
// High-frequency event: deltas can fire dozens
|
||||
// of times per call, so use throttled
|
||||
// scheduleFlush (NOT forceFlush) to coalesce.
|
||||
appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "tool-input-available": {
|
||||
const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2);
|
||||
if (toolCallIndices.has(parsed.toolCallId)) {
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
args: parsed.input || {},
|
||||
argsText: finalArgsText,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
} else {
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
parsed.input || {},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
// addToolCall doesn't accept argsText today;
|
||||
// backfill via updateToolCall so the new card
|
||||
// renders pretty-printed JSON.
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
argsText: finalArgsText,
|
||||
});
|
||||
}
|
||||
forceFlush();
|
||||
break;
|
||||
}
|
||||
|
||||
case "tool-output-available": {
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
result: parsed.output,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
markInterruptsCompleted(contentParts);
|
||||
if (parsed.output?.status === "pending" && parsed.output?.podcast_id) {
|
||||
const idx = toolCallIndices.get(parsed.toolCallId);
|
||||
if (idx !== undefined) {
|
||||
const part = contentParts[idx];
|
||||
if (part?.type === "tool-call" && part.toolName === "generate_podcast") {
|
||||
setActivePodcastTaskId(String(parsed.output.podcast_id));
|
||||
await consumeSseEvents(response, async (parsed) => {
|
||||
if (
|
||||
processSharedStreamEvent(parsed, {
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
currentThinkingSteps,
|
||||
scheduleFlush,
|
||||
forceFlush,
|
||||
onTokenUsage: (data) => {
|
||||
tokenUsageData = data;
|
||||
tokenUsageStore.set(assistantMsgId, data);
|
||||
},
|
||||
onToolOutputAvailable: (event, sharedCtx) => {
|
||||
if (event.output?.status === "pending" && event.output?.podcast_id) {
|
||||
const idx = sharedCtx.toolCallIndices.get(event.toolCallId);
|
||||
if (idx !== undefined) {
|
||||
const part = sharedCtx.contentPartsState.contentParts[idx];
|
||||
if (part?.type === "tool-call" && part.toolName === "generate_podcast") {
|
||||
setActivePodcastTaskId(String(event.output.podcast_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
forceFlush();
|
||||
break;
|
||||
}
|
||||
|
||||
case "data-thinking-step": {
|
||||
const stepData = parsed.data as ThinkingStepData;
|
||||
if (stepData?.id) {
|
||||
currentThinkingSteps.set(stepData.id, stepData);
|
||||
const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps);
|
||||
if (didUpdate) {
|
||||
scheduleFlush();
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
},
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
switch (parsed.type) {
|
||||
case "data-thread-title-update": {
|
||||
const titleData = parsed.data as { threadId: number; title: string };
|
||||
if (titleData?.title && titleData?.threadId === currentThreadId) {
|
||||
|
|
@ -1374,16 +1449,8 @@ export default function NewChatPage() {
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "data-token-usage":
|
||||
tokenUsageData = parsed.data;
|
||||
tokenUsageStore.set(assistantMsgId, parsed.data as TokenUsageData);
|
||||
break;
|
||||
|
||||
case "error":
|
||||
throw toStreamTerminalError(parsed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
batcher.flush();
|
||||
|
||||
|
|
@ -1425,7 +1492,7 @@ export default function NewChatPage() {
|
|||
trackChatResponseReceived(searchSpaceId, currentThreadId);
|
||||
}
|
||||
} catch (error) {
|
||||
batcher.dispose();
|
||||
streamBatcher?.dispose();
|
||||
await handleStreamTerminalError({
|
||||
error,
|
||||
flow: "new",
|
||||
|
|
@ -1448,13 +1515,7 @@ export default function NewChatPage() {
|
|||
}
|
||||
}
|
||||
|
||||
const hasContent = contentParts.some(
|
||||
(part) =>
|
||||
(part.type === "text" && part.text.length > 0) ||
|
||||
(part.type === "reasoning" && part.text.length > 0) ||
|
||||
(part.type === "tool-call" &&
|
||||
(toolsWithUI === "all" || toolsWithUI.has(part.toolName)))
|
||||
);
|
||||
const hasContent = hasPersistableContent(contentParts, toolsWithUI);
|
||||
if (hasContent && currentThreadId) {
|
||||
const partialContent = buildContentForPersistence(contentPartsState, toolsWithUI);
|
||||
await persistAssistantTurn({
|
||||
|
|
@ -1543,7 +1604,6 @@ export default function NewChatPage() {
|
|||
abortControllerRef.current = controller;
|
||||
|
||||
const currentThinkingSteps = new Map<string, ThinkingStepData>();
|
||||
const batcher = new FrameBatchedUpdater();
|
||||
|
||||
const contentPartsState: ContentPartsState = {
|
||||
contentParts: [],
|
||||
|
|
@ -1552,10 +1612,11 @@ export default function NewChatPage() {
|
|||
toolCallIndices: new Map(),
|
||||
};
|
||||
const { contentParts, toolCallIndices } = contentPartsState;
|
||||
let tokenUsageData: Record<string, unknown> | null = null;
|
||||
let tokenUsageData: TokenUsageData | null = null;
|
||||
let resumeAccepted = false;
|
||||
// Captured from ``data-turn-info`` at stream start.
|
||||
let streamedChatTurnId: string | null = null;
|
||||
let streamBatcher: FrameBatchedUpdater | null = null;
|
||||
|
||||
const existingMsg = messages.find((m) => m.id === assistantMsgId);
|
||||
if (existingMsg && Array.isArray(existingMsg.content)) {
|
||||
|
|
@ -1664,102 +1725,26 @@ export default function NewChatPage() {
|
|||
)
|
||||
);
|
||||
};
|
||||
const scheduleFlush = () => batcher.schedule(flushMessages);
|
||||
const forceFlush = () => {
|
||||
scheduleFlush();
|
||||
batcher.flush();
|
||||
};
|
||||
const { batcher, scheduleFlush, forceFlush } = createStreamFlushHelpers(flushMessages);
|
||||
streamBatcher = batcher;
|
||||
|
||||
for await (const parsed of readSSEStream(response)) {
|
||||
await consumeSseEvents(response, async (parsed) => {
|
||||
if (
|
||||
processSharedStreamEvent(parsed, {
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
currentThinkingSteps,
|
||||
scheduleFlush,
|
||||
forceFlush,
|
||||
onTokenUsage: (data) => {
|
||||
tokenUsageData = data;
|
||||
tokenUsageStore.set(assistantMsgId, data);
|
||||
},
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
switch (parsed.type) {
|
||||
case "text-delta":
|
||||
appendText(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "reasoning-delta":
|
||||
appendReasoning(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "reasoning-end":
|
||||
endReasoning(contentPartsState);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "start-step":
|
||||
addStepSeparator(contentPartsState);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "finish-step":
|
||||
break;
|
||||
|
||||
case "tool-input-start":
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
{},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
forceFlush();
|
||||
break;
|
||||
|
||||
case "tool-input-delta":
|
||||
appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "tool-input-available": {
|
||||
const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2);
|
||||
if (toolCallIndices.has(parsed.toolCallId)) {
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
args: parsed.input || {},
|
||||
argsText: finalArgsText,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
} else {
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
parsed.input || {},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
argsText: finalArgsText,
|
||||
});
|
||||
}
|
||||
forceFlush();
|
||||
break;
|
||||
}
|
||||
|
||||
case "tool-output-available":
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
result: parsed.output,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
markInterruptsCompleted(contentParts);
|
||||
forceFlush();
|
||||
break;
|
||||
|
||||
case "data-thinking-step": {
|
||||
const stepData = parsed.data as ThinkingStepData;
|
||||
if (stepData?.id) {
|
||||
currentThinkingSteps.set(stepData.id, stepData);
|
||||
const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps);
|
||||
if (didUpdate) {
|
||||
scheduleFlush();
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "data-interrupt-request": {
|
||||
const interruptData = parsed.data as Record<string, unknown>;
|
||||
const actionRequests = (interruptData.action_requests ?? []) as Array<{
|
||||
|
|
@ -1830,16 +1815,8 @@ export default function NewChatPage() {
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "data-token-usage":
|
||||
tokenUsageData = parsed.data;
|
||||
tokenUsageStore.set(assistantMsgId, parsed.data as TokenUsageData);
|
||||
break;
|
||||
|
||||
case "error":
|
||||
throw toStreamTerminalError(parsed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
batcher.flush();
|
||||
|
||||
|
|
@ -1855,7 +1832,7 @@ export default function NewChatPage() {
|
|||
});
|
||||
}
|
||||
} catch (error) {
|
||||
batcher.dispose();
|
||||
streamBatcher?.dispose();
|
||||
await handleStreamTerminalError({
|
||||
error,
|
||||
flow: "resume",
|
||||
|
|
@ -1864,13 +1841,7 @@ export default function NewChatPage() {
|
|||
accepted: resumeAccepted,
|
||||
onAbort: async () => {
|
||||
if (!resumeAccepted) return;
|
||||
const hasContent = contentParts.some(
|
||||
(part) =>
|
||||
(part.type === "text" && part.text.length > 0) ||
|
||||
(part.type === "reasoning" && part.text.length > 0) ||
|
||||
(part.type === "tool-call" &&
|
||||
(toolsWithUI === "all" || toolsWithUI.has(part.toolName)))
|
||||
);
|
||||
const hasContent = hasPersistableContent(contentParts, toolsWithUI);
|
||||
if (!hasContent) return;
|
||||
const partialContent = buildContentForPersistence(contentPartsState, toolsWithUI);
|
||||
await persistAssistantTurn({
|
||||
|
|
@ -1891,6 +1862,7 @@ export default function NewChatPage() {
|
|||
pendingInterrupt,
|
||||
messages,
|
||||
searchSpaceId,
|
||||
queryClient,
|
||||
tokenUsageStore,
|
||||
handleStreamTerminalError,
|
||||
persistAssistantTurn,
|
||||
|
|
@ -2045,15 +2017,15 @@ export default function NewChatPage() {
|
|||
currentReasoningPartIndex: -1,
|
||||
toolCallIndices: new Map(),
|
||||
};
|
||||
const { contentParts, toolCallIndices } = contentPartsState;
|
||||
const batcher = new FrameBatchedUpdater();
|
||||
let tokenUsageData: Record<string, unknown> | null = null;
|
||||
const { contentParts } = contentPartsState;
|
||||
let tokenUsageData: TokenUsageData | null = null;
|
||||
let regenerateAccepted = false;
|
||||
let userPersisted = false;
|
||||
// Captured from ``data-turn-info`` at stream start; stamped
|
||||
// onto persisted messages so future edits can locate the
|
||||
// right LangGraph checkpoint.
|
||||
let streamedChatTurnId: string | null = null;
|
||||
let streamBatcher: FrameBatchedUpdater | null = null;
|
||||
|
||||
// Add placeholder messages to UI
|
||||
// Always add back the user message (with new query for edit, or original content for reload)
|
||||
|
|
@ -2155,111 +2127,37 @@ export default function NewChatPage() {
|
|||
)
|
||||
);
|
||||
};
|
||||
const scheduleFlush = () => batcher.schedule(flushMessages);
|
||||
const forceFlush = () => {
|
||||
scheduleFlush();
|
||||
batcher.flush();
|
||||
};
|
||||
const { batcher, scheduleFlush, forceFlush } = createStreamFlushHelpers(flushMessages);
|
||||
streamBatcher = batcher;
|
||||
|
||||
for await (const parsed of readSSEStream(response)) {
|
||||
switch (parsed.type) {
|
||||
case "text-delta":
|
||||
appendText(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "reasoning-delta":
|
||||
appendReasoning(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "reasoning-end":
|
||||
endReasoning(contentPartsState);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "start-step":
|
||||
addStepSeparator(contentPartsState);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "finish-step":
|
||||
break;
|
||||
|
||||
case "tool-input-start":
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
{},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
forceFlush();
|
||||
break;
|
||||
|
||||
case "tool-input-delta":
|
||||
appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta);
|
||||
scheduleFlush();
|
||||
break;
|
||||
|
||||
case "tool-input-available": {
|
||||
const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2);
|
||||
if (toolCallIndices.has(parsed.toolCallId)) {
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
args: parsed.input || {},
|
||||
argsText: finalArgsText,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
} else {
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
parsed.input || {},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
argsText: finalArgsText,
|
||||
});
|
||||
}
|
||||
forceFlush();
|
||||
break;
|
||||
}
|
||||
|
||||
case "tool-output-available":
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
result: parsed.output,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
markInterruptsCompleted(contentParts);
|
||||
if (parsed.output?.status === "pending" && parsed.output?.podcast_id) {
|
||||
const idx = toolCallIndices.get(parsed.toolCallId);
|
||||
if (idx !== undefined) {
|
||||
const part = contentParts[idx];
|
||||
if (part?.type === "tool-call" && part.toolName === "generate_podcast") {
|
||||
setActivePodcastTaskId(String(parsed.output.podcast_id));
|
||||
await consumeSseEvents(response, async (parsed) => {
|
||||
if (
|
||||
processSharedStreamEvent(parsed, {
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
currentThinkingSteps,
|
||||
scheduleFlush,
|
||||
forceFlush,
|
||||
onTokenUsage: (data) => {
|
||||
tokenUsageData = data;
|
||||
tokenUsageStore.set(assistantMsgId, data);
|
||||
},
|
||||
onToolOutputAvailable: (event, sharedCtx) => {
|
||||
if (event.output?.status === "pending" && event.output?.podcast_id) {
|
||||
const idx = sharedCtx.toolCallIndices.get(event.toolCallId);
|
||||
if (idx !== undefined) {
|
||||
const part = sharedCtx.contentPartsState.contentParts[idx];
|
||||
if (part?.type === "tool-call" && part.toolName === "generate_podcast") {
|
||||
setActivePodcastTaskId(String(event.output.podcast_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
forceFlush();
|
||||
break;
|
||||
|
||||
case "data-thinking-step": {
|
||||
const stepData = parsed.data as ThinkingStepData;
|
||||
if (stepData?.id) {
|
||||
currentThinkingSteps.set(stepData.id, stepData);
|
||||
const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps);
|
||||
if (didUpdate) {
|
||||
scheduleFlush();
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
},
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
switch (parsed.type) {
|
||||
case "data-action-log": {
|
||||
if (threadId !== null) {
|
||||
applyActionLogSse(queryClient, threadId, searchSpaceId, parsed.data);
|
||||
|
|
@ -2326,16 +2224,8 @@ export default function NewChatPage() {
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "data-token-usage":
|
||||
tokenUsageData = parsed.data;
|
||||
tokenUsageStore.set(assistantMsgId, parsed.data as TokenUsageData);
|
||||
break;
|
||||
|
||||
case "error":
|
||||
throw toStreamTerminalError(parsed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
batcher.flush();
|
||||
|
||||
|
|
@ -2364,7 +2254,7 @@ export default function NewChatPage() {
|
|||
trackChatResponseReceived(searchSpaceId, threadId);
|
||||
}
|
||||
} catch (error) {
|
||||
batcher.dispose();
|
||||
streamBatcher?.dispose();
|
||||
await handleStreamTerminalError({
|
||||
error,
|
||||
flow: "regenerate",
|
||||
|
|
@ -2384,13 +2274,7 @@ export default function NewChatPage() {
|
|||
});
|
||||
userPersisted = Boolean(persistedUserMsgId);
|
||||
}
|
||||
const hasContent = contentParts.some(
|
||||
(part) =>
|
||||
(part.type === "text" && part.text.length > 0) ||
|
||||
(part.type === "reasoning" && part.text.length > 0) ||
|
||||
(part.type === "tool-call" &&
|
||||
(toolsWithUI === "all" || toolsWithUI.has(part.toolName)))
|
||||
);
|
||||
const hasContent = hasPersistableContent(contentParts, toolsWithUI);
|
||||
if (!hasContent) return;
|
||||
const partialContent = buildContentForPersistence(contentPartsState, toolsWithUI);
|
||||
await persistAssistantTurn({
|
||||
|
|
@ -2428,6 +2312,7 @@ export default function NewChatPage() {
|
|||
disabledTools,
|
||||
messageDocumentsMap,
|
||||
setMessageDocumentsMap,
|
||||
queryClient,
|
||||
tokenUsageStore,
|
||||
handleStreamTerminalError,
|
||||
persistAssistantTurn,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue