feat(openrouter): blend per-model /endpoints health into quality score

This commit is contained in:
Anish Sarkar 2026-05-01 23:38:40 +05:30
parent c229b4356a
commit 1eedcaa551
2 changed files with 562 additions and 0 deletions

View file

@ -14,13 +14,28 @@ import asyncio
import hashlib
import logging
import threading
import time
from typing import Any
import httpx
from app.services.quality_score import (
_HEALTH_BLEND_WEIGHT,
_HEALTH_ENRICH_CONCURRENCY,
_HEALTH_ENRICH_TOP_N_FREE,
_HEALTH_ENRICH_TOP_N_PREMIUM,
_HEALTH_FAIL_RATIO_FALLBACK,
_HEALTH_FETCH_TIMEOUT_SEC,
aggregate_health,
static_score_or,
)
logger = logging.getLogger(__name__)
OPENROUTER_API_URL = "https://openrouter.ai/api/v1/models"
OPENROUTER_ENDPOINTS_URL_TEMPLATE = (
"https://openrouter.ai/api/v1/models/{model_id}/endpoints"
)
# Sentinel value stored on each generated config so we can distinguish
# dynamic OpenRouter entries from hand-written YAML entries during refresh.
@ -217,12 +232,15 @@ def _generate_configs(
configs: list[dict] = []
taken: set[int] = set()
now_ts = int(time.time())
for model in text_models:
model_id: str = model["id"]
name: str = model.get("name", model_id)
tier = _openrouter_tier(model)
static_q = static_score_or(model, now_ts=now_ts)
cfg: dict[str, Any] = {
"id": _stable_config_id(model_id, id_offset, taken),
"name": name,
@ -249,6 +267,15 @@ def _generate_configs(
# there — it just drains the shared bucket faster.
"router_pool_eligible": tier == "premium",
_OPENROUTER_DYNAMIC_MARKER: True,
# Auto (Fastest) ranking metadata. ``quality_score`` is initialised
# to the static score and gets re-blended with health on the next
# ``_enrich_health`` pass (synchronous on refresh, deferred on cold
# start so startup latency is unchanged).
"auto_pin_tier": "B" if tier == "premium" else "C",
"quality_score_static": static_q,
"quality_score_health": None,
"quality_score": static_q,
"health_gated": False,
}
configs.append(cfg)
@ -267,6 +294,12 @@ class OpenRouterIntegrationService:
self._configs_by_id: dict[int, dict] = {}
self._initialized = False
self._refresh_task: asyncio.Task | None = None
# Last-good per-model health snapshot. Survives across refresh
# cycles so a transient OpenRouter /endpoints outage doesn't drop
# every cfg back to static-only scoring.
# Shape: {model_name: {"gated": bool, "score": float | None}}
self._health_cache: dict[str, dict[str, Any]] = {}
self._enrich_task: asyncio.Task | None = None
@classmethod
def get_instance(cls) -> "OpenRouterIntegrationService":
@ -307,6 +340,20 @@ class OpenRouterIntegrationService:
tier_counts["free"],
tier_counts["premium"],
)
# Schedule the first health-enrichment pass as a deferred task so
# cold-start latency is unchanged. Only valid when an event loop is
# already running (e.g. FastAPI lifespan); Celery worker init is
# fully sync so we silently skip — its first refresh tick (or the
# next refresh from the web process) will populate health data.
try:
loop = asyncio.get_running_loop()
self._enrich_task = loop.create_task(
self._enrich_health_safely(self._configs)
)
except RuntimeError:
pass
return self._configs
# ------------------------------------------------------------------
@ -343,6 +390,13 @@ class OpenRouterIntegrationService:
tier_counts["premium"],
)
# Re-blend health scores against the freshly fetched catalogue. Also
# re-stamps health for any YAML-curated cfg with provider==OPENROUTER
# so a hand-picked dead OR model is gated like a dynamic one.
await self._enrich_health_safely(
static_configs + new_configs, log_summary=True
)
# Rebuild the LiteLLM router so freshly fetched configs flow through
# (dynamic OR premium entries now opt into the pool, free ones stay
# out; a refresh also needs to pick up any static-config edits and
@ -373,6 +427,183 @@ class OpenRouterIntegrationService:
counts[tier] += 1
return counts
# ------------------------------------------------------------------
# Auto (Fastest) health enrichment
# ------------------------------------------------------------------
async def _enrich_health_safely(
self, configs: list[dict], *, log_summary: bool = True
) -> None:
"""Wrapper around ``_enrich_health`` that swallows all errors.
Health enrichment is best-effort: any failure must leave cfgs in
their static-only state and never break refresh / startup.
"""
try:
await self._enrich_health(configs, log_summary=log_summary)
except Exception:
logger.exception("OpenRouter health enrichment failed")
async def _enrich_health(
self, configs: list[dict], *, log_summary: bool = True
) -> None:
"""Fetch per-model ``/endpoints`` data for the top OR cfgs and blend
the resulting health score into ``cfg["quality_score"]``.
Bounded fan-out: top-N per tier by ``quality_score_static`` only,
with ``asyncio.Semaphore(_HEALTH_ENRICH_CONCURRENCY)`` guarding the
outbound HTTP. Misses fall back to a per-model last-good cache; if
the failure ratio crosses ``_HEALTH_FAIL_RATIO_FALLBACK`` we keep
the entire previous cycle's cache for this run.
"""
or_cfgs = [
c for c in configs if str(c.get("provider", "")).upper() == "OPENROUTER"
]
if not or_cfgs:
return
premium_pool = sorted(
[
c
for c in or_cfgs
if str(c.get("billing_tier", "")).lower() == "premium"
],
key=lambda c: -int(c.get("quality_score_static") or 0),
)[:_HEALTH_ENRICH_TOP_N_PREMIUM]
free_pool = sorted(
[
c
for c in or_cfgs
if str(c.get("billing_tier", "")).lower() == "free"
],
key=lambda c: -int(c.get("quality_score_static") or 0),
)[:_HEALTH_ENRICH_TOP_N_FREE]
# De-duplicate while preserving order: a cfg shouldn't fall in both
# tiers, but defensive code is cheap here.
seen_ids: set[int] = set()
selected: list[dict] = []
for cfg in premium_pool + free_pool:
cid = int(cfg.get("id", 0))
if cid in seen_ids:
continue
seen_ids.add(cid)
selected.append(cfg)
if not selected:
return
api_key = str(self._settings.get("api_key") or "")
semaphore = asyncio.Semaphore(_HEALTH_ENRICH_CONCURRENCY)
async with httpx.AsyncClient(
timeout=_HEALTH_FETCH_TIMEOUT_SEC
) as client:
results = await asyncio.gather(
*(
self._fetch_endpoints(client, semaphore, api_key, cfg)
for cfg in selected
)
)
fail_count = sum(1 for _, _, err in results if err is not None)
fail_ratio = fail_count / len(results) if results else 0.0
degraded = fail_ratio >= _HEALTH_FAIL_RATIO_FALLBACK
if degraded:
logger.warning(
"auto_pin_health_enrich_degraded fail_ratio=%.2f total=%d "
"using_last_good_cache=true",
fail_ratio,
len(results),
)
# Per-cfg health update.
for cfg, endpoints, err in results:
model_name = str(cfg.get("model_name", ""))
if not degraded and err is None and endpoints is not None:
gated, h_score = aggregate_health(endpoints)
cfg["health_gated"] = bool(gated)
cfg["quality_score_health"] = h_score
self._health_cache[model_name] = {
"gated": bool(gated),
"score": h_score,
}
else:
cached = self._health_cache.get(model_name)
if cached is not None:
cfg["health_gated"] = bool(cached.get("gated", False))
cfg["quality_score_health"] = cached.get("score")
# else: keep current values (initial defaults from
# _generate_configs / load_global_llm_configs).
# Blend health into the final score for every OR cfg, including
# those outside the enriched top-N (they fall through to static).
gated_count = 0
by_provider: dict[str, int] = {}
for cfg in or_cfgs:
static_q = int(cfg.get("quality_score_static") or 0)
h = cfg.get("quality_score_health")
if h is not None and not cfg.get("health_gated"):
blended = (
_HEALTH_BLEND_WEIGHT * float(h)
+ (1 - _HEALTH_BLEND_WEIGHT) * static_q
)
cfg["quality_score"] = round(blended)
else:
cfg["quality_score"] = static_q
if cfg.get("health_gated"):
gated_count += 1
model_id = str(cfg.get("model_name", ""))
provider_slug = (
model_id.split("/", 1)[0] if "/" in model_id else "unknown"
)
by_provider[provider_slug] = by_provider.get(provider_slug, 0) + 1
if log_summary:
logger.info(
"auto_pin_health_gated count=%d by_provider=%s fail_ratio=%.2f "
"total_enriched=%d",
gated_count,
dict(sorted(by_provider.items(), key=lambda kv: -kv[1])),
fail_ratio,
len(selected),
)
@staticmethod
async def _fetch_endpoints(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
api_key: str,
cfg: dict,
) -> tuple[dict, list[dict] | None, Exception | None]:
"""Fetch ``/api/v1/models/{id}/endpoints`` for one cfg.
Returns ``(cfg, endpoints, err)`` so the caller can keep batched
results aligned with their cfgs without raising.
"""
model_id = str(cfg.get("model_name", ""))
if not model_id:
return cfg, None, ValueError("missing model_name")
url = OPENROUTER_ENDPOINTS_URL_TEMPLATE.format(model_id=model_id)
headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
async with semaphore:
try:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
return cfg, None, exc
payload = data.get("data") if isinstance(data, dict) else None
if not isinstance(payload, dict):
return cfg, None, ValueError("malformed endpoints payload")
endpoints = payload.get("endpoints")
if not isinstance(endpoints, list):
return cfg, [], None
return cfg, endpoints, None
async def _refresh_loop(self, interval_hours: float) -> None:
interval_sec = interval_hours * 3600
while True:

View file

@ -0,0 +1,331 @@
"""Unit tests for the OpenRouter ``_enrich_health`` background task."""
from __future__ import annotations
from typing import Any
import pytest
from app.services.openrouter_integration_service import (
OpenRouterIntegrationService,
)
from app.services.quality_score import (
_HEALTH_FAIL_RATIO_FALLBACK,
)
pytestmark = pytest.mark.unit
def _or_cfg(
*,
cid: int,
model_name: str,
tier: str = "premium",
static_score: int = 50,
) -> dict:
return {
"id": cid,
"provider": "OPENROUTER",
"model_name": model_name,
"billing_tier": tier,
"auto_pin_tier": "B" if tier == "premium" else "C",
"quality_score_static": static_score,
"quality_score_health": None,
"quality_score": static_score,
"health_gated": False,
}
class _StubResponse:
def __init__(self, *, payload: dict, status_code: int = 200):
self._payload = payload
self.status_code = status_code
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def json(self) -> dict:
return self._payload
class _StubAsyncClient:
"""Minimal drop-in for ``httpx.AsyncClient`` used by ``_fetch_endpoints``."""
def __init__(self, responder):
self._responder = responder
self.requests: list[str] = []
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def get(self, url: str, headers: dict | None = None) -> _StubResponse:
self.requests.append(url)
return self._responder(url)
def _patch_async_client(monkeypatch, responder) -> _StubAsyncClient:
"""Replace ``httpx.AsyncClient`` for the duration of the test."""
client = _StubAsyncClient(responder)
monkeypatch.setattr(
"app.services.openrouter_integration_service.httpx.AsyncClient",
lambda *_args, **_kwargs: client,
)
return client
def _healthy_payload() -> dict:
return {
"data": {
"endpoints": [
{
"status": 0,
"uptime_last_30m": 0.99,
"uptime_last_1d": 0.995,
"uptime_last_5m": 0.99,
}
]
}
}
def _unhealthy_payload() -> dict:
return {
"data": {
"endpoints": [
{
"status": 0,
"uptime_last_30m": 0.55,
"uptime_last_1d": 0.62,
"uptime_last_5m": 0.50,
}
]
}
}
# ---------------------------------------------------------------------------
# Bounded fan-out + happy path
# ---------------------------------------------------------------------------
async def test_enrich_health_marks_healthy_and_gates_unhealthy(monkeypatch):
cfgs = [
_or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70),
_or_cfg(cid=-2, model_name="venice/dead-model", static_score=60),
]
def responder(url: str) -> _StubResponse:
if "anthropic" in url:
return _StubResponse(payload=_healthy_payload())
return _StubResponse(payload=_unhealthy_payload())
_patch_async_client(monkeypatch, responder)
service = OpenRouterIntegrationService()
service._settings = {"api_key": ""}
await service._enrich_health(cfgs)
healthy = next(c for c in cfgs if c["id"] == -1)
gated = next(c for c in cfgs if c["id"] == -2)
assert healthy["health_gated"] is False
assert healthy["quality_score_health"] is not None
assert healthy["quality_score"] >= healthy["quality_score_static"]
assert gated["health_gated"] is True
assert gated["quality_score"] == gated["quality_score_static"]
async def test_enrich_health_only_touches_or_provider(monkeypatch):
"""YAML cfgs that aren't OPENROUTER must be skipped entirely."""
yaml_cfg = {
"id": -1,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score_static": 80,
"quality_score": 80,
"health_gated": False,
}
or_cfg = _or_cfg(cid=-2, model_name="anthropic/claude-haiku")
requests: list[str] = []
def responder(url: str) -> _StubResponse:
requests.append(url)
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, responder)
service = OpenRouterIntegrationService()
service._settings = {}
await service._enrich_health([yaml_cfg, or_cfg])
assert all("anthropic/claude-haiku" in r for r in requests)
# YAML cfg is untouched.
assert yaml_cfg["quality_score"] == 80
assert yaml_cfg["health_gated"] is False
# ---------------------------------------------------------------------------
# Failure ratio fallback
# ---------------------------------------------------------------------------
async def test_enrich_health_falls_back_to_last_good_when_failure_ratio_high(
monkeypatch,
):
"""If >= 25% of fetches fail, keep last-good cache instead of writing
partial data."""
cfgs = [
_or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70),
_or_cfg(cid=-2, model_name="openai/gpt-5", static_score=80),
_or_cfg(cid=-3, model_name="google/gemini-flash", static_score=65),
_or_cfg(cid=-4, model_name="venice/something", static_score=50),
]
service = OpenRouterIntegrationService()
service._settings = {}
# Pre-seed last-good cache with a known-healthy snapshot.
service._health_cache = {
"anthropic/claude-haiku": {"gated": False, "score": 95.0},
}
def all_fail(_url: str) -> _StubResponse:
return _StubResponse(payload={}, status_code=500)
_patch_async_client(monkeypatch, all_fail)
await service._enrich_health(cfgs)
# Above threshold ⇒ degraded; last-good cache wins for the cached cfg.
cached_hit = next(c for c in cfgs if c["model_name"] == "anthropic/claude-haiku")
assert cached_hit["quality_score_health"] == 95.0
assert cached_hit["health_gated"] is False
# Confirm the threshold constant we're testing against is real.
assert _HEALTH_FAIL_RATIO_FALLBACK <= 1.0
async def test_enrich_health_keeps_static_only_with_no_cache_and_failures(
monkeypatch,
):
"""If a fetch fails and there's no last-good cache, the cfg keeps its
static-only ``quality_score`` and is *not* gated by default."""
cfgs = [
_or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70),
]
def fail(_url: str) -> _StubResponse:
return _StubResponse(payload={}, status_code=500)
_patch_async_client(monkeypatch, fail)
service = OpenRouterIntegrationService()
service._settings = {}
await service._enrich_health(cfgs)
cfg = cfgs[0]
assert cfg["health_gated"] is False
assert cfg["quality_score"] == cfg["quality_score_static"]
assert cfg["quality_score_health"] is None
# ---------------------------------------------------------------------------
# Last-good cache: success populates, next failure reuses
# ---------------------------------------------------------------------------
async def test_enrich_health_populates_cache_on_success_then_reuses_on_failure(
monkeypatch,
):
cfg = _or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70)
service = OpenRouterIntegrationService()
service._settings = {}
def healthy(_url: str) -> _StubResponse:
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, healthy)
await service._enrich_health([cfg])
assert "anthropic/claude-haiku" in service._health_cache
cached_score = service._health_cache["anthropic/claude-haiku"]["score"]
assert cached_score is not None
# Next cycle: enough other healthy cfgs so failure ratio stays below
# the 25% threshold even when this one fails individually.
other_cfgs = [
_or_cfg(cid=-2 - i, model_name=f"healthy/m-{i}", static_score=60)
for i in range(10)
]
cfg["quality_score_health"] = None
cfg["quality_score"] = cfg["quality_score_static"]
def mixed(url: str) -> _StubResponse:
if "anthropic" in url:
return _StubResponse(payload={}, status_code=500)
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, mixed)
await service._enrich_health([cfg, *other_cfgs])
assert cfg["quality_score_health"] == cached_score
assert cfg["health_gated"] is False
# ---------------------------------------------------------------------------
# Bounded fan-out: respects top-N caps
# ---------------------------------------------------------------------------
async def test_enrich_health_bounds_premium_fanout(monkeypatch):
"""Top-N premium cap is honoured even when many cfgs are present."""
from app.services.quality_score import _HEALTH_ENRICH_TOP_N_PREMIUM
cfgs = [
_or_cfg(
cid=-i, model_name=f"openai/m-{i}", tier="premium", static_score=100 - i
)
for i in range(1, _HEALTH_ENRICH_TOP_N_PREMIUM + 20)
]
seen: list[str] = []
def responder(url: str) -> _StubResponse:
seen.append(url)
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, responder)
service = OpenRouterIntegrationService()
service._settings = {}
await service._enrich_health(cfgs)
assert len(seen) == _HEALTH_ENRICH_TOP_N_PREMIUM
async def test_enrich_health_no_or_cfgs_is_noop(monkeypatch):
"""When the catalogue has no OR cfgs at all, no HTTP calls fire."""
yaml_cfg: dict[str, Any] = {
"id": -1,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"billing_tier": "premium",
}
requests: list[str] = []
def responder(url: str) -> _StubResponse:
requests.append(url)
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, responder)
service = OpenRouterIntegrationService()
service._settings = {}
await service._enrich_health([yaml_cfg])
assert requests == []