feat: enhance load balancing #23
This commit is contained in:
parent
3a0ccbef7b
commit
5797615736
2 changed files with 70 additions and 19 deletions
15
config.yaml
15
config.yaml
|
|
@ -9,8 +9,23 @@ llama_server_endpoints:
|
|||
- http://192.168.0.50:8889/v1
|
||||
|
||||
# Maximum concurrent connections *per endpoint‑model pair* (equals to OLLAMA_NUM_PARALLEL)
|
||||
# This is the global default; individual endpoints can override it via endpoint_config below.
|
||||
max_concurrent_connections: 2
|
||||
|
||||
# Per-endpoint overrides (optional). Any field not listed falls back to the global default.
|
||||
# endpoint_config:
|
||||
# "http://192.168.0.50:11434":
|
||||
# max_concurrent_connections: 3
|
||||
# "http://192.168.0.51:11434":
|
||||
# max_concurrent_connections: 1
|
||||
|
||||
# Priority / WRR routing (optional, default: false).
|
||||
# When true, requests are routed by utilization ratio (usage/max_concurrent_connections)
|
||||
# and the config order of endpoints acts as the tiebreaker — the first endpoint listed
|
||||
# is preferred when two endpoints are equally loaded.
|
||||
# When false (default), equally-idle endpoints are chosen at random.
|
||||
# priority_routing: true
|
||||
|
||||
# Optional router-level API key that gates router/API/web UI access (leave empty to disable)
|
||||
nomyo-router-api-key: ""
|
||||
|
||||
|
|
|
|||
74
router.py
74
router.py
|
|
@ -218,6 +218,10 @@ class Config(BaseSettings):
|
|||
llama_server_endpoints: List[str] = Field(default_factory=list)
|
||||
# Max concurrent connections per endpoint‑model pair, see OLLAMA_NUM_PARALLEL
|
||||
max_concurrent_connections: int = 1
|
||||
# Per-endpoint overrides: {endpoint_url: {max_concurrent_connections: N}}
|
||||
endpoint_config: Dict[str, Dict] = Field(default_factory=dict)
|
||||
# When True, config order = priority; routes by utilization ratio + config index (WRR)
|
||||
priority_routing: bool = Field(default=False)
|
||||
|
||||
api_keys: Dict[str, str] = Field(default_factory=dict)
|
||||
# Optional router-level API key used to gate access to this service and dashboard
|
||||
|
|
@ -1697,6 +1701,12 @@ async def get_usage_counts() -> Dict:
|
|||
# -------------------------------------------------------------
|
||||
# 5. Endpoint selection logic (respecting the configurable limit)
|
||||
# -------------------------------------------------------------
|
||||
def get_max_connections(ep: str) -> int:
|
||||
"""Per-endpoint max_concurrent_connections, falling back to the global value."""
|
||||
return config.endpoint_config.get(ep, {}).get(
|
||||
"max_concurrent_connections", config.max_concurrent_connections
|
||||
)
|
||||
|
||||
async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]:
|
||||
"""
|
||||
Determine which endpoint to use for the given model while respecting
|
||||
|
|
@ -1767,41 +1777,65 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]:
|
|||
def tracking_usage(ep: str) -> int:
|
||||
return usage_counts.get(ep, {}).get(get_tracking_model(ep, model), 0)
|
||||
|
||||
def utilization_ratio(ep: str) -> float:
|
||||
return tracking_usage(ep) / get_max_connections(ep)
|
||||
|
||||
# Priority map: position in all_endpoints list (lower = higher priority)
|
||||
ep_priority = {ep: i for i, ep in enumerate(all_endpoints)}
|
||||
|
||||
# 3️⃣ Endpoints that have the model loaded *and* a free slot
|
||||
loaded_and_free = [
|
||||
ep for ep, models in zip(candidate_endpoints, loaded_sets)
|
||||
if model in models and tracking_usage(ep) < config.max_concurrent_connections
|
||||
if model in models and tracking_usage(ep) < get_max_connections(ep)
|
||||
]
|
||||
|
||||
if loaded_and_free:
|
||||
# Sort ascending for load balancing — all endpoints here already have the
|
||||
# model loaded, so there is no model-switching cost to optimise for.
|
||||
loaded_and_free.sort(key=tracking_usage)
|
||||
# When all candidates are equally idle, randomise to avoid always picking
|
||||
# the first entry in a stable sort.
|
||||
if all(tracking_usage(ep) == 0 for ep in loaded_and_free):
|
||||
selected = random.choice(loaded_and_free)
|
||||
else:
|
||||
if config.priority_routing:
|
||||
# WRR: sort by config order first (stable), then by utilization ratio.
|
||||
# Stable sort preserves priority for equal-ratio endpoints.
|
||||
loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999))
|
||||
loaded_and_free.sort(key=utilization_ratio)
|
||||
selected = loaded_and_free[0]
|
||||
else:
|
||||
# Sort ascending for load balancing — all endpoints here already have the
|
||||
# model loaded, so there is no model-switching cost to optimise for.
|
||||
loaded_and_free.sort(key=tracking_usage)
|
||||
# When all candidates are equally idle, randomise to avoid always picking
|
||||
# the first entry in a stable sort.
|
||||
if all(tracking_usage(ep) == 0 for ep in loaded_and_free):
|
||||
selected = random.choice(loaded_and_free)
|
||||
else:
|
||||
selected = loaded_and_free[0]
|
||||
else:
|
||||
# 4️⃣ Endpoints among the candidates that simply have a free slot
|
||||
endpoints_with_free_slot = [
|
||||
ep for ep in candidate_endpoints
|
||||
if tracking_usage(ep) < config.max_concurrent_connections
|
||||
if tracking_usage(ep) < get_max_connections(ep)
|
||||
]
|
||||
|
||||
if endpoints_with_free_slot:
|
||||
# Sort by total endpoint load (ascending) to prefer idle endpoints.
|
||||
endpoints_with_free_slot.sort(
|
||||
key=lambda ep: sum(usage_counts.get(ep, {}).values())
|
||||
)
|
||||
if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot):
|
||||
selected = random.choice(endpoints_with_free_slot)
|
||||
else:
|
||||
if config.priority_routing:
|
||||
endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999))
|
||||
endpoints_with_free_slot.sort(key=utilization_ratio)
|
||||
selected = endpoints_with_free_slot[0]
|
||||
else:
|
||||
# Sort by total endpoint load (ascending) to prefer idle endpoints.
|
||||
endpoints_with_free_slot.sort(
|
||||
key=lambda ep: sum(usage_counts.get(ep, {}).values())
|
||||
)
|
||||
if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot):
|
||||
selected = random.choice(endpoints_with_free_slot)
|
||||
else:
|
||||
selected = endpoints_with_free_slot[0]
|
||||
else:
|
||||
# 5️⃣ All candidate endpoints are saturated – pick the least-busy one (will queue)
|
||||
selected = min(candidate_endpoints, key=tracking_usage)
|
||||
if config.priority_routing:
|
||||
selected = min(
|
||||
candidate_endpoints,
|
||||
key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)),
|
||||
)
|
||||
else:
|
||||
selected = min(candidate_endpoints, key=tracking_usage)
|
||||
|
||||
tracking_model = get_tracking_model(selected, model)
|
||||
snapshot = None
|
||||
|
|
@ -3902,7 +3936,9 @@ async def startup_event() -> None:
|
|||
f"Loaded configuration from {config_path}:\n"
|
||||
f" endpoints={config.endpoints},\n"
|
||||
f" llama_server_endpoints={config.llama_server_endpoints},\n"
|
||||
f" max_concurrent_connections={config.max_concurrent_connections}"
|
||||
f" max_concurrent_connections={config.max_concurrent_connections},\n"
|
||||
f" endpoint_config={config.endpoint_config},\n"
|
||||
f" priority_routing={config.priority_routing}"
|
||||
)
|
||||
else:
|
||||
print(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue