Merge pull request #1332 from AnishSarkar22/feat/model-pinnning-mode
Some checks are pending
Build and Push Docker Images / tag_release (push) Waiting to run
Build and Push Docker Images / build (./surfsense_backend, ./surfsense_backend/Dockerfile, backend, surfsense-backend, ubuntu-24.04-arm, linux/arm64, arm64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_backend, ./surfsense_backend/Dockerfile, backend, surfsense-backend, ubuntu-latest, linux/amd64, amd64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_web, ./surfsense_web/Dockerfile, web, surfsense-web, ubuntu-24.04-arm, linux/arm64, arm64) (push) Blocked by required conditions
Build and Push Docker Images / build (./surfsense_web, ./surfsense_web/Dockerfile, web, surfsense-web, ubuntu-latest, linux/amd64, amd64) (push) Blocked by required conditions
Build and Push Docker Images / create_manifest (backend, surfsense-backend) (push) Blocked by required conditions
Build and Push Docker Images / create_manifest (web, surfsense-web) (push) Blocked by required conditions

feat: Auto-pin quality scoring, OpenRouter tier refactor and live usage sidebar
This commit is contained in:
Rohan Verma 2026-05-01 15:57:19 -07:00 committed by GitHub
commit 451a98936e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
35 changed files with 3975 additions and 319 deletions

View file

@ -118,3 +118,37 @@ async def test_end_turn_force_clears_lock_and_cancel_state() -> None:
assert not manager.lock_for(thread_id).locked()
assert not get_cancel_event(thread_id).is_set()
assert is_cancel_requested(thread_id) is False
@pytest.mark.asyncio
async def test_busy_mutex_stale_aafter_does_not_release_new_attempt_lock() -> None:
"""A stale aafter call from attempt A must not unlock attempt B.
Repro flow:
1) attempt A acquires thread lock
2) forced end_turn clears A so retry can proceed
3) attempt B acquires same thread lock
4) stale attempt-A aafter runs late
Expected: B lock remains held.
"""
thread_id = "stale-aafter-lock"
runtime = _Runtime(thread_id)
attempt_a = BusyMutexMiddleware()
attempt_b = BusyMutexMiddleware()
await attempt_a.abefore_agent({}, runtime)
lock = manager.lock_for(thread_id)
assert lock.locked()
end_turn(thread_id)
assert not lock.locked()
await attempt_b.abefore_agent({}, runtime)
assert lock.locked()
# Stale cleanup from attempt A must not release attempt B's lock.
await attempt_a.aafter_agent({}, runtime)
assert lock.locked()
await attempt_b.aafter_agent({}, runtime)

View file

@ -6,13 +6,26 @@ from types import SimpleNamespace
import pytest
from app.services.auto_model_pin_service import (
AUTO_FASTEST_MODE,
clear_healthy,
clear_runtime_cooldown,
is_recently_healthy,
mark_healthy,
mark_runtime_cooldown,
resolve_or_get_pinned_llm_config_id,
)
pytestmark = pytest.mark.unit
@pytest.fixture(autouse=True)
def _clear_runtime_cooldown_map():
clear_runtime_cooldown()
clear_healthy()
yield
clear_runtime_cooldown()
clear_healthy()
@dataclass
class _FakeQuotaResult:
allowed: bool
@ -45,14 +58,11 @@ def _thread(
*,
search_space_id: int = 10,
pinned_llm_config_id: int | None = None,
pinned_auto_mode: str | None = None,
):
return SimpleNamespace(
id=1,
search_space_id=search_space_id,
pinned_llm_config_id=pinned_llm_config_id,
pinned_auto_mode=pinned_auto_mode,
pinned_at=None,
)
@ -93,8 +103,6 @@ async def test_auto_first_turn_pins_one_model(monkeypatch):
)
assert result.resolved_llm_config_id in {-1, -2}
assert session.thread.pinned_llm_config_id == result.resolved_llm_config_id
assert session.thread.pinned_auto_mode == AUTO_FASTEST_MODE
assert session.thread.pinned_at is not None
assert session.commit_count == 1
@ -102,9 +110,7 @@ async def test_auto_first_turn_pins_one_model(monkeypatch):
async def test_next_turn_reuses_existing_pin(monkeypatch):
from app.config import config
session = _FakeSession(
_thread(pinned_llm_config_id=-1, pinned_auto_mode=AUTO_FASTEST_MODE)
)
session = _FakeSession(_thread(pinned_llm_config_id=-1))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
@ -228,9 +234,7 @@ async def test_premium_ineligible_auto_pins_free_only(monkeypatch):
async def test_pinned_premium_stays_premium_after_quota_exhaustion(monkeypatch):
from app.config import config
session = _FakeSession(
_thread(pinned_llm_config_id=-1, pinned_auto_mode=AUTO_FASTEST_MODE)
)
session = _FakeSession(_thread(pinned_llm_config_id=-1))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
@ -275,9 +279,7 @@ async def test_pinned_premium_stays_premium_after_quota_exhaustion(monkeypatch):
async def test_force_repin_free_switches_auto_premium_pin_to_free(monkeypatch):
from app.config import config
session = _FakeSession(
_thread(pinned_llm_config_id=-1, pinned_auto_mode=AUTO_FASTEST_MODE)
)
session = _FakeSession(_thread(pinned_llm_config_id=-1))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
@ -325,9 +327,7 @@ async def test_force_repin_free_switches_auto_premium_pin_to_free(monkeypatch):
async def test_explicit_user_model_change_clears_pin(monkeypatch):
from app.config import config
session = _FakeSession(
_thread(pinned_llm_config_id=-2, pinned_auto_mode=AUTO_FASTEST_MODE)
)
session = _FakeSession(_thread(pinned_llm_config_id=-2))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
@ -345,8 +345,6 @@ async def test_explicit_user_model_change_clears_pin(monkeypatch):
)
assert result.resolved_llm_config_id == 7
assert session.thread.pinned_llm_config_id is None
assert session.thread.pinned_auto_mode is None
assert session.thread.pinned_at is None
assert session.commit_count == 1
@ -354,9 +352,7 @@ async def test_explicit_user_model_change_clears_pin(monkeypatch):
async def test_invalid_pinned_config_repairs_with_new_pin(monkeypatch):
from app.config import config
session = _FakeSession(
_thread(pinned_llm_config_id=-999, pinned_auto_mode=AUTO_FASTEST_MODE)
)
session = _FakeSession(_thread(pinned_llm_config_id=-999))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
@ -383,3 +379,543 @@ async def test_invalid_pinned_config_repairs_with_new_pin(monkeypatch):
assert result.resolved_llm_config_id == -2
assert session.thread.pinned_llm_config_id == -2
assert session.commit_count == 1
# ---------------------------------------------------------------------------
# Quality-aware pin selection (Auto Fastest upgrade)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_health_gated_config_is_excluded_from_selection(monkeypatch):
"""A cfg flagged ``health_gated`` must never be picked even if it has
the highest score among eligible cfgs."""
from app.config import config
session = _FakeSession(_thread())
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[
{
"id": -1,
"provider": "OPENROUTER",
"model_name": "venice/dead-model",
"api_key": "k1",
"billing_tier": "free",
"auto_pin_tier": "C",
"quality_score": 95,
"health_gated": True,
},
{
"id": -2,
"provider": "OPENROUTER",
"model_name": "google/gemini-flash",
"api_key": "k1",
"billing_tier": "free",
"auto_pin_tier": "C",
"quality_score": 60,
"health_gated": False,
},
],
)
async def _blocked(*_args, **_kwargs):
return _FakeQuotaResult(allowed=False)
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_blocked,
)
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=1,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
)
assert result.resolved_llm_config_id == -2
@pytest.mark.asyncio
async def test_tier_a_locks_first_premium_user_skips_or(monkeypatch):
"""Premium-eligible users with Tier A available should never spill to
Tier B even if a B cfg ranks higher by ``quality_score``."""
from app.config import config
session = _FakeSession(_thread())
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[
{
"id": -1,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"api_key": "k-yaml",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score": 70,
"health_gated": False,
},
{
"id": -2,
"provider": "OPENROUTER",
"model_name": "openai/gpt-5",
"api_key": "k-or",
"billing_tier": "premium",
"auto_pin_tier": "B",
"quality_score": 95,
"health_gated": False,
},
],
)
async def _allowed(*_args, **_kwargs):
return _FakeQuotaResult(allowed=True)
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_allowed,
)
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=1,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
)
assert result.resolved_llm_config_id == -1
assert result.resolved_tier == "premium"
@pytest.mark.asyncio
async def test_tier_a_falls_through_to_or_when_a_pool_empty_for_user(monkeypatch):
"""Free-only user with no Tier A free cfg should pick from Tier C."""
from app.config import config
session = _FakeSession(_thread())
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[
{
"id": -1,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"api_key": "k-yaml",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score": 100,
"health_gated": False,
},
{
"id": -2,
"provider": "OPENROUTER",
"model_name": "google/gemini-flash:free",
"api_key": "k-or",
"billing_tier": "free",
"auto_pin_tier": "C",
"quality_score": 60,
"health_gated": False,
},
],
)
async def _blocked(*_args, **_kwargs):
return _FakeQuotaResult(allowed=False)
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_blocked,
)
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=1,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
)
assert result.resolved_llm_config_id == -2
@pytest.mark.asyncio
async def test_top_k_picks_only_high_score_models(monkeypatch):
"""Different thread IDs should spread across top-K, never pick the
obvious low-quality cfg even when it sits in the candidate list."""
from app.config import config
high_score_cfgs = [
{
"id": -i,
"provider": "AZURE_OPENAI",
"model_name": f"gpt-x-{i}",
"api_key": "k",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score": 90,
"health_gated": False,
}
for i in range(1, 6) # 5 high-quality Tier A cfgs
]
low_score_trap = {
"id": -99,
"provider": "AZURE_OPENAI",
"model_name": "tiny-legacy",
"api_key": "k",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score": 10,
"health_gated": False,
}
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[*high_score_cfgs, low_score_trap],
)
async def _allowed(*_args, **_kwargs):
return _FakeQuotaResult(allowed=True)
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_allowed,
)
high_score_ids = {c["id"] for c in high_score_cfgs}
seen = set()
for thread_id in range(1, 50):
session = _FakeSession(_thread())
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=thread_id,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
)
seen.add(result.resolved_llm_config_id)
assert result.resolved_llm_config_id != -99, (
"low-score trap cfg should never be picked"
)
assert result.resolved_llm_config_id in high_score_ids
# Spread across at least a couple of top-K cfgs.
assert len(seen) > 1
@pytest.mark.asyncio
async def test_pin_reuse_survives_health_gating_for_existing_pin(monkeypatch):
"""An *already* pinned cfg that later flips to ``health_gated`` should
still not be reused gated cfgs are filtered out of the candidate
pool, which forces a repair to a healthy cfg.
This guards the no-silent-tier-switch invariant: we don't keep using
a known-broken model just because the thread happened to be pinned
to it before the gate fired."""
from app.config import config
session = _FakeSession(_thread(pinned_llm_config_id=-1))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[
{
"id": -1,
"provider": "OPENROUTER",
"model_name": "venice/dead-model",
"api_key": "k",
"billing_tier": "premium",
"auto_pin_tier": "B",
"quality_score": 50,
"health_gated": True,
},
{
"id": -2,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"api_key": "k",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score": 90,
"health_gated": False,
},
],
)
async def _allowed(*_args, **_kwargs):
return _FakeQuotaResult(allowed=True)
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_allowed,
)
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=1,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
)
assert result.resolved_llm_config_id == -2
assert result.from_existing_pin is False
@pytest.mark.asyncio
async def test_pin_reuse_regression_existing_healthy_pin(monkeypatch):
"""Existing pin reuse must short-circuit the new tier/score logic."""
from app.config import config
session = _FakeSession(_thread(pinned_llm_config_id=-1))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[
{
"id": -1,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"api_key": "k",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score": 50, # lower than -2
"health_gated": False,
},
{
"id": -2,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5-pro",
"api_key": "k",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score": 99,
"health_gated": False,
},
],
)
async def _must_not_call(*_args, **_kwargs):
raise AssertionError("premium_get_usage should not run on pin reuse")
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_must_not_call,
)
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=1,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
)
assert result.resolved_llm_config_id == -1
assert result.from_existing_pin is True
assert session.commit_count == 0
@pytest.mark.asyncio
async def test_runtime_cooled_down_pin_is_not_reused(monkeypatch):
"""A runtime-cooled config should be excluded from candidate reuse.
This enables one-shot recovery from transient provider 429 bursts: we can
mark the pinned cfg as cooled down and force a repair to another eligible
cfg on the next resolution.
"""
from app.config import config
session = _FakeSession(_thread(pinned_llm_config_id=-1))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[
{
"id": -1,
"provider": "OPENROUTER",
"model_name": "google/gemma-4-26b-a4b-it:free",
"api_key": "k",
"billing_tier": "free",
"auto_pin_tier": "C",
"quality_score": 90,
"health_gated": False,
},
{
"id": -2,
"provider": "OPENROUTER",
"model_name": "google/gemini-2.5-flash:free",
"api_key": "k",
"billing_tier": "free",
"auto_pin_tier": "C",
"quality_score": 80,
"health_gated": False,
},
],
)
async def _blocked(*_args, **_kwargs):
return _FakeQuotaResult(allowed=False)
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_blocked,
)
mark_runtime_cooldown(-1, reason="provider_rate_limited", cooldown_seconds=600)
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=1,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
)
assert result.resolved_llm_config_id == -2
assert result.from_existing_pin is False
@pytest.mark.asyncio
async def test_clearing_runtime_cooldown_restores_pin_reuse(monkeypatch):
from app.config import config
session = _FakeSession(_thread(pinned_llm_config_id=-1))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[
{
"id": -1,
"provider": "OPENROUTER",
"model_name": "google/gemma-4-26b-a4b-it:free",
"api_key": "k",
"billing_tier": "free",
"auto_pin_tier": "C",
"quality_score": 90,
"health_gated": False,
},
],
)
async def _must_not_call(*_args, **_kwargs):
raise AssertionError("premium_get_usage should not run on healthy pin reuse")
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_must_not_call,
)
mark_runtime_cooldown(-1, reason="provider_rate_limited", cooldown_seconds=600)
clear_runtime_cooldown(-1)
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=1,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
)
assert result.resolved_llm_config_id == -1
assert result.from_existing_pin is True
@pytest.mark.asyncio
async def test_auto_pin_repin_excludes_previous_config_on_runtime_retry(monkeypatch):
"""Runtime retry should never repin the just-failed config."""
from app.config import config
session = _FakeSession(_thread(pinned_llm_config_id=-1))
monkeypatch.setattr(
config,
"GLOBAL_LLM_CONFIGS",
[
{
"id": -1,
"provider": "OPENROUTER",
"model_name": "google/gemma-4-26b-a4b-it:free",
"api_key": "k",
"billing_tier": "free",
"auto_pin_tier": "C",
"quality_score": 90,
"health_gated": False,
},
{
"id": -2,
"provider": "OPENROUTER",
"model_name": "google/gemini-2.5-flash:free",
"api_key": "k",
"billing_tier": "free",
"auto_pin_tier": "C",
"quality_score": 80,
"health_gated": False,
},
],
)
async def _blocked(*_args, **_kwargs):
return _FakeQuotaResult(allowed=False)
monkeypatch.setattr(
"app.services.auto_model_pin_service.TokenQuotaService.premium_get_usage",
_blocked,
)
result = await resolve_or_get_pinned_llm_config_id(
session,
thread_id=1,
search_space_id=10,
user_id="00000000-0000-0000-0000-000000000001",
selected_llm_config_id=0,
exclude_config_ids={-1},
)
assert result.resolved_llm_config_id == -2
assert result.from_existing_pin is False
# ---------------------------------------------------------------------------
# Healthy-status cache (preflight TTL companion)
# ---------------------------------------------------------------------------
def test_mark_healthy_then_is_recently_healthy_true_within_ttl():
mark_healthy(-42, ttl_seconds=60)
assert is_recently_healthy(-42) is True
def test_healthy_expires_after_ttl(monkeypatch):
import app.services.auto_model_pin_service as svc
real_time = svc.time.time
base = real_time()
monkeypatch.setattr(svc.time, "time", lambda: base)
mark_healthy(-7, ttl_seconds=10)
assert is_recently_healthy(-7) is True
monkeypatch.setattr(svc.time, "time", lambda: base + 11)
assert is_recently_healthy(-7) is False
def test_mark_runtime_cooldown_invalidates_healthy_cache():
mark_healthy(-9, ttl_seconds=60)
assert is_recently_healthy(-9) is True
mark_runtime_cooldown(-9, reason="test", cooldown_seconds=60)
assert is_recently_healthy(-9) is False
def test_clear_healthy_removes_single_entry():
mark_healthy(-11, ttl_seconds=60)
mark_healthy(-12, ttl_seconds=60)
clear_healthy(-11)
assert is_recently_healthy(-11) is False
assert is_recently_healthy(-12) is True
def test_clear_healthy_no_args_drops_all_entries():
mark_healthy(-21, ttl_seconds=60)
mark_healthy(-22, ttl_seconds=60)
clear_healthy()
assert is_recently_healthy(-21) is False
assert is_recently_healthy(-22) is False

View file

@ -0,0 +1,226 @@
"""LLMRouterService pool-filter / rebuild tests.
These tests focus on the *config plumbing* (which configs enter the router
pool, rebuild resets state correctly). They stub out the underlying
``litellm.Router`` so we don't need real API keys or network access.
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
from app.services.llm_router_service import LLMRouterService
pytestmark = pytest.mark.unit
def _fake_yaml_config(
*,
id: int,
model_name: str,
billing_tier: str = "free",
) -> dict:
return {
"id": id,
"name": f"yaml-{id}",
"provider": "OPENAI",
"model_name": model_name,
"api_key": "sk-test",
"api_base": "",
"billing_tier": billing_tier,
"rpm": 100,
"tpm": 100_000,
"litellm_params": {},
}
def _fake_openrouter_config(
*,
id: int,
model_name: str,
billing_tier: str,
router_pool_eligible: bool | None = None,
) -> dict:
"""Build a synthetic dynamic-OR config dict for router-pool tests.
Defaults mirror Strategy 3: premium OR enters the pool, free OR stays
out. Callers can override ``router_pool_eligible`` to simulate legacy
configs or to regression-test the filter mechanics directly.
"""
if router_pool_eligible is None:
router_pool_eligible = billing_tier == "premium"
return {
"id": id,
"name": f"or-{id}",
"provider": "OPENROUTER",
"model_name": model_name,
"api_key": "sk-or-test",
"api_base": "",
"billing_tier": billing_tier,
"rpm": 20 if billing_tier == "free" else 200,
"tpm": 100_000 if billing_tier == "free" else 1_000_000,
"litellm_params": {},
"router_pool_eligible": router_pool_eligible,
}
def _reset_router_singleton() -> None:
instance = LLMRouterService.get_instance()
instance._initialized = False
instance._router = None
instance._model_list = []
instance._premium_model_strings = set()
def test_router_pool_includes_or_premium_excludes_or_free():
"""Strategy 3: premium OR joins the pool, free OR stays out.
Dynamic OpenRouter premium entries opt into load balancing alongside
curated YAML configs. Dynamic OR free entries are intentionally kept
out because OpenRouter's free tier enforces a single account-global
quota bucket that per-deployment router accounting can't represent.
"""
_reset_router_singleton()
configs = [
_fake_yaml_config(id=-1, model_name="gpt-4o", billing_tier="premium"),
_fake_yaml_config(id=-2, model_name="gpt-4o-mini", billing_tier="free"),
_fake_openrouter_config(
id=-10_001, model_name="openai/gpt-4o", billing_tier="premium"
),
_fake_openrouter_config(
id=-10_002,
model_name="meta-llama/llama-3.3-70b:free",
billing_tier="free",
),
]
with (
patch("app.services.llm_router_service.Router") as mock_router,
patch(
"app.services.llm_router_service.LLMRouterService._build_context_fallback_groups"
) as mock_ctx_fb,
):
mock_ctx_fb.side_effect = lambda ml: (ml, None)
mock_router.return_value = object()
LLMRouterService.initialize(configs)
pool_models = {
dep["litellm_params"]["model"]
for dep in LLMRouterService.get_instance()._model_list
}
# YAML premium + YAML free + dynamic OR premium are all in the pool.
# Dynamic OR free is NOT (shared-bucket rate limits can't be load-balanced).
assert pool_models == {
"openai/gpt-4o",
"openai/gpt-4o-mini",
"openrouter/openai/gpt-4o",
}
prem = LLMRouterService.get_instance()._premium_model_strings
# YAML premium is fingerprinted under both its model_string and its
# ``base_model`` form (existing behavior we don't want to regress).
assert "openai/gpt-4o" in prem
# Dynamic OR premium is now fingerprinted as premium so pool-level
# calls through the router are billed against premium quota.
assert "openrouter/openai/gpt-4o" in prem
assert LLMRouterService.is_premium_model("openrouter/openai/gpt-4o") is True
# Dynamic OR free never enters the pool, so it's never counted as premium.
assert (
LLMRouterService.is_premium_model("openrouter/meta-llama/llama-3.3-70b:free")
is False
)
def test_router_pool_filter_mechanics_respect_override():
"""The ``router_pool_eligible`` filter itself works independently of tier.
Regression guard: if a future refactor ever sets the flag False on a
premium config (e.g. for maintenance), that config MUST be skipped by
``initialize`` even though its tier is premium.
"""
_reset_router_singleton()
configs = [
_fake_yaml_config(id=-1, model_name="gpt-4o", billing_tier="premium"),
_fake_openrouter_config(
id=-10_001,
model_name="openai/gpt-4o",
billing_tier="premium",
router_pool_eligible=False, # opt out despite being premium
),
]
with (
patch("app.services.llm_router_service.Router") as mock_router,
patch(
"app.services.llm_router_service.LLMRouterService._build_context_fallback_groups"
) as mock_ctx_fb,
):
mock_ctx_fb.side_effect = lambda ml: (ml, None)
mock_router.return_value = object()
LLMRouterService.initialize(configs)
pool_models = {
dep["litellm_params"]["model"]
for dep in LLMRouterService.get_instance()._model_list
}
assert pool_models == {"openai/gpt-4o"}
assert LLMRouterService.is_premium_model("openrouter/openai/gpt-4o") is False
def test_rebuild_refreshes_pool_after_configs_change():
_reset_router_singleton()
configs_v1 = [
_fake_yaml_config(id=-1, model_name="gpt-4o", billing_tier="premium"),
]
configs_v2 = [
*configs_v1,
_fake_yaml_config(id=-2, model_name="gpt-4o-mini", billing_tier="free"),
]
with (
patch("app.services.llm_router_service.Router") as mock_router,
patch(
"app.services.llm_router_service.LLMRouterService._build_context_fallback_groups"
) as mock_ctx_fb,
):
mock_ctx_fb.side_effect = lambda ml: (ml, None)
mock_router.return_value = object()
LLMRouterService.initialize(configs_v1)
assert len(LLMRouterService.get_instance()._model_list) == 1
# ``initialize`` should be a no-op here (already initialized).
LLMRouterService.initialize(configs_v2)
assert len(LLMRouterService.get_instance()._model_list) == 1
# ``rebuild`` must clear the guard and re-run with the new configs.
LLMRouterService.rebuild(configs_v2)
assert len(LLMRouterService.get_instance()._model_list) == 2
def test_auto_model_pin_candidates_include_dynamic_openrouter():
"""Dynamic OR configs must remain Auto-mode thread-pin candidates.
Guards against a future regression where someone adds the
``router_pool_eligible`` filter to ``auto_model_pin_service._global_candidates``.
"""
from app.config import config
from app.services.auto_model_pin_service import _global_candidates
or_premium = _fake_openrouter_config(
id=-10_001, model_name="openai/gpt-4o", billing_tier="premium"
)
or_free = _fake_openrouter_config(
id=-10_002,
model_name="meta-llama/llama-3.3-70b:free",
billing_tier="free",
)
original = config.GLOBAL_LLM_CONFIGS
try:
config.GLOBAL_LLM_CONFIGS = [or_premium, or_free]
candidate_ids = {c["id"] for c in _global_candidates()}
assert candidate_ids == {-10_001, -10_002}
finally:
config.GLOBAL_LLM_CONFIGS = original

View file

@ -0,0 +1,216 @@
"""Unit tests for the dynamic OpenRouter integration."""
from __future__ import annotations
import pytest
from app.services.openrouter_integration_service import (
_OPENROUTER_DYNAMIC_MARKER,
_generate_configs,
_openrouter_tier,
_stable_config_id,
)
pytestmark = pytest.mark.unit
def _minimal_openrouter_model(
*,
model_id: str,
pricing: dict | None = None,
name: str | None = None,
) -> dict:
"""Return a synthetic OpenRouter /api/v1/models entry.
The real API payload includes a lot of fields; we only populate what
``_generate_configs`` actually inspects (architecture, tool support,
context, pricing, id).
"""
return {
"id": model_id,
"name": name or model_id,
"architecture": {"output_modalities": ["text"]},
"supported_parameters": ["tools"],
"context_length": 200_000,
"pricing": pricing or {"prompt": "0.000003", "completion": "0.000015"},
}
# ---------------------------------------------------------------------------
# _openrouter_tier
# ---------------------------------------------------------------------------
def test_openrouter_tier_free_suffix():
assert _openrouter_tier({"id": "foo/bar:free"}) == "free"
def test_openrouter_tier_zero_pricing():
model = {
"id": "foo/bar",
"pricing": {"prompt": "0", "completion": "0"},
}
assert _openrouter_tier(model) == "free"
def test_openrouter_tier_paid():
model = {
"id": "foo/bar",
"pricing": {"prompt": "0.000003", "completion": "0.000015"},
}
assert _openrouter_tier(model) == "premium"
def test_openrouter_tier_missing_pricing_is_premium():
assert _openrouter_tier({"id": "foo/bar"}) == "premium"
assert _openrouter_tier({"id": "foo/bar", "pricing": {}}) == "premium"
# ---------------------------------------------------------------------------
# _stable_config_id
# ---------------------------------------------------------------------------
def test_stable_config_id_deterministic():
taken1: set[int] = set()
taken2: set[int] = set()
a = _stable_config_id("openai/gpt-4o", -10_000, taken1)
b = _stable_config_id("openai/gpt-4o", -10_000, taken2)
assert a == b
assert a < 0
def test_stable_config_id_collision_decrements():
"""When two model_ids hash to the same slot, the second should decrement."""
taken: set[int] = set()
a = _stable_config_id("openai/gpt-4o", -10_000, taken)
# Force a collision by pre-populating ``taken`` with a slot we know will be
# picked.
taken_forced = {a}
b = _stable_config_id("openai/gpt-4o", -10_000, taken_forced)
assert b != a
assert b == a - 1
assert b in taken_forced
def test_stable_config_id_different_models_different_ids():
taken: set[int] = set()
ids = {
_stable_config_id("openai/gpt-4o", -10_000, taken),
_stable_config_id("anthropic/claude-3.5-sonnet", -10_000, taken),
_stable_config_id("google/gemini-2.0-flash", -10_000, taken),
}
assert len(ids) == 3
def test_stable_config_id_survives_catalogue_churn():
"""Removing a model should not shift other models' IDs (the bug we fix)."""
taken1: set[int] = set()
id_a1 = _stable_config_id("openai/gpt-4o", -10_000, taken1)
_ = _stable_config_id("anthropic/claude-3-haiku", -10_000, taken1)
id_c1 = _stable_config_id("google/gemini-2.0-flash", -10_000, taken1)
taken2: set[int] = set()
id_a2 = _stable_config_id("openai/gpt-4o", -10_000, taken2)
id_c2 = _stable_config_id("google/gemini-2.0-flash", -10_000, taken2)
assert id_a1 == id_a2
assert id_c1 == id_c2
# ---------------------------------------------------------------------------
# _generate_configs
# ---------------------------------------------------------------------------
_SETTINGS_BASE: dict = {
"api_key": "sk-or-test",
"id_offset": -10_000,
"rpm": 200,
"tpm": 1_000_000,
"free_rpm": 20,
"free_tpm": 100_000,
"anonymous_enabled_paid": False,
"anonymous_enabled_free": True,
"quota_reserve_tokens": 4000,
}
def test_generate_configs_respects_tier():
"""Premium OR models opt into the router pool; free OR models stay out.
Strategy-3 split: premium participates in LiteLLM Router load balancing,
free stays excluded because OpenRouter enforces a shared global free-tier
bucket that per-deployment router accounting can't represent.
"""
raw = [
_minimal_openrouter_model(model_id="openai/gpt-4o"),
_minimal_openrouter_model(
model_id="meta-llama/llama-3.3-70b-instruct:free",
pricing={"prompt": "0", "completion": "0"},
),
]
cfgs = _generate_configs(raw, dict(_SETTINGS_BASE))
by_model = {c["model_name"]: c for c in cfgs}
paid = by_model["openai/gpt-4o"]
assert paid["billing_tier"] == "premium"
assert paid["rpm"] == 200
assert paid["tpm"] == 1_000_000
assert paid["anonymous_enabled"] is False
assert paid["router_pool_eligible"] is True
assert paid[_OPENROUTER_DYNAMIC_MARKER] is True
free = by_model["meta-llama/llama-3.3-70b-instruct:free"]
assert free["billing_tier"] == "free"
assert free["rpm"] == 20
assert free["tpm"] == 100_000
assert free["anonymous_enabled"] is True
assert free["router_pool_eligible"] is False
def test_generate_configs_excludes_upstream_openrouter_free_router():
"""OpenRouter's own ``openrouter/free`` meta-router must never become a card.
The upstream API returns this as a first-class zero-priced model, so
without an explicit blocklist entry it would slip through every other
filter (text output, tool calling, 200k context, non-Amazon) and land
in the selector as a duplicate of the concrete ``:free`` cards. The
exclusion in ``_EXCLUDED_MODEL_IDS`` prevents that.
"""
raw = [
_minimal_openrouter_model(model_id="openai/gpt-4o"),
_minimal_openrouter_model(
model_id="openrouter/free",
pricing={"prompt": "0", "completion": "0"},
),
]
cfgs = _generate_configs(raw, dict(_SETTINGS_BASE))
model_names = {c["model_name"] for c in cfgs}
assert "openrouter/free" not in model_names
assert "openai/gpt-4o" in model_names
def test_generate_configs_drops_non_text_and_non_tool_models():
raw = [
_minimal_openrouter_model(model_id="openai/gpt-4o"),
{ # image-output model
"id": "openai/dall-e",
"architecture": {"output_modalities": ["image"]},
"supported_parameters": ["tools"],
"context_length": 200_000,
"pricing": {"prompt": "0.01", "completion": "0.01"},
},
{ # text but no tool calling
"id": "openai/completion-only",
"architecture": {"output_modalities": ["text"]},
"supported_parameters": [],
"context_length": 200_000,
"pricing": {"prompt": "0.01", "completion": "0.01"},
},
]
cfgs = _generate_configs(raw, dict(_SETTINGS_BASE))
model_names = [c["model_name"] for c in cfgs]
assert "openai/gpt-4o" in model_names
assert "openai/dall-e" not in model_names
assert "openai/completion-only" not in model_names

View file

@ -0,0 +1,108 @@
"""Tests for deprecated-key warnings and back-compat in
``load_openrouter_integration_settings``.
"""
from __future__ import annotations
from pathlib import Path
import pytest
pytestmark = pytest.mark.unit
def _write_yaml(tmp_path: Path, body: str) -> Path:
cfg_dir = tmp_path / "app" / "config"
cfg_dir.mkdir(parents=True)
cfg_path = cfg_dir / "global_llm_config.yaml"
cfg_path.write_text(body, encoding="utf-8")
return cfg_path
def _patch_base_dir(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
from app import config as config_module
monkeypatch.setattr(config_module, "BASE_DIR", tmp_path)
def test_legacy_billing_tier_emits_warning(monkeypatch, tmp_path, capsys):
_write_yaml(
tmp_path,
"""
openrouter_integration:
enabled: true
api_key: "sk-or-test"
billing_tier: "premium"
""".lstrip(),
)
_patch_base_dir(monkeypatch, tmp_path)
from app.config import load_openrouter_integration_settings
settings = load_openrouter_integration_settings()
captured = capsys.readouterr().out
assert settings is not None
assert "billing_tier is deprecated" in captured
def test_legacy_anonymous_enabled_back_compat(monkeypatch, tmp_path, capsys):
_write_yaml(
tmp_path,
"""
openrouter_integration:
enabled: true
api_key: "sk-or-test"
anonymous_enabled: true
""".lstrip(),
)
_patch_base_dir(monkeypatch, tmp_path)
from app.config import load_openrouter_integration_settings
settings = load_openrouter_integration_settings()
captured = capsys.readouterr().out
assert settings is not None
assert settings["anonymous_enabled_paid"] is True
assert settings["anonymous_enabled_free"] is True
assert "anonymous_enabled is" in captured
assert "deprecated" in captured
def test_new_keys_take_priority_over_legacy_back_compat(monkeypatch, tmp_path, capsys):
"""If both legacy and new keys are present, new keys win (setdefault)."""
_write_yaml(
tmp_path,
"""
openrouter_integration:
enabled: true
api_key: "sk-or-test"
anonymous_enabled: true
anonymous_enabled_paid: false
anonymous_enabled_free: false
""".lstrip(),
)
_patch_base_dir(monkeypatch, tmp_path)
from app.config import load_openrouter_integration_settings
settings = load_openrouter_integration_settings()
capsys.readouterr()
assert settings is not None
assert settings["anonymous_enabled_paid"] is False
assert settings["anonymous_enabled_free"] is False
def test_disabled_integration_returns_none(monkeypatch, tmp_path):
_write_yaml(
tmp_path,
"""
openrouter_integration:
enabled: false
api_key: "sk-or-test"
""".lstrip(),
)
_patch_base_dir(monkeypatch, tmp_path)
from app.config import load_openrouter_integration_settings
assert load_openrouter_integration_settings() is None

View file

@ -0,0 +1,331 @@
"""Unit tests for the OpenRouter ``_enrich_health`` background task."""
from __future__ import annotations
from typing import Any
import pytest
from app.services.openrouter_integration_service import (
OpenRouterIntegrationService,
)
from app.services.quality_score import (
_HEALTH_FAIL_RATIO_FALLBACK,
)
pytestmark = pytest.mark.unit
def _or_cfg(
*,
cid: int,
model_name: str,
tier: str = "premium",
static_score: int = 50,
) -> dict:
return {
"id": cid,
"provider": "OPENROUTER",
"model_name": model_name,
"billing_tier": tier,
"auto_pin_tier": "B" if tier == "premium" else "C",
"quality_score_static": static_score,
"quality_score_health": None,
"quality_score": static_score,
"health_gated": False,
}
class _StubResponse:
def __init__(self, *, payload: dict, status_code: int = 200):
self._payload = payload
self.status_code = status_code
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def json(self) -> dict:
return self._payload
class _StubAsyncClient:
"""Minimal drop-in for ``httpx.AsyncClient`` used by ``_fetch_endpoints``."""
def __init__(self, responder):
self._responder = responder
self.requests: list[str] = []
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def get(self, url: str, headers: dict | None = None) -> _StubResponse:
self.requests.append(url)
return self._responder(url)
def _patch_async_client(monkeypatch, responder) -> _StubAsyncClient:
"""Replace ``httpx.AsyncClient`` for the duration of the test."""
client = _StubAsyncClient(responder)
monkeypatch.setattr(
"app.services.openrouter_integration_service.httpx.AsyncClient",
lambda *_args, **_kwargs: client,
)
return client
def _healthy_payload() -> dict:
return {
"data": {
"endpoints": [
{
"status": 0,
"uptime_last_30m": 0.99,
"uptime_last_1d": 0.995,
"uptime_last_5m": 0.99,
}
]
}
}
def _unhealthy_payload() -> dict:
return {
"data": {
"endpoints": [
{
"status": 0,
"uptime_last_30m": 0.55,
"uptime_last_1d": 0.62,
"uptime_last_5m": 0.50,
}
]
}
}
# ---------------------------------------------------------------------------
# Bounded fan-out + happy path
# ---------------------------------------------------------------------------
async def test_enrich_health_marks_healthy_and_gates_unhealthy(monkeypatch):
cfgs = [
_or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70),
_or_cfg(cid=-2, model_name="venice/dead-model", static_score=60),
]
def responder(url: str) -> _StubResponse:
if "anthropic" in url:
return _StubResponse(payload=_healthy_payload())
return _StubResponse(payload=_unhealthy_payload())
_patch_async_client(monkeypatch, responder)
service = OpenRouterIntegrationService()
service._settings = {"api_key": ""}
await service._enrich_health(cfgs)
healthy = next(c for c in cfgs if c["id"] == -1)
gated = next(c for c in cfgs if c["id"] == -2)
assert healthy["health_gated"] is False
assert healthy["quality_score_health"] is not None
assert healthy["quality_score"] >= healthy["quality_score_static"]
assert gated["health_gated"] is True
assert gated["quality_score"] == gated["quality_score_static"]
async def test_enrich_health_only_touches_or_provider(monkeypatch):
"""YAML cfgs that aren't OPENROUTER must be skipped entirely."""
yaml_cfg = {
"id": -1,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"billing_tier": "premium",
"auto_pin_tier": "A",
"quality_score_static": 80,
"quality_score": 80,
"health_gated": False,
}
or_cfg = _or_cfg(cid=-2, model_name="anthropic/claude-haiku")
requests: list[str] = []
def responder(url: str) -> _StubResponse:
requests.append(url)
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, responder)
service = OpenRouterIntegrationService()
service._settings = {}
await service._enrich_health([yaml_cfg, or_cfg])
assert all("anthropic/claude-haiku" in r for r in requests)
# YAML cfg is untouched.
assert yaml_cfg["quality_score"] == 80
assert yaml_cfg["health_gated"] is False
# ---------------------------------------------------------------------------
# Failure ratio fallback
# ---------------------------------------------------------------------------
async def test_enrich_health_falls_back_to_last_good_when_failure_ratio_high(
monkeypatch,
):
"""If >= 25% of fetches fail, keep last-good cache instead of writing
partial data."""
cfgs = [
_or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70),
_or_cfg(cid=-2, model_name="openai/gpt-5", static_score=80),
_or_cfg(cid=-3, model_name="google/gemini-flash", static_score=65),
_or_cfg(cid=-4, model_name="venice/something", static_score=50),
]
service = OpenRouterIntegrationService()
service._settings = {}
# Pre-seed last-good cache with a known-healthy snapshot.
service._health_cache = {
"anthropic/claude-haiku": {"gated": False, "score": 95.0},
}
def all_fail(_url: str) -> _StubResponse:
return _StubResponse(payload={}, status_code=500)
_patch_async_client(monkeypatch, all_fail)
await service._enrich_health(cfgs)
# Above threshold ⇒ degraded; last-good cache wins for the cached cfg.
cached_hit = next(c for c in cfgs if c["model_name"] == "anthropic/claude-haiku")
assert cached_hit["quality_score_health"] == 95.0
assert cached_hit["health_gated"] is False
# Confirm the threshold constant we're testing against is real.
assert _HEALTH_FAIL_RATIO_FALLBACK <= 1.0
async def test_enrich_health_keeps_static_only_with_no_cache_and_failures(
monkeypatch,
):
"""If a fetch fails and there's no last-good cache, the cfg keeps its
static-only ``quality_score`` and is *not* gated by default."""
cfgs = [
_or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70),
]
def fail(_url: str) -> _StubResponse:
return _StubResponse(payload={}, status_code=500)
_patch_async_client(monkeypatch, fail)
service = OpenRouterIntegrationService()
service._settings = {}
await service._enrich_health(cfgs)
cfg = cfgs[0]
assert cfg["health_gated"] is False
assert cfg["quality_score"] == cfg["quality_score_static"]
assert cfg["quality_score_health"] is None
# ---------------------------------------------------------------------------
# Last-good cache: success populates, next failure reuses
# ---------------------------------------------------------------------------
async def test_enrich_health_populates_cache_on_success_then_reuses_on_failure(
monkeypatch,
):
cfg = _or_cfg(cid=-1, model_name="anthropic/claude-haiku", static_score=70)
service = OpenRouterIntegrationService()
service._settings = {}
def healthy(_url: str) -> _StubResponse:
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, healthy)
await service._enrich_health([cfg])
assert "anthropic/claude-haiku" in service._health_cache
cached_score = service._health_cache["anthropic/claude-haiku"]["score"]
assert cached_score is not None
# Next cycle: enough other healthy cfgs so failure ratio stays below
# the 25% threshold even when this one fails individually.
other_cfgs = [
_or_cfg(cid=-2 - i, model_name=f"healthy/m-{i}", static_score=60)
for i in range(10)
]
cfg["quality_score_health"] = None
cfg["quality_score"] = cfg["quality_score_static"]
def mixed(url: str) -> _StubResponse:
if "anthropic" in url:
return _StubResponse(payload={}, status_code=500)
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, mixed)
await service._enrich_health([cfg, *other_cfgs])
assert cfg["quality_score_health"] == cached_score
assert cfg["health_gated"] is False
# ---------------------------------------------------------------------------
# Bounded fan-out: respects top-N caps
# ---------------------------------------------------------------------------
async def test_enrich_health_bounds_premium_fanout(monkeypatch):
"""Top-N premium cap is honoured even when many cfgs are present."""
from app.services.quality_score import _HEALTH_ENRICH_TOP_N_PREMIUM
cfgs = [
_or_cfg(
cid=-i, model_name=f"openai/m-{i}", tier="premium", static_score=100 - i
)
for i in range(1, _HEALTH_ENRICH_TOP_N_PREMIUM + 20)
]
seen: list[str] = []
def responder(url: str) -> _StubResponse:
seen.append(url)
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, responder)
service = OpenRouterIntegrationService()
service._settings = {}
await service._enrich_health(cfgs)
assert len(seen) == _HEALTH_ENRICH_TOP_N_PREMIUM
async def test_enrich_health_no_or_cfgs_is_noop(monkeypatch):
"""When the catalogue has no OR cfgs at all, no HTTP calls fire."""
yaml_cfg: dict[str, Any] = {
"id": -1,
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"billing_tier": "premium",
}
requests: list[str] = []
def responder(url: str) -> _StubResponse:
requests.append(url)
return _StubResponse(payload=_healthy_payload())
_patch_async_client(monkeypatch, responder)
service = OpenRouterIntegrationService()
service._settings = {}
await service._enrich_health([yaml_cfg])
assert requests == []

View file

@ -0,0 +1,345 @@
"""Unit tests for the Auto (Fastest) quality scoring module."""
from __future__ import annotations
import time
import pytest
from app.services.quality_score import (
_HEALTH_GATE_UPTIME_PCT,
_OPERATOR_TRUST_BONUS,
aggregate_health,
capabilities_signal,
context_signal,
created_recency_signal,
pricing_band,
slug_penalty,
static_score_or,
static_score_yaml,
)
pytestmark = pytest.mark.unit
# ---------------------------------------------------------------------------
# created_recency_signal
# ---------------------------------------------------------------------------
def test_created_recency_signal_recent_model_scores_high():
now = 1_750_000_000 # ~mid-2025
one_month_ago = now - (30 * 86_400)
assert created_recency_signal(one_month_ago, now) == 20
def test_created_recency_signal_old_model_scores_zero():
now = 1_750_000_000
five_years_ago = now - (5 * 365 * 86_400)
assert created_recency_signal(five_years_ago, now) == 0
def test_created_recency_signal_missing_timestamp_is_neutral():
now = 1_750_000_000
assert created_recency_signal(None, now) == 0
assert created_recency_signal(0, now) == 0
def test_created_recency_signal_monotonic_decay():
now = 1_750_000_000
scores = [
created_recency_signal(now - days * 86_400, now)
for days in (30, 120, 300, 500, 700, 1000, 1500)
]
assert scores == sorted(scores, reverse=True)
# ---------------------------------------------------------------------------
# pricing_band
# ---------------------------------------------------------------------------
def test_pricing_band_free_returns_zero():
assert pricing_band("0", "0") == 0
assert pricing_band(0.0, 0.0) == 0
assert pricing_band(None, None) == 0
def test_pricing_band_handles_unparseable():
assert pricing_band("not-a-number", "0") == 0
assert pricing_band({}, []) == 0 # type: ignore[arg-type]
def test_pricing_band_premium_tiers_increase_with_price():
cheap = pricing_band("0.0000003", "0.0000005")
mid = pricing_band("0.000003", "0.000015")
flagship = pricing_band("0.00001", "0.00005")
assert 0 < cheap < mid < flagship
# ---------------------------------------------------------------------------
# context_signal
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"ctx,expected",
[
(1_500_000, 10),
(1_000_000, 10),
(500_000, 8),
(200_000, 6),
(128_000, 4),
(100_000, 2),
(50_000, 0),
(0, 0),
(None, 0),
],
)
def test_context_signal_bands(ctx, expected):
assert context_signal(ctx) == expected
# ---------------------------------------------------------------------------
# capabilities_signal
# ---------------------------------------------------------------------------
def test_capabilities_signal_caps_at_five():
assert (
capabilities_signal(
["tools", "structured_outputs", "reasoning", "include_reasoning"]
)
<= 5
)
def test_capabilities_signal_tools_only():
assert capabilities_signal(["tools"]) == 2
def test_capabilities_signal_empty():
assert capabilities_signal(None) == 0
assert capabilities_signal([]) == 0
# ---------------------------------------------------------------------------
# slug_penalty
# ---------------------------------------------------------------------------
def test_slug_penalty_demotes_tiny_models():
assert slug_penalty("meta-llama/llama-3.2-1b-instruct") < 0
assert slug_penalty("liquid/lfm-7b") < 0
assert slug_penalty("google/gemma-3n-e4b-it") < 0
def test_slug_penalty_skips_capable_mini_nano_lite_models():
"""Critical Option C+ regression: don't penalise modern frontier
models named ``-nano`` / ``-mini`` / ``-lite`` (gpt-5-mini, etc.)."""
assert slug_penalty("openai/gpt-5-mini") == 0
assert slug_penalty("openai/gpt-5-nano") == 0
assert slug_penalty("google/gemini-2.5-flash-lite") == 0
assert slug_penalty("anthropic/claude-haiku-4.5") == 0
def test_slug_penalty_demotes_legacy_variants():
assert slug_penalty("openai/o1-preview") < 0
assert slug_penalty("foo/bar-base") < 0
assert slug_penalty("foo/bar-distill") < 0
def test_slug_penalty_empty_input():
assert slug_penalty("") == 0
# ---------------------------------------------------------------------------
# static_score_or
# ---------------------------------------------------------------------------
def _or_model(
*,
model_id: str,
created: int | None = None,
prompt: str = "0.000003",
completion: str = "0.000015",
context: int = 200_000,
params: list[str] | None = None,
) -> dict:
return {
"id": model_id,
"created": created,
"pricing": {"prompt": prompt, "completion": completion},
"context_length": context,
"supported_parameters": params if params is not None else ["tools"],
}
def test_static_score_or_frontier_premium_beats_free_tiny():
now = 1_750_000_000
frontier = _or_model(
model_id="openai/gpt-5",
created=now - (60 * 86_400),
prompt="0.000005",
completion="0.000020",
context=400_000,
params=["tools", "structured_outputs", "reasoning"],
)
tiny_free = _or_model(
model_id="meta-llama/llama-3.2-1b-instruct:free",
created=now - (5 * 365 * 86_400),
prompt="0",
completion="0",
context=128_000,
params=["tools"],
)
assert static_score_or(frontier, now_ts=now) > static_score_or(
tiny_free, now_ts=now
)
def test_static_score_or_score_is_clamped_0_to_100():
now = int(time.time())
score = static_score_or(_or_model(model_id="openai/gpt-4o"), now_ts=now)
assert 0 <= score <= 100
def test_static_score_or_unknown_provider_is_neutral_not_zero():
now = int(time.time())
score = static_score_or(
_or_model(model_id="some-new-lab/some-model"),
now_ts=now,
)
assert score > 0
def test_static_score_or_recent_release_beats_year_old_same_provider():
now = 1_750_000_000
fresh = _or_model(model_id="openai/gpt-5", created=now - (60 * 86_400))
old = _or_model(model_id="openai/gpt-4-turbo", created=now - (700 * 86_400))
assert static_score_or(fresh, now_ts=now) > static_score_or(old, now_ts=now)
# ---------------------------------------------------------------------------
# static_score_yaml
# ---------------------------------------------------------------------------
def test_static_score_yaml_includes_operator_bonus():
cfg = {
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"litellm_params": {"base_model": "azure/gpt-5"},
}
score = static_score_yaml(cfg)
assert score >= _OPERATOR_TRUST_BONUS
def test_static_score_yaml_unknown_provider_still_carries_bonus():
cfg = {
"provider": "SOME_NEW_PROVIDER",
"model_name": "weird-model",
}
score = static_score_yaml(cfg)
assert score >= _OPERATOR_TRUST_BONUS
def test_static_score_yaml_clamped_0_to_100():
cfg = {
"provider": "AZURE_OPENAI",
"model_name": "gpt-5",
"litellm_params": {"base_model": "azure/gpt-5"},
}
assert 0 <= static_score_yaml(cfg) <= 100
# ---------------------------------------------------------------------------
# aggregate_health
# ---------------------------------------------------------------------------
def test_aggregate_health_gates_when_uptime_below_threshold():
"""Live data showed Venice-routed cfgs at 53-68%; this guards that the
90% gate excludes them."""
venice_endpoints = [
{
"status": 0,
"uptime_last_30m": 0.55,
"uptime_last_1d": 0.60,
"uptime_last_5m": 0.50,
},
{
"status": 0,
"uptime_last_30m": 0.65,
"uptime_last_1d": 0.68,
"uptime_last_5m": 0.62,
},
]
gated, score = aggregate_health(venice_endpoints)
assert gated is True
assert score is None
def test_aggregate_health_passes_for_healthy_provider():
healthy = [
{
"status": 0,
"uptime_last_30m": 0.99,
"uptime_last_1d": 0.995,
"uptime_last_5m": 0.99,
},
]
gated, score = aggregate_health(healthy)
assert gated is False
assert score is not None
assert score >= _HEALTH_GATE_UPTIME_PCT
def test_aggregate_health_picks_best_endpoint_across_multiple():
"""Multi-endpoint aggregation should reward the best non-null uptime."""
mixed = [
{"status": 0, "uptime_last_30m": 0.55},
{"status": 0, "uptime_last_30m": 0.97}, # this one passes the gate
]
gated, score = aggregate_health(mixed)
assert gated is False
assert score is not None
def test_aggregate_health_empty_endpoints_gated():
gated, score = aggregate_health([])
assert gated is True
assert score is None
def test_aggregate_health_no_status_zero_gated():
"""Even with high uptime, no OK status means the cfg is broken upstream."""
endpoints = [
{"status": 1, "uptime_last_30m": 0.99},
{"status": 2, "uptime_last_30m": 0.98},
]
gated, score = aggregate_health(endpoints)
assert gated is True
assert score is None
def test_aggregate_health_all_uptime_null_gated():
endpoints = [
{"status": 0, "uptime_last_30m": None, "uptime_last_1d": None},
]
gated, score = aggregate_health(endpoints)
assert gated is True
assert score is None
def test_aggregate_health_pct_normalisation():
"""OpenRouter returns 0-1 fractions; some endpoints surface 0-100%
percentages. Both should reach the same gate decision."""
fraction_form = [{"status": 0, "uptime_last_30m": 0.95}]
pct_form = [{"status": 0, "uptime_last_30m": 95.0}]
g1, s1 = aggregate_health(fraction_form)
g2, s2 = aggregate_health(pct_form)
assert g1 == g2 == False # noqa: E712
assert s1 is not None and s2 is not None
assert abs(s1 - s2) < 0.5

