295 lines
13 KiB
Python
295 lines
13 KiB
Python
|
|
"""Endpoint selection (load-balancing + conversation affinity).
|
|||
|
|
|
|||
|
|
``choose_endpoint`` is the heart of routing — it picks an endpoint that
|
|||
|
|
advertises the model, prefers ones with the model already loaded and a free
|
|||
|
|
slot, applies the conversation-affinity hint when available, and honors
|
|||
|
|
config-order priority routing when ``priority_routing`` is set.
|
|||
|
|
|
|||
|
|
``increment_usage`` / ``decrement_usage`` keep the per-(endpoint, model)
|
|||
|
|
counter that drives utilization-based selection; they fan out an SSE
|
|||
|
|
snapshot on every change.
|
|||
|
|
"""
|
|||
|
|
import asyncio
|
|||
|
|
import random
|
|||
|
|
import time
|
|||
|
|
from typing import Optional
|
|||
|
|
|
|||
|
|
from config import get_config
|
|||
|
|
from state import (
|
|||
|
|
usage_counts,
|
|||
|
|
usage_lock,
|
|||
|
|
_loaded_error_cache,
|
|||
|
|
_loaded_error_cache_lock,
|
|||
|
|
_completion_error_cache,
|
|||
|
|
_completion_error_cache_lock,
|
|||
|
|
_COMPLETION_ERROR_TTL,
|
|||
|
|
_affinity_map,
|
|||
|
|
_affinity_lock,
|
|||
|
|
_AFFINITY_MAX_ENTRIES,
|
|||
|
|
)
|
|||
|
|
from sse import _capture_snapshot, _distribute_snapshot
|
|||
|
|
from backends.health import _is_fresh
|
|||
|
|
from backends.normalize import (
|
|||
|
|
is_ext_openai_endpoint,
|
|||
|
|
is_openai_compatible,
|
|||
|
|
get_tracking_model,
|
|||
|
|
)
|
|||
|
|
from backends.probe import fetch
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def increment_usage(endpoint: str, model: str) -> None:
|
|||
|
|
async with usage_lock:
|
|||
|
|
usage_counts[endpoint][model] += 1
|
|||
|
|
snapshot = _capture_snapshot()
|
|||
|
|
await _distribute_snapshot(snapshot)
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def decrement_usage(endpoint: str, model: str) -> None:
|
|||
|
|
async with usage_lock:
|
|||
|
|
# Avoid negative counts
|
|||
|
|
current = usage_counts[endpoint].get(model, 0)
|
|||
|
|
if current > 0:
|
|||
|
|
usage_counts[endpoint][model] = current - 1
|
|||
|
|
# Optionally, clean up zero entries
|
|||
|
|
if usage_counts[endpoint].get(model, 0) == 0:
|
|||
|
|
usage_counts[endpoint].pop(model, None)
|
|||
|
|
#if not usage_counts[endpoint]:
|
|||
|
|
# usage_counts.pop(endpoint, None)
|
|||
|
|
snapshot = _capture_snapshot()
|
|||
|
|
await _distribute_snapshot(snapshot)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_max_connections(ep: str) -> int:
|
|||
|
|
"""Per-endpoint max_concurrent_connections, falling back to the global value."""
|
|||
|
|
cfg = get_config()
|
|||
|
|
return cfg.endpoint_config.get(ep, {}).get(
|
|||
|
|
"max_concurrent_connections", cfg.max_concurrent_connections
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def choose_endpoint(model: str, reserve: bool = True,
|
|||
|
|
affinity_key: Optional[str] = None) -> tuple[str, str]:
|
|||
|
|
"""
|
|||
|
|
Determine which endpoint to use for the given model while respecting
|
|||
|
|
the `max_concurrent_connections` per endpoint‑model pair **and**
|
|||
|
|
ensuring that the chosen endpoint actually *advertises* the model.
|
|||
|
|
|
|||
|
|
The selection algorithm:
|
|||
|
|
|
|||
|
|
1️⃣ Query every endpoint for its advertised models (`/api/tags`).
|
|||
|
|
2️⃣ Build a list of endpoints that contain the requested model.
|
|||
|
|
2️⃣.5 If conversation affinity is enabled and the caller passes
|
|||
|
|
``affinity_key``, prefer the endpoint that previously served the
|
|||
|
|
same conversation — but only when it still has the model loaded
|
|||
|
|
and a free slot. Otherwise fall through to the standard logic.
|
|||
|
|
3️⃣ For those endpoints, find those that have the model loaded
|
|||
|
|
(`/api/ps`) *and* still have a free slot.
|
|||
|
|
4️⃣ If none are both loaded and free, fall back to any endpoint
|
|||
|
|
from the filtered list that simply has a free slot and randomly
|
|||
|
|
select one.
|
|||
|
|
5️⃣ If all are saturated, pick any endpoint from the filtered list
|
|||
|
|
(the request will queue on that endpoint).
|
|||
|
|
6️⃣ If no endpoint advertises the model at all, raise an error.
|
|||
|
|
"""
|
|||
|
|
config = get_config()
|
|||
|
|
# 1️⃣ Gather advertised‑model sets for all endpoints concurrently
|
|||
|
|
# Include both config.endpoints and config.llama_server_endpoints
|
|||
|
|
llama_eps_extra = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints]
|
|||
|
|
all_endpoints = config.endpoints + llama_eps_extra
|
|||
|
|
|
|||
|
|
tag_tasks = [fetch.available_models(ep) for ep in config.endpoints if not is_openai_compatible(ep)]
|
|||
|
|
tag_tasks += [fetch.available_models(ep, config.api_keys.get(ep)) for ep in config.endpoints if is_openai_compatible(ep)]
|
|||
|
|
tag_tasks += [fetch.available_models(ep, config.api_keys.get(ep)) for ep in llama_eps_extra]
|
|||
|
|
advertised_sets = await asyncio.gather(*tag_tasks)
|
|||
|
|
|
|||
|
|
# 2️⃣ Filter endpoints that advertise the requested model
|
|||
|
|
candidate_endpoints = [
|
|||
|
|
ep for ep, models in zip(all_endpoints, advertised_sets)
|
|||
|
|
if model in models
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 6️⃣
|
|||
|
|
if not candidate_endpoints:
|
|||
|
|
if ":latest" in model: #ollama naming convention not applicable to openai/llama-server
|
|||
|
|
model_without_latest = model.split(":latest")[0]
|
|||
|
|
candidate_endpoints = [
|
|||
|
|
ep for ep, models in zip(all_endpoints, advertised_sets)
|
|||
|
|
if model_without_latest in models and (is_ext_openai_endpoint(ep) or ep in config.llama_server_endpoints)
|
|||
|
|
]
|
|||
|
|
if not candidate_endpoints:
|
|||
|
|
# Only add :latest suffix if model doesn't already have a version suffix
|
|||
|
|
if ":" not in model:
|
|||
|
|
model = model + ":latest"
|
|||
|
|
candidate_endpoints = [
|
|||
|
|
ep for ep, models in zip(all_endpoints, advertised_sets)
|
|||
|
|
if model in models
|
|||
|
|
]
|
|||
|
|
if not candidate_endpoints:
|
|||
|
|
raise RuntimeError(
|
|||
|
|
f"None of the configured endpoints ({', '.join(all_endpoints)}) "
|
|||
|
|
f"advertise the model '{model}'."
|
|||
|
|
)
|
|||
|
|
# 3️⃣ Among the candidates, find those that have the model *loaded*
|
|||
|
|
# (concurrently, but only for the filtered list)
|
|||
|
|
load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints]
|
|||
|
|
loaded_sets = await asyncio.gather(*load_tasks)
|
|||
|
|
|
|||
|
|
# 3️⃣.5 Exclude endpoints whose loaded-model probe has been failing
|
|||
|
|
# recently. Without this filter, an endpoint where `/api/ps` returns 5xx
|
|||
|
|
# would appear with an empty loaded set but pass through to the
|
|||
|
|
# free-slot fallback (step 4) — sending completion calls to an
|
|||
|
|
# unhealthy backend. See issue #83.
|
|||
|
|
async with _loaded_error_cache_lock:
|
|||
|
|
unhealthy = {
|
|||
|
|
ep for ep, ts in _loaded_error_cache.items()
|
|||
|
|
if _is_fresh(ts, 300)
|
|||
|
|
}
|
|||
|
|
if unhealthy:
|
|||
|
|
filtered = [
|
|||
|
|
(ep, models) for ep, models in zip(candidate_endpoints, loaded_sets)
|
|||
|
|
if ep not in unhealthy
|
|||
|
|
]
|
|||
|
|
if filtered:
|
|||
|
|
candidate_endpoints = [ep for ep, _ in filtered]
|
|||
|
|
loaded_sets = [models for _, models in filtered]
|
|||
|
|
# If *every* candidate is unhealthy we still fall through with the
|
|||
|
|
# original list — refusing to route is worse than retrying a
|
|||
|
|
# possibly-recovered backend.
|
|||
|
|
|
|||
|
|
# 3️⃣.6 Exclude (endpoint, model) pairs whose completion path has recently
|
|||
|
|
# failed with a backend connection error (e.g. llama-server in router mode
|
|||
|
|
# whose delegated worker for *this* model died). /v1/models keeps reporting
|
|||
|
|
# OK in that case, so the probe-level filter above cannot catch it.
|
|||
|
|
async with _completion_error_cache_lock:
|
|||
|
|
completion_broken = {
|
|||
|
|
ep for (ep, m), ts in _completion_error_cache.items()
|
|||
|
|
if m == model and _is_fresh(ts, _COMPLETION_ERROR_TTL)
|
|||
|
|
}
|
|||
|
|
if completion_broken:
|
|||
|
|
filtered = [
|
|||
|
|
(ep, models) for ep, models in zip(candidate_endpoints, loaded_sets)
|
|||
|
|
if ep not in completion_broken
|
|||
|
|
]
|
|||
|
|
if filtered:
|
|||
|
|
candidate_endpoints = [ep for ep, _ in filtered]
|
|||
|
|
loaded_sets = [models for _, models in filtered]
|
|||
|
|
# Same fallback: if every candidate is broken for this model, fall
|
|||
|
|
# through and let the upstream retry — possibly the operator restarted
|
|||
|
|
# the dead worker.
|
|||
|
|
|
|||
|
|
# Look up a possible affinity hint *before* taking usage_lock. The two
|
|||
|
|
# locks are never held together to avoid lock-ordering issues.
|
|||
|
|
affine_ep: Optional[str] = None
|
|||
|
|
if config.conversation_affinity and affinity_key:
|
|||
|
|
async with _affinity_lock:
|
|||
|
|
entry = _affinity_map.get(affinity_key)
|
|||
|
|
if entry is not None:
|
|||
|
|
ep, _stored_model, expires_at = entry
|
|||
|
|
if expires_at < time.monotonic():
|
|||
|
|
_affinity_map.pop(affinity_key, None)
|
|||
|
|
else:
|
|||
|
|
affine_ep = ep
|
|||
|
|
|
|||
|
|
# Protect all reads/writes of usage_counts with the lock so that selection
|
|||
|
|
# and reservation are atomic — concurrent callers see each other's pending load.
|
|||
|
|
async with usage_lock:
|
|||
|
|
# Helper: current usage for (endpoint, model) using the same normalized key
|
|||
|
|
# that increment_usage/decrement_usage store — raw model names differ from
|
|||
|
|
# tracking names for llama-server (HF prefix / quant suffix stripped).
|
|||
|
|
def tracking_usage(ep: str) -> int:
|
|||
|
|
return usage_counts.get(ep, {}).get(get_tracking_model(ep, model), 0)
|
|||
|
|
|
|||
|
|
def utilization_ratio(ep: str) -> float:
|
|||
|
|
return tracking_usage(ep) / get_max_connections(ep)
|
|||
|
|
|
|||
|
|
# Priority map: position in all_endpoints list (lower = higher priority)
|
|||
|
|
ep_priority = {ep: i for i, ep in enumerate(all_endpoints)}
|
|||
|
|
|
|||
|
|
selected: Optional[str] = None
|
|||
|
|
|
|||
|
|
# 2️⃣.5 Conversation affinity preference — only honour the hint when
|
|||
|
|
# the affine endpoint still advertises the model loaded *and* has a
|
|||
|
|
# free slot. Otherwise fall back to the standard algorithm.
|
|||
|
|
if affine_ep:
|
|||
|
|
ep_loaded = {
|
|||
|
|
ep: set(models)
|
|||
|
|
for ep, models in zip(candidate_endpoints, loaded_sets)
|
|||
|
|
}
|
|||
|
|
if (affine_ep in candidate_endpoints
|
|||
|
|
and model in ep_loaded.get(affine_ep, set())
|
|||
|
|
and tracking_usage(affine_ep) < get_max_connections(affine_ep)):
|
|||
|
|
selected = affine_ep
|
|||
|
|
|
|||
|
|
if selected is None:
|
|||
|
|
# 3️⃣ Endpoints that have the model loaded *and* a free slot
|
|||
|
|
loaded_and_free = [
|
|||
|
|
ep for ep, models in zip(candidate_endpoints, loaded_sets)
|
|||
|
|
if model in models and tracking_usage(ep) < get_max_connections(ep)
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
if loaded_and_free:
|
|||
|
|
if config.priority_routing:
|
|||
|
|
# WRR: sort by config order first (stable), then by utilization ratio.
|
|||
|
|
# Stable sort preserves priority for equal-ratio endpoints.
|
|||
|
|
loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999))
|
|||
|
|
loaded_and_free.sort(key=utilization_ratio)
|
|||
|
|
selected = loaded_and_free[0]
|
|||
|
|
else:
|
|||
|
|
# Sort ascending for load balancing — all endpoints here already have the
|
|||
|
|
# model loaded, so there is no model-switching cost to optimise for.
|
|||
|
|
loaded_and_free.sort(key=tracking_usage)
|
|||
|
|
# When all candidates are equally idle, randomise to avoid always picking
|
|||
|
|
# the first entry in a stable sort.
|
|||
|
|
if all(tracking_usage(ep) == 0 for ep in loaded_and_free):
|
|||
|
|
selected = random.choice(loaded_and_free)
|
|||
|
|
else:
|
|||
|
|
selected = loaded_and_free[0]
|
|||
|
|
else:
|
|||
|
|
# 4️⃣ Endpoints among the candidates that simply have a free slot
|
|||
|
|
endpoints_with_free_slot = [
|
|||
|
|
ep for ep in candidate_endpoints
|
|||
|
|
if tracking_usage(ep) < get_max_connections(ep)
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
if endpoints_with_free_slot:
|
|||
|
|
if config.priority_routing:
|
|||
|
|
endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999))
|
|||
|
|
endpoints_with_free_slot.sort(key=utilization_ratio)
|
|||
|
|
selected = endpoints_with_free_slot[0]
|
|||
|
|
else:
|
|||
|
|
# Sort by total endpoint load (ascending) to prefer idle endpoints.
|
|||
|
|
endpoints_with_free_slot.sort(
|
|||
|
|
key=lambda ep: sum(usage_counts.get(ep, {}).values())
|
|||
|
|
)
|
|||
|
|
if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot):
|
|||
|
|
selected = random.choice(endpoints_with_free_slot)
|
|||
|
|
else:
|
|||
|
|
selected = endpoints_with_free_slot[0]
|
|||
|
|
else:
|
|||
|
|
# 5️⃣ All candidate endpoints are saturated – pick the least-busy one (will queue)
|
|||
|
|
if config.priority_routing:
|
|||
|
|
selected = min(
|
|||
|
|
candidate_endpoints,
|
|||
|
|
key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)),
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
selected = min(candidate_endpoints, key=tracking_usage)
|
|||
|
|
|
|||
|
|
tracking_model = get_tracking_model(selected, model)
|
|||
|
|
snapshot = None
|
|||
|
|
if reserve:
|
|||
|
|
usage_counts[selected][tracking_model] += 1
|
|||
|
|
snapshot = _capture_snapshot()
|
|||
|
|
if snapshot is not None:
|
|||
|
|
await _distribute_snapshot(snapshot)
|
|||
|
|
# Record / refresh affinity *after* releasing usage_lock.
|
|||
|
|
if reserve and config.conversation_affinity and affinity_key:
|
|||
|
|
expires_at = time.monotonic() + config.conversation_affinity_ttl
|
|||
|
|
async with _affinity_lock:
|
|||
|
|
_affinity_map[affinity_key] = (selected, model, expires_at)
|
|||
|
|
if len(_affinity_map) > _AFFINITY_MAX_ENTRIES:
|
|||
|
|
now = time.monotonic()
|
|||
|
|
for k in [k for k, v in _affinity_map.items() if v[2] < now]:
|
|||
|
|
_affinity_map.pop(k, None)
|
|||
|
|
return selected, tracking_model
|