diff --git a/surfsense_backend/app/observability/metrics.py b/surfsense_backend/app/observability/metrics.py index fb99b4cc8..53beb959d 100644 --- a/surfsense_backend/app/observability/metrics.py +++ b/surfsense_backend/app/observability/metrics.py @@ -195,6 +195,73 @@ def _perf_elapsed(): ) +@lru_cache(maxsize=1) +def _chat_request_duration(): + return _get_meter().create_histogram( + "surfsense.chat.request.duration", + unit="ms", + description="Duration of SurfSense streamed chat requests.", + ) + + +@lru_cache(maxsize=1) +def _chat_request_outcome(): + return _get_meter().create_counter( + "surfsense.chat.request.outcome", + description="Count of SurfSense chat request outcomes.", + ) + + +@lru_cache(maxsize=1) +def _subagent_invoke_duration(): + return _get_meter().create_histogram( + "surfsense.subagent.invoke.duration", + unit="ms", + description="Duration of SurfSense subagent invocations.", + ) + + +@lru_cache(maxsize=1) +def _subagent_invoke_outcome(): + return _get_meter().create_counter( + "surfsense.subagent.invoke.outcome", + description="Count of SurfSense subagent invocation outcomes.", + ) + + +@lru_cache(maxsize=1) +def _etl_extract_duration(): + return _get_meter().create_histogram( + "surfsense.etl.extract.duration", + unit="s", + description="Duration of SurfSense ETL extraction.", + ) + + +@lru_cache(maxsize=1) +def _etl_extract_outcome(): + return _get_meter().create_counter( + "surfsense.etl.extract.outcome", + description="Count of SurfSense ETL extraction outcomes.", + ) + + +@lru_cache(maxsize=1) +def _celery_heartbeat_refreshes(): + return _get_meter().create_counter( + "surfsense.celery.heartbeat.refreshes", + description="Count of SurfSense Celery heartbeat refreshes.", + ) + + +@lru_cache(maxsize=1) +def _celery_heartbeat_failures(): + return _get_meter().create_counter( + "surfsense.celery.heartbeat.failures", + description="Count of SurfSense Celery heartbeat failures.", + ) + + def record_model_call_duration( duration_ms: float, *, model: str | None, provider: str | None ) -> None: @@ -312,6 +379,111 @@ def record_perf_elapsed(duration_ms: float, *, label: str) -> None: _record(_perf_elapsed(), duration_ms, {"label": label}) +def record_chat_request_duration( + duration_ms: float, + *, + flow: str, + outcome: str, + agent_mode: str | None = None, +) -> None: + _record( + _chat_request_duration(), + duration_ms, + {"chat.flow": flow, "outcome": outcome, "agent.mode": agent_mode}, + ) + + +def record_chat_request_outcome( + *, + flow: str, + outcome: str, + agent_mode: str | None = None, +) -> None: + _add( + _chat_request_outcome(), + 1, + {"chat.flow": flow, "outcome": outcome, "agent.mode": agent_mode}, + ) + + +def record_subagent_invoke_duration( + duration_ms: float, + *, + subagent_type: str, + path: str | None, + outcome: str, +) -> None: + _record( + _subagent_invoke_duration(), + duration_ms, + { + "subagent.type": subagent_type, + "subagent.path": path or "unknown", + "outcome": outcome, + }, + ) + + +def record_subagent_invoke_outcome( + *, + subagent_type: str, + path: str | None, + outcome: str, +) -> None: + _add( + _subagent_invoke_outcome(), + 1, + { + "subagent.type": subagent_type, + "subagent.path": path or "unknown", + "outcome": outcome, + }, + ) + + +def record_etl_extract_duration( + duration_s: float, + *, + etl_service: str | None, + content_type: str | None, + status: str, +) -> None: + _record( + _etl_extract_duration(), + duration_s, + { + "etl.service": etl_service or "unknown", + "content.type": content_type or "unknown", + "status": status, + }, + ) + + +def record_etl_extract_outcome( + *, + etl_service: str | None, + content_type: str | None, + status: str, +) -> None: + _add( + _etl_extract_outcome(), + 1, + { + "etl.service": etl_service or "unknown", + "content.type": content_type or "unknown", + "status": status, + }, + ) + + +def record_celery_heartbeat_refresh(*, heartbeat_type: str) -> None: + _add(_celery_heartbeat_refreshes(), 1, {"heartbeat.type": heartbeat_type}) + + +def record_celery_heartbeat_failure(*, heartbeat_type: str) -> None: + _add(_celery_heartbeat_failures(), 1, {"heartbeat.type": heartbeat_type}) + + def _runtime_snapshot_value(key: str, transform: Any = None) -> list[Any]: from opentelemetry.metrics import Observation @@ -398,9 +570,15 @@ def register_runtime_observables() -> None: __all__ = [ "record_auth_failure", + "record_celery_heartbeat_failure", + "record_celery_heartbeat_refresh", + "record_chat_request_duration", + "record_chat_request_outcome", "record_compaction_run", "record_connector_sync_duration", "record_connector_sync_outcome", + "record_etl_extract_duration", + "record_etl_extract_outcome", "record_indexing_document_duration", "record_indexing_document_outcome", "record_interrupt", @@ -410,6 +588,8 @@ __all__ = [ "record_perf_elapsed", "record_permission_ask", "record_rate_limit_rejection", + "record_subagent_invoke_duration", + "record_subagent_invoke_outcome", "record_tool_call_duration", "record_tool_call_error", "register_runtime_observables", diff --git a/surfsense_backend/app/observability/otel.py b/surfsense_backend/app/observability/otel.py index f39cfe535..e4d4a1fd9 100644 --- a/surfsense_backend/app/observability/otel.py +++ b/surfsense_backend/app/observability/otel.py @@ -244,6 +244,152 @@ def kb_persist_span( return span("kb.persist", attributes=attrs) +def chat_request_span( + *, + chat_id: int | None = None, + search_space_id: int | None = None, + flow: str | None = None, + request_id: str | None = None, + turn_id: str | None = None, + filesystem_mode: str | None = None, + client_platform: str | None = None, + agent_mode: str | None = None, + extra: dict[str, Any] | None = None, +): + """Parent span for a single streamed chat or resume turn.""" + attrs: dict[str, Any] = {} + if chat_id is not None: + attrs["chat.id"] = int(chat_id) + if search_space_id is not None: + attrs["search_space.id"] = int(search_space_id) + if flow: + attrs["chat.flow"] = flow + if request_id: + attrs["request.id"] = request_id + if turn_id: + attrs["turn.id"] = turn_id + if filesystem_mode: + attrs["filesystem.mode"] = filesystem_mode + if client_platform: + attrs["client.platform"] = client_platform + if agent_mode: + attrs["agent.mode"] = agent_mode + if extra: + attrs.update(extra) + return span("chat.request", attributes=attrs) + + +def subagent_invoke_span( + *, + subagent_type: str, + path: str | None = None, + extra: dict[str, Any] | None = None, +): + """Span around invoking a delegated subagent from the main agent.""" + attrs: dict[str, Any] = {"subagent.type": subagent_type} + if path: + attrs["subagent.path"] = path + if extra: + attrs.update(extra) + return span("subagent.invoke", attributes=attrs) + + +def connector_sync_span( + *, + connector_type: str | None, + extra: dict[str, Any] | None = None, +): + """Business-level span around connector indexing task execution.""" + attrs: dict[str, Any] = {"connector.type": connector_type or "unknown"} + if extra: + attrs.update(extra) + return span("connector.sync", attributes=attrs) + + +def etl_extract_span( + *, + content_type: str | None = None, + file_extension: str | None = None, + processing_mode: str | None = None, + extra: dict[str, Any] | None = None, +): + """Span around top-level ETL extraction for a file.""" + attrs: dict[str, Any] = {} + if content_type: + attrs["content.type"] = content_type + if file_extension: + attrs["file.extension"] = file_extension + if processing_mode: + attrs["processing.mode"] = processing_mode + if extra: + attrs.update(extra) + return span("etl.extract", attributes=attrs) + + +def etl_parse_span( + *, + etl_service: str | None, + content_type: str | None = None, + file_extension: str | None = None, + processing_mode: str | None = None, + extra: dict[str, Any] | None = None, +): + """Span around a concrete ETL parser/backend call.""" + attrs: dict[str, Any] = {"etl.service": etl_service or "unknown"} + if content_type: + attrs["content.type"] = content_type + if file_extension: + attrs["file.extension"] = file_extension + if processing_mode: + attrs["processing.mode"] = processing_mode + if extra: + attrs.update(extra) + return span("etl.parse", attributes=attrs) + + +def etl_ocr_span( + *, + etl_service: str | None, + file_extension: str | None = None, + extra: dict[str, Any] | None = None, +): + """Span around OCR extraction from image content.""" + attrs: dict[str, Any] = {"etl.service": etl_service or "unknown"} + if file_extension: + attrs["file.extension"] = file_extension + if extra: + attrs.update(extra) + return span("etl.ocr", attributes=attrs) + + +def etl_picture_describe_span( + *, + image_count: int | None = None, + extra: dict[str, Any] | None = None, +): + """Span around describing embedded images in a document.""" + attrs: dict[str, Any] = {} + if image_count is not None: + attrs["image.count"] = int(image_count) + if extra: + attrs.update(extra) + return span("etl.picture.describe", attributes=attrs) + + +def etl_picture_ocr_span( + *, + file_extension: str | None = None, + extra: dict[str, Any] | None = None, +): + """Span around per-image OCR during picture description.""" + attrs: dict[str, Any] = {} + if file_extension: + attrs["file.extension"] = file_extension + if extra: + attrs.update(extra) + return span("etl.picture.ocr", attributes=attrs) + + def compaction_span( *, reason: str | None = None, @@ -306,7 +452,14 @@ def reload_for_tests() -> bool: __all__ = [ + "chat_request_span", "compaction_span", + "connector_sync_span", + "etl_extract_span", + "etl_ocr_span", + "etl_parse_span", + "etl_picture_describe_span", + "etl_picture_ocr_span", "interrupt_span", "is_enabled", "kb_persist_span", @@ -315,5 +468,6 @@ __all__ = [ "permission_asked_span", "reload_for_tests", "span", + "subagent_invoke_span", "tool_call_span", ]