diff --git a/api/ollama.py b/api/ollama.py index e673663..d62d6b4 100644 --- a/api/ollama.py +++ b/api/ollama.py @@ -44,7 +44,7 @@ from backends.normalize import ( _extract_llama_quant, ) from backends.probe import fetch -from backends.sessions import _make_openai_client, get_session +from backends.sessions import _make_openai_client, get_probe_session from requests.chat import _make_moe_requests from requests.messages import ( transform_images_to_data_urls, @@ -1055,7 +1055,7 @@ async def ps_details_proxy(request: Request): # Fetch /props for each llama-server model to get context length (n_ctx) # and unload sleeping models automatically async def _fetch_llama_props(endpoint: str, model_id: str) -> tuple[int | None, bool, bool]: - client: aiohttp.ClientSession = get_session(endpoint) + client: aiohttp.ClientSession = get_probe_session(endpoint) base_url = endpoint.rstrip("/").removesuffix("/v1") props_url = f"{base_url}/props?model={model_id}" headers = None diff --git a/backends/probe.py b/backends/probe.py index 02c414e..3ce089f 100644 --- a/backends/probe.py +++ b/backends/probe.py @@ -39,7 +39,7 @@ from state import ( _bg_refresh_lock, default_headers, ) -from backends.sessions import get_session +from backends.sessions import get_probe_session from backends.health import ( _is_fresh, _ensure_success, @@ -71,7 +71,7 @@ class fetch: endpoint_url = f"{ep_base}/api/tags" key = "models" - client: aiohttp.ClientSession = get_session(endpoint) + client: aiohttp.ClientSession = get_probe_session(endpoint) try: async with client.get(endpoint_url, headers=headers) as resp: await _ensure_success(resp) @@ -191,7 +191,7 @@ class fetch: For Ollama endpoints: queries /api/ps and returns model names For llama-server endpoints: queries /v1/models and filters for status.value == "loaded" """ - client: aiohttp.ClientSession = get_session(endpoint) + client: aiohttp.ClientSession = get_probe_session(endpoint) cfg = get_config() # Check if this is a llama-server endpoint @@ -360,7 +360,7 @@ class fetch: headers["Authorization"] = "Bearer " + api_key request_url = f"{endpoint.rstrip('/')}/{route.lstrip('/')}" - client: aiohttp.ClientSession = get_session(endpoint) + client: aiohttp.ClientSession = get_probe_session(endpoint) req_kwargs = {} if timeout is not None: req_kwargs["timeout"] = aiohttp.ClientTimeout(total=timeout) @@ -401,7 +401,7 @@ async def _raw_probe( if timeout is not None: req_kwargs["timeout"] = aiohttp.ClientTimeout(total=timeout) try: - client: aiohttp.ClientSession = get_session(ep) + client: aiohttp.ClientSession = get_probe_session(ep) async with client.get(url, headers=headers, **req_kwargs) as resp: await _ensure_success(resp) data = await resp.json() diff --git a/backends/sessions.py b/backends/sessions.py index a7fa2b9..c73ca00 100644 --- a/backends/sessions.py +++ b/backends/sessions.py @@ -50,6 +50,26 @@ def get_session(endpoint: str) -> aiohttp.ClientSession: return app_state["session"] +def get_probe_session(endpoint: str) -> aiohttp.ClientSession: + """Return the session used for lightweight health/introspection probes. + + Probes (available/loaded models, endpoint health) run on a connection + pool kept separate from the proxy/streaming session, so a burst of + long-lived completion requests cannot starve them — otherwise a probe + would queue waiting for a connection, hit its deadline, and mark a + perfectly healthy endpoint as unavailable under load. + + Unix socket endpoints keep their dedicated per-endpoint session. TCP + endpoints use the shared probe session, falling back to the main + session when the probe pool has not been initialised (e.g. in tests). + """ + if _is_unix_socket_endpoint(endpoint): + sess = app_state["socket_sessions"].get(endpoint) + if sess is not None: + return sess + return app_state.get("probe_session") or app_state["session"] + + def _make_openai_client( endpoint: str, default_headers: dict | None = None, diff --git a/router.py b/router.py index 96916f4..d1a1e38 100644 --- a/router.py +++ b/router.py @@ -343,6 +343,20 @@ async def startup_event() -> None: app_state["connector"] = connector app_state["session"] = session + # Dedicated pool for health/introspection probes, isolated from the proxy + # session above. Streaming completions can hold the proxy pool's per-host + # slots open for a long time; without a separate pool the lightweight + # probes queue behind them, hit their deadline, and mark healthy endpoints + # as unavailable under load. + probe_connector = aiohttp.TCPConnector(limit=0, limit_per_host=64, ssl=ssl_context) + probe_session = aiohttp.ClientSession( + connector=probe_connector, + timeout=timeout, + headers={"Referer": default_headers.get("HTTP-Referer", "https://nomyo.ai")}, + ) + app_state["probe_connector"] = probe_connector + app_state["probe_session"] = probe_session + # Create httpx clients for external OpenAI endpoints (Google, etc.) # aiohttp strips Referer headers for cross-origin requests, so we use httpx for ep in config.endpoints: @@ -380,6 +394,8 @@ async def shutdown_event() -> None: await flush_remaining_buffers() await app_state["session"].close() + if app_state.get("probe_session") is not None: + await app_state["probe_session"].close() # Close Unix socket sessions for ep, sess in list(app_state.get("socket_sessions", {}).items()): diff --git a/state.py b/state.py index 301cc26..d52c1b5 100644 --- a/state.py +++ b/state.py @@ -65,6 +65,8 @@ token_queue: asyncio.Queue[tuple[str, str, int, int]] = asyncio.Queue() app_state = { "session": None, "connector": None, + "probe_session": None, # dedicated session for health/introspection probes + "probe_connector": None, # connection pool isolated from proxy traffic "socket_sessions": {}, # endpoint -> aiohttp.ClientSession(UnixConnector) for .sock endpoints "httpx_clients": {}, # endpoint -> httpx.AsyncClient(UDS transport) for .sock endpoints }