"""Custom OpenTelemetry metrics for SurfSense. This module owns all SurfSense-specific metric instruments. Callers use the small helper functions below instead of constructing instruments directly so attribute names and cardinality stay consistent across the backend. """ from __future__ import annotations import contextlib import gc import logging from functools import lru_cache from importlib import metadata from typing import Any from app.observability import otel logger = logging.getLogger(__name__) _INSTRUMENTATION_NAME = "surfsense.platform" _OBSERVABLES_REGISTERED = False def _package_version() -> str: with contextlib.suppress(metadata.PackageNotFoundError): return metadata.version("surf-new-backend") return "unknown" def _is_enabled() -> bool: return otel.is_enabled() def _clean_attrs(attrs: dict[str, Any]) -> dict[str, str | int | float | bool]: """Drop empty values and coerce low-cardinality attrs to OTel-safe scalars.""" cleaned: dict[str, str | int | float | bool] = {} for key, value in attrs.items(): if value is None: continue if isinstance(value, bool | int | float): cleaned[key] = value continue text = str(value) if text: cleaned[key] = text return cleaned def _record(callable_obj: Any, value: int | float, attrs: dict[str, Any]) -> None: if not _is_enabled(): return with contextlib.suppress(Exception): callable_obj.record(value, _clean_attrs(attrs)) def _add(callable_obj: Any, value: int, attrs: dict[str, Any]) -> None: if not _is_enabled(): return with contextlib.suppress(Exception): callable_obj.add(value, _clean_attrs(attrs)) @lru_cache(maxsize=1) def _get_meter(): from opentelemetry import metrics return metrics.get_meter(_INSTRUMENTATION_NAME, _package_version()) @lru_cache(maxsize=1) def _model_call_duration(): return _get_meter().create_histogram( "surfsense.model.call.duration", unit="ms", description="Duration of SurfSense LLM model calls.", ) @lru_cache(maxsize=1) def _model_token_usage(): return _get_meter().create_histogram( "gen_ai.client.token.usage", unit="{token}", description="Token usage reported by GenAI model responses.", ) @lru_cache(maxsize=1) def _tool_call_duration(): return _get_meter().create_histogram( "surfsense.tool.call.duration", unit="ms", description="Duration of SurfSense agent tool calls.", ) @lru_cache(maxsize=1) def _tool_call_errors(): return _get_meter().create_counter( "surfsense.tool.call.errors", description="Count of SurfSense agent tool call errors.", ) @lru_cache(maxsize=1) def _kb_search_duration(): return _get_meter().create_histogram( "surfsense.kb.search.duration", unit="ms", description="Duration of SurfSense knowledge-base search calls.", ) @lru_cache(maxsize=1) def _compaction_runs(): return _get_meter().create_counter( "surfsense.compaction.runs", description="Count of SurfSense conversation compaction runs.", ) @lru_cache(maxsize=1) def _permission_asks(): return _get_meter().create_counter( "surfsense.permission.asks", description="Count of SurfSense permission asks.", ) @lru_cache(maxsize=1) def _interrupts(): return _get_meter().create_counter( "surfsense.interrupt.raised", description="Count of SurfSense interrupts raised.", ) @lru_cache(maxsize=1) def _indexing_document_duration(): return _get_meter().create_histogram( "surfsense.indexing.document.duration", unit="s", description="Duration of SurfSense document indexing.", ) @lru_cache(maxsize=1) def _indexing_document_outcome(): return _get_meter().create_counter( "surfsense.indexing.document.outcome", description="Count of SurfSense document indexing outcomes.", ) @lru_cache(maxsize=1) def _connector_sync_duration(): return _get_meter().create_histogram( "surfsense.connector.sync.duration", unit="s", description="Duration of SurfSense connector sync tasks.", ) @lru_cache(maxsize=1) def _connector_sync_outcome(): return _get_meter().create_counter( "surfsense.connector.sync.outcome", description="Count of SurfSense connector sync outcomes.", ) @lru_cache(maxsize=1) def _auth_failures(): return _get_meter().create_counter( "surfsense.auth.failures", description="Count of SurfSense authentication failures.", ) @lru_cache(maxsize=1) def _rate_limit_rejections(): return _get_meter().create_counter( "surfsense.rate_limit.rejections", description="Count of SurfSense rate-limit rejections.", ) @lru_cache(maxsize=1) def _perf_elapsed(): return _get_meter().create_histogram( "surfsense.perf.elapsed_ms", unit="ms", description="Elapsed time recorded by SurfSense perf timers.", ) def record_model_call_duration( duration_ms: float, *, model: str | None, provider: str | None ) -> None: _record( _model_call_duration(), duration_ms, { "gen_ai.request.model": model, "gen_ai.provider.name": provider, }, ) def record_model_token_usage( *, input_tokens: int | None, output_tokens: int | None, model: str | None, provider: str | None, ) -> None: base = { "gen_ai.request.model": model, "gen_ai.provider.name": provider, "gen_ai.operation.name": "chat", } if input_tokens is not None: _record( _model_token_usage(), int(input_tokens), {**base, "gen_ai.token.type": "input"}, ) if output_tokens is not None: _record( _model_token_usage(), int(output_tokens), {**base, "gen_ai.token.type": "output"}, ) def record_tool_call_duration(duration_ms: float, *, tool_name: str) -> None: _record(_tool_call_duration(), duration_ms, {"tool.name": tool_name}) def record_tool_call_error(*, tool_name: str) -> None: _add(_tool_call_errors(), 1, {"tool.name": tool_name}) def record_kb_search_duration( duration_ms: float, *, search_space_id: int | None, surface: str ) -> None: _record( _kb_search_duration(), duration_ms, {"search_space.id": search_space_id, "search.surface": surface}, ) def record_compaction_run(*, reason: str | None) -> None: _add(_compaction_runs(), 1, {"compaction.reason": reason or "unknown"}) def record_permission_ask(*, permission: str) -> None: _add(_permission_asks(), 1, {"permission.permission": permission}) def record_interrupt(*, interrupt_type: str) -> None: _add(_interrupts(), 1, {"interrupt.type": interrupt_type}) def record_indexing_document_duration( duration_s: float, *, document_type: str | None ) -> None: _record( _indexing_document_duration(), duration_s, {"document.type": document_type or "unknown"}, ) def record_indexing_document_outcome(*, document_type: str | None, status: str) -> None: _add( _indexing_document_outcome(), 1, {"document.type": document_type or "unknown", "status": status}, ) def record_connector_sync_duration( duration_s: float, *, connector_type: str | None ) -> None: _record( _connector_sync_duration(), duration_s, {"connector.type": connector_type or "unknown"}, ) def record_connector_sync_outcome(*, connector_type: str | None, status: str) -> None: _add( _connector_sync_outcome(), 1, {"connector.type": connector_type or "unknown", "status": status}, ) def record_auth_failure(*, reason: str) -> None: _add(_auth_failures(), 1, {"reason": reason}) def record_rate_limit_rejection(*, scope: str) -> None: _add(_rate_limit_rejections(), 1, {"scope": scope}) def record_perf_elapsed(duration_ms: float, *, label: str) -> None: _record(_perf_elapsed(), duration_ms, {"label": label}) def _runtime_snapshot_value(key: str, transform: Any = None) -> list[Any]: from opentelemetry.metrics import Observation from app.utils.perf import system_snapshot snap = system_snapshot() value = snap.get(key) if not isinstance(value, int | float) or value < 0: return [] if transform is not None: value = transform(value) return [Observation(value)] def _observe_gc_collections(_options: Any) -> list[Any]: from opentelemetry.metrics import Observation return [ Observation(count, {"generation": str(generation)}) for generation, count in enumerate(gc.get_count()) ] def register_runtime_observables() -> None: """Register process/runtime observable gauges once per process.""" global _OBSERVABLES_REGISTERED if _OBSERVABLES_REGISTERED or not _is_enabled(): return meter = _get_meter() try: # Each callback returns the value for a single gauge except GC, whose # callback carries a generation attribute. meter.create_observable_gauge( "process.runtime.cpython.memory.rss", callbacks=[ lambda _options: _runtime_snapshot_value( "rss_mb", lambda v: float(v) * 1024 * 1024 ) ], unit="By", description="Resident set size of the SurfSense backend process.", ) meter.create_observable_gauge( "process.runtime.cpython.cpu.utilization", callbacks=[ lambda _options: _runtime_snapshot_value( "cpu_percent", lambda v: float(v) / 100.0 ) ], unit="1", description="CPU utilization of the SurfSense backend process.", ) meter.create_observable_gauge( "process.runtime.cpython.threads", callbacks=[lambda _options: _runtime_snapshot_value("threads")], unit="{thread}", description="Thread count of the SurfSense backend process.", ) meter.create_observable_gauge( "process.runtime.cpython.open_fds", callbacks=[lambda _options: _runtime_snapshot_value("open_fds")], unit="{fd}", description="Open file descriptor count of the SurfSense backend process.", ) meter.create_observable_gauge( "python.asyncio.tasks", callbacks=[lambda _options: _runtime_snapshot_value("asyncio_tasks")], unit="{task}", description="Live asyncio task count in the current event loop.", ) meter.create_observable_gauge( "process.runtime.cpython.gc.collections", callbacks=[_observe_gc_collections], unit="{collection}", description="CPython GC counters by generation.", ) except Exception: logger.warning("Failed to register OTel runtime observables", exc_info=True) return _OBSERVABLES_REGISTERED = True __all__ = [ "record_auth_failure", "record_compaction_run", "record_connector_sync_duration", "record_connector_sync_outcome", "record_indexing_document_duration", "record_indexing_document_outcome", "record_interrupt", "record_kb_search_duration", "record_model_call_duration", "record_model_token_usage", "record_perf_elapsed", "record_permission_ask", "record_rate_limit_rejection", "record_tool_call_duration", "record_tool_call_error", "register_runtime_observables", ]