diff --git a/surfsense_backend/app/services/openrouter_integration_service.py b/surfsense_backend/app/services/openrouter_integration_service.py index 06b7becdc..9c3eaa5ea 100644 --- a/surfsense_backend/app/services/openrouter_integration_service.py +++ b/surfsense_backend/app/services/openrouter_integration_service.py @@ -14,13 +14,28 @@ import asyncio import hashlib import logging import threading +import time from typing import Any import httpx +from app.services.quality_score import ( + _HEALTH_BLEND_WEIGHT, + _HEALTH_ENRICH_CONCURRENCY, + _HEALTH_ENRICH_TOP_N_FREE, + _HEALTH_ENRICH_TOP_N_PREMIUM, + _HEALTH_FAIL_RATIO_FALLBACK, + _HEALTH_FETCH_TIMEOUT_SEC, + aggregate_health, + static_score_or, +) + logger = logging.getLogger(__name__) OPENROUTER_API_URL = "https://openrouter.ai/api/v1/models" +OPENROUTER_ENDPOINTS_URL_TEMPLATE = ( + "https://openrouter.ai/api/v1/models/{model_id}/endpoints" +) # Sentinel value stored on each generated config so we can distinguish # dynamic OpenRouter entries from hand-written YAML entries during refresh. @@ -217,12 +232,15 @@ def _generate_configs( configs: list[dict] = [] taken: set[int] = set() + now_ts = int(time.time()) for model in text_models: model_id: str = model["id"] name: str = model.get("name", model_id) tier = _openrouter_tier(model) + static_q = static_score_or(model, now_ts=now_ts) + cfg: dict[str, Any] = { "id": _stable_config_id(model_id, id_offset, taken), "name": name, @@ -249,6 +267,15 @@ def _generate_configs( # there — it just drains the shared bucket faster. "router_pool_eligible": tier == "premium", _OPENROUTER_DYNAMIC_MARKER: True, + # Auto (Fastest) ranking metadata. ``quality_score`` is initialised + # to the static score and gets re-blended with health on the next + # ``_enrich_health`` pass (synchronous on refresh, deferred on cold + # start so startup latency is unchanged). + "auto_pin_tier": "B" if tier == "premium" else "C", + "quality_score_static": static_q, + "quality_score_health": None, + "quality_score": static_q, + "health_gated": False, } configs.append(cfg) @@ -267,6 +294,12 @@ class OpenRouterIntegrationService: self._configs_by_id: dict[int, dict] = {} self._initialized = False self._refresh_task: asyncio.Task | None = None + # Last-good per-model health snapshot. Survives across refresh + # cycles so a transient OpenRouter /endpoints outage doesn't drop + # every cfg back to static-only scoring. + # Shape: {model_name: {"gated": bool, "score": float | None}} + self._health_cache: dict[str, dict[str, Any]] = {} + self._enrich_task: asyncio.Task | None = None @classmethod def get_instance(cls) -> "OpenRouterIntegrationService": @@ -307,6 +340,20 @@ class OpenRouterIntegrationService: tier_counts["free"], tier_counts["premium"], ) + + # Schedule the first health-enrichment pass as a deferred task so + # cold-start latency is unchanged. Only valid when an event loop is + # already running (e.g. FastAPI lifespan); Celery worker init is + # fully sync so we silently skip — its first refresh tick (or the + # next refresh from the web process) will populate health data. + try: + loop = asyncio.get_running_loop() + self._enrich_task = loop.create_task( + self._enrich_health_safely(self._configs) + ) + except RuntimeError: + pass + return self._configs # ------------------------------------------------------------------ @@ -343,6 +390,13 @@ class OpenRouterIntegrationService: tier_counts["premium"], ) + # Re-blend health scores against the freshly fetched catalogue. Also + # re-stamps health for any YAML-curated cfg with provider==OPENROUTER + # so a hand-picked dead OR model is gated like a dynamic one. + await self._enrich_health_safely( + static_configs + new_configs, log_summary=True + ) + # Rebuild the LiteLLM router so freshly fetched configs flow through # (dynamic OR premium entries now opt into the pool, free ones stay # out; a refresh also needs to pick up any static-config edits and @@ -373,6 +427,183 @@ class OpenRouterIntegrationService: counts[tier] += 1 return counts + # ------------------------------------------------------------------ + # Auto (Fastest) health enrichment + # ------------------------------------------------------------------ + + async def _enrich_health_safely( + self, configs: list[dict], *, log_summary: bool = True + ) -> None: + """Wrapper around ``_enrich_health`` that swallows all errors. + + Health enrichment is best-effort: any failure must leave cfgs in + their static-only state and never break refresh / startup. + """ + try: + await self._enrich_health(configs, log_summary=log_summary) + except Exception: + logger.exception("OpenRouter health enrichment failed") + + async def _enrich_health( + self, configs: list[dict], *, log_summary: bool = True + ) -> None: + """Fetch per-model ``/endpoints`` data for the top OR cfgs and blend + the resulting health score into ``cfg["quality_score"]``. + + Bounded fan-out: top-N per tier by ``quality_score_static`` only, + with ``asyncio.Semaphore(_HEALTH_ENRICH_CONCURRENCY)`` guarding the + outbound HTTP. Misses fall back to a per-model last-good cache; if + the failure ratio crosses ``_HEALTH_FAIL_RATIO_FALLBACK`` we keep + the entire previous cycle's cache for this run. + """ + or_cfgs = [ + c for c in configs if str(c.get("provider", "")).upper() == "OPENROUTER" + ] + if not or_cfgs: + return + + premium_pool = sorted( + [ + c + for c in or_cfgs + if str(c.get("billing_tier", "")).lower() == "premium" + ], + key=lambda c: -int(c.get("quality_score_static") or 0), + )[:_HEALTH_ENRICH_TOP_N_PREMIUM] + free_pool = sorted( + [ + c + for c in or_cfgs + if str(c.get("billing_tier", "")).lower() == "free" + ], + key=lambda c: -int(c.get("quality_score_static") or 0), + )[:_HEALTH_ENRICH_TOP_N_FREE] + # De-duplicate while preserving order: a cfg shouldn't fall in both + # tiers, but defensive code is cheap here. + seen_ids: set[int] = set() + selected: list[dict] = [] + for cfg in premium_pool + free_pool: + cid = int(cfg.get("id", 0)) + if cid in seen_ids: + continue + seen_ids.add(cid) + selected.append(cfg) + + if not selected: + return + + api_key = str(self._settings.get("api_key") or "") + semaphore = asyncio.Semaphore(_HEALTH_ENRICH_CONCURRENCY) + + async with httpx.AsyncClient( + timeout=_HEALTH_FETCH_TIMEOUT_SEC + ) as client: + results = await asyncio.gather( + *( + self._fetch_endpoints(client, semaphore, api_key, cfg) + for cfg in selected + ) + ) + + fail_count = sum(1 for _, _, err in results if err is not None) + fail_ratio = fail_count / len(results) if results else 0.0 + degraded = fail_ratio >= _HEALTH_FAIL_RATIO_FALLBACK + if degraded: + logger.warning( + "auto_pin_health_enrich_degraded fail_ratio=%.2f total=%d " + "using_last_good_cache=true", + fail_ratio, + len(results), + ) + + # Per-cfg health update. + for cfg, endpoints, err in results: + model_name = str(cfg.get("model_name", "")) + if not degraded and err is None and endpoints is not None: + gated, h_score = aggregate_health(endpoints) + cfg["health_gated"] = bool(gated) + cfg["quality_score_health"] = h_score + self._health_cache[model_name] = { + "gated": bool(gated), + "score": h_score, + } + else: + cached = self._health_cache.get(model_name) + if cached is not None: + cfg["health_gated"] = bool(cached.get("gated", False)) + cfg["quality_score_health"] = cached.get("score") + # else: keep current values (initial defaults from + # _generate_configs / load_global_llm_configs). + + # Blend health into the final score for every OR cfg, including + # those outside the enriched top-N (they fall through to static). + gated_count = 0 + by_provider: dict[str, int] = {} + for cfg in or_cfgs: + static_q = int(cfg.get("quality_score_static") or 0) + h = cfg.get("quality_score_health") + if h is not None and not cfg.get("health_gated"): + blended = ( + _HEALTH_BLEND_WEIGHT * float(h) + + (1 - _HEALTH_BLEND_WEIGHT) * static_q + ) + cfg["quality_score"] = round(blended) + else: + cfg["quality_score"] = static_q + + if cfg.get("health_gated"): + gated_count += 1 + model_id = str(cfg.get("model_name", "")) + provider_slug = ( + model_id.split("/", 1)[0] if "/" in model_id else "unknown" + ) + by_provider[provider_slug] = by_provider.get(provider_slug, 0) + 1 + + if log_summary: + logger.info( + "auto_pin_health_gated count=%d by_provider=%s fail_ratio=%.2f " + "total_enriched=%d", + gated_count, + dict(sorted(by_provider.items(), key=lambda kv: -kv[1])), + fail_ratio, + len(selected), + ) + + @staticmethod + async def _fetch_endpoints( + client: httpx.AsyncClient, + semaphore: asyncio.Semaphore, + api_key: str, + cfg: dict, + ) -> tuple[dict, list[dict] | None, Exception | None]: + """Fetch ``/api/v1/models/{id}/endpoints`` for one cfg. + + Returns ``(cfg, endpoints, err)`` so the caller can keep batched + results aligned with their cfgs without raising. + """ + model_id = str(cfg.get("model_name", "")) + if not model_id: + return cfg, None, ValueError("missing model_name") + + url = OPENROUTER_ENDPOINTS_URL_TEMPLATE.format(model_id=model_id) + headers = {"Authorization": f"Bearer {api_key}"} if api_key else {} + + async with semaphore: + try: + resp = await client.get(url, headers=headers) + resp.raise_for_status() + data = resp.json() + except Exception as exc: + return cfg, None, exc + + payload = data.get("data") if isinstance(data, dict) else None + if not isinstance(payload, dict): + return cfg, None, ValueError("malformed endpoints payload") + endpoints = payload.get("endpoints") + if not isinstance(endpoints, list): + return cfg, [], None + return cfg, endpoints, None + async def _refresh_loop(self, interval_hours: float) -> None: interval_sec = interval_hours * 3600 while True: diff --git a/surfsense_backend/tests/unit/services/test_or_health_enrichment.py b/surfsense_backend/tests/unit/services/test_or_health_enrichment.py new file mode 100644 index 000000000..1c74aa928 --- /dev/null +++ b/surfsense_backend/tests/unit/services/test_or_health_enrichment.py @@ -0,0 +1,331 @@ +"""Unit tests for the OpenRouter ``_enrich_health`` background task.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from app.services.openrouter_integration_service import ( + OpenRouterIntegrationService, +) +from app.services.quality_score import ( + _HEALTH_FAIL_RATIO_FALLBACK, +) + +pytestmark = pytest.mark.unit + + +def _or_cfg( + *, + cid: int, + model_name: str, + tier: str = "premium", + static_score: int = 50, +) -> dict: + return { + "id": cid, + "provider": "OPENROUTER", + "model_name": model_name, + "billing_tier": tier, + "auto_pin_tier": "B" if tier == "premium" else "C", + "quality_score_static": static_score, + "quality_score_health": None, + "quality_score": static_score, + "health_gated": False, + } + + +class _StubResponse: + def __init__(self, *, payload: dict, status_code: int = 200): + self._payload = payload + self.status_code = status_code + + def raise_for_status(self) -> None: + if self.status_code >= 400: + raise RuntimeError(f"HTTP {self.status_code}") + + def json(self) -> dict: + return self._payload + + +class _StubAsyncClient: + """Minimal drop-in for ``httpx.AsyncClient`` used by ``_fetch_endpoints``.""" + + def __init__(self, responder): + self._responder = responder + self.requests: list[str] = [] + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def get(self, url: str, headers: dict | None = None) -> _StubResponse: + self.requests.append(url) + return self._responder(url) + + +def _patch_async_client(monkeypatch, responder) -> _StubAsyncClient: + """Replace ``httpx.AsyncClient`` for the duration of the test.""" + client = _StubAsyncClient(responder) + monkeypatch.setattr( + "app.services.openrouter_integration_service.httpx.AsyncClient", + lambda *_args, **_kwargs: client, + ) + return client + + +def _healthy_payload() -> dict: + return { + "data": { + "endpoints": [ + { + "status": 0, + "uptime_last_30m": 0.99, + "uptime_last_1d": 0.995, + "uptime_last_5m": 0.99, + } + ] + } + } + + +def _unhealthy_payload() -> dict: + return { + "data": { + "endpoints": [ + { + "status": 0, + "uptime_last_30m": 0.55, + "uptime_last_1d": 0.62, + "uptime_last_5m": 0.50, + } + ] + } + } + + +# --------------------------------------------------------------------------- +# Bounded fan-out + happy path +# --------------------------------------------------------------------------- + + +async def test_enrich_health_marks_healthy_and_gates_unhealthy(monkeypatch): + cfgs = [ + _or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70), + _or_cfg(cid=-2, model_name="venice/dead-model", static_score=60), + ] + + def responder(url: str) -> _StubResponse: + if "anthropic" in url: + return _StubResponse(payload=_healthy_payload()) + return _StubResponse(payload=_unhealthy_payload()) + + _patch_async_client(monkeypatch, responder) + + service = OpenRouterIntegrationService() + service._settings = {"api_key": ""} + await service._enrich_health(cfgs) + + healthy = next(c for c in cfgs if c["id"] == -1) + gated = next(c for c in cfgs if c["id"] == -2) + + assert healthy["health_gated"] is False + assert healthy["quality_score_health"] is not None + assert healthy["quality_score"] >= healthy["quality_score_static"] + + assert gated["health_gated"] is True + assert gated["quality_score"] == gated["quality_score_static"] + + +async def test_enrich_health_only_touches_or_provider(monkeypatch): + """YAML cfgs that aren't OPENROUTER must be skipped entirely.""" + yaml_cfg = { + "id": -1, + "provider": "AZURE_OPENAI", + "model_name": "gpt-5", + "billing_tier": "premium", + "auto_pin_tier": "A", + "quality_score_static": 80, + "quality_score": 80, + "health_gated": False, + } + or_cfg = _or_cfg(cid=-2, model_name="anthropic/claude-haiku") + + requests: list[str] = [] + + def responder(url: str) -> _StubResponse: + requests.append(url) + return _StubResponse(payload=_healthy_payload()) + + _patch_async_client(monkeypatch, responder) + + service = OpenRouterIntegrationService() + service._settings = {} + await service._enrich_health([yaml_cfg, or_cfg]) + + assert all("anthropic/claude-haiku" in r for r in requests) + # YAML cfg is untouched. + assert yaml_cfg["quality_score"] == 80 + assert yaml_cfg["health_gated"] is False + + +# --------------------------------------------------------------------------- +# Failure ratio fallback +# --------------------------------------------------------------------------- + + +async def test_enrich_health_falls_back_to_last_good_when_failure_ratio_high( + monkeypatch, +): + """If >= 25% of fetches fail, keep last-good cache instead of writing + partial data.""" + cfgs = [ + _or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70), + _or_cfg(cid=-2, model_name="openai/gpt-5", static_score=80), + _or_cfg(cid=-3, model_name="google/gemini-flash", static_score=65), + _or_cfg(cid=-4, model_name="venice/something", static_score=50), + ] + + service = OpenRouterIntegrationService() + service._settings = {} + # Pre-seed last-good cache with a known-healthy snapshot. + service._health_cache = { + "anthropic/claude-haiku": {"gated": False, "score": 95.0}, + } + + def all_fail(_url: str) -> _StubResponse: + return _StubResponse(payload={}, status_code=500) + + _patch_async_client(monkeypatch, all_fail) + await service._enrich_health(cfgs) + + # Above threshold ⇒ degraded; last-good cache wins for the cached cfg. + cached_hit = next(c for c in cfgs if c["model_name"] == "anthropic/claude-haiku") + assert cached_hit["quality_score_health"] == 95.0 + assert cached_hit["health_gated"] is False + # Confirm the threshold constant we're testing against is real. + assert _HEALTH_FAIL_RATIO_FALLBACK <= 1.0 + + +async def test_enrich_health_keeps_static_only_with_no_cache_and_failures( + monkeypatch, +): + """If a fetch fails and there's no last-good cache, the cfg keeps its + static-only ``quality_score`` and is *not* gated by default.""" + cfgs = [ + _or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70), + ] + + def fail(_url: str) -> _StubResponse: + return _StubResponse(payload={}, status_code=500) + + _patch_async_client(monkeypatch, fail) + + service = OpenRouterIntegrationService() + service._settings = {} + await service._enrich_health(cfgs) + + cfg = cfgs[0] + assert cfg["health_gated"] is False + assert cfg["quality_score"] == cfg["quality_score_static"] + assert cfg["quality_score_health"] is None + + +# --------------------------------------------------------------------------- +# Last-good cache: success populates, next failure reuses +# --------------------------------------------------------------------------- + + +async def test_enrich_health_populates_cache_on_success_then_reuses_on_failure( + monkeypatch, +): + cfg = _or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70) + + service = OpenRouterIntegrationService() + service._settings = {} + + def healthy(_url: str) -> _StubResponse: + return _StubResponse(payload=_healthy_payload()) + + _patch_async_client(monkeypatch, healthy) + await service._enrich_health([cfg]) + + assert "anthropic/claude-haiku" in service._health_cache + cached_score = service._health_cache["anthropic/claude-haiku"]["score"] + assert cached_score is not None + + # Next cycle: enough other healthy cfgs so failure ratio stays below + # the 25% threshold even when this one fails individually. + other_cfgs = [ + _or_cfg(cid=-2 - i, model_name=f"healthy/m-{i}", static_score=60) + for i in range(10) + ] + cfg["quality_score_health"] = None + cfg["quality_score"] = cfg["quality_score_static"] + + def mixed(url: str) -> _StubResponse: + if "anthropic" in url: + return _StubResponse(payload={}, status_code=500) + return _StubResponse(payload=_healthy_payload()) + + _patch_async_client(monkeypatch, mixed) + await service._enrich_health([cfg, *other_cfgs]) + + assert cfg["quality_score_health"] == cached_score + assert cfg["health_gated"] is False + + +# --------------------------------------------------------------------------- +# Bounded fan-out: respects top-N caps +# --------------------------------------------------------------------------- + + +async def test_enrich_health_bounds_premium_fanout(monkeypatch): + """Top-N premium cap is honoured even when many cfgs are present.""" + from app.services.quality_score import _HEALTH_ENRICH_TOP_N_PREMIUM + + cfgs = [ + _or_cfg( + cid=-i, model_name=f"openai/m-{i}", tier="premium", static_score=100 - i + ) + for i in range(1, _HEALTH_ENRICH_TOP_N_PREMIUM + 20) + ] + + seen: list[str] = [] + + def responder(url: str) -> _StubResponse: + seen.append(url) + return _StubResponse(payload=_healthy_payload()) + + _patch_async_client(monkeypatch, responder) + + service = OpenRouterIntegrationService() + service._settings = {} + await service._enrich_health(cfgs) + + assert len(seen) == _HEALTH_ENRICH_TOP_N_PREMIUM + + +async def test_enrich_health_no_or_cfgs_is_noop(monkeypatch): + """When the catalogue has no OR cfgs at all, no HTTP calls fire.""" + yaml_cfg: dict[str, Any] = { + "id": -1, + "provider": "AZURE_OPENAI", + "model_name": "gpt-5", + "billing_tier": "premium", + } + requests: list[str] = [] + + def responder(url: str) -> _StubResponse: + requests.append(url) + return _StubResponse(payload=_healthy_payload()) + + _patch_async_client(monkeypatch, responder) + + service = OpenRouterIntegrationService() + service._settings = {} + await service._enrich_health([yaml_cfg]) + assert requests == []