From 3cd530586c2ba826be8374bc319b680acb4f8062 Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Sun, 7 Jun 2026 09:55:54 +0200 Subject: [PATCH] feat: cache backend clients per endpoint instead of building one (with a fresh SSL context) per request --- api/ollama.py | 20 ++++++++++---------- backends/sessions.py | 37 +++++++++++++++++++++++++++++++++---- router.py | 34 ++++++++++++++++++++++++++++++++++ state.py | 7 +++++++ test/test_stream_errors.py | 4 +++- 5 files changed, 87 insertions(+), 15 deletions(-) diff --git a/api/ollama.py b/api/ollama.py index 6ebed5f..afba243 100644 --- a/api/ollama.py +++ b/api/ollama.py @@ -44,7 +44,7 @@ from backends.normalize import ( _extract_llama_quant, ) from backends.probe import fetch -from backends.sessions import _make_openai_client, get_probe_session +from backends.sessions import _make_openai_client, get_ollama_client, get_probe_session from requests.chat import _make_moe_requests from requests.messages import ( transform_images_to_data_urls, @@ -187,7 +187,7 @@ async def proxy(request: Request): params.update({k: v for k, v in optional_params.items() if v is not None}) oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) else: - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) # 4. Async generator body (error handling + cleanup handled by _guarded_stream) async def stream_generate_response(): @@ -364,7 +364,7 @@ async def chat_proxy(request: Request): params.update({k: v for k, v in optional_params.items() if v is not None}) oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) else: - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) # For OpenAI endpoints: make the API call in handler scope # (try/except inside async generators is unreliable with Starlette's streaming) start_ts = None @@ -598,7 +598,7 @@ async def _handle_embedding_request( model = model[0] client = _make_openai_client(endpoint, api_key=config.api_keys.get(endpoint, "no-key")) else: - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) # 3. Async generator body (error handling + cleanup handled by _guarded_stream) async def stream_embedding_response(): @@ -688,7 +688,7 @@ async def create_proxy(request: Request): status_lists = [] for endpoint in config.endpoints: - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) create = await client.create(model=model, quantize=quantize, from_=from_, files=files, adapters=adapters, template=template, license=license, system=system, parameters=parameters, messages=messages, stream=False) status_lists.append(create) @@ -724,7 +724,7 @@ async def show_proxy(request: Request, model: Optional[str] = None): # 2. Endpoint logic endpoint, _ = await choose_endpoint(model, reserve=False) - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) # 3. Proxy a simple show request show = await client.show(model=model) @@ -768,7 +768,7 @@ async def copy_proxy(request: Request, source: Optional[str] = None, destination for endpoint in config.endpoints: if "/v1" not in endpoint: - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) # 4. Proxy a simple copy request copy = await client.copy(source=src, destination=dst) status_list.append(copy.status) @@ -804,7 +804,7 @@ async def delete_proxy(request: Request, model: Optional[str] = None): for endpoint in config.endpoints: if "/v1" not in endpoint: - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) # 3. Proxy a simple copy request copy = await client.delete(model=model) status_list.append(copy.status) @@ -842,7 +842,7 @@ async def pull_proxy(request: Request, model: Optional[str] = None): for endpoint in config.endpoints: if "/v1" not in endpoint: - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) # 3. Proxy a simple pull request pull = await client.pull(model=model, insecure=insecure, stream=False) status_list.append(pull) @@ -882,7 +882,7 @@ async def push_proxy(request: Request): status_list = [] for endpoint in config.endpoints: - client = ollama.AsyncClient(host=endpoint) + client = get_ollama_client(endpoint) # 3. Proxy a simple push request push = await client.push(model=model, insecure=insecure, stream=False) status_list.append(push) diff --git a/backends/sessions.py b/backends/sessions.py index c73ca00..d659846 100644 --- a/backends/sessions.py +++ b/backends/sessions.py @@ -8,6 +8,7 @@ populate them once and routes can reuse them. import os import aiohttp +import ollama import openai from state import app_state @@ -70,16 +71,42 @@ def get_probe_session(endpoint: str) -> aiohttp.ClientSession: return app_state.get("probe_session") or app_state["session"] +def get_ollama_client(endpoint: str) -> ollama.AsyncClient: + """Return a cached ``ollama.AsyncClient`` for the endpoint, creating it once. + + ``ollama.AsyncClient`` wraps an ``httpx.AsyncClient`` whose construction + builds an SSL context and reloads the OS trust store (~40 ms). It is safe to + reuse concurrently, so we keep one per endpoint instead of building a fresh + one on every request — otherwise that 40 ms of CPU runs on the event loop + per request and caps single-worker throughput at ~25 req/s. + """ + cache = app_state["ollama_clients"] + client = cache.get(endpoint) + if client is None: + client = ollama.AsyncClient(host=endpoint) + cache[endpoint] = client + return client + + def _make_openai_client( endpoint: str, default_headers: dict | None = None, api_key: str = "no-key", ) -> openai.AsyncOpenAI: - """Return an AsyncOpenAI client configured for the given endpoint. + """Return a cached AsyncOpenAI client configured for the given endpoint. - For Unix socket endpoints, injects a pre-created httpx UDS transport - so the OpenAI SDK connects via the socket instead of TCP. + Clients are cached per ``(endpoint, api_key)`` and reused across requests: + constructing one builds an SSL context and reloads the OS trust store + (~40 ms), which serializes the event loop if done per request. For Unix + socket endpoints, injects the pre-created httpx UDS transport so the OpenAI + SDK connects via the socket instead of TCP. """ + cache = app_state["openai_clients"] + cache_key = (endpoint, api_key) + client = cache.get(cache_key) + if client is not None: + return client + base_url = ep2base(endpoint) kwargs: dict = {"api_key": api_key} if default_headers is not None: @@ -89,4 +116,6 @@ def _make_openai_client( if http_client is not None: kwargs["http_client"] = http_client base_url = "http://localhost/v1" - return openai.AsyncOpenAI(base_url=base_url, **kwargs) + client = openai.AsyncOpenAI(base_url=base_url, **kwargs) + cache[cache_key] = client + return client diff --git a/router.py b/router.py index e294555..676e42b 100644 --- a/router.py +++ b/router.py @@ -215,6 +215,7 @@ from backends.sessions import ( _get_socket_path, get_session, _make_openai_client, + get_ollama_client, ) from backends.health import ( _is_fresh, @@ -375,6 +376,25 @@ async def startup_event() -> None: app_state["httpx_clients"][ep] = httpx.AsyncClient(transport=transport, timeout=300.0) print(f"[startup] Unix socket session: {ep} -> {sock_path}") + # Pre-create long-lived backend clients so the expensive SSL-context / + # trust-store construction (~40 ms each) happens once here instead of on the + # request path. Ollama endpoints are reached via both the native ollama + # client (/api/chat, /api/generate) and the OpenAI client (/v1/* routes), + # so warm both; OpenAI-compatible endpoints only need the OpenAI client. + _warm_endpoints = config.endpoints + [ + ep for ep in config.llama_server_endpoints if ep not in config.endpoints + ] + for ep in _warm_endpoints: + try: + if not is_openai_compatible(ep): + get_ollama_client(ep) + _make_openai_client( + ep, default_headers=default_headers, + api_key=config.api_keys.get(ep, "no-key"), + ) + except Exception as e: + print(f"[startup] Backend client pre-warm failed for {ep}: {e}") + token_worker_task = asyncio.create_task(token_worker()) flush_task = asyncio.create_task(flush_buffer()) await init_llm_cache(config) @@ -415,6 +435,20 @@ async def shutdown_event() -> None: except Exception as e: print(f"[shutdown] Error closing httpx client {ep}: {e}") + # Close cached backend clients (reused across requests; see startup pre-warm). + for key, client in list(app_state.get("ollama_clients", {}).items()): + try: + await client._client.aclose() + except Exception as e: + print(f"[shutdown] Error closing ollama client {key}: {e}") + app_state["ollama_clients"].clear() + for key, client in list(app_state.get("openai_clients", {}).items()): + try: + await client.close() + except Exception as e: + print(f"[shutdown] Error closing openai client {key}: {e}") + app_state["openai_clients"].clear() + # Close the aiosqlite connection last — its worker thread is non-daemon # and would otherwise keep the interpreter alive after lifespan completes. if db is not None: diff --git a/state.py b/state.py index d52c1b5..b267c48 100644 --- a/state.py +++ b/state.py @@ -69,6 +69,13 @@ app_state = { "probe_connector": None, # connection pool isolated from proxy traffic "socket_sessions": {}, # endpoint -> aiohttp.ClientSession(UnixConnector) for .sock endpoints "httpx_clients": {}, # endpoint -> httpx.AsyncClient(UDS transport) for .sock endpoints + # Long-lived backend clients, reused across requests. Constructing these is + # expensive (~40 ms each — every new client builds an SSL context and reloads + # the OS trust store via truststore), so building one per request serializes + # the event loop and caps throughput. Created once at startup, closed on + # shutdown. See backends.sessions.get_ollama_client / _make_openai_client. + "ollama_clients": {}, # endpoint -> ollama.AsyncClient + "openai_clients": {}, # (endpoint, api_key) -> openai.AsyncOpenAI } # Default outbound HTTP headers attached to every backend request. diff --git a/test/test_stream_errors.py b/test/test_stream_errors.py index ffd2e1e..166209c 100644 --- a/test/test_stream_errors.py +++ b/test/test_stream_errors.py @@ -80,8 +80,10 @@ def _patches(exc, mark_unhealthy): stack.enter_context(patch("api.ollama.is_openai_compatible", lambda ep: False)) stack.enter_context(patch("api.ollama.decrement_usage", AsyncMock())) stack.enter_context(patch("api.ollama._mark_backend_unhealthy", mark_unhealthy)) + # The native path now fetches a cached client via get_ollama_client() rather + # than constructing ollama.AsyncClient inline, so patch that seam. stack.enter_context( - patch("api.ollama.ollama.AsyncClient", lambda *a, **k: _FakeAsyncClient(exc)) + patch("api.ollama.get_ollama_client", lambda *a, **k: _FakeAsyncClient(exc)) ) return stack