From 079b677e2362d984ebd371a799547ff1d898e0ba Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Mon, 18 May 2026 18:14:28 +0200 Subject: [PATCH] feat: completion errors on an endpoint:model key a caught, cached and rerouted (openai compatible endpoints) --- router.py | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/router.py b/router.py index c465ebc..eb9f1e1 100644 --- a/router.py +++ b/router.py @@ -38,6 +38,14 @@ _loaded_models_cache: dict[str, tuple[Set[str], float]] = {} # in one path does not poison the other. _available_error_cache: dict[str, float] = {} _loaded_error_cache: dict[str, float] = {} +# Per-(endpoint, model) completion-path failures. A llama-server in router +# mode can keep returning /v1/models 200 OK after its delegated worker for +# a specific model dies — the probe-level caches above will not catch this. +# We record signals observed during actual completion attempts so +# choose_endpoint can avoid the affected (endpoint, model) pair without +# poisoning unrelated models on the same backend. +_completion_error_cache: dict[tuple[str, str], float] = {} +_COMPLETION_ERROR_TTL = 300 # ------------------------------------------------------------------ # Cache locks @@ -46,6 +54,7 @@ _models_cache_lock = asyncio.Lock() _loaded_models_cache_lock = asyncio.Lock() _available_error_cache_lock = asyncio.Lock() _loaded_error_cache_lock = asyncio.Lock() +_completion_error_cache_lock = asyncio.Lock() # ------------------------------------------------------------------ # In-flight request tracking (prevents cache stampede) @@ -618,6 +627,41 @@ def _make_openai_client( return openai.AsyncOpenAI(base_url=base_url, **kwargs) +def _is_backend_connection_error(exc: Exception) -> bool: + """True for upstream connection-class failures observed via the OpenAI client. + + Targets the case where a llama-server in router mode keeps answering + /v1/models but its delegated worker for a specific model is dead, so + chat/completions calls return 5xx with 'proxy error: Could not establish + connection' (or the SDK raises APIConnectionError outright). + + Excludes BadRequestError with exceed_context_size_error by design — those + must stay on the reactive-trim path. + """ + if isinstance(exc, openai.APIConnectionError): + return True + if isinstance(exc, openai.InternalServerError): + msg = str(exc).lower() + return ( + "proxy error" in msg + or "could not establish connection" in msg + or "connection refused" in msg + ) + return False + + +async def _mark_backend_unhealthy(endpoint: str, model: str, reason: str = "") -> None: + """Record (endpoint, model) as broken so choose_endpoint avoids it. + + Cleared only by TTL — the dead-worker failure mode is invisible to the + /v1/models / /api/ps probes that clear _loaded_error_cache, so we cannot + rely on a successful probe as a recovery signal. + """ + async with _completion_error_cache_lock: + _completion_error_cache[(endpoint, model)] = time.time() + print(f"[health] marked unhealthy ep={endpoint} model={model} reason={reason[:120]}", flush=True) + + def _is_llama_model_loaded(item: dict) -> bool: """Return True if a llama-server /v1/models item has status 'loaded'. Handles both dict format ({"value": "loaded"}) and plain string ("loaded"). @@ -1887,6 +1931,27 @@ async def choose_endpoint(model: str, reserve: bool = True, # original list — refusing to route is worse than retrying a # possibly-recovered backend. + # 3️⃣.6 Exclude (endpoint, model) pairs whose completion path has recently + # failed with a backend connection error (e.g. llama-server in router mode + # whose delegated worker for *this* model died). /v1/models keeps reporting + # OK in that case, so the probe-level filter above cannot catch it. + async with _completion_error_cache_lock: + completion_broken = { + ep for (ep, m), ts in _completion_error_cache.items() + if m == model and _is_fresh(ts, _COMPLETION_ERROR_TTL) + } + if completion_broken: + filtered = [ + (ep, models) for ep, models in zip(candidate_endpoints, loaded_sets) + if ep not in completion_broken + ] + if filtered: + candidate_endpoints = [ep for ep, _ in filtered] + loaded_sets = [models for _, models in filtered] + # Same fallback: if every candidate is broken for this model, fall + # through and let the upstream retry — possibly the operator restarted + # the dead worker. + # Look up a possible affinity hint *before* taking usage_lock. The two # locks are never held together to avoid lock-ordering issues. affine_ep: Optional[str] = None @@ -2316,6 +2381,11 @@ async def chat_proxy(request: Request): else: await decrement_usage(endpoint, tracking_model) raise + elif _is_backend_connection_error(e): + print(f"[chat_proxy] backend connection error → marking ({endpoint}, {model}) unhealthy", flush=True) + await _mark_backend_unhealthy(endpoint, model, _e_str) + await decrement_usage(endpoint, tracking_model) + raise elif "image input is not supported" in _e_str: print(f"[chat_proxy] Model {model} doesn't support images, retrying with text-only messages") try: @@ -3573,6 +3643,14 @@ async def openai_chat_completions_proxy(request: Request): else: await decrement_usage(endpoint, tracking_model) raise + elif _is_backend_connection_error(e): + # Upstream connection failed (e.g. llama-server in router mode + # whose delegated worker died). Mark (endpoint, model) so the + # next request reroutes; the client will retry this one. + print(f"[ochat] backend connection error → marking ({endpoint}, {model}) unhealthy", flush=True) + await _mark_backend_unhealthy(endpoint, model, _e_str) + await decrement_usage(endpoint, tracking_model) + raise elif "image input is not supported" in _e_str: # Model doesn't support images — strip and retry print(f"[openai_chat_completions_proxy] Model {model} doesn't support images, retrying with text-only messages") @@ -3771,7 +3849,10 @@ async def openai_completions_proxy(request: Request): # Make the API call in handler scope (try/except inside async generators is unreliable) try: async_gen = await oclient.completions.create(**params) - except Exception: + except Exception as e: + if _is_backend_connection_error(e): + print(f"[ocompl] backend connection error → marking ({endpoint}, {model}) unhealthy", flush=True) + await _mark_backend_unhealthy(endpoint, model, str(e)) await decrement_usage(endpoint, tracking_model) raise