From ae3ce914653ec2372f83e4d9ceb63bf148930fdc Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Wed, 27 May 2026 23:37:26 +0530 Subject: [PATCH] feat(gateway): add configuration and metrics hooks --- surfsense_backend/.env.example | 7 + surfsense_backend/app/config/__init__.py | 6 + .../app/observability/metrics.py | 153 ++++++++++++++++++ 3 files changed, 166 insertions(+) diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index b05369412..6fef9b20e 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -15,6 +15,13 @@ REDIS_APP_URL=redis://localhost:6379/0 # Optional: TTL in seconds for connector indexing lock key # CONNECTOR_INDEXING_LOCK_TTL_SECONDS=28800 +# Telegram Gateway +# TELEGRAM_WEBHOOK_SECRET must be 1-256 chars and contain only A-Z, a-z, 0-9, _ or - +TELEGRAM_SHARED_BOT_TOKEN= +TELEGRAM_SHARED_BOT_USERNAME= +TELEGRAM_WEBHOOK_SECRET= +GATEWAY_BASE_URL=http://localhost:8000 + # Platform Web Search (SearXNG) # Set this to enable built-in web search. Docker Compose sets it automatically. # Only uncomment if running the backend outside Docker (e.g. uvicorn on host). diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 5643c048b..a7739d6c4 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -541,6 +541,12 @@ class Config: # Backend URL to override the http to https in the OAuth redirect URI BACKEND_URL = os.getenv("BACKEND_URL") + # Messaging gateway (Telegram v1) + TELEGRAM_SHARED_BOT_TOKEN = os.getenv("TELEGRAM_SHARED_BOT_TOKEN") + TELEGRAM_SHARED_BOT_USERNAME = os.getenv("TELEGRAM_SHARED_BOT_USERNAME") + TELEGRAM_WEBHOOK_SECRET = os.getenv("TELEGRAM_WEBHOOK_SECRET") + GATEWAY_BASE_URL = os.getenv("GATEWAY_BASE_URL", BACKEND_URL) + # Stripe checkout for pay-as-you-go page packs STRIPE_SECRET_KEY = os.getenv("STRIPE_SECRET_KEY") STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET") diff --git a/surfsense_backend/app/observability/metrics.py b/surfsense_backend/app/observability/metrics.py index 798a6e2f7..8098ac307 100644 --- a/surfsense_backend/app/observability/metrics.py +++ b/surfsense_backend/app/observability/metrics.py @@ -314,6 +314,103 @@ def _celery_queue_latency(): ) +@lru_cache(maxsize=1) +def _gateway_redis_fallback(): + return _get_meter().create_counter( + "surfsense.gateway.redis.fallback", + description="Count of gateway Redis fallback uses.", + ) + + +@lru_cache(maxsize=1) +def _gateway_thread_lock_contention(): + return _get_meter().create_counter( + "surfsense.gateway.thread_lock.contention", + description="Count of gateway per-thread lock contention events.", + ) + + +@lru_cache(maxsize=1) +def _gateway_inbox_writes(): + return _get_meter().create_counter( + "surfsense.gateway.inbox.writes", + description="Count of gateway inbound event inbox writes.", + ) + + +@lru_cache(maxsize=1) +def _gateway_inbox_processed(): + return _get_meter().create_counter( + "surfsense.gateway.inbox.processed", + description="Count of gateway inbound event processing outcomes.", + ) + + +@lru_cache(maxsize=1) +def _gateway_inbound_reconciled(): + return _get_meter().create_counter( + "surfsense.gateway.inbound.reconciled", + description="Count of gateway inbox events re-enqueued by reconciliation.", + ) + + +@lru_cache(maxsize=1) +def _gateway_outbound(): + return _get_meter().create_counter( + "surfsense.gateway.outbound", + description="Count of gateway outbound platform operations.", + ) + + +@lru_cache(maxsize=1) +def _gateway_turn_latency(): + return _get_meter().create_histogram( + "surfsense.gateway.turn.latency", + unit="ms", + description="Latency of gateway-routed agent turns.", + ) + + +@lru_cache(maxsize=1) +def _gateway_rate_limit_hits(): + return _get_meter().create_counter( + "surfsense.gateway.rate_limit.hits", + description="Count of gateway outbound rate limit waits.", + ) + + +@lru_cache(maxsize=1) +def _gateway_health_check_failures(): + return _get_meter().create_counter( + "surfsense.gateway.health_check.failures", + description="Count of gateway account health-check failures.", + ) + + +@lru_cache(maxsize=1) +def _gateway_auth_invariant_failures(): + return _get_meter().create_counter( + "surfsense.gateway.auth_invariant.failures", + description="Count of gateway authorization invariant failures.", + ) + + +@lru_cache(maxsize=1) +def _gateway_hitl_aborted(): + return _get_meter().create_counter( + "surfsense.gateway.hitl.aborted", + description="Count of gateway turns aborted because HITL is unsupported.", + ) + + +@lru_cache(maxsize=1) +def _gateway_active_bindings(): + return _get_meter().create_up_down_counter( + "surfsense.gateway.active_bindings", + description="Current change in active gateway bindings.", + ) + + def record_model_call_duration( duration_ms: float, *, model: str | None, provider: str | None ) -> None: @@ -569,6 +666,62 @@ def record_celery_queue_latency( ) +def record_gateway_redis_fallback() -> None: + _add(_gateway_redis_fallback(), 1, {}) + + +def record_gateway_thread_lock_contention() -> None: + _add(_gateway_thread_lock_contention(), 1, {}) + + +def record_gateway_inbox_write(*, platform: str, dedup_skipped: bool) -> None: + _add( + _gateway_inbox_writes(), + 1, + {"platform": platform, "dedup.skipped": bool(dedup_skipped)}, + ) + + +def record_gateway_inbox_processed(*, platform: str, status: str) -> None: + _add(_gateway_inbox_processed(), 1, {"platform": platform, "status": status}) + + +def record_gateway_inbound_reconciled(*, reason: str) -> None: + _add(_gateway_inbound_reconciled(), 1, {"reason": reason}) + + +def record_gateway_outbound(*, platform: str, kind: str, status: str) -> None: + _add( + _gateway_outbound(), + 1, + {"platform": platform, "kind": kind, "status": status}, + ) + + +def record_gateway_turn_latency(duration_ms: float, *, platform: str) -> None: + _record(_gateway_turn_latency(), duration_ms, {"platform": platform}) + + +def record_gateway_rate_limit_hit(*, bucket: str) -> None: + _add(_gateway_rate_limit_hits(), 1, {"bucket": bucket}) + + +def record_gateway_health_check_failure(*, platform: str) -> None: + _add(_gateway_health_check_failures(), 1, {"platform": platform}) + + +def record_gateway_auth_invariant_failure(*, cause: str) -> None: + _add(_gateway_auth_invariant_failures(), 1, {"cause": cause}) + + +def record_gateway_hitl_aborted(*, platform: str) -> None: + _add(_gateway_hitl_aborted(), 1, {"platform": platform}) + + +def record_gateway_active_bindings_delta(delta: int, *, platform: str) -> None: + _add(_gateway_active_bindings(), delta, {"platform": platform}) + + def _runtime_snapshot_value(key: str, transform: Any = None) -> list[Any]: from opentelemetry.metrics import Observation