mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-16 21:05:20 +02:00
feat(chat): enhance error classification and handling for thread busy scenarios, improving user feedback and response management
This commit is contained in:
parent
fd4d0817d1
commit
35ea0eae53
6 changed files with 322 additions and 111 deletions
|
|
@ -19,6 +19,7 @@ import re
|
|||
import time
|
||||
from collections.abc import AsyncGenerator
|
||||
from dataclasses import dataclass, field
|
||||
from functools import partial
|
||||
from typing import Any, Literal
|
||||
from uuid import UUID
|
||||
|
||||
|
|
@ -30,6 +31,7 @@ from sqlalchemy.orm import selectinload
|
|||
|
||||
from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent
|
||||
from app.agents.new_chat.checkpointer import get_checkpointer
|
||||
from app.agents.new_chat.errors import BusyError
|
||||
from app.agents.new_chat.filesystem_selection import FilesystemMode, FilesystemSelection
|
||||
from app.agents.new_chat.llm_config import (
|
||||
AgentConfig,
|
||||
|
|
@ -315,6 +317,15 @@ def _classify_stream_exception(
|
|||
flow_label: str,
|
||||
) -> tuple[str, str, Literal["info", "warn", "error"], bool, str]:
|
||||
raw = str(exc)
|
||||
if isinstance(exc, BusyError) or "Thread is busy with another request" in raw:
|
||||
return (
|
||||
"thread_busy",
|
||||
"THREAD_BUSY",
|
||||
"warn",
|
||||
True,
|
||||
"Another response is still finishing for this thread. Please try again in a moment.",
|
||||
)
|
||||
|
||||
parsed = _parse_error_payload(raw)
|
||||
provider_error_type = ""
|
||||
if parsed:
|
||||
|
|
@ -345,6 +356,37 @@ def _classify_stream_exception(
|
|||
)
|
||||
|
||||
|
||||
def _emit_stream_terminal_error(
|
||||
*,
|
||||
streaming_service: VercelStreamingService,
|
||||
flow: str,
|
||||
request_id: str | None,
|
||||
thread_id: int,
|
||||
search_space_id: int,
|
||||
user_id: str | None,
|
||||
message: str,
|
||||
error_kind: str = "server_error",
|
||||
error_code: str = "SERVER_ERROR",
|
||||
severity: Literal["info", "warn", "error"] = "error",
|
||||
is_expected: bool = False,
|
||||
extra: dict[str, Any] | None = None,
|
||||
) -> str:
|
||||
_log_chat_stream_error(
|
||||
flow=flow,
|
||||
error_kind=error_kind,
|
||||
error_code=error_code,
|
||||
severity=severity,
|
||||
is_expected=is_expected,
|
||||
request_id=request_id,
|
||||
thread_id=thread_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
message=message,
|
||||
extra=extra,
|
||||
)
|
||||
return streaming_service.format_error(message, error_code=error_code)
|
||||
|
||||
|
||||
async def _stream_agent_events(
|
||||
agent: Any,
|
||||
config: dict[str, Any],
|
||||
|
|
@ -1541,29 +1583,15 @@ async def stream_new_chat(
|
|||
_premium_reserved = 0
|
||||
_premium_request_id: str | None = None
|
||||
|
||||
def _emit_stream_error(
|
||||
*,
|
||||
message: str,
|
||||
error_kind: str = "server_error",
|
||||
error_code: str = "SERVER_ERROR",
|
||||
severity: Literal["info", "warn", "error"] = "error",
|
||||
is_expected: bool = False,
|
||||
extra: dict[str, Any] | None = None,
|
||||
) -> str:
|
||||
_log_chat_stream_error(
|
||||
flow=flow,
|
||||
error_kind=error_kind,
|
||||
error_code=error_code,
|
||||
severity=severity,
|
||||
is_expected=is_expected,
|
||||
request_id=request_id,
|
||||
thread_id=chat_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
message=message,
|
||||
extra=extra,
|
||||
)
|
||||
return streaming_service.format_error(message, error_code=error_code)
|
||||
_emit_stream_error = partial(
|
||||
_emit_stream_terminal_error,
|
||||
streaming_service=streaming_service,
|
||||
flow=flow,
|
||||
request_id=request_id,
|
||||
thread_id=chat_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
session = async_session_maker()
|
||||
try:
|
||||
|
|
@ -2380,29 +2408,15 @@ async def stream_resume_chat(
|
|||
|
||||
accumulator = start_turn()
|
||||
|
||||
def _emit_stream_error(
|
||||
*,
|
||||
message: str,
|
||||
error_kind: str = "server_error",
|
||||
error_code: str = "SERVER_ERROR",
|
||||
severity: Literal["info", "warn", "error"] = "error",
|
||||
is_expected: bool = False,
|
||||
extra: dict[str, Any] | None = None,
|
||||
) -> str:
|
||||
_log_chat_stream_error(
|
||||
flow="resume",
|
||||
error_kind=error_kind,
|
||||
error_code=error_code,
|
||||
severity=severity,
|
||||
is_expected=is_expected,
|
||||
request_id=request_id,
|
||||
thread_id=chat_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
message=message,
|
||||
extra=extra,
|
||||
)
|
||||
return streaming_service.format_error(message, error_code=error_code)
|
||||
_emit_stream_error = partial(
|
||||
_emit_stream_terminal_error,
|
||||
streaming_service=streaming_service,
|
||||
flow="resume",
|
||||
request_id=request_id,
|
||||
thread_id=chat_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
session = async_session_maker()
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -1,12 +1,13 @@
|
|||
import inspect
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
import app.tasks.chat.stream_new_chat as stream_new_chat_module
|
||||
from app.agents.new_chat.errors import BusyError
|
||||
from app.tasks.chat.stream_new_chat import (
|
||||
StreamResult,
|
||||
_classify_stream_exception,
|
||||
|
|
@ -130,14 +131,14 @@ def test_stream_error_emission_keeps_machine_error_codes():
|
|||
format_error_calls = re.findall(r"format_error\(", source)
|
||||
emitted_error_codes = set(re.findall(r'error_code="([A-Z_]+)"', source))
|
||||
|
||||
# Both new/resume stream paths now route through local emitters that always
|
||||
# pass a machine-readable error_code.
|
||||
assert len(format_error_calls) == 2
|
||||
# All stream paths should route through one shared terminal error emitter.
|
||||
assert len(format_error_calls) == 1
|
||||
assert {
|
||||
"PREMIUM_QUOTA_EXHAUSTED",
|
||||
"SERVER_ERROR",
|
||||
}.issubset(emitted_error_codes)
|
||||
assert 'flow: Literal["new", "regenerate"] = "new"' in source
|
||||
assert "_emit_stream_terminal_error" in source
|
||||
assert "flow=flow" in source
|
||||
assert 'flow="resume"' in source
|
||||
|
||||
|
|
@ -156,6 +157,30 @@ def test_stream_exception_classifies_rate_limited():
|
|||
assert "temporarily rate-limited" in user_message
|
||||
|
||||
|
||||
def test_stream_exception_classifies_thread_busy():
|
||||
exc = BusyError(request_id="thread-123")
|
||||
kind, code, severity, is_expected, user_message = _classify_stream_exception(
|
||||
exc, flow_label="chat"
|
||||
)
|
||||
assert kind == "thread_busy"
|
||||
assert code == "THREAD_BUSY"
|
||||
assert severity == "warn"
|
||||
assert is_expected is True
|
||||
assert "still finishing for this thread" in user_message
|
||||
|
||||
|
||||
def test_stream_exception_classifies_thread_busy_from_message():
|
||||
exc = Exception("Thread is busy with another request")
|
||||
kind, code, severity, is_expected, user_message = _classify_stream_exception(
|
||||
exc, flow_label="chat"
|
||||
)
|
||||
assert kind == "thread_busy"
|
||||
assert code == "THREAD_BUSY"
|
||||
assert severity == "warn"
|
||||
assert is_expected is True
|
||||
assert "still finishing for this thread" in user_message
|
||||
|
||||
|
||||
def test_premium_classification_is_error_code_driven():
|
||||
classifier_path = Path(__file__).resolve().parents[3] / "surfsense_web/lib/chat/chat-error-classifier.ts"
|
||||
source = classifier_path.read_text(encoding="utf-8")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue