"""Tests for choose_endpoint routing logic with mocked fetch calls.""" import time from unittest.mock import AsyncMock, MagicMock, patch import pytest import router EP1 = "http://ep1:11434" EP2 = "http://ep2:11434" EP3 = "http://ep3:11434" LLAMA_EP = "http://llama:8080/v1" def _make_cfg(endpoints, llama_eps=None, max_conn=2, endpoint_config=None, priority_routing=False): cfg = MagicMock() cfg.endpoints = endpoints cfg.llama_server_endpoints = llama_eps or [] cfg.api_keys = {} cfg.max_concurrent_connections = max_conn cfg.endpoint_config = endpoint_config or {} cfg.priority_routing = priority_routing cfg.router_api_key = None return cfg @pytest.fixture(autouse=True) def reset_usage(): """Clear usage_counts and error caches between tests to prevent bleed.""" router.usage_counts.clear() router._loaded_error_cache.clear() yield router.usage_counts.clear() router._loaded_error_cache.clear() class TestChooseEndpointBasic: async def test_selects_single_candidate(self): cfg = _make_cfg([EP1]) with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value={"llama3.2:latest"})), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"llama3.2:latest"})), ): ep, tracking = await router.choose_endpoint("llama3.2:latest") assert ep == EP1 assert tracking == "llama3.2:latest" async def test_raises_when_no_endpoint_has_model(self): cfg = _make_cfg([EP1, EP2]) with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value=set())), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): with pytest.raises(RuntimeError, match="advertise the model"): await router.choose_endpoint("unknown-model:latest") async def test_prefers_loaded_endpoint(self): cfg = _make_cfg([EP1, EP2]) async def available(ep, *_): return {"llama3.2:latest"} async def loaded(ep): return {"llama3.2:latest"} if ep == EP2 else set() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", side_effect=loaded), ): ep, _ = await router.choose_endpoint("llama3.2:latest") assert ep == EP2 async def test_falls_back_to_free_slot(self): cfg = _make_cfg([EP1, EP2]) async def available(ep, *_): return {"llama3.2:latest"} with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): ep, _ = await router.choose_endpoint("llama3.2:latest") assert ep in (EP1, EP2) async def test_saturated_picks_least_busy(self): cfg = _make_cfg([EP1, EP2]) cfg.max_concurrent_connections = 1 async def available(ep, *_): return {"llama3.2:latest"} # Saturate EP1 with 2 active connections, EP2 with 1 router.usage_counts[EP1]["llama3.2:latest"] = 2 router.usage_counts[EP2]["llama3.2:latest"] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): ep, _ = await router.choose_endpoint("llama3.2:latest") # Least-busy is EP2 assert ep == EP2 async def test_excludes_endpoint_with_recent_loaded_error(self): # Regression: issue #83 — when /api/ps fails for EP1 but EP1 # still advertises the model via /api/tags, routing must not # fall back to EP1 just because it has a free slot. cfg = _make_cfg([EP1, EP2]) async def available(ep, *_): return {"llama3.2:latest"} # EP1's /api/ps probe failed recently; EP2 is fine but the model # is not loaded there. Without the health filter, EP1 would be # picked by the free-slot fallback (step 4 in choose_endpoint). router._loaded_error_cache[EP1] = time.time() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): ep, _ = await router.choose_endpoint("llama3.2:latest") assert ep == EP2 async def test_stale_loaded_error_does_not_exclude(self): # Errors older than the 300s window must not keep an endpoint # excluded forever. cfg = _make_cfg([EP1]) router._loaded_error_cache[EP1] = time.time() - 301 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"m:latest"})), ): ep, _ = await router.choose_endpoint("m:latest") assert ep == EP1 async def test_all_unhealthy_still_routes(self): # If every candidate has a fresh loaded-error we still try one # (it may have recovered between the cache write and now) rather # than refusing to route. cfg = _make_cfg([EP1]) router._loaded_error_cache[EP1] = time.time() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): ep, _ = await router.choose_endpoint("m:latest") assert ep == EP1 async def test_reserve_increments_usage(self): cfg = _make_cfg([EP1]) with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value={"model:latest"})), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"model:latest"})), ): ep, tracking = await router.choose_endpoint("model:latest", reserve=True) assert router.usage_counts[ep][tracking] == 1 class TestChooseEndpointModelNaming: async def test_strips_latest_for_openai_endpoints(self): cfg = _make_cfg(endpoints=[], llama_eps=[LLAMA_EP]) cfg.endpoints = [] async def available(ep, *_): # llama-server advertises without :latest return {"gpt-4o"} with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"gpt-4o"})), ): ep, _ = await router.choose_endpoint("gpt-4o:latest") assert ep == LLAMA_EP async def test_adds_latest_for_ollama_when_bare_name(self): cfg = _make_cfg([EP1]) async def available(ep, *_): return {"llama3.2:latest"} with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"llama3.2:latest"})), ): ep, _ = await router.choose_endpoint("llama3.2") assert ep == EP1 class TestChooseEndpointLoadBalancing: async def test_random_selection_among_idle(self): cfg = _make_cfg([EP1, EP2, EP3]) selected = set() async def available(ep, *_): return {"model:latest"} async def loaded(ep): return {"model:latest"} for _ in range(20): router.usage_counts.clear() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", side_effect=loaded), ): ep, _ = await router.choose_endpoint("model:latest", reserve=False) selected.add(ep) # With 20 draws from 3 idle endpoints, all three should appear assert len(selected) > 1 async def test_sort_by_load_ascending(self): cfg = _make_cfg([EP1, EP2]) router.usage_counts[EP1]["model:latest"] = 1 router.usage_counts[EP2]["model:latest"] = 0 async def available(ep, *_): return {"model:latest"} async def loaded(ep): return {"model:latest"} with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", side_effect=loaded), ): ep, _ = await router.choose_endpoint("model:latest", reserve=False) # EP2 has fewer active connections → should be selected assert ep == EP2 # --------------------------------------------------------------------------- # get_max_connections unit tests # --------------------------------------------------------------------------- class TestGetMaxConnections: def test_returns_global_default_when_no_override(self): cfg = _make_cfg([EP1, EP2], max_conn=3) with patch.object(router, "config", cfg): assert router.get_max_connections(EP1) == 3 assert router.get_max_connections(EP2) == 3 def test_returns_per_endpoint_override(self): cfg = _make_cfg( [EP1, EP2], max_conn=2, endpoint_config={EP1: {"max_concurrent_connections": 5}}, ) with patch.object(router, "config", cfg): assert router.get_max_connections(EP1) == 5 assert router.get_max_connections(EP2) == 2 # falls back to global def test_unrecognised_endpoint_falls_back_to_global(self): cfg = _make_cfg([EP1], max_conn=4, endpoint_config={EP2: {"max_concurrent_connections": 1}}) with patch.object(router, "config", cfg): assert router.get_max_connections(EP3) == 4 # --------------------------------------------------------------------------- # Priority / WRR routing tests # --------------------------------------------------------------------------- MODEL = "model:latest" def _all_loaded(ep): """Side-effect: every endpoint advertises and has MODEL loaded.""" return {MODEL} class TestPriorityRouting: """Tests for priority_routing=True (WRR + config-order tiebreaking).""" async def test_idle_picks_first_in_config_order(self): """When all endpoints are idle, priority picks the first listed endpoint.""" cfg = _make_cfg([EP1, EP2, EP3], priority_routing=True) with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): ep, _ = await router.choose_endpoint(MODEL, reserve=False) assert ep == EP1 async def test_lower_utilization_preferred_over_priority(self): """An endpoint with lower ratio is preferred even if it has lower priority.""" cfg = _make_cfg([EP1, EP2], priority_routing=True) # EP1 (priority 0) is busier: 1/2 = 0.5; EP2 (priority 1) is idle: 0/2 = 0.0 router.usage_counts[EP1][MODEL] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): ep, _ = await router.choose_endpoint(MODEL, reserve=False) assert ep == EP2 async def test_wrr_distribution_matches_expected_sequence(self): """ Full WRR sequence with heterogeneous capacities, mirroring the issue example: EP1 max=2, EP2 max=2, EP3 max=1 Expected routing order for 5 sequential requests: EP1, EP2, EP3, EP1, EP2 """ cfg = _make_cfg( [EP1, EP2, EP3], max_conn=2, endpoint_config={EP3: {"max_concurrent_connections": 1}}, priority_routing=True, ) expected = [EP1, EP2, EP3, EP1, EP2] actual = [] with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): for _ in expected: ep, _ = await router.choose_endpoint(MODEL, reserve=True) actual.append(ep) assert actual == expected async def test_saturated_picks_lowest_ratio_then_priority(self): """When all endpoints are saturated, pick lowest utilization ratio; break ties by priority.""" cfg = _make_cfg( [EP1, EP2, EP3], max_conn=1, endpoint_config={EP3: {"max_concurrent_connections": 2}}, priority_routing=True, ) # EP1 usage=1/1=1.0, EP2 usage=1/1=1.0, EP3 usage=1/2=0.5 → EP3 wins router.usage_counts[EP1][MODEL] = 1 router.usage_counts[EP2][MODEL] = 1 router.usage_counts[EP3][MODEL] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): ep, _ = await router.choose_endpoint(MODEL, reserve=False) assert ep == EP3 async def test_saturated_ties_broken_by_priority(self): """When all are saturated with equal ratio, config order wins.""" cfg = _make_cfg([EP1, EP2, EP3], max_conn=1, priority_routing=True) router.usage_counts[EP1][MODEL] = 1 router.usage_counts[EP2][MODEL] = 1 router.usage_counts[EP3][MODEL] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): ep, _ = await router.choose_endpoint(MODEL, reserve=False) assert ep == EP1 class TestPriorityRoutingDisabled: """Verify that priority_routing=False keeps the original random behaviour.""" async def test_idle_endpoints_are_randomised(self): """Without priority routing, all-idle selection must eventually pick each endpoint.""" cfg = _make_cfg([EP1, EP2, EP3], priority_routing=False) selected = set() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): for _ in range(30): router.usage_counts.clear() ep, _ = await router.choose_endpoint(MODEL, reserve=False) selected.add(ep) # With 30 draws from 3 equally-idle endpoints, all three must appear assert selected == {EP1, EP2, EP3}