"""Tests for choose_endpoint routing logic with mocked fetch calls.""" import time from unittest.mock import AsyncMock, MagicMock, patch import pytest import router EP1 = "http://ep1:11434" EP2 = "http://ep2:11434" EP3 = "http://ep3:11434" LLAMA_EP = "http://llama:8080/v1" def _make_cfg(endpoints, llama_eps=None, swap_eps=None, max_conn=2, endpoint_config=None, priority_routing=False): cfg = MagicMock() cfg.endpoints = endpoints cfg.llama_server_endpoints = llama_eps or [] cfg.llama_swap_endpoints = swap_eps or [] cfg.api_keys = {} cfg.max_concurrent_connections = max_conn cfg.endpoint_config = endpoint_config or {} cfg.priority_routing = priority_routing cfg.router_api_key = None return cfg @pytest.fixture(autouse=True) def reset_usage(): """Clear usage_counts and error caches between tests to prevent bleed.""" router.usage_counts.clear() router._loaded_error_cache.clear() yield router.usage_counts.clear() router._loaded_error_cache.clear() class TestChooseEndpointBasic: async def test_selects_single_candidate(self): cfg = _make_cfg([EP1]) with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value={"llama3.2:latest"})), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"llama3.2:latest"})), ): ep, tracking = await router.choose_endpoint("llama3.2:latest") assert ep == EP1 assert tracking == "llama3.2:latest" async def test_llama_swap_endpoint_is_a_candidate(self): swap_ep = "http://swap:8080/v1" cfg = _make_cfg([EP1], swap_eps=[swap_ep]) async def available(ep, *_): # Only the llama-swap backend advertises this model return {"org/model:Q4_K_M"} if ep == swap_ep else set() async def loaded(ep): return {"org/model:Q4_K_M"} if ep == swap_ep else set() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", side_effect=loaded), ): ep, tracking = await router.choose_endpoint("org/model:Q4_K_M") assert ep == swap_ep # llama-swap models are tracked under their normalized name assert tracking == "model" async def test_raises_when_no_endpoint_has_model(self): cfg = _make_cfg([EP1, EP2]) with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value=set())), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): with pytest.raises(RuntimeError, match="advertise the model"): await router.choose_endpoint("unknown-model:latest") async def test_prefers_loaded_endpoint(self): cfg = _make_cfg([EP1, EP2]) async def available(ep, *_): return {"llama3.2:latest"} async def loaded(ep): return {"llama3.2:latest"} if ep == EP2 else set() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", side_effect=loaded), ): ep, _ = await router.choose_endpoint("llama3.2:latest") assert ep == EP2 async def test_falls_back_to_free_slot(self): cfg = _make_cfg([EP1, EP2]) async def available(ep, *_): return {"llama3.2:latest"} with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): ep, _ = await router.choose_endpoint("llama3.2:latest") assert ep in (EP1, EP2) async def test_cold_model_avoids_backend_busy_with_other_model(self): # Regression: heterogeneous cluster. A cold model B (loaded nowhere) # must not be routed to a backend already serving a *different* model # while other backends sit idle. The step-4 idle check used to look at # per-model usage (zero everywhere for B) and discard the total-load # ranking, so B could land on the busy backend at random. cfg = _make_cfg([EP1, EP2, EP3], max_conn=4) async def available(ep, *_): return {"model-a:latest", "model-b:latest"} # EP3 is busy with model A; EP1 and EP2 are completely idle. Model B # is loaded nowhere. router.usage_counts[EP3]["model-a:latest"] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): # Run repeatedly: the busy backend must be excluded every time, # the idle two share the load at random. for _ in range(50): ep, _ = await router.choose_endpoint("model-b:latest", reserve=False) assert ep in (EP1, EP2) assert ep != EP3 async def test_two_cold_models_spread_across_backends(self): # Regression: 3 backends all advertise all models. Two *different* # cold models requested back-to-back must land on *different* # backends. Once model-a is resident on the chosen backend (infinite # keep-alive), its in-flight count drops back to 0 — so only the # resident-model count distinguishes the backends. Without it, the # second cold model would randomly re-collide on the busy backend. cfg = _make_cfg([EP1, EP2, EP3], max_conn=4) async def available(ep, *_): return {"model-a:latest", "model-b:latest"} # model-a finished loading on EP1 and stays resident; its request has # completed so EP1 has zero in-flight load, same as EP2/EP3. loaded = {EP1: {"model-a:latest"}, EP2: set(), EP3: set()} async def loaded_models(ep): return loaded[ep] with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", side_effect=loaded_models), ): # A cold model-b must avoid EP1 (which already holds model-a) and # go to one of the empty backends, every time. for _ in range(50): ep, _ = await router.choose_endpoint("model-b:latest", reserve=False) assert ep in (EP2, EP3) assert ep != EP1 async def test_saturated_picks_least_busy(self): cfg = _make_cfg([EP1, EP2]) cfg.max_concurrent_connections = 1 async def available(ep, *_): return {"llama3.2:latest"} # Saturate EP1 with 2 active connections, EP2 with 1 router.usage_counts[EP1]["llama3.2:latest"] = 2 router.usage_counts[EP2]["llama3.2:latest"] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): ep, _ = await router.choose_endpoint("llama3.2:latest") # Least-busy is EP2 assert ep == EP2 async def test_excludes_endpoint_with_recent_loaded_error(self): # Regression: issue #83 — when /api/ps fails for EP1 but EP1 # still advertises the model via /api/tags, routing must not # fall back to EP1 just because it has a free slot. cfg = _make_cfg([EP1, EP2]) async def available(ep, *_): return {"llama3.2:latest"} # EP1's /api/ps probe failed recently; EP2 is fine but the model # is not loaded there. Without the health filter, EP1 would be # picked by the free-slot fallback (step 4 in choose_endpoint). router._loaded_error_cache[EP1] = time.time() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): ep, _ = await router.choose_endpoint("llama3.2:latest") assert ep == EP2 async def test_stale_loaded_error_does_not_exclude(self): # Errors older than the 300s window must not keep an endpoint # excluded forever. cfg = _make_cfg([EP1]) router._loaded_error_cache[EP1] = time.time() - 301 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"m:latest"})), ): ep, _ = await router.choose_endpoint("m:latest") assert ep == EP1 async def test_all_unhealthy_still_routes(self): # If every candidate has a fresh loaded-error we still try one # (it may have recovered between the cache write and now) rather # than refusing to route. cfg = _make_cfg([EP1]) router._loaded_error_cache[EP1] = time.time() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})), patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())), ): ep, _ = await router.choose_endpoint("m:latest") assert ep == EP1 async def test_reserve_increments_usage(self): cfg = _make_cfg([EP1]) with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(return_value={"model:latest"})), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"model:latest"})), ): ep, tracking = await router.choose_endpoint("model:latest", reserve=True) assert router.usage_counts[ep][tracking] == 1 class TestChooseEndpointModelNaming: async def test_strips_latest_for_openai_endpoints(self): cfg = _make_cfg(endpoints=[], llama_eps=[LLAMA_EP]) cfg.endpoints = [] async def available(ep, *_): # llama-server advertises without :latest return {"gpt-4o"} with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"gpt-4o"})), ): ep, _ = await router.choose_endpoint("gpt-4o:latest") assert ep == LLAMA_EP async def test_adds_latest_for_ollama_when_bare_name(self): cfg = _make_cfg([EP1]) async def available(ep, *_): return {"llama3.2:latest"} with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"llama3.2:latest"})), ): ep, _ = await router.choose_endpoint("llama3.2") assert ep == EP1 class TestChooseEndpointLoadBalancing: async def test_random_selection_among_idle(self): cfg = _make_cfg([EP1, EP2, EP3]) selected = set() async def available(ep, *_): return {"model:latest"} async def loaded(ep): return {"model:latest"} for _ in range(20): router.usage_counts.clear() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", side_effect=loaded), ): ep, _ = await router.choose_endpoint("model:latest", reserve=False) selected.add(ep) # With 20 draws from 3 idle endpoints, all three should appear assert len(selected) > 1 async def test_sort_by_load_ascending(self): cfg = _make_cfg([EP1, EP2]) router.usage_counts[EP1]["model:latest"] = 1 router.usage_counts[EP2]["model:latest"] = 0 async def available(ep, *_): return {"model:latest"} async def loaded(ep): return {"model:latest"} with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", side_effect=available), patch.object(router.fetch, "loaded_models", side_effect=loaded), ): ep, _ = await router.choose_endpoint("model:latest", reserve=False) # EP2 has fewer active connections → should be selected assert ep == EP2 # --------------------------------------------------------------------------- # get_max_connections unit tests # --------------------------------------------------------------------------- class TestGetMaxConnections: def test_returns_global_default_when_no_override(self): cfg = _make_cfg([EP1, EP2], max_conn=3) with patch.object(router, "config", cfg): assert router.get_max_connections(EP1) == 3 assert router.get_max_connections(EP2) == 3 def test_returns_per_endpoint_override(self): cfg = _make_cfg( [EP1, EP2], max_conn=2, endpoint_config={EP1: {"max_concurrent_connections": 5}}, ) with patch.object(router, "config", cfg): assert router.get_max_connections(EP1) == 5 assert router.get_max_connections(EP2) == 2 # falls back to global def test_unrecognised_endpoint_falls_back_to_global(self): cfg = _make_cfg([EP1], max_conn=4, endpoint_config={EP2: {"max_concurrent_connections": 1}}) with patch.object(router, "config", cfg): assert router.get_max_connections(EP3) == 4 # --------------------------------------------------------------------------- # Priority / WRR routing tests # --------------------------------------------------------------------------- MODEL = "model:latest" def _all_loaded(ep): """Side-effect: every endpoint advertises and has MODEL loaded.""" return {MODEL} class TestPriorityRouting: """Tests for priority_routing=True (WRR + config-order tiebreaking).""" async def test_idle_picks_first_in_config_order(self): """When all endpoints are idle, priority picks the first listed endpoint.""" cfg = _make_cfg([EP1, EP2, EP3], priority_routing=True) with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): ep, _ = await router.choose_endpoint(MODEL, reserve=False) assert ep == EP1 async def test_lower_utilization_preferred_over_priority(self): """An endpoint with lower ratio is preferred even if it has lower priority.""" cfg = _make_cfg([EP1, EP2], priority_routing=True) # EP1 (priority 0) is busier: 1/2 = 0.5; EP2 (priority 1) is idle: 0/2 = 0.0 router.usage_counts[EP1][MODEL] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): ep, _ = await router.choose_endpoint(MODEL, reserve=False) assert ep == EP2 async def test_wrr_distribution_matches_expected_sequence(self): """ Full WRR sequence with heterogeneous capacities, mirroring the issue example: EP1 max=2, EP2 max=2, EP3 max=1 Expected routing order for 5 sequential requests: EP1, EP2, EP3, EP1, EP2 """ cfg = _make_cfg( [EP1, EP2, EP3], max_conn=2, endpoint_config={EP3: {"max_concurrent_connections": 1}}, priority_routing=True, ) expected = [EP1, EP2, EP3, EP1, EP2] actual = [] with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): for _ in expected: ep, _ = await router.choose_endpoint(MODEL, reserve=True) actual.append(ep) assert actual == expected async def test_saturated_picks_lowest_ratio_then_priority(self): """When all endpoints are saturated, pick lowest utilization ratio; break ties by priority.""" cfg = _make_cfg( [EP1, EP2, EP3], max_conn=1, endpoint_config={EP3: {"max_concurrent_connections": 2}}, priority_routing=True, ) # EP1 usage=1/1=1.0, EP2 usage=1/1=1.0, EP3 usage=1/2=0.5 → EP3 wins router.usage_counts[EP1][MODEL] = 1 router.usage_counts[EP2][MODEL] = 1 router.usage_counts[EP3][MODEL] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): ep, _ = await router.choose_endpoint(MODEL, reserve=False) assert ep == EP3 async def test_saturated_ties_broken_by_priority(self): """When all are saturated with equal ratio, config order wins.""" cfg = _make_cfg([EP1, EP2, EP3], max_conn=1, priority_routing=True) router.usage_counts[EP1][MODEL] = 1 router.usage_counts[EP2][MODEL] = 1 router.usage_counts[EP3][MODEL] = 1 with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): ep, _ = await router.choose_endpoint(MODEL, reserve=False) assert ep == EP1 class TestPriorityRoutingDisabled: """Verify that priority_routing=False keeps the original random behaviour.""" async def test_idle_endpoints_are_randomised(self): """Without priority routing, all-idle selection must eventually pick each endpoint.""" cfg = _make_cfg([EP1, EP2, EP3], priority_routing=False) selected = set() with ( patch.object(router, "config", cfg), patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)), patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)), ): for _ in range(30): router.usage_counts.clear() ep, _ = await router.choose_endpoint(MODEL, reserve=False) selected.add(ep) # With 30 draws from 3 equally-idle endpoints, all three must appear assert selected == {EP1, EP2, EP3}