refactor(cache): split error cache and add stale-while-revalidate
Refactor error tracking to use separate caches for 'available' and 'loaded' models, preventing cross-contamination of transient errors. Implement background refresh for available models to prevent blocking requests, and use stale-while-revalidate (300-600s) to serve stale data immediately when the cache is between 300s and 600s old.
This commit is contained in:
parent
92cea1dead
commit
7deb088c6a
1 changed files with 40 additions and 15 deletions
55
router.py
55
router.py
|
|
@ -30,16 +30,18 @@ from PIL import Image
|
|||
# Successful results are cached for 300s
|
||||
_models_cache: dict[str, tuple[Set[str], float]] = {}
|
||||
_loaded_models_cache: dict[str, tuple[Set[str], float]] = {}
|
||||
# Transient errors are cached for 1s – the key stays until the
|
||||
# timeout expires, after which the endpoint will be queried again.
|
||||
_error_cache: dict[str, float] = {}
|
||||
# Transient errors are cached separately per concern so that a failure
|
||||
# in one path does not poison the other.
|
||||
_available_error_cache: dict[str, float] = {}
|
||||
_loaded_error_cache: dict[str, float] = {}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Cache locks
|
||||
# ------------------------------------------------------------------
|
||||
_models_cache_lock = asyncio.Lock()
|
||||
_loaded_models_cache_lock = asyncio.Lock()
|
||||
_error_cache_lock = asyncio.Lock()
|
||||
_available_error_cache_lock = asyncio.Lock()
|
||||
_loaded_error_cache_lock = asyncio.Lock()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# In-flight request tracking (prevents cache stampede)
|
||||
|
|
@ -535,10 +537,21 @@ class fetch:
|
|||
message = _format_connection_issue(endpoint_url, e)
|
||||
print(f"[fetch.available_models] {message}")
|
||||
# Update error cache with lock protection
|
||||
async with _error_cache_lock:
|
||||
_error_cache[endpoint] = time.time()
|
||||
async with _available_error_cache_lock:
|
||||
_available_error_cache[endpoint] = time.time()
|
||||
return set()
|
||||
|
||||
async def _refresh_available_models(endpoint: str, api_key: Optional[str] = None) -> None:
|
||||
"""
|
||||
Background task to refresh available models cache without blocking the caller.
|
||||
Used for stale-while-revalidate pattern.
|
||||
"""
|
||||
try:
|
||||
await fetch._fetch_available_models_internal(endpoint, api_key)
|
||||
except Exception as e:
|
||||
# Silently fail - cache will remain stale but functional
|
||||
print(f"[fetch._refresh_available_models] Background refresh failed for {endpoint}: {e}")
|
||||
|
||||
async def available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]:
|
||||
"""
|
||||
Query <endpoint>/api/tags and return a set of all model names that the
|
||||
|
|
@ -549,6 +562,10 @@ class fetch:
|
|||
Uses request coalescing to prevent cache stampede: if multiple requests
|
||||
arrive when cache is expired, only one actual HTTP request is made.
|
||||
|
||||
Uses stale-while-revalidate: when the cache is between 300-600s old,
|
||||
the stale data is returned immediately while a background refresh runs.
|
||||
This prevents model blackouts caused by transient timeouts.
|
||||
|
||||
If the request fails (e.g. timeout, 5xx, or malformed response), an empty
|
||||
set is returned.
|
||||
"""
|
||||
|
|
@ -556,19 +573,27 @@ class fetch:
|
|||
async with _models_cache_lock:
|
||||
if endpoint in _models_cache:
|
||||
models, cached_at = _models_cache[endpoint]
|
||||
|
||||
# FRESH: < 300s old - return immediately
|
||||
if _is_fresh(cached_at, 300):
|
||||
return models
|
||||
# Stale entry - remove it
|
||||
|
||||
# STALE: 300-600s old - return stale data and refresh in background
|
||||
if _is_fresh(cached_at, 600):
|
||||
asyncio.create_task(fetch._refresh_available_models(endpoint, api_key))
|
||||
return models # Return stale data immediately
|
||||
|
||||
# EXPIRED: > 600s old - too stale, must refresh synchronously
|
||||
del _models_cache[endpoint]
|
||||
|
||||
# Check error cache with lock protection
|
||||
async with _error_cache_lock:
|
||||
if endpoint in _error_cache:
|
||||
if _is_fresh(_error_cache[endpoint], 10):
|
||||
async with _available_error_cache_lock:
|
||||
if endpoint in _available_error_cache:
|
||||
if _is_fresh(_available_error_cache[endpoint], 10):
|
||||
# Still within the short error TTL – pretend nothing is available
|
||||
return set()
|
||||
# Error expired – remove it
|
||||
del _error_cache[endpoint]
|
||||
del _available_error_cache[endpoint]
|
||||
|
||||
# Request coalescing: check if another request is already fetching this endpoint
|
||||
async with _inflight_lock:
|
||||
|
|
@ -657,12 +682,12 @@ class fetch:
|
|||
del _loaded_models_cache[endpoint]
|
||||
|
||||
# Check error cache with lock protection
|
||||
async with _error_cache_lock:
|
||||
if endpoint in _error_cache:
|
||||
if _is_fresh(_error_cache[endpoint], 10):
|
||||
async with _loaded_error_cache_lock:
|
||||
if endpoint in _loaded_error_cache:
|
||||
if _is_fresh(_loaded_error_cache[endpoint], 10):
|
||||
return set()
|
||||
# Error expired - remove it
|
||||
del _error_cache[endpoint]
|
||||
del _loaded_error_cache[endpoint]
|
||||
|
||||
# Request coalescing: check if another request is already fetching this endpoint
|
||||
async with _inflight_lock:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue