From dbb652d4f87f502e3224cc40ee50e2d80ea4ec2b Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 22 May 2026 17:48:01 +0530 Subject: [PATCH] feat(observability): add telemetry error and event helpers --- .../app/observability/metrics.py | 104 ++++++++++++++++-- surfsense_backend/app/observability/otel.py | 44 ++++++++ .../tests/unit/observability/test_helpers.py | 101 +++++++++++++++++ 3 files changed, 241 insertions(+), 8 deletions(-) create mode 100644 surfsense_backend/tests/unit/observability/test_helpers.py diff --git a/surfsense_backend/app/observability/metrics.py b/surfsense_backend/app/observability/metrics.py index 53beb959d..798a6e2f7 100644 --- a/surfsense_backend/app/observability/metrics.py +++ b/surfsense_backend/app/observability/metrics.py @@ -20,6 +20,19 @@ logger = logging.getLogger(__name__) _INSTRUMENTATION_NAME = "surfsense.platform" _OBSERVABLES_REGISTERED = False +_ERROR_CATEGORY_UNKNOWN = "unknown" + +_ERROR_CATEGORY_HINTS: tuple[tuple[str, tuple[str, ...]], ...] = ( + ("rate_limited", ("ratelimit", "rate_limit", "toomanyrequests", "429")), + ("auth_failed", ("authentication", "auth", "unauthorized", "forbidden")), + ("quota_exhausted", ("quota", "insufficient", "credit", "billing")), + ("timeout", ("timeout", "timedout", "deadline")), + ("network_failed", ("connection", "connect", "network", "dns", "socket")), + ("server_error", ("internalserver", "serviceunavailable", "badgateway", "gateway")), + ("lock_contention", ("lock", "busy", "contention", "alreadyrunning")), + ("unsupported_format", ("unsupported", "format", "filetype")), + ("provider_error", ("provider", "apierror", "apistatus", "badrequest")), +) def _package_version() -> str: @@ -47,6 +60,36 @@ def _clean_attrs(attrs: dict[str, Any]) -> dict[str, str | int | float | bool]: return cleaned +def _attrs_with_optional_error_category( + attrs: dict[str, Any], error_category: str | None +) -> dict[str, Any]: + if error_category: + return {**attrs, "error.category": error_category} + return attrs + + +def categorize_exception(exc: BaseException | None) -> str: + """Return a low-cardinality category for an exception.""" + if exc is None: + return _ERROR_CATEGORY_UNKNOWN + haystack = " ".join( + cls.__name__.replace("-", "").replace("_", "").lower() + for cls in type(exc).__mro__ + ) + for category, hints in _ERROR_CATEGORY_HINTS: + if any(hint in haystack for hint in hints): + return category + return _ERROR_CATEGORY_UNKNOWN + + +def parse_celery_task_label(task_name: str | None) -> str: + """Return the operation token from a Celery task name.""" + if not task_name: + return "unknown" + operation = str(task_name).split("_", 1)[0].strip() + return operation or "unknown" + + def _record(callable_obj: Any, value: int | float, attrs: dict[str, Any]) -> None: if not _is_enabled(): return @@ -262,6 +305,15 @@ def _celery_heartbeat_failures(): ) +@lru_cache(maxsize=1) +def _celery_queue_latency(): + return _get_meter().create_histogram( + "surfsense.celery.queue.latency", + unit="s", + description="Time SurfSense Celery tasks spend waiting in queue.", + ) + + def record_model_call_duration( duration_ms: float, *, model: str | None, provider: str | None ) -> None: @@ -359,11 +411,16 @@ def record_connector_sync_duration( ) -def record_connector_sync_outcome(*, connector_type: str | None, status: str) -> None: +def record_connector_sync_outcome( + *, connector_type: str | None, status: str, error_category: str | None = None +) -> None: _add( _connector_sync_outcome(), 1, - {"connector.type": connector_type or "unknown", "status": status}, + _attrs_with_optional_error_category( + {"connector.type": connector_type or "unknown", "status": status}, + error_category, + ), ) @@ -398,11 +455,15 @@ def record_chat_request_outcome( flow: str, outcome: str, agent_mode: str | None = None, + error_category: str | None = None, ) -> None: _add( _chat_request_outcome(), 1, - {"chat.flow": flow, "outcome": outcome, "agent.mode": agent_mode}, + _attrs_with_optional_error_category( + {"chat.flow": flow, "outcome": outcome, "agent.mode": agent_mode}, + error_category, + ), ) @@ -464,15 +525,19 @@ def record_etl_extract_outcome( etl_service: str | None, content_type: str | None, status: str, + error_category: str | None = None, ) -> None: _add( _etl_extract_outcome(), 1, - { - "etl.service": etl_service or "unknown", - "content.type": content_type or "unknown", - "status": status, - }, + _attrs_with_optional_error_category( + { + "etl.service": etl_service or "unknown", + "content.type": content_type or "unknown", + "status": status, + }, + error_category, + ), ) @@ -484,6 +549,26 @@ def record_celery_heartbeat_failure(*, heartbeat_type: str) -> None: _add(_celery_heartbeat_failures(), 1, {"heartbeat.type": heartbeat_type}) +def record_celery_queue_latency( + duration_s: float, + *, + task_name: str | None, + queue: str | None, + scheduled: bool, + operation: str | None, +) -> None: + _record( + _celery_queue_latency(), + duration_s, + { + "task.name": task_name or "unknown", + "task.queue": queue or "unknown", + "task.scheduled": bool(scheduled), + "operation": operation or "unknown", + }, + ) + + def _runtime_snapshot_value(key: str, transform: Any = None) -> list[Any]: from opentelemetry.metrics import Observation @@ -569,9 +654,12 @@ def register_runtime_observables() -> None: __all__ = [ + "categorize_exception", + "parse_celery_task_label", "record_auth_failure", "record_celery_heartbeat_failure", "record_celery_heartbeat_refresh", + "record_celery_queue_latency", "record_chat_request_duration", "record_chat_request_outcome", "record_compaction_run", diff --git a/surfsense_backend/app/observability/otel.py b/surfsense_backend/app/observability/otel.py index e4d4a1fd9..ad2178f39 100644 --- a/surfsense_backend/app/observability/otel.py +++ b/surfsense_backend/app/observability/otel.py @@ -92,6 +92,48 @@ def is_enabled() -> bool: return _ENABLED +def _clean_event_attrs(attrs: dict[str, Any]) -> dict[str, str | int | float | bool]: + """Coerce event attributes to OTel-safe scalar values.""" + cleaned: dict[str, str | int | float | bool] = {} + for key, value in attrs.items(): + if value is None: + continue + if isinstance(value, bool | int | float): + cleaned[key] = value + continue + text = str(value) + if text: + cleaned[key] = text + return cleaned + + +def add_event(name: str, attributes: dict[str, Any] | None = None) -> None: + """Attach an event to the current active span. + + This is intentionally no-op and exception-safe when OTel is disabled, + unavailable, or no span is currently recording. + """ + if not _ENABLED or _ot_trace is None: + return + with contextlib.suppress(Exception): + sp = _ot_trace.get_current_span() + if sp is None or not sp.is_recording(): + return + sp.add_event( + name, + attributes=_clean_event_attrs(attributes) if attributes else None, + ) + + +def record_error(span_obj: Any, exc: BaseException) -> None: + """Record an exception and mark a span as errored without re-raising.""" + if not _ENABLED: + return + with contextlib.suppress(Exception): + span_obj.record_exception(exc) + span_obj.set_status(_OtStatus(_OtStatusCode.ERROR, str(exc))) + + def _get_tracer(): if not _OTEL_AVAILABLE: return None @@ -452,6 +494,7 @@ def reload_for_tests() -> bool: __all__ = [ + "add_event", "chat_request_span", "compaction_span", "connector_sync_span", @@ -466,6 +509,7 @@ __all__ = [ "kb_search_span", "model_call_span", "permission_asked_span", + "record_error", "reload_for_tests", "span", "subagent_invoke_span", diff --git a/surfsense_backend/tests/unit/observability/test_helpers.py b/surfsense_backend/tests/unit/observability/test_helpers.py new file mode 100644 index 000000000..ae60c1939 --- /dev/null +++ b/surfsense_backend/tests/unit/observability/test_helpers.py @@ -0,0 +1,101 @@ +"""Tests for pure observability helper functions.""" + +from __future__ import annotations + +import pytest + +from app.observability import metrics as ot_metrics, otel as ot + +pytestmark = pytest.mark.unit + + +@pytest.fixture(autouse=True) +def _disable_otel(monkeypatch: pytest.MonkeyPatch): + monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False) + monkeypatch.setenv("SURFSENSE_DISABLE_OTEL", "true") + ot.reload_for_tests() + yield + ot.reload_for_tests() + + +@pytest.mark.parametrize( + ("task_name", "expected"), + [ + ("reindex_document", "reindex"), + ("delete_document_background", "delete"), + ("delete_folder_documents_background", "delete"), + ("delete_search_space_background", "delete"), + ("process_extension_document", "process"), + ("process_youtube_video", "process"), + ("process_file_upload", "process"), + ("process_file_upload_with_document", "process"), + ("process_circleback_meeting", "process"), + ("generate_video_presentation", "generate"), + ("generate_content_podcast", "generate"), + ("cleanup_stale_indexing_notifications", "cleanup"), + ("reconcile_pending_stripe_page_purchases", "reconcile"), + ("reconcile_pending_stripe_token_purchases", "reconcile"), + ("check_periodic_schedules", "check"), + ("ai_sort_search_space", "ai"), + ("index_notion_pages", "index"), + ("index_github_repos", "index"), + ("index_google_drive_files", "index"), + ("index_composio_connector", "index"), + ("index_obsidian_attachment", "index"), + ("index_local_folder", "index"), + ("index_uploaded_folder_files", "index"), + ("noseparator", "noseparator"), + ("", "unknown"), + ], +) +def test_parse_celery_task_label(task_name: str, expected: str) -> None: + assert ot_metrics.parse_celery_task_label(task_name) == expected + + +def test_parse_celery_task_label_handles_none() -> None: + assert ot_metrics.parse_celery_task_label(None) == "unknown" + + +@pytest.mark.parametrize( + ("exc", "expected"), + [ + (type("RateLimitError", (Exception,), {})(), "rate_limited"), + (type("AuthenticationError", (Exception,), {})(), "auth_failed"), + (type("QuotaInsufficientError", (Exception,), {})(), "quota_exhausted"), + (TimeoutError(), "timeout"), + (type("APIConnectionError", (Exception,), {})(), "network_failed"), + (type("ServiceUnavailableError", (Exception,), {})(), "server_error"), + (type("LockContentionError", (Exception,), {})(), "lock_contention"), + (type("UnsupportedFormatError", (Exception,), {})(), "unsupported_format"), + (type("ProviderError", (Exception,), {})(), "provider_error"), + (RuntimeError("plain"), "unknown"), + ], +) +def test_categorize_exception(exc: BaseException, expected: str) -> None: + assert ot_metrics.categorize_exception(exc) == expected + + +def test_record_celery_queue_latency_noops_when_disabled() -> None: + ot_metrics.record_celery_queue_latency( + 0.5, + task_name="index_notion_pages", + queue="surfsense.connectors", + scheduled=False, + operation="index", + ) + + +def test_add_event_noops_when_disabled() -> None: + ot.add_event("test.event", {"value": 1}) + + +def test_add_event_noops_without_current_span(monkeypatch: pytest.MonkeyPatch) -> None: + class FakeTrace: + @staticmethod + def get_current_span(): + return None + + monkeypatch.setattr(ot, "_ENABLED", True) + monkeypatch.setattr(ot, "_ot_trace", FakeTrace()) + + ot.add_event("test.event", {"value": 1})