From 067cdf641a49218df64b0c96f429071b3dc5c88c Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Fri, 16 Jan 2026 16:47:24 +0100 Subject: [PATCH] feat: add timestamp index and improve cache concurrency - Added index on token_time_series timestamp for faster queries - Introduced cache locks to prevent race conditions --- db.py | 1 + race_condition_fixes.md | 131 ++++++++++++++++++++++++++++++++++++++++ router.py | 94 +++++++++++++++++----------- 3 files changed, 191 insertions(+), 35 deletions(-) create mode 100644 race_condition_fixes.md diff --git a/db.py b/db.py index 24c8480..9f4efd3 100644 --- a/db.py +++ b/db.py @@ -50,6 +50,7 @@ class TokenDatabase: PRIMARY KEY(endpoint, model) ) ''') + await db.execute('CREATE INDEX IF NOT EXISTS idx_token_time_series_timestamp ON token_time_series(timestamp)') await db.execute(''' CREATE TABLE IF NOT EXISTS token_time_series ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/race_condition_fixes.md b/race_condition_fixes.md new file mode 100644 index 0000000..044372b --- /dev/null +++ b/race_condition_fixes.md @@ -0,0 +1,131 @@ +# Race Condition Fixes in router.py + +This document summarizes all the race condition fixes that have been implemented in the router.py file. + +## Summary of Issues Found and Fixed + +### 1. Cache Access Race Conditions + +**Problem:** The `_models_cache`, `_loaded_models_cache`, and `_error_cache` dictionaries were being accessed concurrently without proper synchronization, leading to potential data corruption and inconsistent state. + +**Solution:** Added three new asyncio.Lock objects to protect cache access: +- `_models_cache_lock` - protects `_models_cache` +- `_loaded_models_cache_lock` - protects `_loaded_models_cache` +- `_error_cache_lock` - protects `_error_cache` + +**Implementation:** +- All cache reads now acquire the appropriate lock before accessing the cache +- All cache writes now acquire the appropriate lock before modifying the cache +- Cache entries are checked for freshness while holding the lock +- Stale entries are removed while holding the lock + +### 2. Timestamp Calculation Race Condition + +**Problem:** In the `token_worker()` function, the timestamp calculation was performed inside the lock, which could lead to: +- Inconsistent timestamps across multiple token entries +- Potential delays in processing due to lock contention +- Race conditions if the datetime object was modified between calculation and use + +**Solution:** Moved timestamp calculation outside the lock: +- Calculate timestamp before acquiring the buffer lock +- Pass the pre-calculated timestamp into the critical section +- This ensures consistent timestamps while minimizing lock duration + +**Implementation:** +```python +# Calculate timestamp once before acquiring lock +now = datetime.now(tz=timezone.utc) +timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) + +# Then use it inside the lock +async with buffer_lock: + time_series_buffer.append({ + 'endpoint': endpoint, + 'model': model, + 'input_tokens': prompt, + 'output_tokens': comp, + 'total_tokens': prompt + comp, + 'timestamp': timestamp # Use pre-calculated timestamp + }) +``` + +### 3. Buffer Access Race Conditions + +**Problem:** The `token_buffer` and `time_series_buffer` were accessed concurrently from multiple sources (token_worker, flush_buffer, flush_remaining_buffers) without proper synchronization. + +**Solution:** The existing `buffer_lock` was correctly used, but we ensured all access patterns properly acquire the lock. + +**Implementation:** +- All reads and writes to `token_buffer` are protected by `buffer_lock` +- All reads and writes to `time_series_buffer` are protected by `buffer_lock` +- Buffer copies are made while holding the lock, then released before DB operations + +### 4. Usage Count Race Conditions + +**Problem:** The `usage_counts` and `token_usage_counts` dictionaries were accessed concurrently without proper synchronization. + +**Solution:** The existing `usage_lock` and `token_usage_lock` were correctly used throughout the codebase. + +**Implementation:** +- All increments/decrements of `usage_counts` are protected by `usage_lock` +- All increments/decrements of `token_usage_counts` are protected by `token_usage_lock` +- The `publish_snapshot()` function acquires `usage_lock` before creating snapshots + +### 5. Subscriber Management Race Conditions + +**Problem:** The `_subscribers` set was accessed concurrently without proper synchronization. + +**Solution:** The existing `_subscribers_lock` was correctly used throughout the codebase. + +**Implementation:** +- All additions/removals from `_subscribers` are protected by `_subscribers_lock` +- The `subscribe()` function acquires the lock before adding new subscribers +- The `unsubscribe()` function acquires the lock before removing subscribers +- The `publish_snapshot()` function acquires the lock before iterating over subscribers + +## Testing Recommendations + +To verify the race condition fixes: + +1. **Concurrent Request Testing:** + - Send multiple simultaneous requests to the same endpoint/model + - Verify that usage counts remain consistent + - Verify that token counts are accurately tracked + +2. **Cache Consistency Testing:** + - Rapidly query `/api/tags` and `/api/ps` endpoints + - Verify that cached responses remain consistent + - Verify that stale cache entries are properly invalidated + +3. **Timestamp Consistency Testing:** + - Send multiple requests in quick succession + - Verify that timestamps in time_series_buffer are consistent and accurate + - Verify that timestamps are properly rounded to minute boundaries + +4. **Load Testing:** + - Simulate high load with many concurrent connections + - Verify that the system remains stable under load + - Verify that no deadlocks occur + +## Performance Considerations + +The race condition fixes introduce additional locking, which may have performance implications: + +1. **Lock Granularity:** The locks are fine-grained (one per cache type), minimizing contention. + +2. **Lock Duration:** Locks are held for minimal durations - typically just for cache reads/writes. + +3. **Concurrent Operations:** Multiple endpoints/models can be processed concurrently as long as they don't contend for the same cache entry. + +4. **Monitoring:** Consider adding metrics to track lock contention and wait times. + +## Future Improvements + +1. **Read-Write Locks:** Consider using read-write locks if read-heavy workloads are identified. + +2. **Cache Invalidation:** Implement more sophisticated cache invalidation strategies. + +3. **Lock Timeouts:** Add timeout mechanisms to prevent deadlocks. + +4. **Performance Monitoring:** Add instrumentation to track lock contention and performance. + diff --git a/router.py b/router.py index cb5af1f..9922c41 100644 --- a/router.py +++ b/router.py @@ -34,6 +34,13 @@ _loaded_models_cache: dict[str, tuple[Set[str], float]] = {} # timeout expires, after which the endpoint will be queried again. _error_cache: dict[str, float] = {} +# ------------------------------------------------------------------ +# Cache locks +# ------------------------------------------------------------------ +_models_cache_lock = asyncio.Lock() +_loaded_models_cache_lock = asyncio.Lock() +_error_cache_lock = asyncio.Lock() + # ------------------------------------------------------------------ # Queues # ------------------------------------------------------------------ @@ -226,6 +233,10 @@ async def token_worker() -> None: try: while True: endpoint, model, prompt, comp = await token_queue.get() + # Calculate timestamp once before acquiring lock + now = datetime.now(tz=timezone.utc) + timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) + # Accumulate counts in memory buffer (protected by lock) async with buffer_lock: token_buffer[endpoint][model] = ( @@ -234,8 +245,6 @@ async def token_worker() -> None: ) # Add to time series buffer with timestamp (UTC) - now = datetime.now(tz=timezone.utc) - timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) time_series_buffer.append({ 'endpoint': endpoint, 'model': model, @@ -256,13 +265,15 @@ async def token_worker() -> None: while not token_queue.empty(): try: endpoint, model, prompt, comp = token_queue.get_nowait() + # Calculate timestamp once before acquiring lock + now = datetime.now(tz=timezone.utc) + timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) + async with buffer_lock: token_buffer[endpoint][model] = ( token_buffer[endpoint].get(model, (0, 0))[0] + prompt, token_buffer[endpoint].get(model, (0, 0))[1] + comp ) - now = datetime.now(tz=timezone.utc) - timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) time_series_buffer.append({ 'endpoint': endpoint, 'model': model, @@ -378,19 +389,21 @@ class fetch: if api_key is not None: headers = {"Authorization": "Bearer " + api_key} - if endpoint in _models_cache: - models, cached_at = _models_cache[endpoint] - if _is_fresh(cached_at, 300): - return models - else: - # stale entry – drop it + # Check models cache with lock protection + async with _models_cache_lock: + if endpoint in _models_cache: + models, cached_at = _models_cache[endpoint] + if _is_fresh(cached_at, 300): + return models + # Stale entry - remove it del _models_cache[endpoint] - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): - # Still within the short error TTL – pretend nothing is available - return set() - else: + # Check error cache with lock protection + async with _error_cache_lock: + if endpoint in _error_cache: + if _is_fresh(_error_cache[endpoint], 10): + # Still within the short error TTL – pretend nothing is available + return set() # Error expired – remove it del _error_cache[endpoint] @@ -408,19 +421,22 @@ class fetch: items = data.get(key, []) models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")} - - if models: - _models_cache[endpoint] = (models, time.time()) - return models - else: - # Empty list – treat as “no models”, but still cache for 300s - _models_cache[endpoint] = (models, time.time()) - return models + + # Update cache with lock protection + async with _models_cache_lock: + if models: + _models_cache[endpoint] = (models, time.time()) + else: + # Empty list – treat as "no models", but still cache for 300s + _models_cache[endpoint] = (models, time.time()) + return models except Exception as e: # Treat any error as if the endpoint offers no models message = _format_connection_issue(endpoint_url, e) print(f"[fetch.available_models] {message}") - _error_cache[endpoint] = time.time() + # Update error cache with lock protection + async with _error_cache_lock: + _error_cache[endpoint] = time.time() return set() @@ -432,19 +448,24 @@ class fetch: """ if is_ext_openai_endpoint(endpoint): return set() - if endpoint in _loaded_models_cache: - models, cached_at = _loaded_models_cache[endpoint] - if _is_fresh(cached_at, 30): - return models - else: - # stale entry – drop it + + # Check loaded models cache with lock protection + async with _loaded_models_cache_lock: + if endpoint in _loaded_models_cache: + models, cached_at = _loaded_models_cache[endpoint] + if _is_fresh(cached_at, 30): + return models + # Stale entry - remove it del _loaded_models_cache[endpoint] - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): - return set() - else: + # Check error cache with lock protection + async with _error_cache_lock: + if endpoint in _error_cache: + if _is_fresh(_error_cache[endpoint], 10): + return set() + # Error expired - remove it del _error_cache[endpoint] + client: aiohttp.ClientSession = app_state["session"] try: async with client.get(f"{endpoint}/api/ps") as resp: @@ -453,7 +474,10 @@ class fetch: # The response format is: # {"models": [{"name": "model1"}, {"name": "model2"}]} models = {m.get("name") for m in data.get("models", []) if m.get("name")} - _loaded_models_cache[endpoint] = (models, time.time()) + + # Update cache with lock protection + async with _loaded_models_cache_lock: + _loaded_models_cache[endpoint] = (models, time.time()) return models except Exception as e: # If anything goes wrong we simply assume the endpoint has no models