From 5a6b92c2b64a48ecb4a51ab32dd674bb4ac219ee Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 22 May 2026 13:48:19 +0530 Subject: [PATCH] feat(chat): instrument streamed chat request telemetry --- .../app/tasks/chat/stream_new_chat.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index c9faa1691..b961733b4 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -58,6 +58,7 @@ from app.db import ( async_session_maker, shielded_async_session, ) +from app.observability import metrics as ot_metrics, otel as ot from app.prompts import TITLE_GENERATION_PROMPT from app.services.auto_model_pin_service import ( mark_runtime_cooldown, @@ -883,6 +884,19 @@ async def stream_new_chat( stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}" stream_result.filesystem_mode = fs_mode stream_result.client_platform = fs_platform + chat_agent_mode = "unknown" + chat_outcome = "success" + chat_span_cm = ot.chat_request_span( + chat_id=chat_id, + search_space_id=search_space_id, + flow=flow, + request_id=request_id, + turn_id=stream_result.turn_id, + filesystem_mode=fs_mode, + client_platform=fs_platform, + agent_mode=chat_agent_mode, + ) + chat_span = chat_span_cm.__enter__() _log_file_contract("turn_start", stream_result) _perf_log.info( "[stream_new_chat] filesystem_mode=%s client_platform=%s", @@ -1189,6 +1203,9 @@ async def stream_new_chat( from app.config import config as _app_config use_multi_agent = bool(_app_config.MULTI_AGENT_CHAT_ENABLED) + chat_agent_mode = "multi" if use_multi_agent else "single" + with contextlib.suppress(Exception): + chat_span.set_attribute("agent.mode", chat_agent_mode) _t0 = time.perf_counter() agent_factory = ( @@ -2011,6 +2028,10 @@ async def stream_new_chat( user_message, error_extra, ) = _classify_stream_exception(e, flow_label="chat") + chat_outcome = error_code or error_kind or "error" + with contextlib.suppress(Exception): + chat_span.set_attribute("chat.outcome", chat_outcome) + chat_span.record_exception(e) error_message = f"Error during chat: {e!s}" print(f"[stream_new_chat] {error_message}") print(f"[stream_new_chat] Exception type: {type(e).__name__}") @@ -2201,6 +2222,20 @@ async def stream_new_chat( ) trim_native_heap() log_system_snapshot("stream_new_chat_END") + with contextlib.suppress(Exception): + chat_span.set_attribute("chat.outcome", chat_outcome) + ot_metrics.record_chat_request_duration( + (time.perf_counter() - _t_total) * 1000, + flow=flow, + outcome=chat_outcome, + agent_mode=chat_agent_mode, + ) + ot_metrics.record_chat_request_outcome( + flow=flow, + outcome=chat_outcome, + agent_mode=chat_agent_mode, + ) + chat_span_cm.__exit__(None, None, None) async def stream_resume_chat( @@ -2225,6 +2260,19 @@ async def stream_resume_chat( stream_result.turn_id = f"{chat_id}:{int(time.time() * 1000)}" stream_result.filesystem_mode = fs_mode stream_result.client_platform = fs_platform + chat_agent_mode = "unknown" + chat_outcome = "success" + chat_span_cm = ot.chat_request_span( + chat_id=chat_id, + search_space_id=search_space_id, + flow="resume", + request_id=request_id, + turn_id=stream_result.turn_id, + filesystem_mode=fs_mode, + client_platform=fs_platform, + agent_mode=chat_agent_mode, + ) + chat_span = chat_span_cm.__enter__() _log_file_contract("turn_start", stream_result) _perf_log.info( "[stream_resume] filesystem_mode=%s client_platform=%s", @@ -2454,6 +2502,11 @@ async def stream_resume_chat( visibility = thread_visibility or ChatVisibility.PRIVATE from app.config import config as _app_config + chat_agent_mode = ( + "multi" if _app_config.MULTI_AGENT_CHAT_ENABLED else "single" + ) + with contextlib.suppress(Exception): + chat_span.set_attribute("agent.mode", chat_agent_mode) _t0 = time.perf_counter() agent_factory = ( create_multi_agent_chat_deep_agent @@ -2815,6 +2868,10 @@ async def stream_resume_chat( user_message, error_extra, ) = _classify_stream_exception(e, flow_label="resume") + chat_outcome = error_code or error_kind or "error" + with contextlib.suppress(Exception): + chat_span.set_attribute("chat.outcome", chat_outcome) + chat_span.record_exception(e) error_message = f"Error during resume: {e!s}" print(f"[stream_resume_chat] {error_message}") print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}") @@ -2964,3 +3021,17 @@ async def stream_resume_chat( ) trim_native_heap() log_system_snapshot("stream_resume_chat_END") + with contextlib.suppress(Exception): + chat_span.set_attribute("chat.outcome", chat_outcome) + ot_metrics.record_chat_request_duration( + (time.perf_counter() - _t_total) * 1000, + flow="resume", + outcome=chat_outcome, + agent_mode=chat_agent_mode, + ) + ot_metrics.record_chat_request_outcome( + flow="resume", + outcome=chat_outcome, + agent_mode=chat_agent_mode, + ) + chat_span_cm.__exit__(None, None, None)