mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-05 13:52:40 +02:00
refactor(chat): introduce new stream handling utilities and restructure event processing for improved performance and maintainability
This commit is contained in:
parent
6465ea181a
commit
86f6b285ce
3 changed files with 217 additions and 198 deletions
19
surfsense_web/lib/chat/stream-flush.ts
Normal file
19
surfsense_web/lib/chat/stream-flush.ts
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
import { FrameBatchedUpdater } from "@/lib/chat/streaming-state";
|
||||
|
||||
export function createStreamFlushHelpers(flushMessages: () => void): {
|
||||
batcher: FrameBatchedUpdater;
|
||||
scheduleFlush: () => void;
|
||||
forceFlush: () => void;
|
||||
} {
|
||||
const batcher = new FrameBatchedUpdater();
|
||||
const scheduleFlush = () => batcher.schedule(flushMessages);
|
||||
// Force-flush helper: ``batcher.flush()`` is a no-op when
|
||||
// ``dirty=false`` (e.g. a tool starts before any text streamed).
|
||||
// ``scheduleFlush(); batcher.flush()`` sets the dirty bit first so
|
||||
// terminal events render promptly without the throttle delay.
|
||||
const forceFlush = () => {
|
||||
scheduleFlush();
|
||||
batcher.flush();
|
||||
};
|
||||
return { batcher, scheduleFlush, forceFlush };
|
||||
}
|
||||
191
surfsense_web/lib/chat/stream-pipeline.ts
Normal file
191
surfsense_web/lib/chat/stream-pipeline.ts
Normal file
|
|
@ -0,0 +1,191 @@
|
|||
import {
|
||||
addStepSeparator,
|
||||
addToolCall,
|
||||
appendReasoning,
|
||||
appendText,
|
||||
appendToolInputDelta,
|
||||
type ContentPartsState,
|
||||
endReasoning,
|
||||
readSSEStream,
|
||||
type SSEEvent,
|
||||
type ThinkingStepData,
|
||||
type ToolUIGate,
|
||||
updateThinkingSteps,
|
||||
updateToolCall,
|
||||
} from "@/lib/chat/streaming-state";
|
||||
|
||||
export type SharedStreamEventContext = {
|
||||
contentPartsState: ContentPartsState;
|
||||
toolsWithUI: ToolUIGate;
|
||||
currentThinkingSteps: Map<string, ThinkingStepData>;
|
||||
scheduleFlush: () => void;
|
||||
forceFlush: () => void;
|
||||
onTokenUsage?: (data: Extract<SSEEvent, { type: "data-token-usage" }>["data"]) => void;
|
||||
onToolOutputAvailable?: (
|
||||
event: Extract<SSEEvent, { type: "tool-output-available" }>,
|
||||
context: {
|
||||
contentPartsState: ContentPartsState;
|
||||
toolCallIndices: Map<string, number>;
|
||||
}
|
||||
) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* After a tool produces output, mark any previously-decided interrupt tool
|
||||
* calls as completed so the ApprovalCard can transition from shimmer to done.
|
||||
*/
|
||||
export function markInterruptsCompleted(
|
||||
contentParts: Array<{ type: string; result?: unknown }>
|
||||
): void {
|
||||
for (const part of contentParts) {
|
||||
if (
|
||||
part.type === "tool-call" &&
|
||||
typeof part.result === "object" &&
|
||||
part.result !== null &&
|
||||
(part.result as Record<string, unknown>).__interrupt__ === true &&
|
||||
(part.result as Record<string, unknown>).__decided__ &&
|
||||
!(part.result as Record<string, unknown>).__completed__
|
||||
) {
|
||||
part.result = { ...(part.result as Record<string, unknown>), __completed__: true };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function hasPersistableContent(
|
||||
contentParts: ContentPartsState["contentParts"],
|
||||
toolsWithUI: ToolUIGate
|
||||
) {
|
||||
return contentParts.some(
|
||||
(part) =>
|
||||
(part.type === "text" && part.text.length > 0) ||
|
||||
(part.type === "reasoning" && part.text.length > 0) ||
|
||||
(part.type === "tool-call" && (toolsWithUI === "all" || toolsWithUI.has(part.toolName)))
|
||||
);
|
||||
}
|
||||
|
||||
function toStreamTerminalError(
|
||||
event: Extract<SSEEvent, { type: "error" }>
|
||||
): Error & { errorCode?: string } {
|
||||
return Object.assign(new Error(event.errorText || "Server error"), {
|
||||
errorCode: event.errorCode,
|
||||
});
|
||||
}
|
||||
|
||||
export function processSharedStreamEvent(parsed: SSEEvent, context: SharedStreamEventContext): boolean {
|
||||
const { contentPartsState, toolsWithUI, currentThinkingSteps, scheduleFlush, forceFlush } = context;
|
||||
const { contentParts, toolCallIndices } = contentPartsState;
|
||||
|
||||
switch (parsed.type) {
|
||||
case "text-delta":
|
||||
appendText(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "reasoning-delta":
|
||||
appendReasoning(contentPartsState, parsed.delta);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "reasoning-end":
|
||||
endReasoning(contentPartsState);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "start-step":
|
||||
addStepSeparator(contentPartsState);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "finish-step":
|
||||
return true;
|
||||
|
||||
case "tool-input-start":
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
{},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
forceFlush();
|
||||
return true;
|
||||
|
||||
case "tool-input-delta":
|
||||
// High-frequency event: deltas can fire dozens of times per call,
|
||||
// so use throttled scheduleFlush (NOT forceFlush) to coalesce.
|
||||
appendToolInputDelta(contentPartsState, parsed.toolCallId, parsed.inputTextDelta);
|
||||
scheduleFlush();
|
||||
return true;
|
||||
|
||||
case "tool-input-available": {
|
||||
const finalArgsText = JSON.stringify(parsed.input ?? {}, null, 2);
|
||||
if (toolCallIndices.has(parsed.toolCallId)) {
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
args: parsed.input || {},
|
||||
argsText: finalArgsText,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
} else {
|
||||
addToolCall(
|
||||
contentPartsState,
|
||||
toolsWithUI,
|
||||
parsed.toolCallId,
|
||||
parsed.toolName,
|
||||
parsed.input || {},
|
||||
false,
|
||||
parsed.langchainToolCallId
|
||||
);
|
||||
// addToolCall doesn't accept argsText today; backfill via
|
||||
// updateToolCall so the new card renders pretty-printed JSON.
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
argsText: finalArgsText,
|
||||
});
|
||||
}
|
||||
forceFlush();
|
||||
return true;
|
||||
}
|
||||
|
||||
case "tool-output-available":
|
||||
updateToolCall(contentPartsState, parsed.toolCallId, {
|
||||
result: parsed.output,
|
||||
langchainToolCallId: parsed.langchainToolCallId,
|
||||
});
|
||||
markInterruptsCompleted(contentParts);
|
||||
context.onToolOutputAvailable?.(parsed, { contentPartsState, toolCallIndices });
|
||||
forceFlush();
|
||||
return true;
|
||||
|
||||
case "data-thinking-step": {
|
||||
const stepData = parsed.data as ThinkingStepData;
|
||||
if (stepData?.id) {
|
||||
currentThinkingSteps.set(stepData.id, stepData);
|
||||
const didUpdate = updateThinkingSteps(contentPartsState, currentThinkingSteps);
|
||||
if (didUpdate) {
|
||||
scheduleFlush();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
case "data-token-usage":
|
||||
context.onTokenUsage?.(parsed.data);
|
||||
return true;
|
||||
|
||||
case "error":
|
||||
throw toStreamTerminalError(parsed);
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export async function consumeSseEvents(
|
||||
response: Response,
|
||||
onEvent: (event: SSEEvent) => void | Promise<void>
|
||||
): Promise<void> {
|
||||
for await (const parsed of readSSEStream(response)) {
|
||||
await onEvent(parsed);
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue