nomyo-router/test/test_fetch.py
alpha nerd d163fea154
fix: remove aioresponses
sec: bumb aiohttp 3.14

fix: tiktoken test issue by pre-cache the vocab file
2026-06-07 13:23:35 +02:00

284 lines
12 KiB
Python

"""Tests for fetch.available_models and fetch.loaded_models.
The backend probes obtain their HTTP client via ``backends.probe.get_probe_session``
and only ever call ``async with client.get(url, headers=...) as resp``. We patch that
seam with a tiny fake session instead of mocking aiohttp's internals (aioresponses),
so the suite stays independent of aiohttp's private ClientResponse/ConnectionKey
structure across version bumps.
"""
import time
from contextlib import contextmanager
from unittest.mock import patch, MagicMock
import pytest
import router
import backends.probe as probe
from conftest import TEST_OLLAMA, TEST_LLAMA
MOCK_OLLAMA_EP = "http://mock-ollama:11434"
MOCK_LLAMA_EP = "http://mock-llama:8080/v1"
def _make_cfg(ollama_eps=None, llama_eps=None, api_keys=None):
cfg = MagicMock()
cfg.endpoints = ollama_eps or [MOCK_OLLAMA_EP]
cfg.llama_server_endpoints = llama_eps or [MOCK_LLAMA_EP]
cfg.api_keys = api_keys or {}
cfg.max_concurrent_connections = 2
cfg.router_api_key = None
return cfg
# ── Fake probe session ────────────────────────────────────────────────────────
class _MockResponse:
"""Minimal stand-in for the aiohttp response used by the probes."""
def __init__(self, *, status=200, payload=None, text=None):
self.status = status
self._payload = payload
self._text = text if text is not None else ""
async def json(self):
return self._payload
async def text(self):
return self._text
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
return False
class _RaisingCtx:
"""``async with client.get(...)`` that raises on entry — mimics a failed connection."""
def __init__(self, exc):
self._exc = exc
async def __aenter__(self):
raise self._exc
async def __aexit__(self, *exc):
return False
class _MockProbeSession:
"""Stand-in for the aiohttp ClientSession returned by ``get_probe_session``.
Routes are registered by exact URL via :meth:`add_get`. A registered exception
is raised when the route is entered; otherwise a :class:`_MockResponse` is yielded.
An unregistered GET fails loudly so tests can't silently pass on a wrong URL.
"""
def __init__(self):
self._routes = {}
def add_get(self, url, *, status=200, payload=None, text=None, exception=None):
self._routes[url] = exception if exception is not None else _MockResponse(
status=status, payload=payload, text=text
)
def get(self, url, **kwargs):
if url not in self._routes:
raise AssertionError(f"unexpected probe GET {url}")
entry = self._routes[url]
return _RaisingCtx(entry) if isinstance(entry, Exception) else entry
@contextmanager
def mock_probe():
"""Patch the probe's session factory to return a fresh :class:`_MockProbeSession`."""
session = _MockProbeSession()
with patch.object(probe, "get_probe_session", lambda endpoint: session):
yield session
@pytest.fixture(autouse=True)
def clear_caches(aio_session):
"""aio_session fixture already clears caches and sets up app_state."""
yield
class TestFetchAvailableModels:
async def test_ollama_tags(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_OLLAMA_EP}/api/tags",
payload={"models": [
{"name": "llama3.2:latest"},
{"name": "qwen2.5:7b"},
]},
)
models = await router.fetch.available_models(MOCK_OLLAMA_EP)
assert models == {"llama3.2:latest", "qwen2.5:7b"}
async def test_openai_compatible_models_endpoint(self):
cfg = _make_cfg(llama_eps=[MOCK_LLAMA_EP])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_LLAMA_EP}/models",
payload={"data": [{"id": "unsloth/model:Q8_0"}]},
)
models = await router.fetch.available_models(MOCK_LLAMA_EP, api_key="tok")
assert "unsloth/model:Q8_0" in models
async def test_caches_successful_result(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_OLLAMA_EP}/api/tags",
payload={"models": [{"name": "llama3.2:latest"}]},
)
first = await router.fetch.available_models(MOCK_OLLAMA_EP)
second = await router.fetch.available_models(MOCK_OLLAMA_EP)
# second call must be served from cache without a second HTTP request
assert first == second == {"llama3.2:latest"}
async def test_returns_empty_on_http_500(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(f"{MOCK_OLLAMA_EP}/api/tags", status=500, payload={"error": "oops"})
models = await router.fetch.available_models(MOCK_OLLAMA_EP)
assert models == set()
async def test_returns_empty_on_connection_error(self):
import aiohttp
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_OLLAMA_EP}/api/tags",
exception=aiohttp.ClientConnectionError(
"Cannot connect to host mock-ollama:11434 [Connection refused]"
),
)
models = await router.fetch.available_models(MOCK_OLLAMA_EP)
assert models == set()
async def test_stale_cache_returned_while_refresh_runs(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_OLLAMA_EP}/api/tags",
payload={"models": [{"name": "llama3.2:latest"}]},
)
await router.fetch.available_models(MOCK_OLLAMA_EP)
# Manually age cache into stale-but-valid window (300-600s)
async with router._models_cache_lock:
models, _ = router._models_cache[MOCK_OLLAMA_EP]
router._models_cache[MOCK_OLLAMA_EP] = (models, time.time() - 400)
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_OLLAMA_EP}/api/tags",
payload={"models": [{"name": "llama3.2:latest"}]},
)
# Should return stale data immediately
stale = await router.fetch.available_models(MOCK_OLLAMA_EP)
assert "llama3.2:latest" in stale
async def test_error_cache_short_circuits(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
# Seed error cache with a very recent error
async with router._available_error_cache_lock:
router._available_error_cache[MOCK_OLLAMA_EP] = time.time()
with patch.object(router, "config", cfg), mock_probe():
# No route registered — if a call happens it raises AssertionError
models = await router.fetch.available_models(MOCK_OLLAMA_EP)
assert models == set()
class TestFetchLoadedModels:
async def test_ollama_ps(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_OLLAMA_EP}/api/ps",
payload={"models": [{"name": "llama3.2:latest"}]},
)
models = await router.fetch.loaded_models(MOCK_OLLAMA_EP)
assert models == {"llama3.2:latest"}
async def test_llama_server_filters_loaded(self):
cfg = _make_cfg(llama_eps=[MOCK_LLAMA_EP])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_LLAMA_EP}/models",
payload={"data": [
{"id": "model-a", "status": {"value": "loaded"}},
{"id": "model-b", "status": {"value": "unloaded"}},
]},
)
models = await router.fetch.loaded_models(MOCK_LLAMA_EP)
assert models == {"model-a"}
async def test_llama_server_no_status_field_always_loaded(self):
cfg = _make_cfg(llama_eps=[MOCK_LLAMA_EP])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_LLAMA_EP}/models",
payload={"data": [{"id": "always-on-model"}]},
)
models = await router.fetch.loaded_models(MOCK_LLAMA_EP)
assert "always-on-model" in models
async def test_returns_empty_on_error(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(f"{MOCK_OLLAMA_EP}/api/ps", status=503, payload={})
models = await router.fetch.loaded_models(MOCK_OLLAMA_EP)
assert models == set()
async def test_ext_openai_always_empty(self):
ext_ep = "https://api.openai.com/v1"
cfg = _make_cfg(ollama_eps=[ext_ep], llama_eps=[])
with patch.object(router, "config", cfg):
models = await router.fetch.loaded_models(ext_ep)
assert models == set()
async def test_caches_result(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_OLLAMA_EP}/api/ps",
payload={"models": [{"name": "qwen:7b"}]},
)
first = await router.fetch.loaded_models(MOCK_OLLAMA_EP)
second = await router.fetch.loaded_models(MOCK_OLLAMA_EP)
assert first == second
async def test_records_error_in_loaded_error_cache_on_failure(self):
# Regression: issue #83 — /api/ps failures must be recorded so
# `choose_endpoint` can exclude unhealthy backends from routing.
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(f"{MOCK_OLLAMA_EP}/api/ps", status=502, payload={})
await router.fetch.loaded_models(MOCK_OLLAMA_EP)
assert MOCK_OLLAMA_EP in router._loaded_error_cache
async def test_records_error_for_llama_server_on_failure(self):
cfg = _make_cfg(ollama_eps=[], llama_eps=[MOCK_LLAMA_EP])
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(f"{MOCK_LLAMA_EP}/models", status=502, payload={})
await router.fetch.loaded_models(MOCK_LLAMA_EP)
assert MOCK_LLAMA_EP in router._loaded_error_cache
async def test_clears_error_cache_on_subsequent_success(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
# Pre-seed an old error so loaded_models() falls through to the
# network probe instead of short-circuiting on the error cache.
async with router._loaded_error_cache_lock:
router._loaded_error_cache[MOCK_OLLAMA_EP] = time.time() - 301
with patch.object(router, "config", cfg), mock_probe() as m:
m.add_get(
f"{MOCK_OLLAMA_EP}/api/ps",
payload={"models": [{"name": "qwen:7b"}]},
)
await router.fetch.loaded_models(MOCK_OLLAMA_EP)
assert MOCK_OLLAMA_EP not in router._loaded_error_cache