Compare commits
2 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 078855ba9a | |||
| 079b677e23 |
2 changed files with 83 additions and 2 deletions
|
|
@ -16,7 +16,7 @@ h11==0.16.0
|
||||||
httpcore==1.0.9
|
httpcore==1.0.9
|
||||||
httpx==0.28.1
|
httpx==0.28.1
|
||||||
idna==3.15
|
idna==3.15
|
||||||
jiter==0.15.0
|
jiter==0.14.0
|
||||||
multidict==6.7.1
|
multidict==6.7.1
|
||||||
ollama==0.6.2
|
ollama==0.6.2
|
||||||
openai==2.37.0
|
openai==2.37.0
|
||||||
|
|
|
||||||
83
router.py
83
router.py
|
|
@ -38,6 +38,14 @@ _loaded_models_cache: dict[str, tuple[Set[str], float]] = {}
|
||||||
# in one path does not poison the other.
|
# in one path does not poison the other.
|
||||||
_available_error_cache: dict[str, float] = {}
|
_available_error_cache: dict[str, float] = {}
|
||||||
_loaded_error_cache: dict[str, float] = {}
|
_loaded_error_cache: dict[str, float] = {}
|
||||||
|
# Per-(endpoint, model) completion-path failures. A llama-server in router
|
||||||
|
# mode can keep returning /v1/models 200 OK after its delegated worker for
|
||||||
|
# a specific model dies — the probe-level caches above will not catch this.
|
||||||
|
# We record signals observed during actual completion attempts so
|
||||||
|
# choose_endpoint can avoid the affected (endpoint, model) pair without
|
||||||
|
# poisoning unrelated models on the same backend.
|
||||||
|
_completion_error_cache: dict[tuple[str, str], float] = {}
|
||||||
|
_COMPLETION_ERROR_TTL = 300
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Cache locks
|
# Cache locks
|
||||||
|
|
@ -46,6 +54,7 @@ _models_cache_lock = asyncio.Lock()
|
||||||
_loaded_models_cache_lock = asyncio.Lock()
|
_loaded_models_cache_lock = asyncio.Lock()
|
||||||
_available_error_cache_lock = asyncio.Lock()
|
_available_error_cache_lock = asyncio.Lock()
|
||||||
_loaded_error_cache_lock = asyncio.Lock()
|
_loaded_error_cache_lock = asyncio.Lock()
|
||||||
|
_completion_error_cache_lock = asyncio.Lock()
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# In-flight request tracking (prevents cache stampede)
|
# In-flight request tracking (prevents cache stampede)
|
||||||
|
|
@ -618,6 +627,41 @@ def _make_openai_client(
|
||||||
return openai.AsyncOpenAI(base_url=base_url, **kwargs)
|
return openai.AsyncOpenAI(base_url=base_url, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_backend_connection_error(exc: Exception) -> bool:
|
||||||
|
"""True for upstream connection-class failures observed via the OpenAI client.
|
||||||
|
|
||||||
|
Targets the case where a llama-server in router mode keeps answering
|
||||||
|
/v1/models but its delegated worker for a specific model is dead, so
|
||||||
|
chat/completions calls return 5xx with 'proxy error: Could not establish
|
||||||
|
connection' (or the SDK raises APIConnectionError outright).
|
||||||
|
|
||||||
|
Excludes BadRequestError with exceed_context_size_error by design — those
|
||||||
|
must stay on the reactive-trim path.
|
||||||
|
"""
|
||||||
|
if isinstance(exc, openai.APIConnectionError):
|
||||||
|
return True
|
||||||
|
if isinstance(exc, openai.InternalServerError):
|
||||||
|
msg = str(exc).lower()
|
||||||
|
return (
|
||||||
|
"proxy error" in msg
|
||||||
|
or "could not establish connection" in msg
|
||||||
|
or "connection refused" in msg
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def _mark_backend_unhealthy(endpoint: str, model: str, reason: str = "") -> None:
|
||||||
|
"""Record (endpoint, model) as broken so choose_endpoint avoids it.
|
||||||
|
|
||||||
|
Cleared only by TTL — the dead-worker failure mode is invisible to the
|
||||||
|
/v1/models / /api/ps probes that clear _loaded_error_cache, so we cannot
|
||||||
|
rely on a successful probe as a recovery signal.
|
||||||
|
"""
|
||||||
|
async with _completion_error_cache_lock:
|
||||||
|
_completion_error_cache[(endpoint, model)] = time.time()
|
||||||
|
print(f"[health] marked unhealthy ep={endpoint} model={model} reason={reason[:120]}", flush=True)
|
||||||
|
|
||||||
|
|
||||||
def _is_llama_model_loaded(item: dict) -> bool:
|
def _is_llama_model_loaded(item: dict) -> bool:
|
||||||
"""Return True if a llama-server /v1/models item has status 'loaded'.
|
"""Return True if a llama-server /v1/models item has status 'loaded'.
|
||||||
Handles both dict format ({"value": "loaded"}) and plain string ("loaded").
|
Handles both dict format ({"value": "loaded"}) and plain string ("loaded").
|
||||||
|
|
@ -1887,6 +1931,27 @@ async def choose_endpoint(model: str, reserve: bool = True,
|
||||||
# original list — refusing to route is worse than retrying a
|
# original list — refusing to route is worse than retrying a
|
||||||
# possibly-recovered backend.
|
# possibly-recovered backend.
|
||||||
|
|
||||||
|
# 3️⃣.6 Exclude (endpoint, model) pairs whose completion path has recently
|
||||||
|
# failed with a backend connection error (e.g. llama-server in router mode
|
||||||
|
# whose delegated worker for *this* model died). /v1/models keeps reporting
|
||||||
|
# OK in that case, so the probe-level filter above cannot catch it.
|
||||||
|
async with _completion_error_cache_lock:
|
||||||
|
completion_broken = {
|
||||||
|
ep for (ep, m), ts in _completion_error_cache.items()
|
||||||
|
if m == model and _is_fresh(ts, _COMPLETION_ERROR_TTL)
|
||||||
|
}
|
||||||
|
if completion_broken:
|
||||||
|
filtered = [
|
||||||
|
(ep, models) for ep, models in zip(candidate_endpoints, loaded_sets)
|
||||||
|
if ep not in completion_broken
|
||||||
|
]
|
||||||
|
if filtered:
|
||||||
|
candidate_endpoints = [ep for ep, _ in filtered]
|
||||||
|
loaded_sets = [models for _, models in filtered]
|
||||||
|
# Same fallback: if every candidate is broken for this model, fall
|
||||||
|
# through and let the upstream retry — possibly the operator restarted
|
||||||
|
# the dead worker.
|
||||||
|
|
||||||
# Look up a possible affinity hint *before* taking usage_lock. The two
|
# Look up a possible affinity hint *before* taking usage_lock. The two
|
||||||
# locks are never held together to avoid lock-ordering issues.
|
# locks are never held together to avoid lock-ordering issues.
|
||||||
affine_ep: Optional[str] = None
|
affine_ep: Optional[str] = None
|
||||||
|
|
@ -2316,6 +2381,11 @@ async def chat_proxy(request: Request):
|
||||||
else:
|
else:
|
||||||
await decrement_usage(endpoint, tracking_model)
|
await decrement_usage(endpoint, tracking_model)
|
||||||
raise
|
raise
|
||||||
|
elif _is_backend_connection_error(e):
|
||||||
|
print(f"[chat_proxy] backend connection error → marking ({endpoint}, {model}) unhealthy", flush=True)
|
||||||
|
await _mark_backend_unhealthy(endpoint, model, _e_str)
|
||||||
|
await decrement_usage(endpoint, tracking_model)
|
||||||
|
raise
|
||||||
elif "image input is not supported" in _e_str:
|
elif "image input is not supported" in _e_str:
|
||||||
print(f"[chat_proxy] Model {model} doesn't support images, retrying with text-only messages")
|
print(f"[chat_proxy] Model {model} doesn't support images, retrying with text-only messages")
|
||||||
try:
|
try:
|
||||||
|
|
@ -3573,6 +3643,14 @@ async def openai_chat_completions_proxy(request: Request):
|
||||||
else:
|
else:
|
||||||
await decrement_usage(endpoint, tracking_model)
|
await decrement_usage(endpoint, tracking_model)
|
||||||
raise
|
raise
|
||||||
|
elif _is_backend_connection_error(e):
|
||||||
|
# Upstream connection failed (e.g. llama-server in router mode
|
||||||
|
# whose delegated worker died). Mark (endpoint, model) so the
|
||||||
|
# next request reroutes; the client will retry this one.
|
||||||
|
print(f"[ochat] backend connection error → marking ({endpoint}, {model}) unhealthy", flush=True)
|
||||||
|
await _mark_backend_unhealthy(endpoint, model, _e_str)
|
||||||
|
await decrement_usage(endpoint, tracking_model)
|
||||||
|
raise
|
||||||
elif "image input is not supported" in _e_str:
|
elif "image input is not supported" in _e_str:
|
||||||
# Model doesn't support images — strip and retry
|
# Model doesn't support images — strip and retry
|
||||||
print(f"[openai_chat_completions_proxy] Model {model} doesn't support images, retrying with text-only messages")
|
print(f"[openai_chat_completions_proxy] Model {model} doesn't support images, retrying with text-only messages")
|
||||||
|
|
@ -3771,7 +3849,10 @@ async def openai_completions_proxy(request: Request):
|
||||||
# Make the API call in handler scope (try/except inside async generators is unreliable)
|
# Make the API call in handler scope (try/except inside async generators is unreliable)
|
||||||
try:
|
try:
|
||||||
async_gen = await oclient.completions.create(**params)
|
async_gen = await oclient.completions.create(**params)
|
||||||
except Exception:
|
except Exception as e:
|
||||||
|
if _is_backend_connection_error(e):
|
||||||
|
print(f"[ocompl] backend connection error → marking ({endpoint}, {model}) unhealthy", flush=True)
|
||||||
|
await _mark_backend_unhealthy(endpoint, model, str(e))
|
||||||
await decrement_usage(endpoint, tracking_model)
|
await decrement_usage(endpoint, tracking_model)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue