From 067cdf641a49218df64b0c96f429071b3dc5c88c Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Fri, 16 Jan 2026 16:47:24 +0100 Subject: [PATCH 1/9] feat: add timestamp index and improve cache concurrency - Added index on token_time_series timestamp for faster queries - Introduced cache locks to prevent race conditions --- db.py | 1 + race_condition_fixes.md | 131 ++++++++++++++++++++++++++++++++++++++++ router.py | 94 +++++++++++++++++----------- 3 files changed, 191 insertions(+), 35 deletions(-) create mode 100644 race_condition_fixes.md diff --git a/db.py b/db.py index 24c8480..9f4efd3 100644 --- a/db.py +++ b/db.py @@ -50,6 +50,7 @@ class TokenDatabase: PRIMARY KEY(endpoint, model) ) ''') + await db.execute('CREATE INDEX IF NOT EXISTS idx_token_time_series_timestamp ON token_time_series(timestamp)') await db.execute(''' CREATE TABLE IF NOT EXISTS token_time_series ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/race_condition_fixes.md b/race_condition_fixes.md new file mode 100644 index 0000000..044372b --- /dev/null +++ b/race_condition_fixes.md @@ -0,0 +1,131 @@ +# Race Condition Fixes in router.py + +This document summarizes all the race condition fixes that have been implemented in the router.py file. + +## Summary of Issues Found and Fixed + +### 1. Cache Access Race Conditions + +**Problem:** The `_models_cache`, `_loaded_models_cache`, and `_error_cache` dictionaries were being accessed concurrently without proper synchronization, leading to potential data corruption and inconsistent state. + +**Solution:** Added three new asyncio.Lock objects to protect cache access: +- `_models_cache_lock` - protects `_models_cache` +- `_loaded_models_cache_lock` - protects `_loaded_models_cache` +- `_error_cache_lock` - protects `_error_cache` + +**Implementation:** +- All cache reads now acquire the appropriate lock before accessing the cache +- All cache writes now acquire the appropriate lock before modifying the cache +- Cache entries are checked for freshness while holding the lock +- Stale entries are removed while holding the lock + +### 2. Timestamp Calculation Race Condition + +**Problem:** In the `token_worker()` function, the timestamp calculation was performed inside the lock, which could lead to: +- Inconsistent timestamps across multiple token entries +- Potential delays in processing due to lock contention +- Race conditions if the datetime object was modified between calculation and use + +**Solution:** Moved timestamp calculation outside the lock: +- Calculate timestamp before acquiring the buffer lock +- Pass the pre-calculated timestamp into the critical section +- This ensures consistent timestamps while minimizing lock duration + +**Implementation:** +```python +# Calculate timestamp once before acquiring lock +now = datetime.now(tz=timezone.utc) +timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) + +# Then use it inside the lock +async with buffer_lock: + time_series_buffer.append({ + 'endpoint': endpoint, + 'model': model, + 'input_tokens': prompt, + 'output_tokens': comp, + 'total_tokens': prompt + comp, + 'timestamp': timestamp # Use pre-calculated timestamp + }) +``` + +### 3. Buffer Access Race Conditions + +**Problem:** The `token_buffer` and `time_series_buffer` were accessed concurrently from multiple sources (token_worker, flush_buffer, flush_remaining_buffers) without proper synchronization. + +**Solution:** The existing `buffer_lock` was correctly used, but we ensured all access patterns properly acquire the lock. + +**Implementation:** +- All reads and writes to `token_buffer` are protected by `buffer_lock` +- All reads and writes to `time_series_buffer` are protected by `buffer_lock` +- Buffer copies are made while holding the lock, then released before DB operations + +### 4. Usage Count Race Conditions + +**Problem:** The `usage_counts` and `token_usage_counts` dictionaries were accessed concurrently without proper synchronization. + +**Solution:** The existing `usage_lock` and `token_usage_lock` were correctly used throughout the codebase. + +**Implementation:** +- All increments/decrements of `usage_counts` are protected by `usage_lock` +- All increments/decrements of `token_usage_counts` are protected by `token_usage_lock` +- The `publish_snapshot()` function acquires `usage_lock` before creating snapshots + +### 5. Subscriber Management Race Conditions + +**Problem:** The `_subscribers` set was accessed concurrently without proper synchronization. + +**Solution:** The existing `_subscribers_lock` was correctly used throughout the codebase. + +**Implementation:** +- All additions/removals from `_subscribers` are protected by `_subscribers_lock` +- The `subscribe()` function acquires the lock before adding new subscribers +- The `unsubscribe()` function acquires the lock before removing subscribers +- The `publish_snapshot()` function acquires the lock before iterating over subscribers + +## Testing Recommendations + +To verify the race condition fixes: + +1. **Concurrent Request Testing:** + - Send multiple simultaneous requests to the same endpoint/model + - Verify that usage counts remain consistent + - Verify that token counts are accurately tracked + +2. **Cache Consistency Testing:** + - Rapidly query `/api/tags` and `/api/ps` endpoints + - Verify that cached responses remain consistent + - Verify that stale cache entries are properly invalidated + +3. **Timestamp Consistency Testing:** + - Send multiple requests in quick succession + - Verify that timestamps in time_series_buffer are consistent and accurate + - Verify that timestamps are properly rounded to minute boundaries + +4. **Load Testing:** + - Simulate high load with many concurrent connections + - Verify that the system remains stable under load + - Verify that no deadlocks occur + +## Performance Considerations + +The race condition fixes introduce additional locking, which may have performance implications: + +1. **Lock Granularity:** The locks are fine-grained (one per cache type), minimizing contention. + +2. **Lock Duration:** Locks are held for minimal durations - typically just for cache reads/writes. + +3. **Concurrent Operations:** Multiple endpoints/models can be processed concurrently as long as they don't contend for the same cache entry. + +4. **Monitoring:** Consider adding metrics to track lock contention and wait times. + +## Future Improvements + +1. **Read-Write Locks:** Consider using read-write locks if read-heavy workloads are identified. + +2. **Cache Invalidation:** Implement more sophisticated cache invalidation strategies. + +3. **Lock Timeouts:** Add timeout mechanisms to prevent deadlocks. + +4. **Performance Monitoring:** Add instrumentation to track lock contention and performance. + diff --git a/router.py b/router.py index cb5af1f..9922c41 100644 --- a/router.py +++ b/router.py @@ -34,6 +34,13 @@ _loaded_models_cache: dict[str, tuple[Set[str], float]] = {} # timeout expires, after which the endpoint will be queried again. _error_cache: dict[str, float] = {} +# ------------------------------------------------------------------ +# Cache locks +# ------------------------------------------------------------------ +_models_cache_lock = asyncio.Lock() +_loaded_models_cache_lock = asyncio.Lock() +_error_cache_lock = asyncio.Lock() + # ------------------------------------------------------------------ # Queues # ------------------------------------------------------------------ @@ -226,6 +233,10 @@ async def token_worker() -> None: try: while True: endpoint, model, prompt, comp = await token_queue.get() + # Calculate timestamp once before acquiring lock + now = datetime.now(tz=timezone.utc) + timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) + # Accumulate counts in memory buffer (protected by lock) async with buffer_lock: token_buffer[endpoint][model] = ( @@ -234,8 +245,6 @@ async def token_worker() -> None: ) # Add to time series buffer with timestamp (UTC) - now = datetime.now(tz=timezone.utc) - timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) time_series_buffer.append({ 'endpoint': endpoint, 'model': model, @@ -256,13 +265,15 @@ async def token_worker() -> None: while not token_queue.empty(): try: endpoint, model, prompt, comp = token_queue.get_nowait() + # Calculate timestamp once before acquiring lock + now = datetime.now(tz=timezone.utc) + timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) + async with buffer_lock: token_buffer[endpoint][model] = ( token_buffer[endpoint].get(model, (0, 0))[0] + prompt, token_buffer[endpoint].get(model, (0, 0))[1] + comp ) - now = datetime.now(tz=timezone.utc) - timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) time_series_buffer.append({ 'endpoint': endpoint, 'model': model, @@ -378,19 +389,21 @@ class fetch: if api_key is not None: headers = {"Authorization": "Bearer " + api_key} - if endpoint in _models_cache: - models, cached_at = _models_cache[endpoint] - if _is_fresh(cached_at, 300): - return models - else: - # stale entry – drop it + # Check models cache with lock protection + async with _models_cache_lock: + if endpoint in _models_cache: + models, cached_at = _models_cache[endpoint] + if _is_fresh(cached_at, 300): + return models + # Stale entry - remove it del _models_cache[endpoint] - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): - # Still within the short error TTL – pretend nothing is available - return set() - else: + # Check error cache with lock protection + async with _error_cache_lock: + if endpoint in _error_cache: + if _is_fresh(_error_cache[endpoint], 10): + # Still within the short error TTL – pretend nothing is available + return set() # Error expired – remove it del _error_cache[endpoint] @@ -408,19 +421,22 @@ class fetch: items = data.get(key, []) models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")} - - if models: - _models_cache[endpoint] = (models, time.time()) - return models - else: - # Empty list – treat as “no models”, but still cache for 300s - _models_cache[endpoint] = (models, time.time()) - return models + + # Update cache with lock protection + async with _models_cache_lock: + if models: + _models_cache[endpoint] = (models, time.time()) + else: + # Empty list – treat as "no models", but still cache for 300s + _models_cache[endpoint] = (models, time.time()) + return models except Exception as e: # Treat any error as if the endpoint offers no models message = _format_connection_issue(endpoint_url, e) print(f"[fetch.available_models] {message}") - _error_cache[endpoint] = time.time() + # Update error cache with lock protection + async with _error_cache_lock: + _error_cache[endpoint] = time.time() return set() @@ -432,19 +448,24 @@ class fetch: """ if is_ext_openai_endpoint(endpoint): return set() - if endpoint in _loaded_models_cache: - models, cached_at = _loaded_models_cache[endpoint] - if _is_fresh(cached_at, 30): - return models - else: - # stale entry – drop it + + # Check loaded models cache with lock protection + async with _loaded_models_cache_lock: + if endpoint in _loaded_models_cache: + models, cached_at = _loaded_models_cache[endpoint] + if _is_fresh(cached_at, 30): + return models + # Stale entry - remove it del _loaded_models_cache[endpoint] - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): - return set() - else: + # Check error cache with lock protection + async with _error_cache_lock: + if endpoint in _error_cache: + if _is_fresh(_error_cache[endpoint], 10): + return set() + # Error expired - remove it del _error_cache[endpoint] + client: aiohttp.ClientSession = app_state["session"] try: async with client.get(f"{endpoint}/api/ps") as resp: @@ -453,7 +474,10 @@ class fetch: # The response format is: # {"models": [{"name": "model1"}, {"name": "model2"}]} models = {m.get("name") for m in data.get("models", []) if m.get("name")} - _loaded_models_cache[endpoint] = (models, time.time()) + + # Update cache with lock protection + async with _loaded_models_cache_lock: + _loaded_models_cache[endpoint] = (models, time.time()) return models except Exception as e: # If anything goes wrong we simply assume the endpoint has no models From 5ad5bfe66ef25095a21274978a7a142cd1e319c4 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sun, 18 Jan 2026 09:31:53 +0100 Subject: [PATCH 2/9] feat: endpoint selection more consistent and understandable --- router.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/router.py b/router.py index 9922c41..0a2ba57 100644 --- a/router.py +++ b/router.py @@ -980,8 +980,15 @@ async def choose_endpoint(model: str) -> str: ] if loaded_and_free: - ep = min(loaded_and_free, key=current_usage) - return ep + # Sort by total endpoint usage first (prefer idle endpoints) + # Then by per-model usage (balance load for this specific model) + loaded_and_free.sort( + key=lambda ep: ( + sum(usage_counts.get(ep, {}).values()), # Primary: total endpoint usage + usage_counts.get(ep, {}).get(model, 0) # Secondary: per-model usage + ) + ) + return loaded_and_free[0] # 4️⃣ Endpoints among the candidates that simply have a free slot endpoints_with_free_slot = [ @@ -990,8 +997,14 @@ async def choose_endpoint(model: str) -> str: ] if endpoints_with_free_slot: - #return random.choice(endpoints_with_free_slot) - endpoints_with_free_slot.sort(key=lambda ep: sum(usage_counts.get(ep, {}).values())) + # Sort by total endpoint usage first (prefer idle endpoints) + # Then by per-model usage (balance load for this specific model) + endpoints_with_free_slot.sort( + key=lambda ep: ( + sum(usage_counts.get(ep, {}).values()), # Primary: total endpoint usage + usage_counts.get(ep, {}).get(model, 0) # Secondary: per-model usage + ) + ) return endpoints_with_free_slot[0] # 5️⃣ All candidate endpoints are saturated – pick one with lowest usages count (will queue) From 3e3f0dd383b4e6273fbaf7387565521f8f300cb8 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 19 Jan 2026 14:21:08 +0100 Subject: [PATCH 3/9] fix: endpoint selection logic --- router.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/router.py b/router.py index 0a2ba57..5bb1eaa 100644 --- a/router.py +++ b/router.py @@ -980,13 +980,11 @@ async def choose_endpoint(model: str) -> str: ] if loaded_and_free: - # Sort by total endpoint usage first (prefer idle endpoints) - # Then by per-model usage (balance load for this specific model) + # Sort by per-model usage in DESCENDING order to ensure model affinity + # Endpoints with higher usage (already handling this model) should be preferred + # until they reach max_concurrent_connections loaded_and_free.sort( - key=lambda ep: ( - sum(usage_counts.get(ep, {}).values()), # Primary: total endpoint usage - usage_counts.get(ep, {}).get(model, 0) # Secondary: per-model usage - ) + key=lambda ep: -usage_counts.get(ep, {}).get(model, 0) # Negative for descending order ) return loaded_and_free[0] @@ -997,12 +995,14 @@ async def choose_endpoint(model: str) -> str: ] if endpoints_with_free_slot: - # Sort by total endpoint usage first (prefer idle endpoints) - # Then by per-model usage (balance load for this specific model) + # Sort by per-model usage (descending) first to ensure model affinity + # Even if the model isn't showing as "loaded" in /api/ps yet (e.g., during initial loading), + # we want to send subsequent requests to the endpoint that already has connections for this model + # Then by total endpoint usage (ascending) to balance idle endpoints endpoints_with_free_slot.sort( key=lambda ep: ( - sum(usage_counts.get(ep, {}).values()), # Primary: total endpoint usage - usage_counts.get(ep, {}).get(model, 0) # Secondary: per-model usage + -usage_counts.get(ep, {}).get(model, 0), # Primary: per-model usage (descending - prefer endpoints with connections) + sum(usage_counts.get(ep, {}).values()) # Secondary: total endpoint usage (ascending - prefer idle endpoints) ) ) return endpoints_with_free_slot[0] From d4b255811603634460b33badf625ed25d9bb33ff Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 26 Jan 2026 17:18:57 +0100 Subject: [PATCH 4/9] refactor: improve snapshot safety and usage tracking Create atomic snapshots by deep copying usage data structures to prevent race conditions. Protect concurrent reads of usage counts with explicit locking in endpoint selection. Replace README screenshot with a video link. --- README.md | 3 +- race_condition_fixes.md | 131 ---------------------------------------- router.py | 17 ++++-- 3 files changed, 13 insertions(+), 138 deletions(-) delete mode 100644 race_condition_fixes.md diff --git a/README.md b/README.md index 6c3f402..f892803 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ is a transparent proxy for [Ollama](https://github.com/ollama/ollama) with model deployment aware routing. -Screenshot_NOMYO_Router_0-2-2_Dashboard
+https://eu1.nomyo.ai/assets/dash.mp4 It runs between your frontend application and Ollama backend and is transparent for both, the front- and backend. @@ -73,6 +73,7 @@ docker run -d \ ``` Notes: + - `-e CONFIG_PATH` sets the `NOMYO_ROUTER_CONFIG_PATH` environment variable under the hood; you can export it directly instead if you prefer. - To override the bind address or port, export `UVICORN_HOST` or `UVICORN_PORT`, or pass the corresponding uvicorn flags after `--`, e.g. `nomyo-router --config-path /config/config.yaml -- --port 9000`. - Use `docker logs nomyo-router` to confirm the loaded endpoints and concurrency settings at startup. diff --git a/race_condition_fixes.md b/race_condition_fixes.md deleted file mode 100644 index 044372b..0000000 --- a/race_condition_fixes.md +++ /dev/null @@ -1,131 +0,0 @@ -# Race Condition Fixes in router.py - -This document summarizes all the race condition fixes that have been implemented in the router.py file. - -## Summary of Issues Found and Fixed - -### 1. Cache Access Race Conditions - -**Problem:** The `_models_cache`, `_loaded_models_cache`, and `_error_cache` dictionaries were being accessed concurrently without proper synchronization, leading to potential data corruption and inconsistent state. - -**Solution:** Added three new asyncio.Lock objects to protect cache access: -- `_models_cache_lock` - protects `_models_cache` -- `_loaded_models_cache_lock` - protects `_loaded_models_cache` -- `_error_cache_lock` - protects `_error_cache` - -**Implementation:** -- All cache reads now acquire the appropriate lock before accessing the cache -- All cache writes now acquire the appropriate lock before modifying the cache -- Cache entries are checked for freshness while holding the lock -- Stale entries are removed while holding the lock - -### 2. Timestamp Calculation Race Condition - -**Problem:** In the `token_worker()` function, the timestamp calculation was performed inside the lock, which could lead to: -- Inconsistent timestamps across multiple token entries -- Potential delays in processing due to lock contention -- Race conditions if the datetime object was modified between calculation and use - -**Solution:** Moved timestamp calculation outside the lock: -- Calculate timestamp before acquiring the buffer lock -- Pass the pre-calculated timestamp into the critical section -- This ensures consistent timestamps while minimizing lock duration - -**Implementation:** -```python -# Calculate timestamp once before acquiring lock -now = datetime.now(tz=timezone.utc) -timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) - -# Then use it inside the lock -async with buffer_lock: - time_series_buffer.append({ - 'endpoint': endpoint, - 'model': model, - 'input_tokens': prompt, - 'output_tokens': comp, - 'total_tokens': prompt + comp, - 'timestamp': timestamp # Use pre-calculated timestamp - }) -``` - -### 3. Buffer Access Race Conditions - -**Problem:** The `token_buffer` and `time_series_buffer` were accessed concurrently from multiple sources (token_worker, flush_buffer, flush_remaining_buffers) without proper synchronization. - -**Solution:** The existing `buffer_lock` was correctly used, but we ensured all access patterns properly acquire the lock. - -**Implementation:** -- All reads and writes to `token_buffer` are protected by `buffer_lock` -- All reads and writes to `time_series_buffer` are protected by `buffer_lock` -- Buffer copies are made while holding the lock, then released before DB operations - -### 4. Usage Count Race Conditions - -**Problem:** The `usage_counts` and `token_usage_counts` dictionaries were accessed concurrently without proper synchronization. - -**Solution:** The existing `usage_lock` and `token_usage_lock` were correctly used throughout the codebase. - -**Implementation:** -- All increments/decrements of `usage_counts` are protected by `usage_lock` -- All increments/decrements of `token_usage_counts` are protected by `token_usage_lock` -- The `publish_snapshot()` function acquires `usage_lock` before creating snapshots - -### 5. Subscriber Management Race Conditions - -**Problem:** The `_subscribers` set was accessed concurrently without proper synchronization. - -**Solution:** The existing `_subscribers_lock` was correctly used throughout the codebase. - -**Implementation:** -- All additions/removals from `_subscribers` are protected by `_subscribers_lock` -- The `subscribe()` function acquires the lock before adding new subscribers -- The `unsubscribe()` function acquires the lock before removing subscribers -- The `publish_snapshot()` function acquires the lock before iterating over subscribers - -## Testing Recommendations - -To verify the race condition fixes: - -1. **Concurrent Request Testing:** - - Send multiple simultaneous requests to the same endpoint/model - - Verify that usage counts remain consistent - - Verify that token counts are accurately tracked - -2. **Cache Consistency Testing:** - - Rapidly query `/api/tags` and `/api/ps` endpoints - - Verify that cached responses remain consistent - - Verify that stale cache entries are properly invalidated - -3. **Timestamp Consistency Testing:** - - Send multiple requests in quick succession - - Verify that timestamps in time_series_buffer are consistent and accurate - - Verify that timestamps are properly rounded to minute boundaries - -4. **Load Testing:** - - Simulate high load with many concurrent connections - - Verify that the system remains stable under load - - Verify that no deadlocks occur - -## Performance Considerations - -The race condition fixes introduce additional locking, which may have performance implications: - -1. **Lock Granularity:** The locks are fine-grained (one per cache type), minimizing contention. - -2. **Lock Duration:** Locks are held for minimal durations - typically just for cache reads/writes. - -3. **Concurrent Operations:** Multiple endpoints/models can be processed concurrently as long as they don't contend for the same cache entry. - -4. **Monitoring:** Consider adding metrics to track lock contention and wait times. - -## Future Improvements - -1. **Read-Write Locks:** Consider using read-write locks if read-heavy workloads are identified. - -2. **Cache Invalidation:** Implement more sophisticated cache invalidation strategies. - -3. **Lock Timeouts:** Add timeout mechanisms to prevent deadlocks. - -4. **Performance Monitoring:** Add instrumentation to track lock contention and performance. - diff --git a/router.py b/router.py index 5bb1eaa..6250d11 100644 --- a/router.py +++ b/router.py @@ -868,10 +868,14 @@ class rechunk: # SSE Helpser # ------------------------------------------------------------------ async def publish_snapshot(): + # Take a consistent snapshot while holding the lock async with usage_lock: - snapshot = orjson.dumps({"usage_counts": usage_counts, - "token_usage_counts": token_usage_counts, - }, option=orjson.OPT_SORT_KEYS).decode("utf-8") + snapshot = orjson.dumps({ + "usage_counts": dict(usage_counts), # Create a copy + "token_usage_counts": dict(token_usage_counts) + }, option=orjson.OPT_SORT_KEYS).decode("utf-8") + + # Distribute the snapshot (no lock needed here since we have a copy) async with _subscribers_lock: for q in _subscribers: # If the queue is full, drop the message to avoid back‑pressure. @@ -967,18 +971,19 @@ async def choose_endpoint(model: str) -> str: # (concurrently, but only for the filtered list) load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints] loaded_sets = await asyncio.gather(*load_tasks) - + + # Protect all reads of usage_counts with the lock async with usage_lock: # Helper: get current usage count for (endpoint, model) def current_usage(ep: str) -> int: return usage_counts.get(ep, {}).get(model, 0) - + # 3️⃣ Endpoints that have the model loaded *and* a free slot loaded_and_free = [ ep for ep, models in zip(candidate_endpoints, loaded_sets) if model in models and usage_counts.get(ep, {}).get(model, 0) < config.max_concurrent_connections ] - + if loaded_and_free: # Sort by per-model usage in DESCENDING order to ensure model affinity # Endpoints with higher usage (already handling this model) should be preferred From ff402ba0bb629170b35f100d7e62028528bd9c70 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 26 Jan 2026 18:34:30 +0100 Subject: [PATCH 5/9] Update video link to clickable thumbnail Replace static video link with a clickable thumbnail. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f892803..8723d89 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ is a transparent proxy for [Ollama](https://github.com/ollama/ollama) with model deployment aware routing. -https://eu1.nomyo.ai/assets/dash.mp4 +[![Click for video](https://github.com/user-attachments/assets/ddacdf88-e3f3-41dd-8be6-f165b22d9879)](https://eu1.nomyo.ai/assets/dash.mp4) It runs between your frontend application and Ollama backend and is transparent for both, the front- and backend. From a1276e3de874ac86a6a5493305cbf410dde9d5ce Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 29 Jan 2026 10:32:59 +0100 Subject: [PATCH 6/9] fix: correct indentation for publish_snapshot calls in usage functions This fix ensures that the snapshot publishing happens within the usage lock context, maintaining proper synchronization of usage counts. --- router.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/router.py b/router.py index 6250d11..a990cb7 100644 --- a/router.py +++ b/router.py @@ -532,7 +532,7 @@ def dedupe_on_keys(dicts, key_fields): async def increment_usage(endpoint: str, model: str) -> None: async with usage_lock: usage_counts[endpoint][model] += 1 - await publish_snapshot() + await publish_snapshot() async def decrement_usage(endpoint: str, model: str) -> None: async with usage_lock: @@ -545,7 +545,7 @@ async def decrement_usage(endpoint: str, model: str) -> None: usage_counts[endpoint].pop(model, None) #if not usage_counts[endpoint]: # usage_counts.pop(endpoint, None) - await publish_snapshot() + await publish_snapshot() async def _make_chat_request(endpoint: str, model: str, messages: list, tools=None, stream: bool = False, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse: """ From 4ca1a5667e6589f5d4c8924ee9d833a7a3422b92 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 29 Jan 2026 18:00:33 +0100 Subject: [PATCH 7/9] feat(router): implement in-flight request tracking to prevent cache stampede in high concurrency scenarios Added in-flight request tracking mechanism to prevent cache stampede when multiple concurrent requests arrive for the same endpoint. This introduces new dictionaries to track ongoing requests and a lock to coordinate access. The available_models method was refactored to use an internal helper function and includes request coalescing logic to ensure only one HTTP request is made per endpoint when cache entries expire. The loaded_models method was also updated to use the new caching and coalescing pattern. --- router.py | 231 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 166 insertions(+), 65 deletions(-) diff --git a/router.py b/router.py index a990cb7..829b16a 100644 --- a/router.py +++ b/router.py @@ -41,6 +41,13 @@ _models_cache_lock = asyncio.Lock() _loaded_models_cache_lock = asyncio.Lock() _error_cache_lock = asyncio.Lock() +# ------------------------------------------------------------------ +# In-flight request tracking (prevents cache stampede) +# ------------------------------------------------------------------ +_inflight_available_models: dict[str, asyncio.Task] = {} +_inflight_loaded_models: dict[str, asyncio.Task] = {} +_inflight_lock = asyncio.Lock() + # ------------------------------------------------------------------ # Queues # ------------------------------------------------------------------ @@ -375,6 +382,44 @@ async def flush_remaining_buffers() -> None: print(f"[shutdown] Error flushing remaining buffers: {e}") class fetch: + async def _fetch_available_models_internal(endpoint: str, api_key: Optional[str] = None) -> Set[str]: + """ + Internal function that performs the actual HTTP request to fetch available models. + This is called by available_models() after checking caches and in-flight requests. + """ + headers = None + if api_key is not None: + headers = {"Authorization": "Bearer " + api_key} + + if "/v1" in endpoint: + endpoint_url = f"{endpoint}/models" + key = "data" + else: + endpoint_url = f"{endpoint}/api/tags" + key = "models" + + client: aiohttp.ClientSession = app_state["session"] + try: + async with client.get(endpoint_url, headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() + + items = data.get(key, []) + models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")} + + # Update cache with lock protection + async with _models_cache_lock: + _models_cache[endpoint] = (models, time.time()) + return models + except Exception as e: + # Treat any error as if the endpoint offers no models + message = _format_connection_issue(endpoint_url, e) + print(f"[fetch.available_models] {message}") + # Update error cache with lock protection + async with _error_cache_lock: + _error_cache[endpoint] = time.time() + return set() + async def available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: """ Query /api/tags and return a set of all model names that the @@ -382,13 +427,12 @@ class fetch: every model that is installed on the Ollama instance, regardless of whether the model is currently loaded into memory. + Uses request coalescing to prevent cache stampede: if multiple requests + arrive when cache is expired, only one actual HTTP request is made. + If the request fails (e.g. timeout, 5xx, or malformed response), an empty set is returned. """ - headers = None - if api_key is not None: - headers = {"Authorization": "Bearer " + api_key} - # Check models cache with lock protection async with _models_cache_lock: if endpoint in _models_cache: @@ -407,65 +451,32 @@ class fetch: # Error expired – remove it del _error_cache[endpoint] - if "/v1" in endpoint: - endpoint_url = f"{endpoint}/models" - key = "data" - else: - endpoint_url = f"{endpoint}/api/tags" - key = "models" - client: aiohttp.ClientSession = app_state["session"] + # Request coalescing: check if another request is already fetching this endpoint + async with _inflight_lock: + if endpoint in _inflight_available_models: + # Another request is already fetching - wait for it + task = _inflight_available_models[endpoint] + else: + # Create new fetch task + task = asyncio.create_task(fetch._fetch_available_models_internal(endpoint, api_key)) + _inflight_available_models[endpoint] = task + try: - async with client.get(endpoint_url, headers=headers) as resp: - await _ensure_success(resp) - data = await resp.json() - - items = data.get(key, []) - models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")} - - # Update cache with lock protection - async with _models_cache_lock: - if models: - _models_cache[endpoint] = (models, time.time()) - else: - # Empty list – treat as "no models", but still cache for 300s - _models_cache[endpoint] = (models, time.time()) - return models - except Exception as e: - # Treat any error as if the endpoint offers no models - message = _format_connection_issue(endpoint_url, e) - print(f"[fetch.available_models] {message}") - # Update error cache with lock protection - async with _error_cache_lock: - _error_cache[endpoint] = time.time() - return set() + # Wait for the fetch to complete (either ours or another request's) + result = await task + return result + finally: + # Clean up in-flight tracking (only if we created it) + async with _inflight_lock: + if _inflight_available_models.get(endpoint) == task: + _inflight_available_models.pop(endpoint, None) - async def loaded_models(endpoint: str) -> Set[str]: + async def _fetch_loaded_models_internal(endpoint: str) -> Set[str]: """ - Query /api/ps and return a set of model names that are currently - loaded on that endpoint. If the request fails (e.g. timeout, 5xx), an empty - set is returned. + Internal function that performs the actual HTTP request to fetch loaded models. + This is called by loaded_models() after checking caches and in-flight requests. """ - if is_ext_openai_endpoint(endpoint): - return set() - - # Check loaded models cache with lock protection - async with _loaded_models_cache_lock: - if endpoint in _loaded_models_cache: - models, cached_at = _loaded_models_cache[endpoint] - if _is_fresh(cached_at, 30): - return models - # Stale entry - remove it - del _loaded_models_cache[endpoint] - - # Check error cache with lock protection - async with _error_cache_lock: - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): - return set() - # Error expired - remove it - del _error_cache[endpoint] - client: aiohttp.ClientSession = app_state["session"] try: async with client.get(f"{endpoint}/api/ps") as resp: @@ -485,6 +496,75 @@ class fetch: print(f"[fetch.loaded_models] {message}") return set() + async def _refresh_loaded_models(endpoint: str) -> None: + """ + Background task to refresh loaded models cache without blocking the caller. + Used for stale-while-revalidate pattern. + """ + try: + await fetch._fetch_loaded_models_internal(endpoint) + except Exception as e: + # Silently fail - cache will remain stale but functional + print(f"[fetch._refresh_loaded_models] Background refresh failed for {endpoint}: {e}") + + async def loaded_models(endpoint: str) -> Set[str]: + """ + Query /api/ps and return a set of model names that are currently + loaded on that endpoint. If the request fails (e.g. timeout, 5xx), an empty + set is returned. + + Uses request coalescing to prevent cache stampede and stale-while-revalidate + to serve requests immediately even when cache is stale (refreshing in background). + """ + if is_ext_openai_endpoint(endpoint): + return set() + + # Check loaded models cache with lock protection + async with _loaded_models_cache_lock: + if endpoint in _loaded_models_cache: + models, cached_at = _loaded_models_cache[endpoint] + + # FRESH: < 10s old - return immediately + if _is_fresh(cached_at, 10): + return models + + # STALE: 10-60s old - return stale data and refresh in background + if _is_fresh(cached_at, 60): + # Kick off background refresh (fire-and-forget) + asyncio.create_task(fetch._refresh_loaded_models(endpoint)) + return models # Return stale data immediately + + # EXPIRED: > 60s old - too stale, must refresh synchronously + del _loaded_models_cache[endpoint] + + # Check error cache with lock protection + async with _error_cache_lock: + if endpoint in _error_cache: + if _is_fresh(_error_cache[endpoint], 10): + return set() + # Error expired - remove it + del _error_cache[endpoint] + + # Request coalescing: check if another request is already fetching this endpoint + async with _inflight_lock: + if endpoint in _inflight_loaded_models: + # Another request is already fetching - wait for it + task = _inflight_loaded_models[endpoint] + else: + # Create new fetch task + task = asyncio.create_task(fetch._fetch_loaded_models_internal(endpoint)) + _inflight_loaded_models[endpoint] = task + + try: + # Wait for the fetch to complete (either ours or another request's) + result = await task + return result + finally: + # Clean up in-flight tracking (only if we created it) + async with _inflight_lock: + if _inflight_loaded_models.get(endpoint) == task: + _inflight_loaded_models.pop(endpoint, None) + async def endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]: """ Query / to fetch and return a List of dicts with details @@ -868,12 +948,12 @@ class rechunk: # SSE Helpser # ------------------------------------------------------------------ async def publish_snapshot(): - # Take a consistent snapshot while holding the lock - async with usage_lock: - snapshot = orjson.dumps({ - "usage_counts": dict(usage_counts), # Create a copy - "token_usage_counts": dict(token_usage_counts) - }, option=orjson.OPT_SORT_KEYS).decode("utf-8") + # NOTE: This function assumes usage_lock OR token_usage_lock is already held by the caller + # Create a snapshot without acquiring the lock (caller must hold it) + snapshot = orjson.dumps({ + "usage_counts": dict(usage_counts), # Create a copy + "token_usage_counts": dict(token_usage_counts) + }, option=orjson.OPT_SORT_KEYS).decode("utf-8") # Distribute the snapshot (no lock needed here since we have a copy) async with _subscribers_lock: @@ -991,6 +1071,12 @@ async def choose_endpoint(model: str) -> str: loaded_and_free.sort( key=lambda ep: -usage_counts.get(ep, {}).get(model, 0) # Negative for descending order ) + + # If all endpoints have zero usage for this model, randomize to distribute + # different models across different endpoints for better resource utilization + if all(usage_counts.get(ep, {}).get(model, 0) == 0 for ep in loaded_and_free): + return random.choice(loaded_and_free) + return loaded_and_free[0] # 4️⃣ Endpoints among the candidates that simply have a free slot @@ -1010,6 +1096,12 @@ async def choose_endpoint(model: str) -> str: sum(usage_counts.get(ep, {}).values()) # Secondary: total endpoint usage (ascending - prefer idle endpoints) ) ) + + # If all endpoints have zero usage for this specific model, randomize to distribute + # different models across different endpoints for better resource utilization + if all(usage_counts.get(ep, {}).get(model, 0) == 0 for ep in endpoints_with_free_slot): + return random.choice(endpoints_with_free_slot) + return endpoints_with_free_slot[0] # 5️⃣ All candidate endpoints are saturated – pick one with lowest usages count (will queue) @@ -1974,7 +2066,16 @@ async def openai_chat_completions_proxy(request: Request): async def stream_ochat_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - async_gen = await oclient.chat.completions.create(**params) + try: + async_gen = await oclient.chat.completions.create(**params) + except openai.BadRequestError as e: + # If tools are not supported by the model, retry without tools + if "does not support tools" in str(e): + print(f"[openai_chat_completions_proxy] Model {model} doesn't support tools, retrying without tools") + params_without_tools = {k: v for k, v in params.items() if k != "tools"} + async_gen = await oclient.chat.completions.create(**params_without_tools) + else: + raise if stream == True: async for chunk in async_gen: data = ( From 0ebfa7c5194c881f335d25510024b73a3d4eb306 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 29 Jan 2026 18:12:05 +0100 Subject: [PATCH 8/9] security: bump orjson to >=3.11.5 preventing a recursive DOS attack --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index a0c7ab6..da9c9e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,7 +21,7 @@ jiter==0.10.0 multidict==6.6.4 ollama==0.6.0 openai==1.102.0 -orjson==3.11.4 +orjson>=3.11.5 pillow==11.3.0 propcache==0.3.2 pydantic==2.11.7 From d80b29e4f2a0120b8a2827f24a5d6a59df4560a4 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 29 Jan 2026 19:59:08 +0100 Subject: [PATCH 9/9] feat: enhance code quality and documentation - Renamed Feedback class to follow PascalCase convention - Fixed candidate enumeration start index from 0 to 1 - Simplified candidate content access by removing .message.content - Updated CONFIG_PATH environment variable name to CONFIG_PATH_ARG - Bumped version from 0.5 to 0.6 - Removed unnecessary return statement and trailing newline --- enhance.py | 11 +++++------ entrypoint.sh | 2 +- router.py | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/enhance.py b/enhance.py index 9be1fef..f3200c7 100644 --- a/enhance.py +++ b/enhance.py @@ -1,6 +1,6 @@ from pydantic import BaseModel -class feedback(BaseModel): +class Feedback(BaseModel): query_id: int content: str @@ -25,13 +25,13 @@ def moe(query: str, query_id: int, response: str) -> str: def moe_select_candidate(query: str, candidates: list[str]) -> str: if not candidates: - raise ValueError("No candidates supplied") + raise ValueError("No candidates supplied") candidate_sections = "" - for i, cand in enumerate(candidates[:3], start=0): + for i, cand in enumerate(candidates[:3], start=1): candidate_sections += f""" - {cand.message.content} + {cand} """ @@ -45,5 +45,4 @@ def moe_select_candidate(query: str, candidates: list[str]) -> str: **Do NOT** mention candidate numbers, strengths, weaknesses, or any other commentary. Just give the final answer—nothing else. """ - return select_prompt.strip() - + return select_prompt.strip() \ No newline at end of file diff --git a/entrypoint.sh b/entrypoint.sh index 6682851..e9ccbc3 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -50,7 +50,7 @@ Options: Any arguments that remain after the options above are passed directly to uvicorn. Environment variables: - CONFIG_PATH Alternative way to specify the config path. + CONFIG_PATH_ARG Alternative way to specify the config path. NOMYO_ROUTER_CONFIG_PATH Overrides the config path (same as --config-path). UVICORN_HOST Host interface to bind to (default: 0.0.0.0). UVICORN_PORT Port to listen on (default: 12434). diff --git a/router.py b/router.py index 829b16a..2215a4e 100644 --- a/router.py +++ b/router.py @@ -2,7 +2,7 @@ title: NOMYO Router - an Ollama Proxy with Endpoint:Model aware routing author: alpha-nerd-nomyo author_url: https://github.com/nomyo-ai -version: 0.5 +version: 0.6 license: AGPL """ # -------------------------------------------------------------