feat(chat): unify error handling and logging for chat operations, enhancing clarity and consistency in error reporting

This commit is contained in:
Anish Sarkar 2026-04-30 11:56:41 +05:30
parent 222b27183f
commit d64543686f
6 changed files with 831 additions and 171 deletions

View file

@ -19,7 +19,7 @@ import re
import time
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from typing import Any
from typing import Any, Literal
from uuid import UUID
import anyio
@ -253,6 +253,98 @@ def _log_file_contract(stage: str, result: StreamResult, **extra: Any) -> None:
)
def _log_chat_stream_error(
*,
flow: Literal["new", "resume", "regenerate"],
error_kind: str,
error_code: str | None,
severity: Literal["info", "warn", "error"],
is_expected: bool,
request_id: str | None,
thread_id: int | None,
search_space_id: int | None,
user_id: str | None,
message: str,
extra: dict[str, Any] | None = None,
) -> None:
payload: dict[str, Any] = {
"event": "chat_stream_error",
"flow": flow,
"error_kind": error_kind,
"error_code": error_code,
"severity": severity,
"is_expected": is_expected,
"request_id": request_id or "unknown",
"thread_id": thread_id,
"search_space_id": search_space_id,
"user_id": user_id,
"message": message,
}
if extra:
payload.update(extra)
logger = logging.getLogger(__name__)
rendered = json.dumps(payload, ensure_ascii=False)
if severity == "error":
logger.error("[chat_stream_error] %s", rendered)
elif severity == "warn":
logger.warning("[chat_stream_error] %s", rendered)
else:
logger.info("[chat_stream_error] %s", rendered)
def _parse_error_payload(message: str) -> dict[str, Any] | None:
candidates = [message]
first_brace_idx = message.find("{")
if first_brace_idx >= 0:
candidates.append(message[first_brace_idx:])
for candidate in candidates:
try:
parsed = json.loads(candidate)
if isinstance(parsed, dict):
return parsed
except Exception:
continue
return None
def _classify_stream_exception(
exc: Exception,
*,
flow_label: str,
) -> tuple[str, str, Literal["info", "warn", "error"], bool, str]:
raw = str(exc)
parsed = _parse_error_payload(raw)
provider_error_type = ""
if parsed:
top_type = parsed.get("type")
if isinstance(top_type, str):
provider_error_type = top_type.lower()
nested = parsed.get("error")
if isinstance(nested, dict):
nested_type = nested.get("type")
if isinstance(nested_type, str):
provider_error_type = nested_type.lower()
if provider_error_type == "rate_limit_error":
return (
"rate_limited",
"RATE_LIMITED",
"warn",
True,
"This model is temporarily rate-limited. Please try again in a few seconds or switch models.",
)
return (
"server_error",
"SERVER_ERROR",
"error",
False,
f"Error during {flow_label}: {raw}",
)
async def _stream_agent_events(
agent: Any,
config: dict[str, Any],
@ -1397,6 +1489,7 @@ async def stream_new_chat(
filesystem_selection: FilesystemSelection | None = None,
request_id: str | None = None,
user_image_data_urls: list[str] | None = None,
flow: Literal["new", "regenerate"] = "new",
) -> AsyncGenerator[str, None]:
"""
Stream chat responses from the new SurfSense deep agent.
@ -1448,6 +1541,30 @@ async def stream_new_chat(
_premium_reserved = 0
_premium_request_id: str | None = None
def _emit_stream_error(
*,
message: str,
error_kind: str = "server_error",
error_code: str = "SERVER_ERROR",
severity: Literal["info", "warn", "error"] = "error",
is_expected: bool = False,
extra: dict[str, Any] | None = None,
) -> str:
_log_chat_stream_error(
flow=flow,
error_kind=error_kind,
error_code=error_code,
severity=severity,
is_expected=is_expected,
request_id=request_id,
thread_id=chat_id,
search_space_id=search_space_id,
user_id=user_id,
message=message,
extra=extra,
)
return streaming_service.format_error(message, error_code=error_code)
session = async_session_maker()
try:
# Mark AI as responding to this user for live collaboration
@ -1499,13 +1616,21 @@ async def stream_new_chat(
)
).resolved_llm_config_id
except ValueError as pin_error:
yield streaming_service.format_error(str(pin_error))
yield _emit_stream_error(
message=str(pin_error),
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
llm, agent_config, llm_load_error = await _load_llm_bundle(llm_config_id)
if llm_load_error:
yield streaming_service.format_error(llm_load_error)
yield _emit_stream_error(
message=llm_load_error,
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
_perf_log.info(
@ -1541,13 +1666,6 @@ async def stream_new_chat(
)
_premium_reserved = reserve_amount
if not quota_result.allowed:
logging.getLogger(__name__).info(
"premium_quota_blocked_pinned_model thread_id=%s search_space_id=%s user_id=%s resolved_config_id=%s",
chat_id,
search_space_id,
user_id,
llm_config_id,
)
if requested_llm_config_id == 0:
try:
llm_config_id = (
@ -1561,34 +1679,66 @@ async def stream_new_chat(
)
).resolved_llm_config_id
except ValueError as pin_error:
yield streaming_service.format_error(str(pin_error))
yield _emit_stream_error(
message=str(pin_error),
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
llm, agent_config, llm_load_error = await _load_llm_bundle(llm_config_id)
if llm_load_error:
yield streaming_service.format_error(llm_load_error)
yield _emit_stream_error(
message=llm_load_error,
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
_premium_request_id = None
_premium_reserved = 0
logging.getLogger(__name__).info(
"premium_quota_auto_fallback_to_free thread_id=%s search_space_id=%s user_id=%s fallback_config_id=%s",
chat_id,
search_space_id,
user_id,
llm_config_id,
_log_chat_stream_error(
flow=flow,
error_kind="premium_quota_exhausted",
error_code="PREMIUM_QUOTA_EXHAUSTED",
severity="info",
is_expected=True,
request_id=request_id,
thread_id=chat_id,
search_space_id=search_space_id,
user_id=user_id,
message=(
"Premium quota exhausted on pinned model; auto-fallback switched to a free model"
),
extra={
"fallback_config_id": llm_config_id,
"auto_fallback": True,
},
)
else:
yield streaming_service.format_error(
"Buy more tokens to continue with this model, or switch to a free model",
yield _emit_stream_error(
message=(
"Buy more tokens to continue with this model, or switch to a free model"
),
error_kind="premium_quota_exhausted",
error_code="PREMIUM_QUOTA_EXHAUSTED",
severity="info",
is_expected=True,
extra={
"resolved_config_id": llm_config_id,
"auto_fallback": False,
},
)
yield streaming_service.format_done()
return
if not llm:
yield streaming_service.format_error("Failed to create LLM instance")
yield _emit_stream_error(
message="Failed to create LLM instance",
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
@ -2097,12 +2247,25 @@ async def stream_new_chat(
# Handle any errors
import traceback
(
error_kind,
error_code,
severity,
is_expected,
user_message,
) = _classify_stream_exception(e, flow_label="chat")
error_message = f"Error during chat: {e!s}"
print(f"[stream_new_chat] {error_message}")
print(f"[stream_new_chat] Exception type: {type(e).__name__}")
print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}")
yield streaming_service.format_error(error_message)
yield _emit_stream_error(
message=user_message,
error_kind=error_kind,
error_code=error_code,
severity=severity,
is_expected=is_expected,
)
yield streaming_service.format_finish_step()
yield streaming_service.format_finish()
yield streaming_service.format_done()
@ -2217,6 +2380,30 @@ async def stream_resume_chat(
accumulator = start_turn()
def _emit_stream_error(
*,
message: str,
error_kind: str = "server_error",
error_code: str = "SERVER_ERROR",
severity: Literal["info", "warn", "error"] = "error",
is_expected: bool = False,
extra: dict[str, Any] | None = None,
) -> str:
_log_chat_stream_error(
flow="resume",
error_kind=error_kind,
error_code=error_code,
severity=severity,
is_expected=is_expected,
request_id=request_id,
thread_id=chat_id,
search_space_id=search_space_id,
user_id=user_id,
message=message,
extra=extra,
)
return streaming_service.format_error(message, error_code=error_code)
session = async_session_maker()
try:
if user_id:
@ -2267,13 +2454,21 @@ async def stream_resume_chat(
)
).resolved_llm_config_id
except ValueError as pin_error:
yield streaming_service.format_error(str(pin_error))
yield _emit_stream_error(
message=str(pin_error),
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
llm, agent_config, llm_load_error = await _load_llm_bundle(llm_config_id)
if llm_load_error:
yield streaming_service.format_error(llm_load_error)
yield _emit_stream_error(
message=llm_load_error,
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
_perf_log.info(
@ -2309,13 +2504,6 @@ async def stream_resume_chat(
)
_resume_premium_reserved = reserve_amount
if not quota_result.allowed:
logging.getLogger(__name__).info(
"premium_quota_blocked_pinned_model thread_id=%s search_space_id=%s user_id=%s resolved_config_id=%s",
chat_id,
search_space_id,
user_id,
llm_config_id,
)
if requested_llm_config_id == 0:
try:
llm_config_id = (
@ -2329,34 +2517,66 @@ async def stream_resume_chat(
)
).resolved_llm_config_id
except ValueError as pin_error:
yield streaming_service.format_error(str(pin_error))
yield _emit_stream_error(
message=str(pin_error),
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
llm, agent_config, llm_load_error = await _load_llm_bundle(llm_config_id)
if llm_load_error:
yield streaming_service.format_error(llm_load_error)
yield _emit_stream_error(
message=llm_load_error,
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
_resume_premium_request_id = None
_resume_premium_reserved = 0
logging.getLogger(__name__).info(
"premium_quota_auto_fallback_to_free thread_id=%s search_space_id=%s user_id=%s fallback_config_id=%s",
chat_id,
search_space_id,
user_id,
llm_config_id,
_log_chat_stream_error(
flow="resume",
error_kind="premium_quota_exhausted",
error_code="PREMIUM_QUOTA_EXHAUSTED",
severity="info",
is_expected=True,
request_id=request_id,
thread_id=chat_id,
search_space_id=search_space_id,
user_id=user_id,
message=(
"Premium quota exhausted on pinned model; auto-fallback switched to a free model"
),
extra={
"fallback_config_id": llm_config_id,
"auto_fallback": True,
},
)
else:
yield streaming_service.format_error(
"Buy more tokens to continue with this model, or switch to a free model",
yield _emit_stream_error(
message=(
"Buy more tokens to continue with this model, or switch to a free model"
),
error_kind="premium_quota_exhausted",
error_code="PREMIUM_QUOTA_EXHAUSTED",
severity="info",
is_expected=True,
extra={
"resolved_config_id": llm_config_id,
"auto_fallback": False,
},
)
yield streaming_service.format_done()
return
if not llm:
yield streaming_service.format_error("Failed to create LLM instance")
yield _emit_stream_error(
message="Failed to create LLM instance",
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
@ -2528,10 +2748,23 @@ async def stream_resume_chat(
except Exception as e:
import traceback
(
error_kind,
error_code,
severity,
is_expected,
user_message,
) = _classify_stream_exception(e, flow_label="resume")
error_message = f"Error during resume: {e!s}"
print(f"[stream_resume_chat] {error_message}")
print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}")
yield streaming_service.format_error(error_message)
yield _emit_stream_error(
message=user_message,
error_kind=error_kind,
error_code=error_code,
severity=severity,
is_expected=is_expected,
)
yield streaming_service.format_finish_step()
yield streaming_service.format_finish()
yield streaming_service.format_done()