From d4b255811603634460b33badf625ed25d9bb33ff Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 26 Jan 2026 17:18:57 +0100 Subject: [PATCH] refactor: improve snapshot safety and usage tracking Create atomic snapshots by deep copying usage data structures to prevent race conditions. Protect concurrent reads of usage counts with explicit locking in endpoint selection. Replace README screenshot with a video link. --- README.md | 3 +- race_condition_fixes.md | 131 ---------------------------------------- router.py | 17 ++++-- 3 files changed, 13 insertions(+), 138 deletions(-) delete mode 100644 race_condition_fixes.md diff --git a/README.md b/README.md index 6c3f402..f892803 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ is a transparent proxy for [Ollama](https://github.com/ollama/ollama) with model deployment aware routing. -Screenshot_NOMYO_Router_0-2-2_Dashboard
+https://eu1.nomyo.ai/assets/dash.mp4 It runs between your frontend application and Ollama backend and is transparent for both, the front- and backend. @@ -73,6 +73,7 @@ docker run -d \ ``` Notes: + - `-e CONFIG_PATH` sets the `NOMYO_ROUTER_CONFIG_PATH` environment variable under the hood; you can export it directly instead if you prefer. - To override the bind address or port, export `UVICORN_HOST` or `UVICORN_PORT`, or pass the corresponding uvicorn flags after `--`, e.g. `nomyo-router --config-path /config/config.yaml -- --port 9000`. - Use `docker logs nomyo-router` to confirm the loaded endpoints and concurrency settings at startup. diff --git a/race_condition_fixes.md b/race_condition_fixes.md deleted file mode 100644 index 044372b..0000000 --- a/race_condition_fixes.md +++ /dev/null @@ -1,131 +0,0 @@ -# Race Condition Fixes in router.py - -This document summarizes all the race condition fixes that have been implemented in the router.py file. - -## Summary of Issues Found and Fixed - -### 1. Cache Access Race Conditions - -**Problem:** The `_models_cache`, `_loaded_models_cache`, and `_error_cache` dictionaries were being accessed concurrently without proper synchronization, leading to potential data corruption and inconsistent state. - -**Solution:** Added three new asyncio.Lock objects to protect cache access: -- `_models_cache_lock` - protects `_models_cache` -- `_loaded_models_cache_lock` - protects `_loaded_models_cache` -- `_error_cache_lock` - protects `_error_cache` - -**Implementation:** -- All cache reads now acquire the appropriate lock before accessing the cache -- All cache writes now acquire the appropriate lock before modifying the cache -- Cache entries are checked for freshness while holding the lock -- Stale entries are removed while holding the lock - -### 2. Timestamp Calculation Race Condition - -**Problem:** In the `token_worker()` function, the timestamp calculation was performed inside the lock, which could lead to: -- Inconsistent timestamps across multiple token entries -- Potential delays in processing due to lock contention -- Race conditions if the datetime object was modified between calculation and use - -**Solution:** Moved timestamp calculation outside the lock: -- Calculate timestamp before acquiring the buffer lock -- Pass the pre-calculated timestamp into the critical section -- This ensures consistent timestamps while minimizing lock duration - -**Implementation:** -```python -# Calculate timestamp once before acquiring lock -now = datetime.now(tz=timezone.utc) -timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) - -# Then use it inside the lock -async with buffer_lock: - time_series_buffer.append({ - 'endpoint': endpoint, - 'model': model, - 'input_tokens': prompt, - 'output_tokens': comp, - 'total_tokens': prompt + comp, - 'timestamp': timestamp # Use pre-calculated timestamp - }) -``` - -### 3. Buffer Access Race Conditions - -**Problem:** The `token_buffer` and `time_series_buffer` were accessed concurrently from multiple sources (token_worker, flush_buffer, flush_remaining_buffers) without proper synchronization. - -**Solution:** The existing `buffer_lock` was correctly used, but we ensured all access patterns properly acquire the lock. - -**Implementation:** -- All reads and writes to `token_buffer` are protected by `buffer_lock` -- All reads and writes to `time_series_buffer` are protected by `buffer_lock` -- Buffer copies are made while holding the lock, then released before DB operations - -### 4. Usage Count Race Conditions - -**Problem:** The `usage_counts` and `token_usage_counts` dictionaries were accessed concurrently without proper synchronization. - -**Solution:** The existing `usage_lock` and `token_usage_lock` were correctly used throughout the codebase. - -**Implementation:** -- All increments/decrements of `usage_counts` are protected by `usage_lock` -- All increments/decrements of `token_usage_counts` are protected by `token_usage_lock` -- The `publish_snapshot()` function acquires `usage_lock` before creating snapshots - -### 5. Subscriber Management Race Conditions - -**Problem:** The `_subscribers` set was accessed concurrently without proper synchronization. - -**Solution:** The existing `_subscribers_lock` was correctly used throughout the codebase. - -**Implementation:** -- All additions/removals from `_subscribers` are protected by `_subscribers_lock` -- The `subscribe()` function acquires the lock before adding new subscribers -- The `unsubscribe()` function acquires the lock before removing subscribers -- The `publish_snapshot()` function acquires the lock before iterating over subscribers - -## Testing Recommendations - -To verify the race condition fixes: - -1. **Concurrent Request Testing:** - - Send multiple simultaneous requests to the same endpoint/model - - Verify that usage counts remain consistent - - Verify that token counts are accurately tracked - -2. **Cache Consistency Testing:** - - Rapidly query `/api/tags` and `/api/ps` endpoints - - Verify that cached responses remain consistent - - Verify that stale cache entries are properly invalidated - -3. **Timestamp Consistency Testing:** - - Send multiple requests in quick succession - - Verify that timestamps in time_series_buffer are consistent and accurate - - Verify that timestamps are properly rounded to minute boundaries - -4. **Load Testing:** - - Simulate high load with many concurrent connections - - Verify that the system remains stable under load - - Verify that no deadlocks occur - -## Performance Considerations - -The race condition fixes introduce additional locking, which may have performance implications: - -1. **Lock Granularity:** The locks are fine-grained (one per cache type), minimizing contention. - -2. **Lock Duration:** Locks are held for minimal durations - typically just for cache reads/writes. - -3. **Concurrent Operations:** Multiple endpoints/models can be processed concurrently as long as they don't contend for the same cache entry. - -4. **Monitoring:** Consider adding metrics to track lock contention and wait times. - -## Future Improvements - -1. **Read-Write Locks:** Consider using read-write locks if read-heavy workloads are identified. - -2. **Cache Invalidation:** Implement more sophisticated cache invalidation strategies. - -3. **Lock Timeouts:** Add timeout mechanisms to prevent deadlocks. - -4. **Performance Monitoring:** Add instrumentation to track lock contention and performance. - diff --git a/router.py b/router.py index 5bb1eaa..6250d11 100644 --- a/router.py +++ b/router.py @@ -868,10 +868,14 @@ class rechunk: # SSE Helpser # ------------------------------------------------------------------ async def publish_snapshot(): + # Take a consistent snapshot while holding the lock async with usage_lock: - snapshot = orjson.dumps({"usage_counts": usage_counts, - "token_usage_counts": token_usage_counts, - }, option=orjson.OPT_SORT_KEYS).decode("utf-8") + snapshot = orjson.dumps({ + "usage_counts": dict(usage_counts), # Create a copy + "token_usage_counts": dict(token_usage_counts) + }, option=orjson.OPT_SORT_KEYS).decode("utf-8") + + # Distribute the snapshot (no lock needed here since we have a copy) async with _subscribers_lock: for q in _subscribers: # If the queue is full, drop the message to avoid back‑pressure. @@ -967,18 +971,19 @@ async def choose_endpoint(model: str) -> str: # (concurrently, but only for the filtered list) load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints] loaded_sets = await asyncio.gather(*load_tasks) - + + # Protect all reads of usage_counts with the lock async with usage_lock: # Helper: get current usage count for (endpoint, model) def current_usage(ep: str) -> int: return usage_counts.get(ep, {}).get(model, 0) - + # 3️⃣ Endpoints that have the model loaded *and* a free slot loaded_and_free = [ ep for ep, models in zip(candidate_endpoints, loaded_sets) if model in models and usage_counts.get(ep, {}).get(model, 0) < config.max_concurrent_connections ] - + if loaded_and_free: # Sort by per-model usage in DESCENDING order to ensure model affinity # Endpoints with higher usage (already handling this model) should be preferred