diff --git a/README.md b/README.md index 6c3f402..f892803 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ is a transparent proxy for [Ollama](https://github.com/ollama/ollama) with model deployment aware routing. -Screenshot_NOMYO_Router_0-2-2_Dashboard
+https://eu1.nomyo.ai/assets/dash.mp4 It runs between your frontend application and Ollama backend and is transparent for both, the front- and backend. @@ -73,6 +73,7 @@ docker run -d \ ``` Notes: + - `-e CONFIG_PATH` sets the `NOMYO_ROUTER_CONFIG_PATH` environment variable under the hood; you can export it directly instead if you prefer. - To override the bind address or port, export `UVICORN_HOST` or `UVICORN_PORT`, or pass the corresponding uvicorn flags after `--`, e.g. `nomyo-router --config-path /config/config.yaml -- --port 9000`. - Use `docker logs nomyo-router` to confirm the loaded endpoints and concurrency settings at startup. diff --git a/race_condition_fixes.md b/race_condition_fixes.md deleted file mode 100644 index 044372b..0000000 --- a/race_condition_fixes.md +++ /dev/null @@ -1,131 +0,0 @@ -# Race Condition Fixes in router.py - -This document summarizes all the race condition fixes that have been implemented in the router.py file. - -## Summary of Issues Found and Fixed - -### 1. Cache Access Race Conditions - -**Problem:** The `_models_cache`, `_loaded_models_cache`, and `_error_cache` dictionaries were being accessed concurrently without proper synchronization, leading to potential data corruption and inconsistent state. - -**Solution:** Added three new asyncio.Lock objects to protect cache access: -- `_models_cache_lock` - protects `_models_cache` -- `_loaded_models_cache_lock` - protects `_loaded_models_cache` -- `_error_cache_lock` - protects `_error_cache` - -**Implementation:** -- All cache reads now acquire the appropriate lock before accessing the cache -- All cache writes now acquire the appropriate lock before modifying the cache -- Cache entries are checked for freshness while holding the lock -- Stale entries are removed while holding the lock - -### 2. Timestamp Calculation Race Condition - -**Problem:** In the `token_worker()` function, the timestamp calculation was performed inside the lock, which could lead to: -- Inconsistent timestamps across multiple token entries -- Potential delays in processing due to lock contention -- Race conditions if the datetime object was modified between calculation and use - -**Solution:** Moved timestamp calculation outside the lock: -- Calculate timestamp before acquiring the buffer lock -- Pass the pre-calculated timestamp into the critical section -- This ensures consistent timestamps while minimizing lock duration - -**Implementation:** -```python -# Calculate timestamp once before acquiring lock -now = datetime.now(tz=timezone.utc) -timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) - -# Then use it inside the lock -async with buffer_lock: - time_series_buffer.append({ - 'endpoint': endpoint, - 'model': model, - 'input_tokens': prompt, - 'output_tokens': comp, - 'total_tokens': prompt + comp, - 'timestamp': timestamp # Use pre-calculated timestamp - }) -``` - -### 3. Buffer Access Race Conditions - -**Problem:** The `token_buffer` and `time_series_buffer` were accessed concurrently from multiple sources (token_worker, flush_buffer, flush_remaining_buffers) without proper synchronization. - -**Solution:** The existing `buffer_lock` was correctly used, but we ensured all access patterns properly acquire the lock. - -**Implementation:** -- All reads and writes to `token_buffer` are protected by `buffer_lock` -- All reads and writes to `time_series_buffer` are protected by `buffer_lock` -- Buffer copies are made while holding the lock, then released before DB operations - -### 4. Usage Count Race Conditions - -**Problem:** The `usage_counts` and `token_usage_counts` dictionaries were accessed concurrently without proper synchronization. - -**Solution:** The existing `usage_lock` and `token_usage_lock` were correctly used throughout the codebase. - -**Implementation:** -- All increments/decrements of `usage_counts` are protected by `usage_lock` -- All increments/decrements of `token_usage_counts` are protected by `token_usage_lock` -- The `publish_snapshot()` function acquires `usage_lock` before creating snapshots - -### 5. Subscriber Management Race Conditions - -**Problem:** The `_subscribers` set was accessed concurrently without proper synchronization. - -**Solution:** The existing `_subscribers_lock` was correctly used throughout the codebase. - -**Implementation:** -- All additions/removals from `_subscribers` are protected by `_subscribers_lock` -- The `subscribe()` function acquires the lock before adding new subscribers -- The `unsubscribe()` function acquires the lock before removing subscribers -- The `publish_snapshot()` function acquires the lock before iterating over subscribers - -## Testing Recommendations - -To verify the race condition fixes: - -1. **Concurrent Request Testing:** - - Send multiple simultaneous requests to the same endpoint/model - - Verify that usage counts remain consistent - - Verify that token counts are accurately tracked - -2. **Cache Consistency Testing:** - - Rapidly query `/api/tags` and `/api/ps` endpoints - - Verify that cached responses remain consistent - - Verify that stale cache entries are properly invalidated - -3. **Timestamp Consistency Testing:** - - Send multiple requests in quick succession - - Verify that timestamps in time_series_buffer are consistent and accurate - - Verify that timestamps are properly rounded to minute boundaries - -4. **Load Testing:** - - Simulate high load with many concurrent connections - - Verify that the system remains stable under load - - Verify that no deadlocks occur - -## Performance Considerations - -The race condition fixes introduce additional locking, which may have performance implications: - -1. **Lock Granularity:** The locks are fine-grained (one per cache type), minimizing contention. - -2. **Lock Duration:** Locks are held for minimal durations - typically just for cache reads/writes. - -3. **Concurrent Operations:** Multiple endpoints/models can be processed concurrently as long as they don't contend for the same cache entry. - -4. **Monitoring:** Consider adding metrics to track lock contention and wait times. - -## Future Improvements - -1. **Read-Write Locks:** Consider using read-write locks if read-heavy workloads are identified. - -2. **Cache Invalidation:** Implement more sophisticated cache invalidation strategies. - -3. **Lock Timeouts:** Add timeout mechanisms to prevent deadlocks. - -4. **Performance Monitoring:** Add instrumentation to track lock contention and performance. - diff --git a/router.py b/router.py index 5bb1eaa..6250d11 100644 --- a/router.py +++ b/router.py @@ -868,10 +868,14 @@ class rechunk: # SSE Helpser # ------------------------------------------------------------------ async def publish_snapshot(): + # Take a consistent snapshot while holding the lock async with usage_lock: - snapshot = orjson.dumps({"usage_counts": usage_counts, - "token_usage_counts": token_usage_counts, - }, option=orjson.OPT_SORT_KEYS).decode("utf-8") + snapshot = orjson.dumps({ + "usage_counts": dict(usage_counts), # Create a copy + "token_usage_counts": dict(token_usage_counts) + }, option=orjson.OPT_SORT_KEYS).decode("utf-8") + + # Distribute the snapshot (no lock needed here since we have a copy) async with _subscribers_lock: for q in _subscribers: # If the queue is full, drop the message to avoid back‑pressure. @@ -967,18 +971,19 @@ async def choose_endpoint(model: str) -> str: # (concurrently, but only for the filtered list) load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints] loaded_sets = await asyncio.gather(*load_tasks) - + + # Protect all reads of usage_counts with the lock async with usage_lock: # Helper: get current usage count for (endpoint, model) def current_usage(ep: str) -> int: return usage_counts.get(ep, {}).get(model, 0) - + # 3️⃣ Endpoints that have the model loaded *and* a free slot loaded_and_free = [ ep for ep, models in zip(candidate_endpoints, loaded_sets) if model in models and usage_counts.get(ep, {}).get(model, 0) < config.max_concurrent_connections ] - + if loaded_and_free: # Sort by per-model usage in DESCENDING order to ensure model affinity # Endpoints with higher usage (already handling this model) should be preferred