From 35ea0eae53a24875e368111f294b366f48f2d9fa Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:03:09 +0530 Subject: [PATCH] feat(chat): enhance error classification and handling for thread busy scenarios, improving user feedback and response management --- .../app/tasks/chat/stream_new_chat.py | 106 +++++---- .../unit/test_stream_new_chat_contract.py | 33 ++- .../new-chat/[[...chat_id]]/page.tsx | 209 +++++++++++++----- .../components/free-chat/anonymous-chat.tsx | 16 +- .../components/free-chat/free-chat-page.tsx | 52 ++++- .../lib/chat/chat-error-classifier.ts | 17 ++ 6 files changed, 322 insertions(+), 111 deletions(-) diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index a0be55c1b..d6ca5418c 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -19,6 +19,7 @@ import re import time from collections.abc import AsyncGenerator from dataclasses import dataclass, field +from functools import partial from typing import Any, Literal from uuid import UUID @@ -30,6 +31,7 @@ from sqlalchemy.orm import selectinload from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent from app.agents.new_chat.checkpointer import get_checkpointer +from app.agents.new_chat.errors import BusyError from app.agents.new_chat.filesystem_selection import FilesystemMode, FilesystemSelection from app.agents.new_chat.llm_config import ( AgentConfig, @@ -315,6 +317,15 @@ def _classify_stream_exception( flow_label: str, ) -> tuple[str, str, Literal["info", "warn", "error"], bool, str]: raw = str(exc) + if isinstance(exc, BusyError) or "Thread is busy with another request" in raw: + return ( + "thread_busy", + "THREAD_BUSY", + "warn", + True, + "Another response is still finishing for this thread. Please try again in a moment.", + ) + parsed = _parse_error_payload(raw) provider_error_type = "" if parsed: @@ -345,6 +356,37 @@ def _classify_stream_exception( ) +def _emit_stream_terminal_error( + *, + streaming_service: VercelStreamingService, + flow: str, + request_id: str | None, + thread_id: int, + search_space_id: int, + user_id: str | None, + message: str, + error_kind: str = "server_error", + error_code: str = "SERVER_ERROR", + severity: Literal["info", "warn", "error"] = "error", + is_expected: bool = False, + extra: dict[str, Any] | None = None, +) -> str: + _log_chat_stream_error( + flow=flow, + error_kind=error_kind, + error_code=error_code, + severity=severity, + is_expected=is_expected, + request_id=request_id, + thread_id=thread_id, + search_space_id=search_space_id, + user_id=user_id, + message=message, + extra=extra, + ) + return streaming_service.format_error(message, error_code=error_code) + + async def _stream_agent_events( agent: Any, config: dict[str, Any], @@ -1541,29 +1583,15 @@ async def stream_new_chat( _premium_reserved = 0 _premium_request_id: str | None = None - def _emit_stream_error( - *, - message: str, - error_kind: str = "server_error", - error_code: str = "SERVER_ERROR", - severity: Literal["info", "warn", "error"] = "error", - is_expected: bool = False, - extra: dict[str, Any] | None = None, - ) -> str: - _log_chat_stream_error( - flow=flow, - error_kind=error_kind, - error_code=error_code, - severity=severity, - is_expected=is_expected, - request_id=request_id, - thread_id=chat_id, - search_space_id=search_space_id, - user_id=user_id, - message=message, - extra=extra, - ) - return streaming_service.format_error(message, error_code=error_code) + _emit_stream_error = partial( + _emit_stream_terminal_error, + streaming_service=streaming_service, + flow=flow, + request_id=request_id, + thread_id=chat_id, + search_space_id=search_space_id, + user_id=user_id, + ) session = async_session_maker() try: @@ -2380,29 +2408,15 @@ async def stream_resume_chat( accumulator = start_turn() - def _emit_stream_error( - *, - message: str, - error_kind: str = "server_error", - error_code: str = "SERVER_ERROR", - severity: Literal["info", "warn", "error"] = "error", - is_expected: bool = False, - extra: dict[str, Any] | None = None, - ) -> str: - _log_chat_stream_error( - flow="resume", - error_kind=error_kind, - error_code=error_code, - severity=severity, - is_expected=is_expected, - request_id=request_id, - thread_id=chat_id, - search_space_id=search_space_id, - user_id=user_id, - message=message, - extra=extra, - ) - return streaming_service.format_error(message, error_code=error_code) + _emit_stream_error = partial( + _emit_stream_terminal_error, + streaming_service=streaming_service, + flow="resume", + request_id=request_id, + thread_id=chat_id, + search_space_id=search_space_id, + user_id=user_id, + ) session = async_session_maker() try: diff --git a/surfsense_backend/tests/unit/test_stream_new_chat_contract.py b/surfsense_backend/tests/unit/test_stream_new_chat_contract.py index 1f8168837..125177084 100644 --- a/surfsense_backend/tests/unit/test_stream_new_chat_contract.py +++ b/surfsense_backend/tests/unit/test_stream_new_chat_contract.py @@ -1,12 +1,13 @@ import inspect import json import logging -from pathlib import Path import re +from pathlib import Path import pytest import app.tasks.chat.stream_new_chat as stream_new_chat_module +from app.agents.new_chat.errors import BusyError from app.tasks.chat.stream_new_chat import ( StreamResult, _classify_stream_exception, @@ -130,14 +131,14 @@ def test_stream_error_emission_keeps_machine_error_codes(): format_error_calls = re.findall(r"format_error\(", source) emitted_error_codes = set(re.findall(r'error_code="([A-Z_]+)"', source)) - # Both new/resume stream paths now route through local emitters that always - # pass a machine-readable error_code. - assert len(format_error_calls) == 2 + # All stream paths should route through one shared terminal error emitter. + assert len(format_error_calls) == 1 assert { "PREMIUM_QUOTA_EXHAUSTED", "SERVER_ERROR", }.issubset(emitted_error_codes) assert 'flow: Literal["new", "regenerate"] = "new"' in source + assert "_emit_stream_terminal_error" in source assert "flow=flow" in source assert 'flow="resume"' in source @@ -156,6 +157,30 @@ def test_stream_exception_classifies_rate_limited(): assert "temporarily rate-limited" in user_message +def test_stream_exception_classifies_thread_busy(): + exc = BusyError(request_id="thread-123") + kind, code, severity, is_expected, user_message = _classify_stream_exception( + exc, flow_label="chat" + ) + assert kind == "thread_busy" + assert code == "THREAD_BUSY" + assert severity == "warn" + assert is_expected is True + assert "still finishing for this thread" in user_message + + +def test_stream_exception_classifies_thread_busy_from_message(): + exc = Exception("Thread is busy with another request") + kind, code, severity, is_expected, user_message = _classify_stream_exception( + exc, flow_label="chat" + ) + assert kind == "thread_busy" + assert code == "THREAD_BUSY" + assert severity == "warn" + assert is_expected is True + assert "still finishing for this thread" in user_message + + def test_premium_classification_is_error_code_driven(): classifier_path = Path(__file__).resolve().parents[3] / "surfsense_web/lib/chat/chat-error-classifier.ts" source = classifier_path.read_text(encoding="utf-8") diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index b6afaf131..70e188612 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -67,6 +67,7 @@ import { type ContentPartsState, FrameBatchedUpdater, readSSEStream, + type SSEEvent, type ThinkingStepData, updateThinkingSteps, updateToolCall, @@ -136,6 +137,75 @@ function markInterruptsCompleted(contentParts: Array<{ type: string; result?: un } } +function toStreamTerminalError( + event: Extract +): Error & { errorCode?: string } { + return Object.assign(new Error(event.errorText || "Server error"), { + errorCode: event.errorCode, + }); +} + +async function toHttpResponseError(response: Response): Promise { + const statusDefaultCode = + response.status === 409 + ? "THREAD_BUSY" + : response.status === 429 + ? "RATE_LIMITED" + : response.status === 401 || response.status === 403 + ? "AUTH_EXPIRED" + : "SERVER_ERROR"; + + let rawBody = ""; + try { + rawBody = await response.text(); + } catch { + // noop + } + + let parsedBody: Record | null = null; + if (rawBody) { + try { + const parsed = JSON.parse(rawBody); + if (typeof parsed === "object" && parsed !== null) { + parsedBody = parsed as Record; + } + } catch { + // noop + } + } + + const detail = parsedBody?.detail; + const detailObject = + typeof detail === "object" && detail !== null ? (detail as Record) : null; + const detailMessage = typeof detail === "string" ? detail : undefined; + const topLevelMessage = + typeof parsedBody?.message === "string" ? (parsedBody.message as string) : undefined; + const detailNestedMessage = + typeof detailObject?.message === "string" ? (detailObject.message as string) : undefined; + + const topLevelCode = + typeof parsedBody?.errorCode === "string" + ? parsedBody.errorCode + : typeof parsedBody?.error_code === "string" + ? parsedBody.error_code + : undefined; + const detailCode = + typeof detailObject?.errorCode === "string" + ? detailObject.errorCode + : typeof detailObject?.error_code === "string" + ? detailObject.error_code + : undefined; + + const errorCode = detailCode ?? topLevelCode ?? statusDefaultCode; + const message = + detailNestedMessage ?? + detailMessage ?? + topLevelMessage ?? + `Backend error: ${response.status}`; + + return Object.assign(new Error(message), { errorCode }); +} + /** * Zod schema for mentioned document info (for type-safe parsing) */ @@ -532,6 +602,43 @@ export default function NewChatPage() { ] ); + const handleStreamTerminalError = useCallback( + async ({ + error, + flow, + threadId, + assistantMsgId, + accepted, + onAbort, + onAcceptedStreamError, + }: { + error: unknown; + flow: ChatFlow; + threadId: number | null; + assistantMsgId: string; + accepted: boolean; + onAbort?: () => Promise; + onAcceptedStreamError?: () => Promise; + }) => { + if (error instanceof Error && error.name === "AbortError") { + await onAbort?.(); + return; + } + + if (accepted) { + await onAcceptedStreamError?.(); + } + + await handleChatFailure({ + error, + flow, + threadId, + assistantMsgId: accepted ? assistantMsgId : "no-persist-assistant", + }); + }, + [handleChatFailure] + ); + // Initialize thread and load messages // For new chats (no urlChatId), we use lazy creation - thread is created on first message const initializeThread = useCallback(async () => { @@ -880,6 +987,7 @@ export default function NewChatPage() { const { contentParts, toolCallIndices } = contentPartsState; let wasInterrupted = false; let tokenUsageData: Record | null = null; + let newAccepted = false; // Add placeholder assistant message setMessages((prev) => [ @@ -951,8 +1059,9 @@ export default function NewChatPage() { }); if (!response.ok) { - throw new Error(`Backend error: ${response.status}`); + throw await toHttpResponseError(response); } + newAccepted = true; const flushMessages = () => { setMessages((prev) => @@ -1106,9 +1215,7 @@ export default function NewChatPage() { break; case "error": - throw Object.assign(new Error(parsed.errorText || "Server error"), { - errorCode: parsed.errorCode, - }); + throw toStreamTerminalError(parsed); } } @@ -1137,29 +1244,29 @@ export default function NewChatPage() { } } catch (error) { batcher.dispose(); - if (error instanceof Error && error.name === "AbortError") { - // Request was cancelled by user - persist partial response if any content was received - const hasContent = contentParts.some( - (part) => - (part.type === "text" && part.text.length > 0) || - (part.type === "tool-call" && toolsWithUI.has(part.toolName)) - ); - if (hasContent && currentThreadId) { - const partialContent = buildContentForPersistence(contentPartsState, toolsWithUI); - await persistAssistantTurn({ - threadId: currentThreadId, - assistantMsgId, - content: partialContent, - logContext: "partial new chat", - }); - } - return; - } - await handleChatFailure({ + await handleStreamTerminalError({ error, flow: "new", threadId: currentThreadId, assistantMsgId, + accepted: newAccepted, + onAbort: async () => { + // Request was cancelled by user - persist partial response if any content was received + const hasContent = contentParts.some( + (part) => + (part.type === "text" && part.text.length > 0) || + (part.type === "tool-call" && toolsWithUI.has(part.toolName)) + ); + if (hasContent && currentThreadId) { + const partialContent = buildContentForPersistence(contentPartsState, toolsWithUI); + await persistAssistantTurn({ + threadId: currentThreadId, + assistantMsgId, + content: partialContent, + logContext: "partial new chat", + }); + } + }, }); } finally { setIsRunning(false); @@ -1183,7 +1290,7 @@ export default function NewChatPage() { pendingUserImageUrls, setPendingUserImageUrls, toolsWithUI, - handleChatFailure, + handleStreamTerminalError, persistAssistantTurn, ] ); @@ -1221,6 +1328,7 @@ export default function NewChatPage() { }; const { contentParts, toolCallIndices } = contentPartsState; let tokenUsageData: Record | null = null; + let resumeAccepted = false; const existingMsg = messages.find((m) => m.id === assistantMsgId); if (existingMsg && Array.isArray(existingMsg.content)) { @@ -1302,8 +1410,9 @@ export default function NewChatPage() { }); if (!response.ok) { - throw new Error(`Backend error: ${response.status}`); + throw await toHttpResponseError(response); } + resumeAccepted = true; const flushMessages = () => { setMessages((prev) => @@ -1415,9 +1524,7 @@ export default function NewChatPage() { break; case "error": - throw Object.assign(new Error(parsed.errorText || "Server error"), { - errorCode: parsed.errorCode, - }); + throw toStreamTerminalError(parsed); } } @@ -1435,14 +1542,12 @@ export default function NewChatPage() { } } catch (error) { batcher.dispose(); - if (error instanceof Error && error.name === "AbortError") { - return; - } - await handleChatFailure({ + await handleStreamTerminalError({ error, flow: "resume", threadId: resumeThreadId, assistantMsgId, + accepted: resumeAccepted, }); } finally { setIsRunning(false); @@ -1455,7 +1560,7 @@ export default function NewChatPage() { searchSpaceId, tokenUsageStore, toolsWithUI, - handleChatFailure, + handleStreamTerminalError, persistAssistantTurn, ] ); @@ -1644,7 +1749,7 @@ export default function NewChatPage() { }); if (!response.ok) { - throw new Error(`Backend error: ${response.status}`); + throw await toHttpResponseError(response); } regenerateAccepted = true; @@ -1741,9 +1846,7 @@ export default function NewChatPage() { break; case "error": - throw Object.assign(new Error(parsed.errorText || "Server error"), { - errorCode: parsed.errorCode, - }); + throw toStreamTerminalError(parsed); } } @@ -1772,25 +1875,25 @@ export default function NewChatPage() { trackChatResponseReceived(searchSpaceId, threadId); } } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - return; - } batcher.dispose(); - if (regenerateAccepted && !userPersisted) { - const persistedUserMsgId = await persistUserTurn({ - threadId, - userMsgId, - content: userContentToPersist, - mentionedDocs: sourceMentionedDocs, - logContext: "regenerated (stream error)", - }); - userPersisted = Boolean(persistedUserMsgId); - } - await handleChatFailure({ + await handleStreamTerminalError({ error, flow: "regenerate", threadId, - assistantMsgId: regenerateAccepted ? assistantMsgId : "no-persist-assistant", + assistantMsgId, + accepted: regenerateAccepted, + onAcceptedStreamError: async () => { + if (!userPersisted) { + const persistedUserMsgId = await persistUserTurn({ + threadId, + userMsgId, + content: userContentToPersist, + mentionedDocs: sourceMentionedDocs, + logContext: "regenerated (stream error)", + }); + userPersisted = Boolean(persistedUserMsgId); + } + }, }); } finally { setIsRunning(false); @@ -1806,7 +1909,7 @@ export default function NewChatPage() { setMessageDocumentsMap, tokenUsageStore, toolsWithUI, - handleChatFailure, + handleStreamTerminalError, persistAssistantTurn, persistUserTurn, ] diff --git a/surfsense_web/components/free-chat/anonymous-chat.tsx b/surfsense_web/components/free-chat/anonymous-chat.tsx index b286c5316..3de2ca434 100644 --- a/surfsense_web/components/free-chat/anonymous-chat.tsx +++ b/surfsense_web/components/free-chat/anonymous-chat.tsx @@ -104,7 +104,13 @@ export function AnonymousChat({ model }: AnonymousChatProps) { setMessages((prev) => prev.filter((m) => m.id !== assistantId)); return; } - throw new Error(`Stream error: ${response.status}`); + const body = await response.text().catch(() => ""); + const errorCode = response.status === 409 ? "THREAD_BUSY" : "SERVER_ERROR"; + const message = + errorCode === "THREAD_BUSY" + ? "A previous response is still stopping. Please try again in a moment." + : `Stream error: ${response.status}`; + throw Object.assign(new Error(body || message), { errorCode }); } for await (const event of readSSEStream(response)) { @@ -115,10 +121,12 @@ export function AnonymousChat({ model }: AnonymousChatProps) { prev.map((m) => (m.id === assistantId ? { ...m, content: m.content + event.delta } : m)) ); } else if (event.type === "error") { + const message = + event.errorCode === "THREAD_BUSY" + ? "A previous response is still stopping. Please try again in a moment." + : event.errorText; setMessages((prev) => - prev.map((m) => - m.id === assistantId ? { ...m, content: m.content || event.errorText } : m - ) + prev.map((m) => (m.id === assistantId ? { ...m, content: m.content || message } : m)) ); } else if ("type" in event && event.type === "data-token-usage") { // After streaming completes, refresh quota diff --git a/surfsense_web/components/free-chat/free-chat-page.tsx b/surfsense_web/components/free-chat/free-chat-page.tsx index deac1fd00..dd6693b35 100644 --- a/surfsense_web/components/free-chat/free-chat-page.tsx +++ b/surfsense_web/components/free-chat/free-chat-page.tsx @@ -48,6 +48,48 @@ function parseCaptchaError(status: number, body: string): string | null { return null; } +function normalizeFreeChatErrorMessage(error: unknown): string { + if (!(error instanceof Error)) return "An unexpected error occurred"; + const code = (error as Error & { errorCode?: string }).errorCode; + if (code === "THREAD_BUSY") { + return "A previous response is still stopping. Please try again in a moment."; + } + return error.message || "An unexpected error occurred"; +} + +function toFreeChatHttpError(status: number, body: string): Error & { errorCode?: string } { + let errorCode: string | undefined; + let message = body || `Server error: ${status}`; + try { + const parsed = JSON.parse(body) as Record; + const detail = + typeof parsed.detail === "object" && parsed.detail !== null + ? (parsed.detail as Record) + : null; + errorCode = + (typeof detail?.error_code === "string" ? detail.error_code : undefined) ?? + (typeof detail?.errorCode === "string" ? detail.errorCode : undefined) ?? + (typeof parsed.error_code === "string" ? parsed.error_code : undefined) ?? + (typeof parsed.errorCode === "string" ? parsed.errorCode : undefined); + message = + (typeof detail?.message === "string" ? detail.message : undefined) ?? + (typeof parsed.message === "string" ? parsed.message : undefined) ?? + (typeof parsed.detail === "string" ? parsed.detail : undefined) ?? + message; + } catch { + // non-json response + } + + if (!errorCode) { + if (status === 409) errorCode = "THREAD_BUSY"; + else if (status === 429) errorCode = "RATE_LIMITED"; + else if (status === 401 || status === 403) errorCode = "AUTH_EXPIRED"; + else errorCode = "SERVER_ERROR"; + } + + return Object.assign(new Error(message), { errorCode }); +} + export function FreeChatPage() { const anonMode = useAnonymousMode(); const modelSlug = anonMode.isAnonymous ? anonMode.modelSlug : ""; @@ -117,7 +159,7 @@ export function FreeChatPage() { const body = await response.text().catch(() => ""); const captchaCode = parseCaptchaError(response.status, body); if (captchaCode) return "captcha"; - throw new Error(body || `Server error: ${response.status}`); + throw toFreeChatHttpError(response.status, body); } const currentThinkingSteps = new Map(); @@ -187,7 +229,9 @@ export function FreeChatPage() { break; case "error": - throw new Error(parsed.errorText || "Server error"); + throw Object.assign(new Error(parsed.errorText || "Server error"), { + errorCode: parsed.errorCode, + }); } } batcher.flush(); @@ -277,7 +321,7 @@ export function FreeChatPage() { } catch (error) { if (error instanceof Error && error.name === "AbortError") return; console.error("[FreeChatPage] Chat error:", error); - const errorText = error instanceof Error ? error.message : "An unexpected error occurred"; + const errorText = normalizeFreeChatErrorMessage(error); setMessages((prev) => prev.map((m) => m.id === assistantMsgId @@ -336,7 +380,7 @@ export function FreeChatPage() { } catch (error) { if (error instanceof Error && error.name === "AbortError") return; console.error("[FreeChatPage] Retry error:", error); - const errorText = error instanceof Error ? error.message : "An unexpected error occurred"; + const errorText = normalizeFreeChatErrorMessage(error); setMessages((prev) => prev.map((m) => m.id === assistantMsgId diff --git a/surfsense_web/lib/chat/chat-error-classifier.ts b/surfsense_web/lib/chat/chat-error-classifier.ts index dc9bb09df..4341f7dc5 100644 --- a/surfsense_web/lib/chat/chat-error-classifier.ts +++ b/surfsense_web/lib/chat/chat-error-classifier.ts @@ -2,6 +2,7 @@ export type ChatFlow = "new" | "resume" | "regenerate"; export type ChatErrorKind = | "premium_quota_exhausted" + | "thread_busy" | "auth_expired" | "rate_limited" | "network_offline" @@ -144,6 +145,22 @@ export function classifyChatError(input: RawChatErrorInput): NormalizedChatError }; } + if ( + errorCode === "THREAD_BUSY" + ) { + return { + kind: "thread_busy", + channel: "toast", + severity: "warn", + telemetryEvent: "chat_blocked", + isExpected: true, + userMessage: "A previous response is still stopping. Please try again in a moment.", + rawMessage, + errorCode: errorCode ?? "THREAD_BUSY", + details: { flow: input.flow }, + }; + } + if ( errorCode === "AUTH_EXPIRED" || errorCode === "UNAUTHORIZED"