nomyo-router/state.py
alpha nerd 3cd530586c
All checks were successful
Build and Publish Docker Image (Semantic Cache) / build (amd64, linux/amd64, docker-amd64) (push) Successful in 3m59s
Build and Publish Docker Image / build (amd64, linux/amd64, docker-amd64) (push) Successful in 1m25s
Build and Publish Docker Image / build (arm64, linux/arm64, docker-arm64) (push) Successful in 12m46s
Build and Publish Docker Image / merge (push) Successful in 33s
Build and Publish Docker Image (Semantic Cache) / build (arm64, linux/arm64, docker-arm64) (push) Successful in 19m56s
Build and Publish Docker Image (Semantic Cache) / merge (push) Successful in 33s
feat: cache backend clients per endpoint instead of building one (with a fresh SSL context) per request
2026-06-07 09:55:54 +02:00

116 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Shared mutable router state.
All process-wide caches, locks, in-flight task maps, queues, counters and
buffers used by the router live here. These names are only ever *mutated*
(dict/set updates, lock acquisitions, queue put/get) — never rebound — so
importing them via ``from state import …`` is safe from every module.
Rebound singletons (``config``, ``db``, ``token_worker_task``,
``flush_task``) intentionally stay in router.py so their reassignment on
startup is visible to all callers.
"""
import asyncio
from collections import defaultdict
from typing import Dict, Set
# ------------------------------------------------------------------
# Inmemory caches
# ------------------------------------------------------------------
# Successful results are cached for 300s
_models_cache: dict[str, tuple[Set[str], float]] = {}
_loaded_models_cache: dict[str, tuple[Set[str], float]] = {}
# Transient errors are cached separately per concern so that a failure
# in one path does not poison the other.
_available_error_cache: dict[str, float] = {}
_loaded_error_cache: dict[str, float] = {}
# Per-(endpoint, model) completion-path failures. A llama-server in router
# mode can keep returning /v1/models 200 OK after its delegated worker for
# a specific model dies — the probe-level caches above will not catch this.
# We record signals observed during actual completion attempts so
# choose_endpoint can avoid the affected (endpoint, model) pair without
# poisoning unrelated models on the same backend.
_completion_error_cache: dict[tuple[str, str], float] = {}
_COMPLETION_ERROR_TTL = 300
# ------------------------------------------------------------------
# Cache locks
# ------------------------------------------------------------------
_models_cache_lock = asyncio.Lock()
_loaded_models_cache_lock = asyncio.Lock()
_available_error_cache_lock = asyncio.Lock()
_loaded_error_cache_lock = asyncio.Lock()
_completion_error_cache_lock = asyncio.Lock()
# ------------------------------------------------------------------
# In-flight request tracking (prevents cache stampede)
# ------------------------------------------------------------------
_inflight_available_models: dict[str, asyncio.Task] = {}
_inflight_loaded_models: dict[str, asyncio.Task] = {}
_inflight_lock = asyncio.Lock()
_bg_refresh_available: dict[str, asyncio.Task] = {}
_bg_refresh_loaded: dict[str, asyncio.Task] = {}
_bg_refresh_lock = asyncio.Lock()
# ------------------------------------------------------------------
# Queues
# ------------------------------------------------------------------
_subscribers: Set[asyncio.Queue] = set()
_subscribers_lock = asyncio.Lock()
token_queue: asyncio.Queue[tuple[str, str, int, int]] = asyncio.Queue()
# ------------------------------------------------------------------
# HTTP client / connector cache
# ------------------------------------------------------------------
app_state = {
"session": None,
"connector": None,
"probe_session": None, # dedicated session for health/introspection probes
"probe_connector": None, # connection pool isolated from proxy traffic
"socket_sessions": {}, # endpoint -> aiohttp.ClientSession(UnixConnector) for .sock endpoints
"httpx_clients": {}, # endpoint -> httpx.AsyncClient(UDS transport) for .sock endpoints
# Long-lived backend clients, reused across requests. Constructing these is
# expensive (~40 ms each — every new client builds an SSL context and reloads
# the OS trust store via truststore), so building one per request serializes
# the event loop and caps throughput. Created once at startup, closed on
# shutdown. See backends.sessions.get_ollama_client / _make_openai_client.
"ollama_clients": {}, # endpoint -> ollama.AsyncClient
"openai_clients": {}, # (endpoint, api_key) -> openai.AsyncOpenAI
}
# Default outbound HTTP headers attached to every backend request.
default_headers = {
"HTTP-Referer": "https://nomyo.ai",
"Referer": "https://nomyo.ai",
"X-Title": "NOMYO Router",
}
# ------------------------------------------------------------------
# Token Count Buffer (for write-behind pattern)
# ------------------------------------------------------------------
# Structure: {endpoint: {model: (input_tokens, output_tokens)}}
token_buffer: dict[str, dict[str, tuple[int, int]]] = defaultdict(lambda: defaultdict(lambda: (0, 0)))
# Time series buffer with timestamp
time_series_buffer: list[dict[str, int | str]] = []
# Lock to protect buffer access from race conditions
buffer_lock = asyncio.Lock()
# Configuration for periodic flushing
FLUSH_INTERVAL = 10 # seconds
# ------------------------------------------------------------------
# Perendpoint permodel active connection counters
# ------------------------------------------------------------------
usage_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
token_usage_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
usage_lock = asyncio.Lock() # protects access to usage_counts
token_usage_lock = asyncio.Lock()
# Conversation affinity map: fingerprint -> (endpoint, model, expires_at_monotonic).
# Keeps the same conversation pinned to the endpoint that already has its
# KV-cache prefix warm. Model is stored so the dashboard can aggregate live
# entries per (endpoint, model) without recomputing fingerprints.
# Never held together with usage_lock.
_affinity_map: Dict[str, tuple[str, str, float]] = {}
_affinity_lock = asyncio.Lock()
_AFFINITY_MAX_ENTRIES = 10000