From 57976157365908321b517ebb9c8b12a876a44cdc Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Wed, 22 Apr 2026 17:27:34 +0200 Subject: [PATCH] feat: enhance load balancing #23 --- config.yaml | 15 +++++++++++ router.py | 74 +++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/config.yaml b/config.yaml index 4d7a5e4..76fbbe1 100644 --- a/config.yaml +++ b/config.yaml @@ -9,8 +9,23 @@ llama_server_endpoints: - http://192.168.0.50:8889/v1 # Maximum concurrent connections *per endpoint‑model pair* (equals to OLLAMA_NUM_PARALLEL) +# This is the global default; individual endpoints can override it via endpoint_config below. max_concurrent_connections: 2 +# Per-endpoint overrides (optional). Any field not listed falls back to the global default. +# endpoint_config: +# "http://192.168.0.50:11434": +# max_concurrent_connections: 3 +# "http://192.168.0.51:11434": +# max_concurrent_connections: 1 + +# Priority / WRR routing (optional, default: false). +# When true, requests are routed by utilization ratio (usage/max_concurrent_connections) +# and the config order of endpoints acts as the tiebreaker — the first endpoint listed +# is preferred when two endpoints are equally loaded. +# When false (default), equally-idle endpoints are chosen at random. +# priority_routing: true + # Optional router-level API key that gates router/API/web UI access (leave empty to disable) nomyo-router-api-key: "" diff --git a/router.py b/router.py index 5b4a3ed..995eb78 100644 --- a/router.py +++ b/router.py @@ -218,6 +218,10 @@ class Config(BaseSettings): llama_server_endpoints: List[str] = Field(default_factory=list) # Max concurrent connections per endpoint‑model pair, see OLLAMA_NUM_PARALLEL max_concurrent_connections: int = 1 + # Per-endpoint overrides: {endpoint_url: {max_concurrent_connections: N}} + endpoint_config: Dict[str, Dict] = Field(default_factory=dict) + # When True, config order = priority; routes by utilization ratio + config index (WRR) + priority_routing: bool = Field(default=False) api_keys: Dict[str, str] = Field(default_factory=dict) # Optional router-level API key used to gate access to this service and dashboard @@ -1697,6 +1701,12 @@ async def get_usage_counts() -> Dict: # ------------------------------------------------------------- # 5. Endpoint selection logic (respecting the configurable limit) # ------------------------------------------------------------- +def get_max_connections(ep: str) -> int: + """Per-endpoint max_concurrent_connections, falling back to the global value.""" + return config.endpoint_config.get(ep, {}).get( + "max_concurrent_connections", config.max_concurrent_connections + ) + async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: """ Determine which endpoint to use for the given model while respecting @@ -1767,41 +1777,65 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: def tracking_usage(ep: str) -> int: return usage_counts.get(ep, {}).get(get_tracking_model(ep, model), 0) + def utilization_ratio(ep: str) -> float: + return tracking_usage(ep) / get_max_connections(ep) + + # Priority map: position in all_endpoints list (lower = higher priority) + ep_priority = {ep: i for i, ep in enumerate(all_endpoints)} + # 3️⃣ Endpoints that have the model loaded *and* a free slot loaded_and_free = [ ep for ep, models in zip(candidate_endpoints, loaded_sets) - if model in models and tracking_usage(ep) < config.max_concurrent_connections + if model in models and tracking_usage(ep) < get_max_connections(ep) ] if loaded_and_free: - # Sort ascending for load balancing — all endpoints here already have the - # model loaded, so there is no model-switching cost to optimise for. - loaded_and_free.sort(key=tracking_usage) - # When all candidates are equally idle, randomise to avoid always picking - # the first entry in a stable sort. - if all(tracking_usage(ep) == 0 for ep in loaded_and_free): - selected = random.choice(loaded_and_free) - else: + if config.priority_routing: + # WRR: sort by config order first (stable), then by utilization ratio. + # Stable sort preserves priority for equal-ratio endpoints. + loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999)) + loaded_and_free.sort(key=utilization_ratio) selected = loaded_and_free[0] + else: + # Sort ascending for load balancing — all endpoints here already have the + # model loaded, so there is no model-switching cost to optimise for. + loaded_and_free.sort(key=tracking_usage) + # When all candidates are equally idle, randomise to avoid always picking + # the first entry in a stable sort. + if all(tracking_usage(ep) == 0 for ep in loaded_and_free): + selected = random.choice(loaded_and_free) + else: + selected = loaded_and_free[0] else: # 4️⃣ Endpoints among the candidates that simply have a free slot endpoints_with_free_slot = [ ep for ep in candidate_endpoints - if tracking_usage(ep) < config.max_concurrent_connections + if tracking_usage(ep) < get_max_connections(ep) ] if endpoints_with_free_slot: - # Sort by total endpoint load (ascending) to prefer idle endpoints. - endpoints_with_free_slot.sort( - key=lambda ep: sum(usage_counts.get(ep, {}).values()) - ) - if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot): - selected = random.choice(endpoints_with_free_slot) - else: + if config.priority_routing: + endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999)) + endpoints_with_free_slot.sort(key=utilization_ratio) selected = endpoints_with_free_slot[0] + else: + # Sort by total endpoint load (ascending) to prefer idle endpoints. + endpoints_with_free_slot.sort( + key=lambda ep: sum(usage_counts.get(ep, {}).values()) + ) + if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot): + selected = random.choice(endpoints_with_free_slot) + else: + selected = endpoints_with_free_slot[0] else: # 5️⃣ All candidate endpoints are saturated – pick the least-busy one (will queue) - selected = min(candidate_endpoints, key=tracking_usage) + if config.priority_routing: + selected = min( + candidate_endpoints, + key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)), + ) + else: + selected = min(candidate_endpoints, key=tracking_usage) tracking_model = get_tracking_model(selected, model) snapshot = None @@ -3902,7 +3936,9 @@ async def startup_event() -> None: f"Loaded configuration from {config_path}:\n" f" endpoints={config.endpoints},\n" f" llama_server_endpoints={config.llama_server_endpoints},\n" - f" max_concurrent_connections={config.max_concurrent_connections}" + f" max_concurrent_connections={config.max_concurrent_connections},\n" + f" endpoint_config={config.endpoint_config},\n" + f" priority_routing={config.priority_routing}" ) else: print(