All checks were successful
Build and Publish Docker Image (Semantic Cache) / build (amd64, linux/amd64, docker-amd64) (push) Successful in 3m59s
Build and Publish Docker Image / build (amd64, linux/amd64, docker-amd64) (push) Successful in 1m25s
Build and Publish Docker Image / build (arm64, linux/arm64, docker-arm64) (push) Successful in 12m46s
Build and Publish Docker Image / merge (push) Successful in 33s
Build and Publish Docker Image (Semantic Cache) / build (arm64, linux/arm64, docker-arm64) (push) Successful in 19m56s
Build and Publish Docker Image (Semantic Cache) / merge (push) Successful in 33s
116 lines
5.4 KiB
Python
116 lines
5.4 KiB
Python
"""Shared mutable router state.
|
||
|
||
All process-wide caches, locks, in-flight task maps, queues, counters and
|
||
buffers used by the router live here. These names are only ever *mutated*
|
||
(dict/set updates, lock acquisitions, queue put/get) — never rebound — so
|
||
importing them via ``from state import …`` is safe from every module.
|
||
|
||
Rebound singletons (``config``, ``db``, ``token_worker_task``,
|
||
``flush_task``) intentionally stay in router.py so their reassignment on
|
||
startup is visible to all callers.
|
||
"""
|
||
import asyncio
|
||
from collections import defaultdict
|
||
from typing import Dict, Set
|
||
|
||
|
||
# ------------------------------------------------------------------
|
||
# In‑memory caches
|
||
# ------------------------------------------------------------------
|
||
# Successful results are cached for 300s
|
||
_models_cache: dict[str, tuple[Set[str], float]] = {}
|
||
_loaded_models_cache: dict[str, tuple[Set[str], float]] = {}
|
||
# Transient errors are cached separately per concern so that a failure
|
||
# in one path does not poison the other.
|
||
_available_error_cache: dict[str, float] = {}
|
||
_loaded_error_cache: dict[str, float] = {}
|
||
# Per-(endpoint, model) completion-path failures. A llama-server in router
|
||
# mode can keep returning /v1/models 200 OK after its delegated worker for
|
||
# a specific model dies — the probe-level caches above will not catch this.
|
||
# We record signals observed during actual completion attempts so
|
||
# choose_endpoint can avoid the affected (endpoint, model) pair without
|
||
# poisoning unrelated models on the same backend.
|
||
_completion_error_cache: dict[tuple[str, str], float] = {}
|
||
_COMPLETION_ERROR_TTL = 300
|
||
|
||
# ------------------------------------------------------------------
|
||
# Cache locks
|
||
# ------------------------------------------------------------------
|
||
_models_cache_lock = asyncio.Lock()
|
||
_loaded_models_cache_lock = asyncio.Lock()
|
||
_available_error_cache_lock = asyncio.Lock()
|
||
_loaded_error_cache_lock = asyncio.Lock()
|
||
_completion_error_cache_lock = asyncio.Lock()
|
||
|
||
# ------------------------------------------------------------------
|
||
# In-flight request tracking (prevents cache stampede)
|
||
# ------------------------------------------------------------------
|
||
_inflight_available_models: dict[str, asyncio.Task] = {}
|
||
_inflight_loaded_models: dict[str, asyncio.Task] = {}
|
||
_inflight_lock = asyncio.Lock()
|
||
_bg_refresh_available: dict[str, asyncio.Task] = {}
|
||
_bg_refresh_loaded: dict[str, asyncio.Task] = {}
|
||
_bg_refresh_lock = asyncio.Lock()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Queues
|
||
# ------------------------------------------------------------------
|
||
_subscribers: Set[asyncio.Queue] = set()
|
||
_subscribers_lock = asyncio.Lock()
|
||
token_queue: asyncio.Queue[tuple[str, str, int, int]] = asyncio.Queue()
|
||
|
||
# ------------------------------------------------------------------
|
||
# HTTP client / connector cache
|
||
# ------------------------------------------------------------------
|
||
app_state = {
|
||
"session": None,
|
||
"connector": None,
|
||
"probe_session": None, # dedicated session for health/introspection probes
|
||
"probe_connector": None, # connection pool isolated from proxy traffic
|
||
"socket_sessions": {}, # endpoint -> aiohttp.ClientSession(UnixConnector) for .sock endpoints
|
||
"httpx_clients": {}, # endpoint -> httpx.AsyncClient(UDS transport) for .sock endpoints
|
||
# Long-lived backend clients, reused across requests. Constructing these is
|
||
# expensive (~40 ms each — every new client builds an SSL context and reloads
|
||
# the OS trust store via truststore), so building one per request serializes
|
||
# the event loop and caps throughput. Created once at startup, closed on
|
||
# shutdown. See backends.sessions.get_ollama_client / _make_openai_client.
|
||
"ollama_clients": {}, # endpoint -> ollama.AsyncClient
|
||
"openai_clients": {}, # (endpoint, api_key) -> openai.AsyncOpenAI
|
||
}
|
||
|
||
# Default outbound HTTP headers attached to every backend request.
|
||
default_headers = {
|
||
"HTTP-Referer": "https://nomyo.ai",
|
||
"Referer": "https://nomyo.ai",
|
||
"X-Title": "NOMYO Router",
|
||
}
|
||
|
||
# ------------------------------------------------------------------
|
||
# Token Count Buffer (for write-behind pattern)
|
||
# ------------------------------------------------------------------
|
||
# Structure: {endpoint: {model: (input_tokens, output_tokens)}}
|
||
token_buffer: dict[str, dict[str, tuple[int, int]]] = defaultdict(lambda: defaultdict(lambda: (0, 0)))
|
||
# Time series buffer with timestamp
|
||
time_series_buffer: list[dict[str, int | str]] = []
|
||
# Lock to protect buffer access from race conditions
|
||
buffer_lock = asyncio.Lock()
|
||
|
||
# Configuration for periodic flushing
|
||
FLUSH_INTERVAL = 10 # seconds
|
||
|
||
# ------------------------------------------------------------------
|
||
# Per‑endpoint per‑model active connection counters
|
||
# ------------------------------------------------------------------
|
||
usage_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
|
||
token_usage_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
|
||
usage_lock = asyncio.Lock() # protects access to usage_counts
|
||
token_usage_lock = asyncio.Lock()
|
||
|
||
# Conversation affinity map: fingerprint -> (endpoint, model, expires_at_monotonic).
|
||
# Keeps the same conversation pinned to the endpoint that already has its
|
||
# KV-cache prefix warm. Model is stored so the dashboard can aggregate live
|
||
# entries per (endpoint, model) without recomputing fingerprints.
|
||
# Never held together with usage_lock.
|
||
_affinity_map: Dict[str, tuple[str, str, float]] = {}
|
||
_affinity_lock = asyncio.Lock()
|
||
_AFFINITY_MAX_ENTRIES = 10000
|