diff --git a/router.py b/router.py index 08225fb..c465ebc 100644 --- a/router.py +++ b/router.py @@ -1000,7 +1000,7 @@ class fetch: async with client.get(f"{endpoint}/models") as resp: await _ensure_success(resp) data = await resp.json() - + # Filter for loaded models only items = data.get("data", []) models = { @@ -1012,11 +1012,19 @@ class fetch: # Update cache with lock protection async with _loaded_models_cache_lock: _loaded_models_cache[endpoint] = (models, time.time()) + # Probe succeeded — clear any stale error so the endpoint + # becomes routable again. + async with _loaded_error_cache_lock: + _loaded_error_cache.pop(endpoint, None) return models except Exception as e: # If anything goes wrong we simply assume the endpoint has no models message = _format_connection_issue(f"{endpoint}/models", e) print(f"[fetch.loaded_models] {message}") + # Record the failure so `choose_endpoint` can avoid routing + # to an unhealthy backend and repeated probes short-circuit. + async with _loaded_error_cache_lock: + _loaded_error_cache[endpoint] = time.time() return set() else: # Original Ollama /api/ps logic @@ -1031,11 +1039,15 @@ class fetch: # Update cache with lock protection async with _loaded_models_cache_lock: _loaded_models_cache[endpoint] = (models, time.time()) + async with _loaded_error_cache_lock: + _loaded_error_cache.pop(endpoint, None) return models except Exception as e: # If anything goes wrong we simply assume the endpoint has no models message = _format_connection_issue(f"{endpoint}/api/ps", e) print(f"[fetch.loaded_models] {message}") + async with _loaded_error_cache_lock: + _loaded_error_cache[endpoint] = time.time() return set() async def _refresh_loaded_models(endpoint: str) -> None: @@ -1853,6 +1865,28 @@ async def choose_endpoint(model: str, reserve: bool = True, load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints] loaded_sets = await asyncio.gather(*load_tasks) + # 3️⃣.5 Exclude endpoints whose loaded-model probe has been failing + # recently. Without this filter, an endpoint where `/api/ps` returns 5xx + # would appear with an empty loaded set but pass through to the + # free-slot fallback (step 4) — sending completion calls to an + # unhealthy backend. See issue #83. + async with _loaded_error_cache_lock: + unhealthy = { + ep for ep, ts in _loaded_error_cache.items() + if _is_fresh(ts, 300) + } + if unhealthy: + filtered = [ + (ep, models) for ep, models in zip(candidate_endpoints, loaded_sets) + if ep not in unhealthy + ] + if filtered: + candidate_endpoints = [ep for ep, _ in filtered] + loaded_sets = [models for _, models in filtered] + # If *every* candidate is unhealthy we still fall through with the + # original list — refusing to route is worse than retrying a + # possibly-recovered backend. + # Look up a possible affinity hint *before* taking usage_lock. The two # locks are never held together to avoid lock-ordering issues. affine_ep: Optional[str] = None @@ -3154,44 +3188,103 @@ async def usage_proxy(request: Request): "token_usage_counts": token_usage_counts} # ------------------------------------------------------------- -# 20. Proxy config route – for monitoring and frontent usage +# 20. Endpoint health probes (shared by /api/config and /health) +# ------------------------------------------------------------- +async def _raw_probe( + ep: str, + route: str, + api_key: Optional[str] = None, + timeout: Optional[float] = None, +) -> tuple[bool, object]: + """Direct HTTP probe that distinguishes success from failure + (unlike `fetch.endpoint_details`, which returns [] on either). + Returns `(ok, payload_or_error_message)`. + """ + headers = {"Referer": default_headers.get("HTTP-Referer", "https://nomyo.ai")} + if api_key is not None: + headers["Authorization"] = "Bearer " + api_key + url = f"{ep.rstrip('/')}/{route.lstrip('/')}" + req_kwargs = {} + if timeout is not None: + req_kwargs["timeout"] = aiohttp.ClientTimeout(total=timeout) + try: + client: aiohttp.ClientSession = get_session(ep) + async with client.get(url, headers=headers, **req_kwargs) as resp: + await _ensure_success(resp) + data = await resp.json() + return True, data + except Exception as exc: + return False, _format_connection_issue(url, exc) + + +async def _endpoint_health(ep: str, *, timeout: Optional[float] = None) -> dict: + """Probe an endpoint and return `{status, version?, detail?}`. + + Ollama endpoints get a dual probe of `/api/version` and `/api/ps` so + that a daemon which is reachable but has a broken model-introspection + path (issue #83) is reported as `error` rather than `ok`. + OpenAI-compatible endpoints use a single `/models` probe. + """ + if is_openai_compatible(ep): + ok, payload = await _raw_probe( + ep, "/models", config.api_keys.get(ep), timeout=timeout, + ) + if ok: + return {"status": "ok", "version": "latest"} + return {"status": "error", "detail": str(payload)} + + (version_ok, version_payload), (ps_ok, ps_payload) = await asyncio.gather( + _raw_probe(ep, "/api/version", timeout=timeout), + _raw_probe(ep, "/api/ps", timeout=timeout), + ) + + version_value = ( + version_payload.get("version") + if version_ok and isinstance(version_payload, dict) + else None + ) + + if version_ok and ps_ok: + return {"status": "ok", "version": version_value} + if not version_ok and not ps_ok: + return {"status": "error", "detail": str(version_payload)} + # Partial failure — daemon reachable but one probe failed. Report + # as "error" so callers can surface the issue; include `version` so + # the operator knows the daemon itself is alive. + if not ps_ok: + return { + "status": "error", + "version": version_value, + "detail": f"/api/ps: {ps_payload}", + } + return { + "status": "error", + "detail": f"/api/version: {version_payload}", + } + + +# ------------------------------------------------------------- +# 20b. Proxy config route – for monitoring and frontend usage # ------------------------------------------------------------- @app.get("/api/config") async def config_proxy(request: Request): """ Return a simple JSON object that contains the configured - Ollama endpoints and llama_server_endpoints. The front‑end uses this to display - which endpoints are being proxied. + Ollama endpoints and llama_server_endpoints. The front‑end uses this + to display which endpoints are being proxied and their health. + Status is "error" when either liveness (/api/version) or routing + health (/api/ps) fails — see issue #83. """ - async def check_endpoint(url: str): - client: aiohttp.ClientSession = get_session(url) - headers = None - if "/v1" in url: - headers = {"Authorization": "Bearer " + config.api_keys.get(url, "no-key")} - target_url = f"{url}/models" - else: - target_url = f"{url}/api/version" + async def check(url: str) -> dict: + return {"url": url, **(await _endpoint_health(url, timeout=5))} - try: - async with client.get(target_url, headers=headers, timeout=aiohttp.ClientTimeout(total=5)) as resp: - await _ensure_success(resp) - data = await resp.json() - if "/v1" in url: - return {"url": url, "status": "ok", "version": "latest"} - else: - return {"url": url, "status": "ok", "version": data.get("version")} - except Exception as e: - detail = _format_connection_issue(target_url, e) - return {"url": url, "status": "error", "detail": detail} - - # Check Ollama endpoints - ollama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints]) - - # Check llama-server endpoints + ollama_results = await asyncio.gather(*[check(ep) for ep in config.endpoints]) llama_results = [] if config.llama_server_endpoints: - llama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.llama_server_endpoints]) - + llama_results = await asyncio.gather( + *[check(ep) for ep in config.llama_server_endpoints] + ) + return { "endpoints": ollama_results, "llama_server_endpoints": llama_results, @@ -4003,44 +4096,30 @@ async def health_proxy(request: Request): """ Health‑check endpoint for monitoring the proxy. - * Queries each configured endpoint for its `/api/version` response. + * Queries each configured endpoint for both liveness and routing health: + Ollama endpoints are probed at `/api/version` AND `/api/ps`, + OpenAI-compatible endpoints at `/models`. * Returns a JSON object containing: - - `status`: "ok" if every endpoint replied, otherwise "error". + - `status`: "ok" if every endpoint replied to every probe, otherwise "error". - `endpoints`: a mapping of endpoint URL → `{status, version|detail}`. * The HTTP status code is 200 when everything is healthy, 503 otherwise. """ # Run all health checks in parallel. - # Ollama endpoints expose /api/version; OpenAI-compatible endpoints (vLLM, - # llama-server, external) expose /models. Using /api/version against an - # OpenAI-compatible endpoint yields a 404 and noisy log output. + # Ollama endpoints expose /api/version (liveness) and /api/ps (routing + # health — required by `choose_endpoint`). OpenAI-compatible endpoints + # (vLLM, llama-server, external) expose /models, which serves both + # purposes. Probing /api/version alone would miss the case where the + # Ollama process is up but /api/ps is failing — see issue #83. all_endpoints = list(config.endpoints) llama_eps_extra = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints] all_endpoints += llama_eps_extra - tasks = [] - for ep in all_endpoints: - if is_openai_compatible(ep): - tasks.append(fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep), skip_error_cache=True)) - else: - tasks.append(fetch.endpoint_details(ep, "/api/version", "version", skip_error_cache=True)) + probe_results = await asyncio.gather( + *(_endpoint_health(ep) for ep in all_endpoints), + ) - results = await asyncio.gather(*tasks, return_exceptions=True) - - health_summary = {} - overall_ok = True - - for ep, result in zip(all_endpoints, results): - if isinstance(result, Exception): - # Endpoint did not respond / returned an error - health_summary[ep] = {"status": "error", "detail": str(result)} - overall_ok = False - else: - # Successful response – report the reported version (Ollama) or - # indicate the endpoint is reachable (OpenAI-compatible). - if is_openai_compatible(ep): - health_summary[ep] = {"status": "ok"} - else: - health_summary[ep] = {"status": "ok", "version": result} + health_summary = dict(zip(all_endpoints, probe_results)) + overall_ok = all(entry.get("status") == "ok" for entry in probe_results) response_payload = { "status": "ok" if overall_ok else "error", diff --git a/static/index.html b/static/index.html index b29f22b..8c0b16c 100644 --- a/static/index.html +++ b/static/index.html @@ -192,6 +192,10 @@ color: #8b0000; font-weight: bold; } + .status-error[title] { + cursor: help; + text-decoration: underline dotted; + } .copy-link, .delete-link, .show-link, @@ -736,6 +740,16 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { return await resp.json(); } + function escapeHtml(value) { + if (value === null || value === undefined) return ""; + return String(value) + .replace(/&/g, "&") + .replace(//g, ">") + .replace(/"/g, """) + .replace(/'/g, "'"); + } + function toggleDarkMode() { document.documentElement.classList.toggle("dark-mode"); } @@ -752,40 +766,24 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { // Build HTML for both endpoints and llama_server_endpoints let html = ""; - // Add Ollama endpoints - html += data.endpoints - .map((e) => { - const statusClass = - e.status === "ok" - ? "status-ok" - : "status-error"; - const version = e.version || "N/A"; - return ` + const renderRow = (e) => { + const statusClass = + e.status === "ok" ? "status-ok" : "status-error"; + const version = e.version || "N/A"; + const titleAttr = e.detail + ? ` title="${escapeHtml(e.detail)}"` + : ""; + return `