feat: cache backend clients per endpoint instead of building one (with a fresh SSL context) per request
All checks were successful
Build and Publish Docker Image (Semantic Cache) / build (amd64, linux/amd64, docker-amd64) (push) Successful in 3m59s
Build and Publish Docker Image / build (amd64, linux/amd64, docker-amd64) (push) Successful in 1m25s
Build and Publish Docker Image / build (arm64, linux/arm64, docker-arm64) (push) Successful in 12m46s
Build and Publish Docker Image / merge (push) Successful in 33s
Build and Publish Docker Image (Semantic Cache) / build (arm64, linux/arm64, docker-arm64) (push) Successful in 19m56s
Build and Publish Docker Image (Semantic Cache) / merge (push) Successful in 33s
All checks were successful
Build and Publish Docker Image (Semantic Cache) / build (amd64, linux/amd64, docker-amd64) (push) Successful in 3m59s
Build and Publish Docker Image / build (amd64, linux/amd64, docker-amd64) (push) Successful in 1m25s
Build and Publish Docker Image / build (arm64, linux/arm64, docker-arm64) (push) Successful in 12m46s
Build and Publish Docker Image / merge (push) Successful in 33s
Build and Publish Docker Image (Semantic Cache) / build (arm64, linux/arm64, docker-arm64) (push) Successful in 19m56s
Build and Publish Docker Image (Semantic Cache) / merge (push) Successful in 33s
This commit is contained in:
parent
1ce792c48b
commit
3cd530586c
5 changed files with 87 additions and 15 deletions
|
|
@ -44,7 +44,7 @@ from backends.normalize import (
|
||||||
_extract_llama_quant,
|
_extract_llama_quant,
|
||||||
)
|
)
|
||||||
from backends.probe import fetch
|
from backends.probe import fetch
|
||||||
from backends.sessions import _make_openai_client, get_probe_session
|
from backends.sessions import _make_openai_client, get_ollama_client, get_probe_session
|
||||||
from requests.chat import _make_moe_requests
|
from requests.chat import _make_moe_requests
|
||||||
from requests.messages import (
|
from requests.messages import (
|
||||||
transform_images_to_data_urls,
|
transform_images_to_data_urls,
|
||||||
|
|
@ -187,7 +187,7 @@ async def proxy(request: Request):
|
||||||
params.update({k: v for k, v in optional_params.items() if v is not None})
|
params.update({k: v for k, v in optional_params.items() if v is not None})
|
||||||
oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
|
oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
|
||||||
else:
|
else:
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
|
|
||||||
# 4. Async generator body (error handling + cleanup handled by _guarded_stream)
|
# 4. Async generator body (error handling + cleanup handled by _guarded_stream)
|
||||||
async def stream_generate_response():
|
async def stream_generate_response():
|
||||||
|
|
@ -364,7 +364,7 @@ async def chat_proxy(request: Request):
|
||||||
params.update({k: v for k, v in optional_params.items() if v is not None})
|
params.update({k: v for k, v in optional_params.items() if v is not None})
|
||||||
oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
|
oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
|
||||||
else:
|
else:
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
# For OpenAI endpoints: make the API call in handler scope
|
# For OpenAI endpoints: make the API call in handler scope
|
||||||
# (try/except inside async generators is unreliable with Starlette's streaming)
|
# (try/except inside async generators is unreliable with Starlette's streaming)
|
||||||
start_ts = None
|
start_ts = None
|
||||||
|
|
@ -598,7 +598,7 @@ async def _handle_embedding_request(
|
||||||
model = model[0]
|
model = model[0]
|
||||||
client = _make_openai_client(endpoint, api_key=config.api_keys.get(endpoint, "no-key"))
|
client = _make_openai_client(endpoint, api_key=config.api_keys.get(endpoint, "no-key"))
|
||||||
else:
|
else:
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
|
|
||||||
# 3. Async generator body (error handling + cleanup handled by _guarded_stream)
|
# 3. Async generator body (error handling + cleanup handled by _guarded_stream)
|
||||||
async def stream_embedding_response():
|
async def stream_embedding_response():
|
||||||
|
|
@ -688,7 +688,7 @@ async def create_proxy(request: Request):
|
||||||
status_lists = []
|
status_lists = []
|
||||||
|
|
||||||
for endpoint in config.endpoints:
|
for endpoint in config.endpoints:
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
create = await client.create(model=model, quantize=quantize, from_=from_, files=files, adapters=adapters, template=template, license=license, system=system, parameters=parameters, messages=messages, stream=False)
|
create = await client.create(model=model, quantize=quantize, from_=from_, files=files, adapters=adapters, template=template, license=license, system=system, parameters=parameters, messages=messages, stream=False)
|
||||||
status_lists.append(create)
|
status_lists.append(create)
|
||||||
|
|
||||||
|
|
@ -724,7 +724,7 @@ async def show_proxy(request: Request, model: Optional[str] = None):
|
||||||
# 2. Endpoint logic
|
# 2. Endpoint logic
|
||||||
endpoint, _ = await choose_endpoint(model, reserve=False)
|
endpoint, _ = await choose_endpoint(model, reserve=False)
|
||||||
|
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
|
|
||||||
# 3. Proxy a simple show request
|
# 3. Proxy a simple show request
|
||||||
show = await client.show(model=model)
|
show = await client.show(model=model)
|
||||||
|
|
@ -768,7 +768,7 @@ async def copy_proxy(request: Request, source: Optional[str] = None, destination
|
||||||
|
|
||||||
for endpoint in config.endpoints:
|
for endpoint in config.endpoints:
|
||||||
if "/v1" not in endpoint:
|
if "/v1" not in endpoint:
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
# 4. Proxy a simple copy request
|
# 4. Proxy a simple copy request
|
||||||
copy = await client.copy(source=src, destination=dst)
|
copy = await client.copy(source=src, destination=dst)
|
||||||
status_list.append(copy.status)
|
status_list.append(copy.status)
|
||||||
|
|
@ -804,7 +804,7 @@ async def delete_proxy(request: Request, model: Optional[str] = None):
|
||||||
|
|
||||||
for endpoint in config.endpoints:
|
for endpoint in config.endpoints:
|
||||||
if "/v1" not in endpoint:
|
if "/v1" not in endpoint:
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
# 3. Proxy a simple copy request
|
# 3. Proxy a simple copy request
|
||||||
copy = await client.delete(model=model)
|
copy = await client.delete(model=model)
|
||||||
status_list.append(copy.status)
|
status_list.append(copy.status)
|
||||||
|
|
@ -842,7 +842,7 @@ async def pull_proxy(request: Request, model: Optional[str] = None):
|
||||||
|
|
||||||
for endpoint in config.endpoints:
|
for endpoint in config.endpoints:
|
||||||
if "/v1" not in endpoint:
|
if "/v1" not in endpoint:
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
# 3. Proxy a simple pull request
|
# 3. Proxy a simple pull request
|
||||||
pull = await client.pull(model=model, insecure=insecure, stream=False)
|
pull = await client.pull(model=model, insecure=insecure, stream=False)
|
||||||
status_list.append(pull)
|
status_list.append(pull)
|
||||||
|
|
@ -882,7 +882,7 @@ async def push_proxy(request: Request):
|
||||||
status_list = []
|
status_list = []
|
||||||
|
|
||||||
for endpoint in config.endpoints:
|
for endpoint in config.endpoints:
|
||||||
client = ollama.AsyncClient(host=endpoint)
|
client = get_ollama_client(endpoint)
|
||||||
# 3. Proxy a simple push request
|
# 3. Proxy a simple push request
|
||||||
push = await client.push(model=model, insecure=insecure, stream=False)
|
push = await client.push(model=model, insecure=insecure, stream=False)
|
||||||
status_list.append(push)
|
status_list.append(push)
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ populate them once and routes can reuse them.
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
import ollama
|
||||||
import openai
|
import openai
|
||||||
|
|
||||||
from state import app_state
|
from state import app_state
|
||||||
|
|
@ -70,16 +71,42 @@ def get_probe_session(endpoint: str) -> aiohttp.ClientSession:
|
||||||
return app_state.get("probe_session") or app_state["session"]
|
return app_state.get("probe_session") or app_state["session"]
|
||||||
|
|
||||||
|
|
||||||
|
def get_ollama_client(endpoint: str) -> ollama.AsyncClient:
|
||||||
|
"""Return a cached ``ollama.AsyncClient`` for the endpoint, creating it once.
|
||||||
|
|
||||||
|
``ollama.AsyncClient`` wraps an ``httpx.AsyncClient`` whose construction
|
||||||
|
builds an SSL context and reloads the OS trust store (~40 ms). It is safe to
|
||||||
|
reuse concurrently, so we keep one per endpoint instead of building a fresh
|
||||||
|
one on every request — otherwise that 40 ms of CPU runs on the event loop
|
||||||
|
per request and caps single-worker throughput at ~25 req/s.
|
||||||
|
"""
|
||||||
|
cache = app_state["ollama_clients"]
|
||||||
|
client = cache.get(endpoint)
|
||||||
|
if client is None:
|
||||||
|
client = ollama.AsyncClient(host=endpoint)
|
||||||
|
cache[endpoint] = client
|
||||||
|
return client
|
||||||
|
|
||||||
|
|
||||||
def _make_openai_client(
|
def _make_openai_client(
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
default_headers: dict | None = None,
|
default_headers: dict | None = None,
|
||||||
api_key: str = "no-key",
|
api_key: str = "no-key",
|
||||||
) -> openai.AsyncOpenAI:
|
) -> openai.AsyncOpenAI:
|
||||||
"""Return an AsyncOpenAI client configured for the given endpoint.
|
"""Return a cached AsyncOpenAI client configured for the given endpoint.
|
||||||
|
|
||||||
For Unix socket endpoints, injects a pre-created httpx UDS transport
|
Clients are cached per ``(endpoint, api_key)`` and reused across requests:
|
||||||
so the OpenAI SDK connects via the socket instead of TCP.
|
constructing one builds an SSL context and reloads the OS trust store
|
||||||
|
(~40 ms), which serializes the event loop if done per request. For Unix
|
||||||
|
socket endpoints, injects the pre-created httpx UDS transport so the OpenAI
|
||||||
|
SDK connects via the socket instead of TCP.
|
||||||
"""
|
"""
|
||||||
|
cache = app_state["openai_clients"]
|
||||||
|
cache_key = (endpoint, api_key)
|
||||||
|
client = cache.get(cache_key)
|
||||||
|
if client is not None:
|
||||||
|
return client
|
||||||
|
|
||||||
base_url = ep2base(endpoint)
|
base_url = ep2base(endpoint)
|
||||||
kwargs: dict = {"api_key": api_key}
|
kwargs: dict = {"api_key": api_key}
|
||||||
if default_headers is not None:
|
if default_headers is not None:
|
||||||
|
|
@ -89,4 +116,6 @@ def _make_openai_client(
|
||||||
if http_client is not None:
|
if http_client is not None:
|
||||||
kwargs["http_client"] = http_client
|
kwargs["http_client"] = http_client
|
||||||
base_url = "http://localhost/v1"
|
base_url = "http://localhost/v1"
|
||||||
return openai.AsyncOpenAI(base_url=base_url, **kwargs)
|
client = openai.AsyncOpenAI(base_url=base_url, **kwargs)
|
||||||
|
cache[cache_key] = client
|
||||||
|
return client
|
||||||
|
|
|
||||||
34
router.py
34
router.py
|
|
@ -215,6 +215,7 @@ from backends.sessions import (
|
||||||
_get_socket_path,
|
_get_socket_path,
|
||||||
get_session,
|
get_session,
|
||||||
_make_openai_client,
|
_make_openai_client,
|
||||||
|
get_ollama_client,
|
||||||
)
|
)
|
||||||
from backends.health import (
|
from backends.health import (
|
||||||
_is_fresh,
|
_is_fresh,
|
||||||
|
|
@ -375,6 +376,25 @@ async def startup_event() -> None:
|
||||||
app_state["httpx_clients"][ep] = httpx.AsyncClient(transport=transport, timeout=300.0)
|
app_state["httpx_clients"][ep] = httpx.AsyncClient(transport=transport, timeout=300.0)
|
||||||
print(f"[startup] Unix socket session: {ep} -> {sock_path}")
|
print(f"[startup] Unix socket session: {ep} -> {sock_path}")
|
||||||
|
|
||||||
|
# Pre-create long-lived backend clients so the expensive SSL-context /
|
||||||
|
# trust-store construction (~40 ms each) happens once here instead of on the
|
||||||
|
# request path. Ollama endpoints are reached via both the native ollama
|
||||||
|
# client (/api/chat, /api/generate) and the OpenAI client (/v1/* routes),
|
||||||
|
# so warm both; OpenAI-compatible endpoints only need the OpenAI client.
|
||||||
|
_warm_endpoints = config.endpoints + [
|
||||||
|
ep for ep in config.llama_server_endpoints if ep not in config.endpoints
|
||||||
|
]
|
||||||
|
for ep in _warm_endpoints:
|
||||||
|
try:
|
||||||
|
if not is_openai_compatible(ep):
|
||||||
|
get_ollama_client(ep)
|
||||||
|
_make_openai_client(
|
||||||
|
ep, default_headers=default_headers,
|
||||||
|
api_key=config.api_keys.get(ep, "no-key"),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[startup] Backend client pre-warm failed for {ep}: {e}")
|
||||||
|
|
||||||
token_worker_task = asyncio.create_task(token_worker())
|
token_worker_task = asyncio.create_task(token_worker())
|
||||||
flush_task = asyncio.create_task(flush_buffer())
|
flush_task = asyncio.create_task(flush_buffer())
|
||||||
await init_llm_cache(config)
|
await init_llm_cache(config)
|
||||||
|
|
@ -415,6 +435,20 @@ async def shutdown_event() -> None:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[shutdown] Error closing httpx client {ep}: {e}")
|
print(f"[shutdown] Error closing httpx client {ep}: {e}")
|
||||||
|
|
||||||
|
# Close cached backend clients (reused across requests; see startup pre-warm).
|
||||||
|
for key, client in list(app_state.get("ollama_clients", {}).items()):
|
||||||
|
try:
|
||||||
|
await client._client.aclose()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[shutdown] Error closing ollama client {key}: {e}")
|
||||||
|
app_state["ollama_clients"].clear()
|
||||||
|
for key, client in list(app_state.get("openai_clients", {}).items()):
|
||||||
|
try:
|
||||||
|
await client.close()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[shutdown] Error closing openai client {key}: {e}")
|
||||||
|
app_state["openai_clients"].clear()
|
||||||
|
|
||||||
# Close the aiosqlite connection last — its worker thread is non-daemon
|
# Close the aiosqlite connection last — its worker thread is non-daemon
|
||||||
# and would otherwise keep the interpreter alive after lifespan completes.
|
# and would otherwise keep the interpreter alive after lifespan completes.
|
||||||
if db is not None:
|
if db is not None:
|
||||||
|
|
|
||||||
7
state.py
7
state.py
|
|
@ -69,6 +69,13 @@ app_state = {
|
||||||
"probe_connector": None, # connection pool isolated from proxy traffic
|
"probe_connector": None, # connection pool isolated from proxy traffic
|
||||||
"socket_sessions": {}, # endpoint -> aiohttp.ClientSession(UnixConnector) for .sock endpoints
|
"socket_sessions": {}, # endpoint -> aiohttp.ClientSession(UnixConnector) for .sock endpoints
|
||||||
"httpx_clients": {}, # endpoint -> httpx.AsyncClient(UDS transport) for .sock endpoints
|
"httpx_clients": {}, # endpoint -> httpx.AsyncClient(UDS transport) for .sock endpoints
|
||||||
|
# Long-lived backend clients, reused across requests. Constructing these is
|
||||||
|
# expensive (~40 ms each — every new client builds an SSL context and reloads
|
||||||
|
# the OS trust store via truststore), so building one per request serializes
|
||||||
|
# the event loop and caps throughput. Created once at startup, closed on
|
||||||
|
# shutdown. See backends.sessions.get_ollama_client / _make_openai_client.
|
||||||
|
"ollama_clients": {}, # endpoint -> ollama.AsyncClient
|
||||||
|
"openai_clients": {}, # (endpoint, api_key) -> openai.AsyncOpenAI
|
||||||
}
|
}
|
||||||
|
|
||||||
# Default outbound HTTP headers attached to every backend request.
|
# Default outbound HTTP headers attached to every backend request.
|
||||||
|
|
|
||||||
|
|
@ -80,8 +80,10 @@ def _patches(exc, mark_unhealthy):
|
||||||
stack.enter_context(patch("api.ollama.is_openai_compatible", lambda ep: False))
|
stack.enter_context(patch("api.ollama.is_openai_compatible", lambda ep: False))
|
||||||
stack.enter_context(patch("api.ollama.decrement_usage", AsyncMock()))
|
stack.enter_context(patch("api.ollama.decrement_usage", AsyncMock()))
|
||||||
stack.enter_context(patch("api.ollama._mark_backend_unhealthy", mark_unhealthy))
|
stack.enter_context(patch("api.ollama._mark_backend_unhealthy", mark_unhealthy))
|
||||||
|
# The native path now fetches a cached client via get_ollama_client() rather
|
||||||
|
# than constructing ollama.AsyncClient inline, so patch that seam.
|
||||||
stack.enter_context(
|
stack.enter_context(
|
||||||
patch("api.ollama.ollama.AsyncClient", lambda *a, **k: _FakeAsyncClient(exc))
|
patch("api.ollama.get_ollama_client", lambda *a, **k: _FakeAsyncClient(exc))
|
||||||
)
|
)
|
||||||
return stack
|
return stack
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue