- _fetch_loaded_models_internal now writes _loaded_error_cache[endpoint] = time.time() on /api/ps or /v1/models failure, and clears the entry on success - choose_endpoint now filters out candidates with a fresh (<300s) loaded-models error. - /health now probes both /api/version and /api/ps for Ollama endpoints - dashboard adaption relates to #83
399 lines
16 KiB
Python
399 lines
16 KiB
Python
"""Tests for choose_endpoint routing logic with mocked fetch calls."""
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
import router
|
|
|
|
EP1 = "http://ep1:11434"
|
|
EP2 = "http://ep2:11434"
|
|
EP3 = "http://ep3:11434"
|
|
LLAMA_EP = "http://llama:8080/v1"
|
|
|
|
|
|
def _make_cfg(endpoints, llama_eps=None, max_conn=2, endpoint_config=None, priority_routing=False):
|
|
cfg = MagicMock()
|
|
cfg.endpoints = endpoints
|
|
cfg.llama_server_endpoints = llama_eps or []
|
|
cfg.api_keys = {}
|
|
cfg.max_concurrent_connections = max_conn
|
|
cfg.endpoint_config = endpoint_config or {}
|
|
cfg.priority_routing = priority_routing
|
|
cfg.router_api_key = None
|
|
return cfg
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_usage():
|
|
"""Clear usage_counts and error caches between tests to prevent bleed."""
|
|
router.usage_counts.clear()
|
|
router._loaded_error_cache.clear()
|
|
yield
|
|
router.usage_counts.clear()
|
|
router._loaded_error_cache.clear()
|
|
|
|
|
|
class TestChooseEndpointBasic:
|
|
async def test_selects_single_candidate(self):
|
|
cfg = _make_cfg([EP1])
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(return_value={"llama3.2:latest"})),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"llama3.2:latest"})),
|
|
):
|
|
ep, tracking = await router.choose_endpoint("llama3.2:latest")
|
|
assert ep == EP1
|
|
assert tracking == "llama3.2:latest"
|
|
|
|
async def test_raises_when_no_endpoint_has_model(self):
|
|
cfg = _make_cfg([EP1, EP2])
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(return_value=set())),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
|
|
):
|
|
with pytest.raises(RuntimeError, match="advertise the model"):
|
|
await router.choose_endpoint("unknown-model:latest")
|
|
|
|
async def test_prefers_loaded_endpoint(self):
|
|
cfg = _make_cfg([EP1, EP2])
|
|
async def available(ep, *_):
|
|
return {"llama3.2:latest"}
|
|
|
|
async def loaded(ep):
|
|
return {"llama3.2:latest"} if ep == EP2 else set()
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", side_effect=available),
|
|
patch.object(router.fetch, "loaded_models", side_effect=loaded),
|
|
):
|
|
ep, _ = await router.choose_endpoint("llama3.2:latest")
|
|
assert ep == EP2
|
|
|
|
async def test_falls_back_to_free_slot(self):
|
|
cfg = _make_cfg([EP1, EP2])
|
|
async def available(ep, *_):
|
|
return {"llama3.2:latest"}
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", side_effect=available),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
|
|
):
|
|
ep, _ = await router.choose_endpoint("llama3.2:latest")
|
|
assert ep in (EP1, EP2)
|
|
|
|
async def test_saturated_picks_least_busy(self):
|
|
cfg = _make_cfg([EP1, EP2])
|
|
cfg.max_concurrent_connections = 1
|
|
|
|
async def available(ep, *_):
|
|
return {"llama3.2:latest"}
|
|
|
|
# Saturate EP1 with 2 active connections, EP2 with 1
|
|
router.usage_counts[EP1]["llama3.2:latest"] = 2
|
|
router.usage_counts[EP2]["llama3.2:latest"] = 1
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", side_effect=available),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
|
|
):
|
|
ep, _ = await router.choose_endpoint("llama3.2:latest")
|
|
# Least-busy is EP2
|
|
assert ep == EP2
|
|
|
|
async def test_excludes_endpoint_with_recent_loaded_error(self):
|
|
# Regression: issue #83 — when /api/ps fails for EP1 but EP1
|
|
# still advertises the model via /api/tags, routing must not
|
|
# fall back to EP1 just because it has a free slot.
|
|
cfg = _make_cfg([EP1, EP2])
|
|
|
|
async def available(ep, *_):
|
|
return {"llama3.2:latest"}
|
|
|
|
# EP1's /api/ps probe failed recently; EP2 is fine but the model
|
|
# is not loaded there. Without the health filter, EP1 would be
|
|
# picked by the free-slot fallback (step 4 in choose_endpoint).
|
|
router._loaded_error_cache[EP1] = time.time()
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", side_effect=available),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
|
|
):
|
|
ep, _ = await router.choose_endpoint("llama3.2:latest")
|
|
assert ep == EP2
|
|
|
|
async def test_stale_loaded_error_does_not_exclude(self):
|
|
# Errors older than the 300s window must not keep an endpoint
|
|
# excluded forever.
|
|
cfg = _make_cfg([EP1])
|
|
router._loaded_error_cache[EP1] = time.time() - 301
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"m:latest"})),
|
|
):
|
|
ep, _ = await router.choose_endpoint("m:latest")
|
|
assert ep == EP1
|
|
|
|
async def test_all_unhealthy_still_routes(self):
|
|
# If every candidate has a fresh loaded-error we still try one
|
|
# (it may have recovered between the cache write and now) rather
|
|
# than refusing to route.
|
|
cfg = _make_cfg([EP1])
|
|
router._loaded_error_cache[EP1] = time.time()
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
|
|
):
|
|
ep, _ = await router.choose_endpoint("m:latest")
|
|
assert ep == EP1
|
|
|
|
async def test_reserve_increments_usage(self):
|
|
cfg = _make_cfg([EP1])
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(return_value={"model:latest"})),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"model:latest"})),
|
|
):
|
|
ep, tracking = await router.choose_endpoint("model:latest", reserve=True)
|
|
assert router.usage_counts[ep][tracking] == 1
|
|
|
|
|
|
class TestChooseEndpointModelNaming:
|
|
async def test_strips_latest_for_openai_endpoints(self):
|
|
cfg = _make_cfg(endpoints=[], llama_eps=[LLAMA_EP])
|
|
cfg.endpoints = []
|
|
|
|
async def available(ep, *_):
|
|
# llama-server advertises without :latest
|
|
return {"gpt-4o"}
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", side_effect=available),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"gpt-4o"})),
|
|
):
|
|
ep, _ = await router.choose_endpoint("gpt-4o:latest")
|
|
assert ep == LLAMA_EP
|
|
|
|
async def test_adds_latest_for_ollama_when_bare_name(self):
|
|
cfg = _make_cfg([EP1])
|
|
|
|
async def available(ep, *_):
|
|
return {"llama3.2:latest"}
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", side_effect=available),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"llama3.2:latest"})),
|
|
):
|
|
ep, _ = await router.choose_endpoint("llama3.2")
|
|
assert ep == EP1
|
|
|
|
|
|
class TestChooseEndpointLoadBalancing:
|
|
async def test_random_selection_among_idle(self):
|
|
cfg = _make_cfg([EP1, EP2, EP3])
|
|
selected = set()
|
|
|
|
async def available(ep, *_):
|
|
return {"model:latest"}
|
|
|
|
async def loaded(ep):
|
|
return {"model:latest"}
|
|
|
|
for _ in range(20):
|
|
router.usage_counts.clear()
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", side_effect=available),
|
|
patch.object(router.fetch, "loaded_models", side_effect=loaded),
|
|
):
|
|
ep, _ = await router.choose_endpoint("model:latest", reserve=False)
|
|
selected.add(ep)
|
|
|
|
# With 20 draws from 3 idle endpoints, all three should appear
|
|
assert len(selected) > 1
|
|
|
|
async def test_sort_by_load_ascending(self):
|
|
cfg = _make_cfg([EP1, EP2])
|
|
router.usage_counts[EP1]["model:latest"] = 1
|
|
router.usage_counts[EP2]["model:latest"] = 0
|
|
|
|
async def available(ep, *_):
|
|
return {"model:latest"}
|
|
|
|
async def loaded(ep):
|
|
return {"model:latest"}
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", side_effect=available),
|
|
patch.object(router.fetch, "loaded_models", side_effect=loaded),
|
|
):
|
|
ep, _ = await router.choose_endpoint("model:latest", reserve=False)
|
|
# EP2 has fewer active connections → should be selected
|
|
assert ep == EP2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_max_connections unit tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetMaxConnections:
|
|
def test_returns_global_default_when_no_override(self):
|
|
cfg = _make_cfg([EP1, EP2], max_conn=3)
|
|
with patch.object(router, "config", cfg):
|
|
assert router.get_max_connections(EP1) == 3
|
|
assert router.get_max_connections(EP2) == 3
|
|
|
|
def test_returns_per_endpoint_override(self):
|
|
cfg = _make_cfg(
|
|
[EP1, EP2],
|
|
max_conn=2,
|
|
endpoint_config={EP1: {"max_concurrent_connections": 5}},
|
|
)
|
|
with patch.object(router, "config", cfg):
|
|
assert router.get_max_connections(EP1) == 5
|
|
assert router.get_max_connections(EP2) == 2 # falls back to global
|
|
|
|
def test_unrecognised_endpoint_falls_back_to_global(self):
|
|
cfg = _make_cfg([EP1], max_conn=4, endpoint_config={EP2: {"max_concurrent_connections": 1}})
|
|
with patch.object(router, "config", cfg):
|
|
assert router.get_max_connections(EP3) == 4
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Priority / WRR routing tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MODEL = "model:latest"
|
|
|
|
|
|
def _all_loaded(ep):
|
|
"""Side-effect: every endpoint advertises and has MODEL loaded."""
|
|
return {MODEL}
|
|
|
|
|
|
class TestPriorityRouting:
|
|
"""Tests for priority_routing=True (WRR + config-order tiebreaking)."""
|
|
|
|
async def test_idle_picks_first_in_config_order(self):
|
|
"""When all endpoints are idle, priority picks the first listed endpoint."""
|
|
cfg = _make_cfg([EP1, EP2, EP3], priority_routing=True)
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
|
|
):
|
|
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
|
|
assert ep == EP1
|
|
|
|
async def test_lower_utilization_preferred_over_priority(self):
|
|
"""An endpoint with lower ratio is preferred even if it has lower priority."""
|
|
cfg = _make_cfg([EP1, EP2], priority_routing=True)
|
|
# EP1 (priority 0) is busier: 1/2 = 0.5; EP2 (priority 1) is idle: 0/2 = 0.0
|
|
router.usage_counts[EP1][MODEL] = 1
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
|
|
):
|
|
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
|
|
assert ep == EP2
|
|
|
|
async def test_wrr_distribution_matches_expected_sequence(self):
|
|
"""
|
|
Full WRR sequence with heterogeneous capacities, mirroring the issue example:
|
|
EP1 max=2, EP2 max=2, EP3 max=1
|
|
|
|
Expected routing order for 5 sequential requests:
|
|
EP1, EP2, EP3, EP1, EP2
|
|
"""
|
|
cfg = _make_cfg(
|
|
[EP1, EP2, EP3],
|
|
max_conn=2,
|
|
endpoint_config={EP3: {"max_concurrent_connections": 1}},
|
|
priority_routing=True,
|
|
)
|
|
|
|
expected = [EP1, EP2, EP3, EP1, EP2]
|
|
actual = []
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
|
|
):
|
|
for _ in expected:
|
|
ep, _ = await router.choose_endpoint(MODEL, reserve=True)
|
|
actual.append(ep)
|
|
|
|
assert actual == expected
|
|
|
|
async def test_saturated_picks_lowest_ratio_then_priority(self):
|
|
"""When all endpoints are saturated, pick lowest utilization ratio; break ties by priority."""
|
|
cfg = _make_cfg(
|
|
[EP1, EP2, EP3],
|
|
max_conn=1,
|
|
endpoint_config={EP3: {"max_concurrent_connections": 2}},
|
|
priority_routing=True,
|
|
)
|
|
# EP1 usage=1/1=1.0, EP2 usage=1/1=1.0, EP3 usage=1/2=0.5 → EP3 wins
|
|
router.usage_counts[EP1][MODEL] = 1
|
|
router.usage_counts[EP2][MODEL] = 1
|
|
router.usage_counts[EP3][MODEL] = 1
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
|
|
):
|
|
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
|
|
assert ep == EP3
|
|
|
|
async def test_saturated_ties_broken_by_priority(self):
|
|
"""When all are saturated with equal ratio, config order wins."""
|
|
cfg = _make_cfg([EP1, EP2, EP3], max_conn=1, priority_routing=True)
|
|
router.usage_counts[EP1][MODEL] = 1
|
|
router.usage_counts[EP2][MODEL] = 1
|
|
router.usage_counts[EP3][MODEL] = 1
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
|
|
):
|
|
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
|
|
assert ep == EP1
|
|
|
|
|
|
class TestPriorityRoutingDisabled:
|
|
"""Verify that priority_routing=False keeps the original random behaviour."""
|
|
|
|
async def test_idle_endpoints_are_randomised(self):
|
|
"""Without priority routing, all-idle selection must eventually pick each endpoint."""
|
|
cfg = _make_cfg([EP1, EP2, EP3], priority_routing=False)
|
|
selected = set()
|
|
|
|
with (
|
|
patch.object(router, "config", cfg),
|
|
patch.object(router.fetch, "available_models", AsyncMock(side_effect=_all_loaded)),
|
|
patch.object(router.fetch, "loaded_models", AsyncMock(side_effect=_all_loaded)),
|
|
):
|
|
for _ in range(30):
|
|
router.usage_counts.clear()
|
|
ep, _ = await router.choose_endpoint(MODEL, reserve=False)
|
|
selected.add(ep)
|
|
|
|
# With 30 draws from 3 equally-idle endpoints, all three must appear
|
|
assert selected == {EP1, EP2, EP3}
|