View file

@ -14,6 +14,7 @@ from app.tasks.chat.stream_new_chat import (
_classify_stream_exception,
_contract_enforcement_active,
_evaluate_file_contract_outcome,
_extract_resolved_file_path,
_log_chat_stream_error,
_tool_output_has_error,
)
@ -28,6 +29,39 @@ def test_tool_output_error_detection():
assert not _tool_output_has_error({"result": "Updated file /notes.md"})
def test_extract_resolved_file_path_prefers_structured_path():
assert (
_extract_resolved_file_path(
tool_name="write_file",
tool_output={"status": "completed", "path": "/docs/note.md"},
tool_input=None,
)
== "/docs/note.md"
)
def test_extract_resolved_file_path_falls_back_to_tool_input():
assert (
_extract_resolved_file_path(
tool_name="edit_file",
tool_output={"status": "completed", "result": "updated"},
tool_input={"file_path": "/docs/edited.md"},
)
== "/docs/edited.md"
)
def test_extract_resolved_file_path_does_not_parse_result_text():
assert (
_extract_resolved_file_path(
tool_name="write_file",
tool_output={"result": "Updated file /docs/from-text.md"},
tool_input=None,
)
is None
)
def test_file_write_contract_outcome_reasons():
result = StreamResult(intent_detected="file_write")
passed, reason = _evaluate_file_contract_outcome(result)
@ -159,6 +193,84 @@ def test_stream_exception_classifies_rate_limited():
assert extra is None
def test_stream_exception_classifies_openrouter_429_payload():
exc = Exception(
'OpenrouterException - {"error":{"message":"Provider returned error","code":429,'
'"metadata":{"raw":"foo is temporarily rate-limited upstream"}}}'
)
kind, code, severity, is_expected, user_message, extra = _classify_stream_exception(
exc, flow_label="chat"
)
assert kind == "rate_limited"
assert code == "RATE_LIMITED"
assert severity == "warn"
assert is_expected is True
assert "temporarily rate-limited" in user_message
assert extra is None
@pytest.mark.asyncio
async def test_preflight_swallows_non_rate_limit_errors_and_re_raises_429(monkeypatch):
"""``_preflight_llm`` is best-effort.
- On rate-limit shaped exceptions (provider 429) it MUST re-raise so the
caller can drive the cooldown/repin branch.
- On any other transient failure it MUST swallow the error so the normal
stream path continues without surfacing preflight noise to the user.
"""
from types import SimpleNamespace
from app.tasks.chat.stream_new_chat import _preflight_llm
class _RateLimitedError(Exception):
"""Class-name carries 'RateLimit' so _is_provider_rate_limited triggers."""
rate_calls: list[dict] = []
other_calls: list[dict] = []
async def _fake_acompletion_429(**kwargs):
rate_calls.append(kwargs)
raise _RateLimitedError("simulated 429")
async def _fake_acompletion_other(**kwargs):
other_calls.append(kwargs)
raise RuntimeError("some unrelated transient failure")
fake_llm = SimpleNamespace(
model="openrouter/google/gemma-4-31b-it:free",
api_key="test",
api_base=None,
)
import litellm # type: ignore[import-not-found]
monkeypatch.setattr(litellm, "acompletion", _fake_acompletion_429)
with pytest.raises(_RateLimitedError):
await _preflight_llm(fake_llm)
assert len(rate_calls) == 1
assert rate_calls[0]["max_tokens"] == 1
assert rate_calls[0]["stream"] is False
monkeypatch.setattr(litellm, "acompletion", _fake_acompletion_other)
# MUST NOT raise: non-rate-limit failures are swallowed.
await _preflight_llm(fake_llm)
assert len(other_calls) == 1
@pytest.mark.asyncio
async def test_preflight_skipped_for_auto_router_model():
"""Router-mode ``model='auto'`` has no single deployment to ping; the
LiteLLM router itself owns per-deployment rate-limit accounting, so the
preflight helper must short-circuit instead of issuing a probe."""
from types import SimpleNamespace
from app.tasks.chat.stream_new_chat import _preflight_llm
fake_llm = SimpleNamespace(model="auto", api_key="x", api_base=None)
# Should return without raising or making any LiteLLM call.
await _preflight_llm(fake_llm)
def test_stream_exception_classifies_thread_busy():
exc = BusyError(request_id="thread-123")
kind, code, severity, is_expected, user_message, extra = _classify_stream_exception(