"""Tests for fetch.available_models and fetch.loaded_models using aioresponses mocking.""" import time from unittest.mock import patch, MagicMock import pytest from aioresponses import aioresponses import router from conftest import TEST_OLLAMA, TEST_LLAMA MOCK_OLLAMA_EP = "http://mock-ollama:11434" MOCK_LLAMA_EP = "http://mock-llama:8080/v1" def _make_cfg(ollama_eps=None, llama_eps=None, api_keys=None): cfg = MagicMock() cfg.endpoints = ollama_eps or [MOCK_OLLAMA_EP] cfg.llama_server_endpoints = llama_eps or [MOCK_LLAMA_EP] cfg.api_keys = api_keys or {} cfg.max_concurrent_connections = 2 cfg.router_api_key = None return cfg @pytest.fixture(autouse=True) def clear_caches(aio_session): """aio_session fixture already clears caches and sets up app_state.""" yield class TestFetchAvailableModels: async def test_ollama_tags(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_OLLAMA_EP}/api/tags", payload={"models": [ {"name": "llama3.2:latest"}, {"name": "qwen2.5:7b"}, ]}, ) models = await router.fetch.available_models(MOCK_OLLAMA_EP) assert models == {"llama3.2:latest", "qwen2.5:7b"} async def test_openai_compatible_models_endpoint(self): cfg = _make_cfg(llama_eps=[MOCK_LLAMA_EP]) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_LLAMA_EP}/models", payload={"data": [{"id": "unsloth/model:Q8_0"}]}, ) models = await router.fetch.available_models(MOCK_LLAMA_EP, api_key="tok") assert "unsloth/model:Q8_0" in models async def test_caches_successful_result(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_OLLAMA_EP}/api/tags", payload={"models": [{"name": "llama3.2:latest"}]}, ) first = await router.fetch.available_models(MOCK_OLLAMA_EP) second = await router.fetch.available_models(MOCK_OLLAMA_EP) # second call must be served from cache without a second HTTP request assert first == second == {"llama3.2:latest"} async def test_returns_empty_on_http_500(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) with patch.object(router, "config", cfg), aioresponses() as m: m.get(f"{MOCK_OLLAMA_EP}/api/tags", status=500, payload={"error": "oops"}) models = await router.fetch.available_models(MOCK_OLLAMA_EP) assert models == set() async def test_returns_empty_on_connection_error(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) import aiohttp with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_OLLAMA_EP}/api/tags", exception=aiohttp.ClientConnectorError( connection_key=MagicMock(host="mock-ollama", port=11434), os_error=OSError(111, "refused"), ), ) models = await router.fetch.available_models(MOCK_OLLAMA_EP) assert models == set() async def test_stale_cache_returned_while_refresh_runs(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_OLLAMA_EP}/api/tags", payload={"models": [{"name": "llama3.2:latest"}]}, ) await router.fetch.available_models(MOCK_OLLAMA_EP) # Manually age cache into stale-but-valid window (300-600s) async with router._models_cache_lock: models, _ = router._models_cache[MOCK_OLLAMA_EP] router._models_cache[MOCK_OLLAMA_EP] = (models, time.time() - 400) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_OLLAMA_EP}/api/tags", payload={"models": [{"name": "llama3.2:latest"}]}, ) # Should return stale data immediately stale = await router.fetch.available_models(MOCK_OLLAMA_EP) assert "llama3.2:latest" in stale async def test_error_cache_short_circuits(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) # Seed error cache with a very recent error async with router._available_error_cache_lock: router._available_error_cache[MOCK_OLLAMA_EP] = time.time() with patch.object(router, "config", cfg), aioresponses(): # No HTTP mock registered — if a call happens it will raise models = await router.fetch.available_models(MOCK_OLLAMA_EP) assert models == set() class TestFetchLoadedModels: async def test_ollama_ps(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_OLLAMA_EP}/api/ps", payload={"models": [{"name": "llama3.2:latest"}]}, ) models = await router.fetch.loaded_models(MOCK_OLLAMA_EP) assert models == {"llama3.2:latest"} async def test_llama_server_filters_loaded(self): cfg = _make_cfg(llama_eps=[MOCK_LLAMA_EP]) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_LLAMA_EP}/models", payload={"data": [ {"id": "model-a", "status": {"value": "loaded"}}, {"id": "model-b", "status": {"value": "unloaded"}}, ]}, ) models = await router.fetch.loaded_models(MOCK_LLAMA_EP) assert models == {"model-a"} async def test_llama_server_no_status_field_always_loaded(self): cfg = _make_cfg(llama_eps=[MOCK_LLAMA_EP]) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_LLAMA_EP}/models", payload={"data": [{"id": "always-on-model"}]}, ) models = await router.fetch.loaded_models(MOCK_LLAMA_EP) assert "always-on-model" in models async def test_returns_empty_on_error(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) with patch.object(router, "config", cfg), aioresponses() as m: m.get(f"{MOCK_OLLAMA_EP}/api/ps", status=503, payload={}) models = await router.fetch.loaded_models(MOCK_OLLAMA_EP) assert models == set() async def test_ext_openai_always_empty(self): ext_ep = "https://api.openai.com/v1" cfg = _make_cfg(ollama_eps=[ext_ep], llama_eps=[]) with patch.object(router, "config", cfg): models = await router.fetch.loaded_models(ext_ep) assert models == set() async def test_caches_result(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_OLLAMA_EP}/api/ps", payload={"models": [{"name": "qwen:7b"}]}, ) first = await router.fetch.loaded_models(MOCK_OLLAMA_EP) second = await router.fetch.loaded_models(MOCK_OLLAMA_EP) assert first == second async def test_records_error_in_loaded_error_cache_on_failure(self): # Regression: issue #83 — /api/ps failures must be recorded so # `choose_endpoint` can exclude unhealthy backends from routing. cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) with patch.object(router, "config", cfg), aioresponses() as m: m.get(f"{MOCK_OLLAMA_EP}/api/ps", status=502, payload={}) await router.fetch.loaded_models(MOCK_OLLAMA_EP) assert MOCK_OLLAMA_EP in router._loaded_error_cache async def test_records_error_for_llama_server_on_failure(self): cfg = _make_cfg(ollama_eps=[], llama_eps=[MOCK_LLAMA_EP]) with patch.object(router, "config", cfg), aioresponses() as m: m.get(f"{MOCK_LLAMA_EP}/models", status=502, payload={}) await router.fetch.loaded_models(MOCK_LLAMA_EP) assert MOCK_LLAMA_EP in router._loaded_error_cache async def test_clears_error_cache_on_subsequent_success(self): cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[]) # Pre-seed an old error so loaded_models() falls through to the # network probe instead of short-circuiting on the error cache. async with router._loaded_error_cache_lock: router._loaded_error_cache[MOCK_OLLAMA_EP] = time.time() - 301 with patch.object(router, "config", cfg), aioresponses() as m: m.get( f"{MOCK_OLLAMA_EP}/api/ps", payload={"models": [{"name": "qwen:7b"}]}, ) await router.fetch.loaded_models(MOCK_OLLAMA_EP) assert MOCK_OLLAMA_EP not in router._loaded_error_cache