refactor: improve snapshot safety and usage tracking

Create atomic snapshots by deep copying usage data structures to prevent race conditions.
Protect concurrent reads of usage counts with explicit locking in endpoint selection.
Replace README screenshot with a video link.
This commit is contained in:
Alpha Nerd 2026-01-26 17:18:57 +01:00
parent 3e3f0dd383
commit d4b2558116
3 changed files with 13 additions and 138 deletions

View file

@ -868,10 +868,14 @@ class rechunk:
# SSE Helpser
# ------------------------------------------------------------------
async def publish_snapshot():
# Take a consistent snapshot while holding the lock
async with usage_lock:
snapshot = orjson.dumps({"usage_counts": usage_counts,
"token_usage_counts": token_usage_counts,
}, option=orjson.OPT_SORT_KEYS).decode("utf-8")
snapshot = orjson.dumps({
"usage_counts": dict(usage_counts), # Create a copy
"token_usage_counts": dict(token_usage_counts)
}, option=orjson.OPT_SORT_KEYS).decode("utf-8")
# Distribute the snapshot (no lock needed here since we have a copy)
async with _subscribers_lock:
for q in _subscribers:
# If the queue is full, drop the message to avoid backpressure.
@ -967,18 +971,19 @@ async def choose_endpoint(model: str) -> str:
# (concurrently, but only for the filtered list)
load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints]
loaded_sets = await asyncio.gather(*load_tasks)
# Protect all reads of usage_counts with the lock
async with usage_lock:
# Helper: get current usage count for (endpoint, model)
def current_usage(ep: str) -> int:
return usage_counts.get(ep, {}).get(model, 0)
# 3⃣ Endpoints that have the model loaded *and* a free slot
loaded_and_free = [
ep for ep, models in zip(candidate_endpoints, loaded_sets)
if model in models and usage_counts.get(ep, {}).get(model, 0) < config.max_concurrent_connections
]
if loaded_and_free:
# Sort by per-model usage in DESCENDING order to ensure model affinity
# Endpoints with higher usage (already handling this model) should be preferred