nomyo-router/test/test_choose_endpoint.py
alpha nerd db6aa73903
All checks were successful
PR Tests / test (pull_request) Successful in 58s
NYX Security Scan / nyx-scan (pull_request) Successful in 6m59s
fix:
-  _fetch_loaded_models_internal now writes _loaded_error_cache[endpoint] = time.time() on /api/ps or /v1/models failure, and clears the entry on success
- choose_endpoint now filters out candidates with a fresh (<300s) loaded-models error.
-  /health now probes both /api/version and /api/ps for Ollama endpoints
-  dashboard adaption

relates to #83
2026-05-18 13:45:06 +02:00

399 lines
16 KiB
Python

"""Tests for choose_endpoint routing logic with mocked fetch calls."""
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import router
EP1 = "http://ep1:11434"
EP2 = "http://ep2:11434"
EP3 = "http://ep3:11434"
LLAMA_EP = "http://llama:8080/v1"
def _make_cfg(endpoints, llama_eps=None, max_conn=2, endpoint_config=None, priority_routing=False):
cfg = MagicMock()
cfg.endpoints = endpoints
cfg.llama_server_endpoints = llama_eps or []
cfg.api_keys = {}
cfg.max_concurrent_connections = max_conn
cfg.endpoint_config = endpoint_config or {}
cfg.priority_routing = priority_routing
cfg.router_api_key = None
return cfg
@pytest.fixture(autouse=True)
def reset_usage():
"""Clear usage_counts and error caches between tests to prevent bleed."""
router.usage_counts.clear()
router._loaded_error_cache.clear()
yield
router.usage_counts.clear()
router._loaded_error_cache.clear()
class TestChooseEndpointBasic:
async def test_selects_single_candidate(self):
cfg = _make_cfg([EP1])
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(return_value={"llama3.2:latest"})),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"llama3.2:latest"})),
):
ep, tracking = await router.choose_endpoint("llama3.2:latest")
assert ep == EP1
assert tracking == "llama3.2:latest"
async def test_raises_when_no_endpoint_has_model(self):
cfg = _make_cfg([EP1, EP2])
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(return_value=set())),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
):
with pytest.raises(RuntimeError, match="advertise the model"):
await router.choose_endpoint("unknown-model:latest")
async def test_prefers_loaded_endpoint(self):
cfg = _make_cfg([EP1, EP2])
async def available(ep, *_):
return {"llama3.2:latest"}
async def loaded(ep):
return {"llama3.2:latest"} if ep == EP2 else set()
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", side_effect=loaded),
):
ep, _ = await router.choose_endpoint("llama3.2:latest")
assert ep == EP2
async def test_falls_back_to_free_slot(self):
cfg = _make_cfg([EP1, EP2])
async def available(ep, *_):
return {"llama3.2:latest"}
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
):
ep, _ = await router.choose_endpoint("llama3.2:latest")
assert ep in (EP1, EP2)
async def test_saturated_picks_least_busy(self):
cfg = _make_cfg([EP1, EP2])
cfg.max_concurrent_connections = 1
async def available(ep, *_):
return {"llama3.2:latest"}
# Saturate EP1 with 2 active connections, EP2 with 1
router.usage_counts[EP1]["llama3.2:latest"] = 2
router.usage_counts[EP2]["llama3.2:latest"] = 1
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
):
ep, _ = await router.choose_endpoint("llama3.2:latest")
# Least-busy is EP2
assert ep == EP2
async def test_excludes_endpoint_with_recent_loaded_error(self):
# Regression: issue #83 — when /api/ps fails for EP1 but EP1
# still advertises the model via /api/tags, routing must not
# fall back to EP1 just because it has a free slot.
cfg = _make_cfg([EP1, EP2])
async def available(ep, *_):
return {"llama3.2:latest"}
# EP1's /api/ps probe failed recently; EP2 is fine but the model
# is not loaded there. Without the health filter, EP1 would be
# picked by the free-slot fallback (step 4 in choose_endpoint).
router._loaded_error_cache[EP1] = time.time()
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
):
ep, _ = await router.choose_endpoint("llama3.2:latest")
assert ep == EP2
async def test_stale_loaded_error_does_not_exclude(self):
# Errors older than the 300s window must not keep an endpoint
# excluded forever.
cfg = _make_cfg([EP1])
router._loaded_error_cache[EP1] = time.time() - 301
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"m:latest"})),
):
ep, _ = await router.choose_endpoint("m:latest")
assert ep == EP1
async def test_all_unhealthy_still_routes(self):
# If every candidate has a fresh loaded-error we still try one
# (it may have recovered between the cache write and now) rather
# than refusing to route.
cfg = _make_cfg([EP1])
router._loaded_error_cache[EP1] = time.time()
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
):
ep, _ = await router.choose_endpoint("m:latest")
assert ep == EP1
async def test_reserve_increments_usage(self):
cfg = _make_cfg([EP1])
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(return_value={"model:latest"})),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"model:latest"})),
):
ep, tracking = await router.choose_endpoint("model:latest", reserve=True)
assert router.usage_counts[ep][tracking] == 1
class TestChooseEndpointModelNaming:
async def test_strips_latest_for_openai_endpoints(self):
cfg = _make_cfg(endpoints=[], llama_eps=[LLAMA_EP])
cfg.endpoints = []
async def available(ep, *_):
# llama-server advertises without :latest
return {"gpt-4o"}
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"gpt-4o"})),
):
ep, _ = await router.choose_endpoint("gpt-4o:latest")
assert ep == LLAMA_EP
async def test_adds_latest_for_ollama_when_bare_name(self):
cfg = _make_cfg([EP1])
async def available(ep, *_):
return {"llama3.2:latest"}
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"llama3.2:latest"})),
):
ep, _ = await router.choose_endpoint("llama3.2")
assert ep == EP1
class TestChooseEndpointLoadBalancing:
async def test_random_selection_among_idle(self):
cfg = _make_cfg([EP1, EP2, EP3])
selected = set()
async def available(ep, *_):
return {"model:latest"}
async def loaded(ep):
return {"model:latest"}
for _ in range(20):
router.usage_counts.clear()
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", side_effect=loaded),
):
ep, _ = await router.choose_endpoint("model:latest", reserve=False)
selected.add(ep)
# With 20 draws from 3 idle endpoints, all three should appear
assert len(selected) > 1
async def test_sort_by_load_ascending(self):
cfg = _make_cfg([EP1, EP2])
router.usage_counts[EP1]["model:latest"] = 1
router.usage_counts[EP2]["model:latest"] = 0
async def available(ep, *_):
return {"model:latest"}
async def loaded(ep):
return {"model:latest"}
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", side_effect=loaded),
):
ep, _ = await router.choose_endpoint("model:latest", reserve=False)
# EP2 has fewer active connections → should be selected
assert ep == EP2
# ---------------------------------------------------------------------------
# get_max_connections unit tests
# ---------------------------------------------------------------------------
class TestGetMaxConnections:
def test_returns_global_default_when_no_override(self):
cfg = _make_cfg([EP1, EP2], max_conn=3)
with patch.object(router, "config", cfg):
assert router.get_max_connections(EP1) == 3
assert router.get_max_connections(EP2) == 3
def test_returns_per_endpoint_override(self):
cfg = _make_cfg(
[EP1, EP2],
max_conn=2,
endpoint_config={EP1: {"max_concurrent_connections": 5}},
)
with patch.object(router, "config", cfg):
assert router.get_max_connections(EP1) == 5
assert router.get_max_connections(EP2) == 2 # falls back to global
def test_unrecognised_endpoint_falls_back_to_global(self):
cfg = _make_cfg([EP1], max_conn=4, endpoint_config={EP2: {"max_concurrent_connections": 1}})
with patch.object(router, "config", cfg):
assert router.get_max_connections(EP3) == 4
# ---------------------------------------------------------------------------
# Priority / WRR routing tests
# ---------------------------------------------------------------------------
MODEL = "model:latest"
def _all_loaded(ep):
"""Side-effect: every endpoint advertises and has MODEL loaded."""
return {MODEL}
class TestPriorityRouting:
"""Tests for priority_routing=True (WRR + config-order tiebreaking)."""
async def test_idle_picks_first_in_config_order(self):
"""When all endpoints are idle, priority picks the first listed endpoint."""
cfg = _make_cfg([EP1, EP2, EP3], priority_routing=True)
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
):
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
assert ep == EP1
async def test_lower_utilization_preferred_over_priority(self):
"""An endpoint with lower ratio is preferred even if it has lower priority."""
cfg = _make_cfg([EP1, EP2], priority_routing=True)
# EP1 (priority 0) is busier: 1/2 = 0.5; EP2 (priority 1) is idle: 0/2 = 0.0
router.usage_counts[EP1][MODEL] = 1
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
):
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
assert ep == EP2
async def test_wrr_distribution_matches_expected_sequence(self):
"""
Full WRR sequence with heterogeneous capacities, mirroring the issue example:
EP1 max=2, EP2 max=2, EP3 max=1
Expected routing order for 5 sequential requests:
EP1, EP2, EP3, EP1, EP2
"""
cfg = _make_cfg(
[EP1, EP2, EP3],
max_conn=2,
endpoint_config={EP3: {"max_concurrent_connections": 1}},
priority_routing=True,
)
expected = [EP1, EP2, EP3, EP1, EP2]
actual = []
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
):
for _ in expected:
ep, _ = await router.choose_endpoint(MODEL, reserve=True)
actual.append(ep)
assert actual == expected
async def test_saturated_picks_lowest_ratio_then_priority(self):
"""When all endpoints are saturated, pick lowest utilization ratio; break ties by priority."""
cfg = _make_cfg(
[EP1, EP2, EP3],
max_conn=1,
endpoint_config={EP3: {"max_concurrent_connections": 2}},
priority_routing=True,
)
# EP1 usage=1/1=1.0, EP2 usage=1/1=1.0, EP3 usage=1/2=0.5 → EP3 wins
router.usage_counts[EP1][MODEL] = 1
router.usage_counts[EP2][MODEL] = 1
router.usage_counts[EP3][MODEL] = 1
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
):
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
assert ep == EP3
async def test_saturated_ties_broken_by_priority(self):
"""When all are saturated with equal ratio, config order wins."""
cfg = _make_cfg([EP1, EP2, EP3], max_conn=1, priority_routing=True)
router.usage_counts[EP1][MODEL] = 1
router.usage_counts[EP2][MODEL] = 1
router.usage_counts[EP3][MODEL] = 1
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
):
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
assert ep == EP1
class TestPriorityRoutingDisabled:
"""Verify that priority_routing=False keeps the original random behaviour."""
async def test_idle_endpoints_are_randomised(self):
"""Without priority routing, all-idle selection must eventually pick each endpoint."""
cfg = _make_cfg([EP1, EP2, EP3], priority_routing=False)
selected = set()
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
):
for _ in range(30):
router.usage_counts.clear()
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
selected.add(ep)
# With 30 draws from 3 equally-idle endpoints, all three must appear
assert selected == {EP1, EP2, EP3}