fix:
All checks were successful
PR Tests / test (pull_request) Successful in 58s
NYX Security Scan / nyx-scan (pull_request) Successful in 6m59s

-  _fetch_loaded_models_internal now writes _loaded_error_cache[endpoint] = time.time() on /api/ps or /v1/models failure, and clears the entry on success
- choose_endpoint now filters out candidates with a fresh (<300s) loaded-models error.
-  /health now probes both /api/version and /api/ps for Ollama endpoints
-  dashboard adaption

relates to #83
This commit is contained in:
Alpha Nerd 2026-05-18 13:45:06 +02:00
parent 0b64a84e96
commit db6aa73903
Signed by: alpha-nerd
SSH key fingerprint: SHA256:QkkAgVoYi9TQ0UKPkiKSfnerZy2h4qhi3SVPXJmBN+M
4 changed files with 251 additions and 90 deletions

195
router.py
View file

@ -1000,7 +1000,7 @@ class fetch:
async with client.get(f"{endpoint}/models") as resp:
await _ensure_success(resp)
data = await resp.json()
# Filter for loaded models only
items = data.get("data", [])
models = {
@ -1012,11 +1012,19 @@ class fetch:
# Update cache with lock protection
async with _loaded_models_cache_lock:
_loaded_models_cache[endpoint] = (models, time.time())
# Probe succeeded — clear any stale error so the endpoint
# becomes routable again.
async with _loaded_error_cache_lock:
_loaded_error_cache.pop(endpoint, None)
return models
except Exception as e:
# If anything goes wrong we simply assume the endpoint has no models
message = _format_connection_issue(f"{endpoint}/models", e)
print(f"[fetch.loaded_models] {message}")
# Record the failure so `choose_endpoint` can avoid routing
# to an unhealthy backend and repeated probes short-circuit.
async with _loaded_error_cache_lock:
_loaded_error_cache[endpoint] = time.time()
return set()
else:
# Original Ollama /api/ps logic
@ -1031,11 +1039,15 @@ class fetch:
# Update cache with lock protection
async with _loaded_models_cache_lock:
_loaded_models_cache[endpoint] = (models, time.time())
async with _loaded_error_cache_lock:
_loaded_error_cache.pop(endpoint, None)
return models
except Exception as e:
# If anything goes wrong we simply assume the endpoint has no models
message = _format_connection_issue(f"{endpoint}/api/ps", e)
print(f"[fetch.loaded_models] {message}")
async with _loaded_error_cache_lock:
_loaded_error_cache[endpoint] = time.time()
return set()
async def _refresh_loaded_models(endpoint: str) -> None:
@ -1853,6 +1865,28 @@ async def choose_endpoint(model: str, reserve: bool = True,
load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints]
loaded_sets = await asyncio.gather(*load_tasks)
# 3⃣.5 Exclude endpoints whose loaded-model probe has been failing
# recently. Without this filter, an endpoint where `/api/ps` returns 5xx
# would appear with an empty loaded set but pass through to the
# free-slot fallback (step 4) — sending completion calls to an
# unhealthy backend. See issue #83.
async with _loaded_error_cache_lock:
unhealthy = {
ep for ep, ts in _loaded_error_cache.items()
if _is_fresh(ts, 300)
}
if unhealthy:
filtered = [
(ep, models) for ep, models in zip(candidate_endpoints, loaded_sets)
if ep not in unhealthy
]
if filtered:
candidate_endpoints = [ep for ep, _ in filtered]
loaded_sets = [models for _, models in filtered]
# If *every* candidate is unhealthy we still fall through with the
# original list — refusing to route is worse than retrying a
# possibly-recovered backend.
# Look up a possible affinity hint *before* taking usage_lock. The two
# locks are never held together to avoid lock-ordering issues.
affine_ep: Optional[str] = None
@ -3154,44 +3188,103 @@ async def usage_proxy(request: Request):
"token_usage_counts": token_usage_counts}
# -------------------------------------------------------------
# 20. Proxy config route for monitoring and frontent usage
# 20. Endpoint health probes (shared by /api/config and /health)
# -------------------------------------------------------------
async def _raw_probe(
ep: str,
route: str,
api_key: Optional[str] = None,
timeout: Optional[float] = None,
) -> tuple[bool, object]:
"""Direct HTTP probe that distinguishes success from failure
(unlike `fetch.endpoint_details`, which returns [] on either).
Returns `(ok, payload_or_error_message)`.
"""
headers = {"Referer": default_headers.get("HTTP-Referer", "https://nomyo.ai")}
if api_key is not None:
headers["Authorization"] = "Bearer " + api_key
url = f"{ep.rstrip('/')}/{route.lstrip('/')}"
req_kwargs = {}
if timeout is not None:
req_kwargs["timeout"] = aiohttp.ClientTimeout(total=timeout)
try:
client: aiohttp.ClientSession = get_session(ep)
async with client.get(url, headers=headers, **req_kwargs) as resp:
await _ensure_success(resp)
data = await resp.json()
return True, data
except Exception as exc:
return False, _format_connection_issue(url, exc)
async def _endpoint_health(ep: str, *, timeout: Optional[float] = None) -> dict:
"""Probe an endpoint and return `{status, version?, detail?}`.
Ollama endpoints get a dual probe of `/api/version` and `/api/ps` so
that a daemon which is reachable but has a broken model-introspection
path (issue #83) is reported as `error` rather than `ok`.
OpenAI-compatible endpoints use a single `/models` probe.
"""
if is_openai_compatible(ep):
ok, payload = await _raw_probe(
ep, "/models", config.api_keys.get(ep), timeout=timeout,
)
if ok:
return {"status": "ok", "version": "latest"}
return {"status": "error", "detail": str(payload)}
(version_ok, version_payload), (ps_ok, ps_payload) = await asyncio.gather(
_raw_probe(ep, "/api/version", timeout=timeout),
_raw_probe(ep, "/api/ps", timeout=timeout),
)
version_value = (
version_payload.get("version")
if version_ok and isinstance(version_payload, dict)
else None
)
if version_ok and ps_ok:
return {"status": "ok", "version": version_value}
if not version_ok and not ps_ok:
return {"status": "error", "detail": str(version_payload)}
# Partial failure — daemon reachable but one probe failed. Report
# as "error" so callers can surface the issue; include `version` so
# the operator knows the daemon itself is alive.
if not ps_ok:
return {
"status": "error",
"version": version_value,
"detail": f"/api/ps: {ps_payload}",
}
return {
"status": "error",
"detail": f"/api/version: {version_payload}",
}
# -------------------------------------------------------------
# 20b. Proxy config route for monitoring and frontend usage
# -------------------------------------------------------------
@app.get("/api/config")
async def config_proxy(request: Request):
"""
Return a simple JSON object that contains the configured
Ollama endpoints and llama_server_endpoints. The frontend uses this to display
which endpoints are being proxied.
Ollama endpoints and llama_server_endpoints. The frontend uses this
to display which endpoints are being proxied and their health.
Status is "error" when either liveness (/api/version) or routing
health (/api/ps) fails see issue #83.
"""
async def check_endpoint(url: str):
client: aiohttp.ClientSession = get_session(url)
headers = None
if "/v1" in url:
headers = {"Authorization": "Bearer " + config.api_keys.get(url, "no-key")}
target_url = f"{url}/models"
else:
target_url = f"{url}/api/version"
async def check(url: str) -> dict:
return {"url": url, **(await _endpoint_health(url, timeout=5))}
try:
async with client.get(target_url, headers=headers, timeout=aiohttp.ClientTimeout(total=5)) as resp:
await _ensure_success(resp)
data = await resp.json()
if "/v1" in url:
return {"url": url, "status": "ok", "version": "latest"}
else:
return {"url": url, "status": "ok", "version": data.get("version")}
except Exception as e:
detail = _format_connection_issue(target_url, e)
return {"url": url, "status": "error", "detail": detail}
# Check Ollama endpoints
ollama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints])
# Check llama-server endpoints
ollama_results = await asyncio.gather(*[check(ep) for ep in config.endpoints])
llama_results = []
if config.llama_server_endpoints:
llama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.llama_server_endpoints])
llama_results = await asyncio.gather(
*[check(ep) for ep in config.llama_server_endpoints]
)
return {
"endpoints": ollama_results,
"llama_server_endpoints": llama_results,
@ -4003,44 +4096,30 @@ async def health_proxy(request: Request):
"""
Healthcheck endpoint for monitoring the proxy.
* Queries each configured endpoint for its `/api/version` response.
* Queries each configured endpoint for both liveness and routing health:
Ollama endpoints are probed at `/api/version` AND `/api/ps`,
OpenAI-compatible endpoints at `/models`.
* Returns a JSON object containing:
- `status`: "ok" if every endpoint replied, otherwise "error".
- `status`: "ok" if every endpoint replied to every probe, otherwise "error".
- `endpoints`: a mapping of endpoint URL `{status, version|detail}`.
* The HTTP status code is 200 when everything is healthy, 503 otherwise.
"""
# Run all health checks in parallel.
# Ollama endpoints expose /api/version; OpenAI-compatible endpoints (vLLM,
# llama-server, external) expose /models. Using /api/version against an
# OpenAI-compatible endpoint yields a 404 and noisy log output.
# Ollama endpoints expose /api/version (liveness) and /api/ps (routing
# health — required by `choose_endpoint`). OpenAI-compatible endpoints
# (vLLM, llama-server, external) expose /models, which serves both
# purposes. Probing /api/version alone would miss the case where the
# Ollama process is up but /api/ps is failing — see issue #83.
all_endpoints = list(config.endpoints)
llama_eps_extra = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints]
all_endpoints += llama_eps_extra
tasks = []
for ep in all_endpoints:
if is_openai_compatible(ep):
tasks.append(fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep), skip_error_cache=True))
else:
tasks.append(fetch.endpoint_details(ep, "/api/version", "version", skip_error_cache=True))
probe_results = await asyncio.gather(
*(_endpoint_health(ep) for ep in all_endpoints),
)
results = await asyncio.gather(*tasks, return_exceptions=True)
health_summary = {}
overall_ok = True
for ep, result in zip(all_endpoints, results):
if isinstance(result, Exception):
# Endpoint did not respond / returned an error
health_summary[ep] = {"status": "error", "detail": str(result)}
overall_ok = False
else:
# Successful response report the reported version (Ollama) or
# indicate the endpoint is reachable (OpenAI-compatible).
if is_openai_compatible(ep):
health_summary[ep] = {"status": "ok"}
else:
health_summary[ep] = {"status": "ok", "version": result}
health_summary = dict(zip(all_endpoints, probe_results))
overall_ok = all(entry.get("status") == "ok" for entry in probe_results)
response_payload = {
"status": "ok" if overall_ok else "error",

View file

@ -192,6 +192,10 @@
color: #8b0000;
font-weight: bold;
}
.status-error[title] {
cursor: help;
text-decoration: underline dotted;
}
.copy-link,
.delete-link,
.show-link,
@ -736,6 +740,16 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
return await resp.json();
}
function escapeHtml(value) {
if (value === null || value === undefined) return "";
return String(value)
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;")
.replace(/'/g, "&#39;");
}
function toggleDarkMode() {
document.documentElement.classList.toggle("dark-mode");
}
@ -752,40 +766,24 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
// Build HTML for both endpoints and llama_server_endpoints
let html = "";
// Add Ollama endpoints
html += data.endpoints
.map((e) => {
const statusClass =
e.status === "ok"
? "status-ok"
: "status-error";
const version = e.version || "N/A";
return `
const renderRow = (e) => {
const statusClass =
e.status === "ok" ? "status-ok" : "status-error";
const version = e.version || "N/A";
const titleAttr = e.detail
? ` title="${escapeHtml(e.detail)}"`
: "";
return `
<tr>
<td class="endpoint">${e.url}</td>
<td class="status ${statusClass}">${e.status}</td>
<td class="version">${version}</td>
<td class="endpoint">${escapeHtml(e.url)}</td>
<td class="status ${statusClass}"${titleAttr}>${escapeHtml(e.status)}</td>
<td class="version">${escapeHtml(version)}</td>
</tr>`;
})
.join("");
// Add llama-server endpoints
};
html += data.endpoints.map(renderRow).join("");
if (data.llama_server_endpoints && data.llama_server_endpoints.length > 0) {
html += data.llama_server_endpoints
.map((e) => {
const statusClass =
e.status === "ok"
? "status-ok"
: "status-error";
const version = e.version || "N/A";
return `
<tr>
<td class="endpoint">${e.url}</td>
<td class="status ${statusClass}">${e.status}</td>
<td class="version">${version}</td>
</tr>`;
})
.join("");
html += data.llama_server_endpoints.map(renderRow).join("");
}
body.innerHTML = html;

View file

@ -1,4 +1,5 @@
"""Tests for choose_endpoint routing logic with mocked fetch calls."""
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@ -25,10 +26,12 @@ def _make_cfg(endpoints, llama_eps=None, max_conn=2, endpoint_config=None, prior
@pytest.fixture(autouse=True)
def reset_usage():
"""Clear usage_counts between tests to prevent bleed."""
"""Clear usage_counts and error caches between tests to prevent bleed."""
router.usage_counts.clear()
router._loaded_error_cache.clear()
yield
router.usage_counts.clear()
router._loaded_error_cache.clear()
class TestChooseEndpointBasic:
@ -102,6 +105,57 @@ class TestChooseEndpointBasic:
# Least-busy is EP2
assert ep == EP2
async def test_excludes_endpoint_with_recent_loaded_error(self):
# Regression: issue #83 — when /api/ps fails for EP1 but EP1
# still advertises the model via /api/tags, routing must not
# fall back to EP1 just because it has a free slot.
cfg = _make_cfg([EP1, EP2])
async def available(ep, *_):
return {"llama3.2:latest"}
# EP1's /api/ps probe failed recently; EP2 is fine but the model
# is not loaded there. Without the health filter, EP1 would be
# picked by the free-slot fallback (step 4 in choose_endpoint).
router._loaded_error_cache[EP1] = time.time()
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", side_effect=available),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
):
ep, _ = await router.choose_endpoint("llama3.2:latest")
assert ep == EP2
async def test_stale_loaded_error_does_not_exclude(self):
# Errors older than the 300s window must not keep an endpoint
# excluded forever.
cfg = _make_cfg([EP1])
router._loaded_error_cache[EP1] = time.time() - 301
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value={"m:latest"})),
):
ep, _ = await router.choose_endpoint("m:latest")
assert ep == EP1
async def test_all_unhealthy_still_routes(self):
# If every candidate has a fresh loaded-error we still try one
# (it may have recovered between the cache write and now) rather
# than refusing to route.
cfg = _make_cfg([EP1])
router._loaded_error_cache[EP1] = time.time()
with (
patch.object(router, "config", cfg),
patch.object(router.fetch, "available_models", AsyncMock(return_value={"m:latest"})),
patch.object(router.fetch, "loaded_models", AsyncMock(return_value=set())),
):
ep, _ = await router.choose_endpoint("m:latest")
assert ep == EP1
async def test_reserve_increments_usage(self):
cfg = _make_cfg([EP1])
with (

View file

@ -178,3 +178,33 @@ class TestFetchLoadedModels:
first = await router.fetch.loaded_models(MOCK_OLLAMA_EP)
second = await router.fetch.loaded_models(MOCK_OLLAMA_EP)
assert first == second
async def test_records_error_in_loaded_error_cache_on_failure(self):
# Regression: issue #83 — /api/ps failures must be recorded so
# `choose_endpoint` can exclude unhealthy backends from routing.
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
with patch.object(router, "config", cfg), aioresponses() as m:
m.get(f"{MOCK_OLLAMA_EP}/api/ps", status=502, payload={})
await router.fetch.loaded_models(MOCK_OLLAMA_EP)
assert MOCK_OLLAMA_EP in router._loaded_error_cache
async def test_records_error_for_llama_server_on_failure(self):
cfg = _make_cfg(ollama_eps=[], llama_eps=[MOCK_LLAMA_EP])
with patch.object(router, "config", cfg), aioresponses() as m:
m.get(f"{MOCK_LLAMA_EP}/models", status=502, payload={})
await router.fetch.loaded_models(MOCK_LLAMA_EP)
assert MOCK_LLAMA_EP in router._loaded_error_cache
async def test_clears_error_cache_on_subsequent_success(self):
cfg = _make_cfg(ollama_eps=[MOCK_OLLAMA_EP], llama_eps=[])
# Pre-seed an old error so loaded_models() falls through to the
# network probe instead of short-circuiting on the error cache.
async with router._loaded_error_cache_lock:
router._loaded_error_cache[MOCK_OLLAMA_EP] = time.time() - 301
with patch.object(router, "config", cfg), aioresponses() as m:
m.get(
f"{MOCK_OLLAMA_EP}/api/ps",
payload={"models": [{"name": "qwen:7b"}]},
)
await router.fetch.loaded_models(MOCK_OLLAMA_EP)
assert MOCK_OLLAMA_EP not in router._loaded_error_cache