diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index b5560d90d..0189dd139 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -1524,6 +1524,7 @@ async def regenerate_response( filesystem_selection=filesystem_selection, request_id=getattr(http_request.state, "request_id", "unknown"), user_image_data_urls=regenerate_image_urls or None, + flow="regenerate", ): yield chunk streaming_completed = True diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index ecc727b47..a0be55c1b 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -19,7 +19,7 @@ import re import time from collections.abc import AsyncGenerator from dataclasses import dataclass, field -from typing import Any +from typing import Any, Literal from uuid import UUID import anyio @@ -253,6 +253,98 @@ def _log_file_contract(stage: str, result: StreamResult, **extra: Any) -> None: ) +def _log_chat_stream_error( + *, + flow: Literal["new", "resume", "regenerate"], + error_kind: str, + error_code: str | None, + severity: Literal["info", "warn", "error"], + is_expected: bool, + request_id: str | None, + thread_id: int | None, + search_space_id: int | None, + user_id: str | None, + message: str, + extra: dict[str, Any] | None = None, +) -> None: + payload: dict[str, Any] = { + "event": "chat_stream_error", + "flow": flow, + "error_kind": error_kind, + "error_code": error_code, + "severity": severity, + "is_expected": is_expected, + "request_id": request_id or "unknown", + "thread_id": thread_id, + "search_space_id": search_space_id, + "user_id": user_id, + "message": message, + } + if extra: + payload.update(extra) + + logger = logging.getLogger(__name__) + rendered = json.dumps(payload, ensure_ascii=False) + if severity == "error": + logger.error("[chat_stream_error] %s", rendered) + elif severity == "warn": + logger.warning("[chat_stream_error] %s", rendered) + else: + logger.info("[chat_stream_error] %s", rendered) + + +def _parse_error_payload(message: str) -> dict[str, Any] | None: + candidates = [message] + first_brace_idx = message.find("{") + if first_brace_idx >= 0: + candidates.append(message[first_brace_idx:]) + + for candidate in candidates: + try: + parsed = json.loads(candidate) + if isinstance(parsed, dict): + return parsed + except Exception: + continue + return None + + +def _classify_stream_exception( + exc: Exception, + *, + flow_label: str, +) -> tuple[str, str, Literal["info", "warn", "error"], bool, str]: + raw = str(exc) + parsed = _parse_error_payload(raw) + provider_error_type = "" + if parsed: + top_type = parsed.get("type") + if isinstance(top_type, str): + provider_error_type = top_type.lower() + nested = parsed.get("error") + if isinstance(nested, dict): + nested_type = nested.get("type") + if isinstance(nested_type, str): + provider_error_type = nested_type.lower() + + if provider_error_type == "rate_limit_error": + return ( + "rate_limited", + "RATE_LIMITED", + "warn", + True, + "This model is temporarily rate-limited. Please try again in a few seconds or switch models.", + ) + + return ( + "server_error", + "SERVER_ERROR", + "error", + False, + f"Error during {flow_label}: {raw}", + ) + + async def _stream_agent_events( agent: Any, config: dict[str, Any], @@ -1397,6 +1489,7 @@ async def stream_new_chat( filesystem_selection: FilesystemSelection | None = None, request_id: str | None = None, user_image_data_urls: list[str] | None = None, + flow: Literal["new", "regenerate"] = "new", ) -> AsyncGenerator[str, None]: """ Stream chat responses from the new SurfSense deep agent. @@ -1448,6 +1541,30 @@ async def stream_new_chat( _premium_reserved = 0 _premium_request_id: str | None = None + def _emit_stream_error( + *, + message: str, + error_kind: str = "server_error", + error_code: str = "SERVER_ERROR", + severity: Literal["info", "warn", "error"] = "error", + is_expected: bool = False, + extra: dict[str, Any] | None = None, + ) -> str: + _log_chat_stream_error( + flow=flow, + error_kind=error_kind, + error_code=error_code, + severity=severity, + is_expected=is_expected, + request_id=request_id, + thread_id=chat_id, + search_space_id=search_space_id, + user_id=user_id, + message=message, + extra=extra, + ) + return streaming_service.format_error(message, error_code=error_code) + session = async_session_maker() try: # Mark AI as responding to this user for live collaboration @@ -1499,13 +1616,21 @@ async def stream_new_chat( ) ).resolved_llm_config_id except ValueError as pin_error: - yield streaming_service.format_error(str(pin_error)) + yield _emit_stream_error( + message=str(pin_error), + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return llm, agent_config, llm_load_error = await _load_llm_bundle(llm_config_id) if llm_load_error: - yield streaming_service.format_error(llm_load_error) + yield _emit_stream_error( + message=llm_load_error, + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return _perf_log.info( @@ -1541,13 +1666,6 @@ async def stream_new_chat( ) _premium_reserved = reserve_amount if not quota_result.allowed: - logging.getLogger(__name__).info( - "premium_quota_blocked_pinned_model thread_id=%s search_space_id=%s user_id=%s resolved_config_id=%s", - chat_id, - search_space_id, - user_id, - llm_config_id, - ) if requested_llm_config_id == 0: try: llm_config_id = ( @@ -1561,34 +1679,66 @@ async def stream_new_chat( ) ).resolved_llm_config_id except ValueError as pin_error: - yield streaming_service.format_error(str(pin_error)) + yield _emit_stream_error( + message=str(pin_error), + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return llm, agent_config, llm_load_error = await _load_llm_bundle(llm_config_id) if llm_load_error: - yield streaming_service.format_error(llm_load_error) + yield _emit_stream_error( + message=llm_load_error, + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return _premium_request_id = None _premium_reserved = 0 - logging.getLogger(__name__).info( - "premium_quota_auto_fallback_to_free thread_id=%s search_space_id=%s user_id=%s fallback_config_id=%s", - chat_id, - search_space_id, - user_id, - llm_config_id, + _log_chat_stream_error( + flow=flow, + error_kind="premium_quota_exhausted", + error_code="PREMIUM_QUOTA_EXHAUSTED", + severity="info", + is_expected=True, + request_id=request_id, + thread_id=chat_id, + search_space_id=search_space_id, + user_id=user_id, + message=( + "Premium quota exhausted on pinned model; auto-fallback switched to a free model" + ), + extra={ + "fallback_config_id": llm_config_id, + "auto_fallback": True, + }, ) else: - yield streaming_service.format_error( - "Buy more tokens to continue with this model, or switch to a free model", + yield _emit_stream_error( + message=( + "Buy more tokens to continue with this model, or switch to a free model" + ), + error_kind="premium_quota_exhausted", error_code="PREMIUM_QUOTA_EXHAUSTED", + severity="info", + is_expected=True, + extra={ + "resolved_config_id": llm_config_id, + "auto_fallback": False, + }, ) yield streaming_service.format_done() return if not llm: - yield streaming_service.format_error("Failed to create LLM instance") + yield _emit_stream_error( + message="Failed to create LLM instance", + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return @@ -2097,12 +2247,25 @@ async def stream_new_chat( # Handle any errors import traceback + ( + error_kind, + error_code, + severity, + is_expected, + user_message, + ) = _classify_stream_exception(e, flow_label="chat") error_message = f"Error during chat: {e!s}" print(f"[stream_new_chat] {error_message}") print(f"[stream_new_chat] Exception type: {type(e).__name__}") print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}") - yield streaming_service.format_error(error_message) + yield _emit_stream_error( + message=user_message, + error_kind=error_kind, + error_code=error_code, + severity=severity, + is_expected=is_expected, + ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() yield streaming_service.format_done() @@ -2217,6 +2380,30 @@ async def stream_resume_chat( accumulator = start_turn() + def _emit_stream_error( + *, + message: str, + error_kind: str = "server_error", + error_code: str = "SERVER_ERROR", + severity: Literal["info", "warn", "error"] = "error", + is_expected: bool = False, + extra: dict[str, Any] | None = None, + ) -> str: + _log_chat_stream_error( + flow="resume", + error_kind=error_kind, + error_code=error_code, + severity=severity, + is_expected=is_expected, + request_id=request_id, + thread_id=chat_id, + search_space_id=search_space_id, + user_id=user_id, + message=message, + extra=extra, + ) + return streaming_service.format_error(message, error_code=error_code) + session = async_session_maker() try: if user_id: @@ -2267,13 +2454,21 @@ async def stream_resume_chat( ) ).resolved_llm_config_id except ValueError as pin_error: - yield streaming_service.format_error(str(pin_error)) + yield _emit_stream_error( + message=str(pin_error), + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return llm, agent_config, llm_load_error = await _load_llm_bundle(llm_config_id) if llm_load_error: - yield streaming_service.format_error(llm_load_error) + yield _emit_stream_error( + message=llm_load_error, + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return _perf_log.info( @@ -2309,13 +2504,6 @@ async def stream_resume_chat( ) _resume_premium_reserved = reserve_amount if not quota_result.allowed: - logging.getLogger(__name__).info( - "premium_quota_blocked_pinned_model thread_id=%s search_space_id=%s user_id=%s resolved_config_id=%s", - chat_id, - search_space_id, - user_id, - llm_config_id, - ) if requested_llm_config_id == 0: try: llm_config_id = ( @@ -2329,34 +2517,66 @@ async def stream_resume_chat( ) ).resolved_llm_config_id except ValueError as pin_error: - yield streaming_service.format_error(str(pin_error)) + yield _emit_stream_error( + message=str(pin_error), + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return llm, agent_config, llm_load_error = await _load_llm_bundle(llm_config_id) if llm_load_error: - yield streaming_service.format_error(llm_load_error) + yield _emit_stream_error( + message=llm_load_error, + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return _resume_premium_request_id = None _resume_premium_reserved = 0 - logging.getLogger(__name__).info( - "premium_quota_auto_fallback_to_free thread_id=%s search_space_id=%s user_id=%s fallback_config_id=%s", - chat_id, - search_space_id, - user_id, - llm_config_id, + _log_chat_stream_error( + flow="resume", + error_kind="premium_quota_exhausted", + error_code="PREMIUM_QUOTA_EXHAUSTED", + severity="info", + is_expected=True, + request_id=request_id, + thread_id=chat_id, + search_space_id=search_space_id, + user_id=user_id, + message=( + "Premium quota exhausted on pinned model; auto-fallback switched to a free model" + ), + extra={ + "fallback_config_id": llm_config_id, + "auto_fallback": True, + }, ) else: - yield streaming_service.format_error( - "Buy more tokens to continue with this model, or switch to a free model", + yield _emit_stream_error( + message=( + "Buy more tokens to continue with this model, or switch to a free model" + ), + error_kind="premium_quota_exhausted", error_code="PREMIUM_QUOTA_EXHAUSTED", + severity="info", + is_expected=True, + extra={ + "resolved_config_id": llm_config_id, + "auto_fallback": False, + }, ) yield streaming_service.format_done() return if not llm: - yield streaming_service.format_error("Failed to create LLM instance") + yield _emit_stream_error( + message="Failed to create LLM instance", + error_kind="server_error", + error_code="SERVER_ERROR", + ) yield streaming_service.format_done() return @@ -2528,10 +2748,23 @@ async def stream_resume_chat( except Exception as e: import traceback + ( + error_kind, + error_code, + severity, + is_expected, + user_message, + ) = _classify_stream_exception(e, flow_label="resume") error_message = f"Error during resume: {e!s}" print(f"[stream_resume_chat] {error_message}") print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}") - yield streaming_service.format_error(error_message) + yield _emit_stream_error( + message=user_message, + error_kind=error_kind, + error_code=error_code, + severity=severity, + is_expected=is_expected, + ) yield streaming_service.format_finish_step() yield streaming_service.format_finish() yield streaming_service.format_done() diff --git a/surfsense_backend/tests/unit/test_stream_new_chat_contract.py b/surfsense_backend/tests/unit/test_stream_new_chat_contract.py index 034aa484c..1f8168837 100644 --- a/surfsense_backend/tests/unit/test_stream_new_chat_contract.py +++ b/surfsense_backend/tests/unit/test_stream_new_chat_contract.py @@ -1,9 +1,18 @@ +import inspect +import json +import logging +from pathlib import Path +import re + import pytest +import app.tasks.chat.stream_new_chat as stream_new_chat_module from app.tasks.chat.stream_new_chat import ( StreamResult, + _classify_stream_exception, _contract_enforcement_active, _evaluate_file_contract_outcome, + _log_chat_stream_error, _tool_output_has_error, ) @@ -45,3 +54,113 @@ def test_contract_enforcement_local_only(): result.filesystem_mode = "cloud" assert not _contract_enforcement_active(result) + + +def _extract_chat_stream_payload(record_message: str) -> dict: + prefix = "[chat_stream_error] " + assert record_message.startswith(prefix) + return json.loads(record_message[len(prefix) :]) + + +def test_unified_chat_stream_error_log_schema(caplog): + with caplog.at_level(logging.INFO, logger="app.tasks.chat.stream_new_chat"): + _log_chat_stream_error( + flow="new", + error_kind="server_error", + error_code="SERVER_ERROR", + severity="warn", + is_expected=False, + request_id="req-123", + thread_id=101, + search_space_id=202, + user_id="user-1", + message="Error during chat: boom", + ) + + record = next(r for r in caplog.records if "[chat_stream_error]" in r.message) + payload = _extract_chat_stream_payload(record.message) + + required_keys = { + "event", + "flow", + "error_kind", + "error_code", + "severity", + "is_expected", + "request_id", + "thread_id", + "search_space_id", + "user_id", + "message", + } + assert required_keys.issubset(payload.keys()) + assert payload["event"] == "chat_stream_error" + assert payload["flow"] == "new" + assert payload["error_code"] == "SERVER_ERROR" + + +def test_premium_quota_uses_unified_chat_stream_log_shape(caplog): + with caplog.at_level(logging.INFO, logger="app.tasks.chat.stream_new_chat"): + _log_chat_stream_error( + flow="resume", + error_kind="premium_quota_exhausted", + error_code="PREMIUM_QUOTA_EXHAUSTED", + severity="info", + is_expected=True, + request_id="req-premium", + thread_id=303, + search_space_id=404, + user_id="user-2", + message="Buy more tokens to continue with this model, or switch to a free model", + extra={"auto_fallback": False}, + ) + + record = next(r for r in caplog.records if "[chat_stream_error]" in r.message) + payload = _extract_chat_stream_payload(record.message) + assert payload["event"] == "chat_stream_error" + assert payload["error_kind"] == "premium_quota_exhausted" + assert payload["error_code"] == "PREMIUM_QUOTA_EXHAUSTED" + assert payload["flow"] == "resume" + assert payload["is_expected"] is True + assert payload["auto_fallback"] is False + + +def test_stream_error_emission_keeps_machine_error_codes(): + source = inspect.getsource(stream_new_chat_module) + format_error_calls = re.findall(r"format_error\(", source) + emitted_error_codes = set(re.findall(r'error_code="([A-Z_]+)"', source)) + + # Both new/resume stream paths now route through local emitters that always + # pass a machine-readable error_code. + assert len(format_error_calls) == 2 + assert { + "PREMIUM_QUOTA_EXHAUSTED", + "SERVER_ERROR", + }.issubset(emitted_error_codes) + assert 'flow: Literal["new", "regenerate"] = "new"' in source + assert "flow=flow" in source + assert 'flow="resume"' in source + + +def test_stream_exception_classifies_rate_limited(): + exc = Exception( + '{"error":{"type":"rate_limit_error","message":"Rate limited. Please try again later."}}' + ) + kind, code, severity, is_expected, user_message = _classify_stream_exception( + exc, flow_label="chat" + ) + assert kind == "rate_limited" + assert code == "RATE_LIMITED" + assert severity == "warn" + assert is_expected is True + assert "temporarily rate-limited" in user_message + + +def test_premium_classification_is_error_code_driven(): + classifier_path = Path(__file__).resolve().parents[3] / "surfsense_web/lib/chat/chat-error-classifier.ts" + source = classifier_path.read_text(encoding="utf-8") + + assert "PREMIUM_KEYWORDS" not in source + assert "RATE_LIMIT_KEYWORDS" not in source + assert "normalized.includes(" not in source + assert 'if (errorCode === "PREMIUM_QUOTA_EXHAUSTED") {' in source diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index a2985ab0c..ffd58e660 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -49,6 +49,10 @@ import { useMessagesSync } from "@/hooks/use-messages-sync"; import { getAgentFilesystemSelection } from "@/lib/agent-filesystem"; import { documentsApiService } from "@/lib/apis/documents-api.service"; import { getBearerToken } from "@/lib/auth-utils"; +import { + classifyChatError, + type ChatFlow, +} from "@/lib/chat/chat-error-classifier"; import { convertToThreadMessage } from "@/lib/chat/message-utils"; import { isPodcastGenerating, @@ -84,7 +88,8 @@ import { import { NotFoundError } from "@/lib/error"; import { trackChatCreated, - trackChatError, + trackChatBlocked, + trackChatErrorDetailed, trackChatMessageSent, trackChatResponseReceived, } from "@/lib/posthog/events"; @@ -201,26 +206,6 @@ const BASE_TOOLS_WITH_UI = new Set([ // "write_todos", // Disabled for now ]); -const PREMIUM_QUOTA_ASSISTANT_MESSAGE = - "I can’t continue with the current premium model because your premium tokens are exhausted. Switch to a free model or buy more tokens to continue."; - -function getPinnedPremiumQuotaErrorMessage(error: unknown): string | null { - if (!(error instanceof Error)) return null; - const withCode = error as Error & { errorCode?: string }; - if (withCode.errorCode === "PREMIUM_QUOTA_EXHAUSTED") { - return error.message; - } - const normalized = error.message.toLowerCase(); - if ( - !normalized.includes("premium tokens exhausted") - && !normalized.includes("premium token quota exceeded") - && !normalized.includes("buy more tokens") - ) { - return null; - } - return error.message; -} - export default function NewChatPage() { const params = useParams(); const queryClient = useQueryClient(); @@ -378,6 +363,81 @@ export default function NewChatPage() { return Number.isNaN(parsed) ? 0 : parsed; }, [params.chat_id]); + const handleChatFailure = useCallback( + async ({ + error, + flow, + threadId, + assistantMsgId, + }: { + error: unknown; + flow: ChatFlow; + threadId: number | null; + assistantMsgId: string; + }) => { + const normalized = classifyChatError({ + error, + flow, + context: { + searchSpaceId, + threadId, + }, + }); + + const logger = + normalized.severity === "error" + ? console.error + : normalized.severity === "warn" + ? console.warn + : console.info; + logger(`[NewChatPage] ${flow} ${normalized.kind}:`, error); + + const telemetryPayload = { + flow, + kind: normalized.kind, + error_code: normalized.errorCode, + severity: normalized.severity, + is_expected: normalized.isExpected, + message: normalized.userMessage, + }; + if (normalized.telemetryEvent === "chat_blocked") { + trackChatBlocked(searchSpaceId, threadId, telemetryPayload); + } else { + trackChatErrorDetailed(searchSpaceId, threadId, telemetryPayload); + } + + if (normalized.channel === "silent") { + return; + } + + if (normalized.channel === "pinned_inline") { + if (threadId) { + setPremiumAlertForThread({ + threadId, + message: normalized.userMessage, + userId: currentUser?.id ?? null, + }); + } + if (normalized.assistantMessage) { + await persistAssistantErrorMessage({ + threadId, + assistantMsgId, + text: normalized.assistantMessage, + }); + } + return; + } + + toast.error(normalized.userMessage); + }, + [ + currentUser?.id, + persistAssistantErrorMessage, + searchSpaceId, + setPremiumAlertForThread, + ] + ); + // Initialize thread and load messages // For new chats (no urlChatId), we use lazy creation - thread is created on first message const initializeThread = useCallback(async () => { @@ -1018,36 +1078,11 @@ export default function NewChatPage() { } return; } - const premiumQuotaAlertMessage = getPinnedPremiumQuotaErrorMessage(error); - if (premiumQuotaAlertMessage) { - console.info("[NewChatPage] Premium quota exhausted:", error); - } else { - console.error("[NewChatPage] Chat error:", error); - } - - // Track chat error - trackChatError( - searchSpaceId, - currentThreadId, - error instanceof Error ? error.message : "Unknown error" - ); - - if (premiumQuotaAlertMessage) { - setPremiumAlertForThread({ - threadId: currentThreadId, - message: premiumQuotaAlertMessage, - userId: currentUser?.id ?? null, - }); - } else { - toast.error("Failed to get response. Please try again."); - } - await persistAssistantErrorMessage({ + await handleChatFailure({ + error, + flow: "new", threadId: currentThreadId, assistantMsgId, - text: - (premiumQuotaAlertMessage - ? PREMIUM_QUOTA_ASSISTANT_MESSAGE - : undefined) ?? "Sorry, there was an error. Please try again.", }); } finally { setIsRunning(false); @@ -1071,8 +1106,7 @@ export default function NewChatPage() { pendingUserImageUrls, setPendingUserImageUrls, toolsWithUI, - setPremiumAlertForThread, - persistAssistantErrorMessage, + handleChatFailure, ] ); @@ -1333,28 +1367,11 @@ export default function NewChatPage() { if (error instanceof Error && error.name === "AbortError") { return; } - const premiumQuotaAlertMessage = getPinnedPremiumQuotaErrorMessage(error); - if (premiumQuotaAlertMessage) { - console.info("[NewChatPage] Premium quota exhausted during resume:", error); - } else { - console.error("[NewChatPage] Resume error:", error); - } - if (premiumQuotaAlertMessage) { - setPremiumAlertForThread({ - threadId: resumeThreadId, - message: premiumQuotaAlertMessage, - userId: currentUser?.id ?? null, - }); - } else { - toast.error("Failed to resume. Please try again."); - } - await persistAssistantErrorMessage({ + await handleChatFailure({ + error, + flow: "resume", threadId: resumeThreadId, assistantMsgId, - text: - (premiumQuotaAlertMessage - ? PREMIUM_QUOTA_ASSISTANT_MESSAGE - : undefined) ?? "Sorry, there was an error. Please try again.", }); } finally { setIsRunning(false); @@ -1365,11 +1382,9 @@ export default function NewChatPage() { pendingInterrupt, messages, searchSpaceId, - currentUser?.id, tokenUsageStore, toolsWithUI, - setPremiumAlertForThread, - persistAssistantErrorMessage, + handleChatFailure, ] ); @@ -1491,15 +1506,6 @@ export default function NewChatPage() { userQueryToDisplay = newUserQuery; } - // Remove the last two messages (user + assistant) from the UI immediately - // The backend will also delete them from the database - setMessages((prev) => { - if (prev.length >= 2) { - return prev.slice(0, -2); - } - return prev; - }); - // Start streaming setIsRunning(true); const controller = new AbortController(); @@ -1530,19 +1536,9 @@ export default function NewChatPage() { createdAt: new Date(), metadata: isEdit ? undefined : originalUserMessageMetadata, }; - setMessages((prev) => [...prev, userMessage]); - - // Add placeholder assistant message - setMessages((prev) => [ - ...prev, - { - id: assistantMsgId, - role: "assistant", - content: [{ type: "text", text: "" }], - createdAt: new Date(), - }, - ]); - + const userContentToPersist = isEdit + ? (editExtras?.userMessageContent ?? [{ type: "text", text: newUserQuery ?? "" }]) + : originalUserMessageContent || [{ type: "text", text: userQueryToDisplay || "" }]; try { const selection = await getAgentFilesystemSelection(searchSpaceId); const requestBody: Record = { @@ -1570,6 +1566,22 @@ export default function NewChatPage() { throw new Error(`Backend error: ${response.status}`); } + // Only switch UI to regenerated placeholder messages after the backend accepts + // regenerate. This avoids local message loss when regenerate fails early (e.g. 400). + setMessages((prev) => { + const base = prev.length >= 2 ? prev.slice(0, -2) : prev; + return [ + ...base, + userMessage, + { + id: assistantMsgId, + role: "assistant", + content: [{ type: "text", text: "" }], + createdAt: new Date(), + }, + ]; + }); + const flushMessages = () => { setMessages((prev) => prev.map((m) => @@ -1654,10 +1666,6 @@ export default function NewChatPage() { if (contentParts.length > 0) { try { // Persist user message (for both edit and reload modes, since backend deleted it) - const userContentToPersist = isEdit - ? (editExtras?.userMessageContent ?? [{ type: "text", text: newUserQuery ?? "" }]) - : originalUserMessageContent || [{ type: "text", text: userQueryToDisplay || "" }]; - const savedUserMessage = await appendMessage(threadId, { role: "user", content: userContentToPersist, @@ -1692,33 +1700,11 @@ export default function NewChatPage() { return; } batcher.dispose(); - const premiumQuotaAlertMessage = getPinnedPremiumQuotaErrorMessage(error); - if (premiumQuotaAlertMessage) { - console.info("[NewChatPage] Premium quota exhausted during regeneration:", error); - } else { - console.error("[NewChatPage] Regeneration error:", error); - } - trackChatError( - searchSpaceId, - threadId, - error instanceof Error ? error.message : "Unknown error" - ); - if (premiumQuotaAlertMessage) { - setPremiumAlertForThread({ - threadId, - message: premiumQuotaAlertMessage, - userId: currentUser?.id ?? null, - }); - } else { - toast.error("Failed to regenerate response. Please try again."); - } - await persistAssistantErrorMessage({ + await handleChatFailure({ + error, + flow: "regenerate", threadId, assistantMsgId, - text: - (premiumQuotaAlertMessage - ? PREMIUM_QUOTA_ASSISTANT_MESSAGE - : undefined) ?? "Sorry, there was an error. Please try again.", }); } finally { setIsRunning(false); @@ -1730,11 +1716,9 @@ export default function NewChatPage() { searchSpaceId, messages, disabledTools, - currentUser?.id, tokenUsageStore, toolsWithUI, - setPremiumAlertForThread, - persistAssistantErrorMessage, + handleChatFailure, ] ); diff --git a/surfsense_web/lib/chat/chat-error-classifier.ts b/surfsense_web/lib/chat/chat-error-classifier.ts new file mode 100644 index 000000000..dc9bb09df --- /dev/null +++ b/surfsense_web/lib/chat/chat-error-classifier.ts @@ -0,0 +1,273 @@ +export type ChatFlow = "new" | "resume" | "regenerate"; + +export type ChatErrorKind = + | "premium_quota_exhausted" + | "auth_expired" + | "rate_limited" + | "network_offline" + | "stream_interrupted" + | "stream_parse_error" + | "tool_execution_error" + | "persist_message_failed" + | "server_error" + | "unknown"; + +export type ChatErrorChannel = "pinned_inline" | "toast" | "silent"; +export type ChatTelemetryEvent = "chat_blocked" | "chat_error"; +export type ChatErrorSeverity = "info" | "warn" | "error"; + +export interface NormalizedChatError { + kind: ChatErrorKind; + channel: ChatErrorChannel; + severity: ChatErrorSeverity; + telemetryEvent: ChatTelemetryEvent; + isExpected: boolean; + userMessage: string; + assistantMessage?: string; + rawMessage?: string; + errorCode?: string; + details?: Record; +} + +export interface RawChatErrorInput { + error: unknown; + flow: ChatFlow; + context?: { + searchSpaceId?: number; + threadId?: number | null; + }; +} + +export const PREMIUM_QUOTA_ASSISTANT_MESSAGE = + "I can’t continue with the current premium model because your premium tokens are exhausted. Switch to a free model or buy more tokens to continue."; + +function getErrorMessage(error: unknown): string { + if (error instanceof Error) return error.message; + if (typeof error === "string") return error; + try { + return JSON.stringify(error); + } catch { + return "Unknown error"; + } +} + +function getErrorCode(error: unknown, parsedJson: Record | null): string | undefined { + if (error instanceof Error) { + const withCode = error as Error & { errorCode?: string }; + if (withCode.errorCode) return withCode.errorCode; + } + + if (typeof error === "object" && error !== null) { + const withCode = error as { errorCode?: unknown }; + if (typeof withCode.errorCode === "string" && withCode.errorCode) { + return withCode.errorCode; + } + } + + if (parsedJson) { + const topLevelCode = parsedJson.errorCode; + if (typeof topLevelCode === "string" && topLevelCode) { + return topLevelCode; + } + } + + return undefined; +} + +function parseEmbeddedJson(text: string): Record | null { + const candidates = [text]; + const firstBraceIdx = text.indexOf("{"); + if (firstBraceIdx >= 0) { + candidates.push(text.slice(firstBraceIdx)); + } + for (const candidate of candidates) { + try { + const parsed = JSON.parse(candidate); + if (typeof parsed === "object" && parsed !== null) { + return parsed as Record; + } + } catch { + // noop + } + } + return null; +} + +function inferProviderErrorType(parsedJson: Record | null): string | undefined { + if (!parsedJson) return undefined; + const topLevelType = parsedJson.type; + if (typeof topLevelType === "string" && topLevelType) return topLevelType; + const nestedError = parsedJson.error; + if (typeof nestedError === "object" && nestedError !== null) { + const nestedType = (nestedError as Record).type; + if (typeof nestedType === "string" && nestedType) return nestedType; + } + return undefined; +} + +export function classifyChatError(input: RawChatErrorInput): NormalizedChatError { + const { error } = input; + const rawMessage = getErrorMessage(error); + const parsedJson = parseEmbeddedJson(rawMessage); + const errorCode = getErrorCode(error, parsedJson); + const providerErrorType = inferProviderErrorType(parsedJson); + const providerTypeNormalized = providerErrorType?.toLowerCase() ?? ""; + const errorName = error instanceof Error ? error.name : undefined; + + if (errorName === "AbortError") { + return { + kind: "stream_interrupted", + channel: "silent", + severity: "info", + telemetryEvent: "chat_error", + isExpected: true, + userMessage: "Request canceled.", + rawMessage, + errorCode, + details: { flow: input.flow }, + }; + } + + if (errorCode === "PREMIUM_QUOTA_EXHAUSTED") { + return { + kind: "premium_quota_exhausted", + channel: "pinned_inline", + severity: "info", + telemetryEvent: "chat_blocked", + isExpected: true, + userMessage: + "Buy more tokens to continue with this model, or switch to a free model.", + assistantMessage: PREMIUM_QUOTA_ASSISTANT_MESSAGE, + rawMessage, + errorCode: errorCode ?? "PREMIUM_QUOTA_EXHAUSTED", + details: { flow: input.flow }, + }; + } + + if ( + errorCode === "AUTH_EXPIRED" || + errorCode === "UNAUTHORIZED" + ) { + return { + kind: "auth_expired", + channel: "toast", + severity: "warn", + telemetryEvent: "chat_error", + isExpected: true, + userMessage: "Your session expired. Please sign in again.", + rawMessage, + errorCode: errorCode ?? "AUTH_EXPIRED", + details: { flow: input.flow }, + }; + } + + if ( + errorCode === "RATE_LIMITED" || + providerTypeNormalized === "rate_limit_error" + ) { + return { + kind: "rate_limited", + channel: "toast", + severity: "warn", + telemetryEvent: "chat_blocked", + isExpected: true, + userMessage: + "This model is temporarily rate-limited. Please try again in a few seconds or switch models.", + rawMessage, + errorCode: errorCode ?? "RATE_LIMITED", + details: { flow: input.flow, providerErrorType }, + }; + } + + if ( + errorCode === "NETWORK_ERROR" + ) { + return { + kind: "network_offline", + channel: "toast", + severity: "warn", + telemetryEvent: "chat_error", + isExpected: true, + userMessage: "Connection issue detected. Check your internet and try again.", + rawMessage, + errorCode: errorCode ?? "NETWORK_ERROR", + details: { flow: input.flow }, + }; + } + + if ( + errorCode === "STREAM_PARSE_ERROR" + ) { + return { + kind: "stream_parse_error", + channel: "toast", + severity: "error", + telemetryEvent: "chat_error", + isExpected: false, + userMessage: "We hit a response formatting issue. Please try again.", + rawMessage, + errorCode: errorCode ?? "STREAM_PARSE_ERROR", + details: { flow: input.flow }, + }; + } + + if ( + errorCode === "TOOL_EXECUTION_ERROR" + ) { + return { + kind: "tool_execution_error", + channel: "toast", + severity: "error", + telemetryEvent: "chat_error", + isExpected: false, + userMessage: "A tool failed while processing your request. Please try again.", + rawMessage, + errorCode: errorCode ?? "TOOL_EXECUTION_ERROR", + details: { flow: input.flow }, + }; + } + + if ( + errorCode === "PERSIST_MESSAGE_FAILED" + ) { + return { + kind: "persist_message_failed", + channel: "toast", + severity: "error", + telemetryEvent: "chat_error", + isExpected: false, + userMessage: "Response generated, but saving failed. Please retry once.", + rawMessage, + errorCode: errorCode ?? "PERSIST_MESSAGE_FAILED", + details: { flow: input.flow }, + }; + } + + if ( + errorCode === "SERVER_ERROR" + ) { + return { + kind: "server_error", + channel: "toast", + severity: "error", + telemetryEvent: "chat_error", + isExpected: false, + userMessage: "We couldn’t complete this response right now. Please try again.", + rawMessage, + errorCode: errorCode ?? "SERVER_ERROR", + details: { flow: input.flow, providerErrorType }, + }; + } + + return { + kind: "unknown", + channel: "toast", + severity: "error", + telemetryEvent: "chat_error", + isExpected: false, + userMessage: "We couldn’t complete this response right now. Please try again.", + rawMessage, + errorCode, + details: { flow: input.flow, providerErrorType }, + }; +} diff --git a/surfsense_web/lib/posthog/events.ts b/surfsense_web/lib/posthog/events.ts index 34ed3044d..30e58215a 100644 --- a/surfsense_web/lib/posthog/events.ts +++ b/surfsense_web/lib/posthog/events.ts @@ -1,5 +1,6 @@ import posthog from "posthog-js"; import { getConnectorTelemetryMeta } from "@/components/assistant-ui/connector-popup/constants/connector-constants"; +import type { ChatErrorKind, ChatFlow, ChatErrorSeverity } from "@/lib/chat/chat-error-classifier"; /** * PostHog Analytics Event Definitions @@ -139,6 +140,55 @@ export function trackChatError(searchSpaceId: number, chatId: number, error?: st }); } +export interface ChatFailureTelemetry { + flow: ChatFlow; + kind: ChatErrorKind; + error_code?: string; + severity: ChatErrorSeverity; + is_expected: boolean; + message?: string; +} + +export function trackChatBlocked( + searchSpaceId: number, + chatId: number | null, + payload: ChatFailureTelemetry +) { + safeCapture( + "chat_blocked", + compact({ + search_space_id: searchSpaceId, + chat_id: chatId ?? undefined, + flow: payload.flow, + kind: payload.kind, + error_code: payload.error_code, + severity: payload.severity, + is_expected: payload.is_expected, + message: payload.message, + }) + ); +} + +export function trackChatErrorDetailed( + searchSpaceId: number, + chatId: number | null, + payload: ChatFailureTelemetry +) { + safeCapture( + "chat_error", + compact({ + search_space_id: searchSpaceId, + chat_id: chatId ?? undefined, + flow: payload.flow, + kind: payload.kind, + error_code: payload.error_code, + severity: payload.severity, + is_expected: payload.is_expected, + message: payload.message, + }) + ); +} + /** * Track a message sent from the unauthenticated "free" / anonymous chat * flow. This is intentionally a separate event from `chat_message_sent`