All checks were successful
Build and Publish Docker Image (Semantic Cache) / build (amd64, linux/amd64, docker-amd64) (push) Successful in 3m59s
Build and Publish Docker Image / build (amd64, linux/amd64, docker-amd64) (push) Successful in 1m25s
Build and Publish Docker Image / build (arm64, linux/arm64, docker-arm64) (push) Successful in 12m46s
Build and Publish Docker Image / merge (push) Successful in 33s
Build and Publish Docker Image (Semantic Cache) / build (arm64, linux/arm64, docker-arm64) (push) Successful in 19m56s
Build and Publish Docker Image (Semantic Cache) / merge (push) Successful in 33s
121 lines
4.5 KiB
Python
121 lines
4.5 KiB
Python
"""aiohttp / OpenAI client factories aware of Unix-socket endpoints.
|
|
|
|
Unix socket endpoints follow the ``.sock`` hostname convention (e.g.
|
|
``http://192.168.0.52.sock/v1``) and resolve to ``/run/user/<uid>/<host>``.
|
|
Their sessions/clients live in ``state.app_state`` so that startup can
|
|
populate them once and routes can reuse them.
|
|
"""
|
|
import os
|
|
|
|
import aiohttp
|
|
import ollama
|
|
import openai
|
|
|
|
from state import app_state
|
|
from backends.normalize import ep2base
|
|
|
|
|
|
def _is_unix_socket_endpoint(endpoint: str) -> bool:
|
|
"""Return True if endpoint uses Unix socket (.sock hostname convention).
|
|
|
|
Detects URLs like http://192.168.0.52.sock/v1 where the host ends with
|
|
.sock, indicating the connection should use a Unix domain socket at
|
|
/tmp/<host> instead of TCP.
|
|
"""
|
|
try:
|
|
host = endpoint.split("//", 1)[1].split("/")[0].split(":")[0]
|
|
return host.endswith(".sock")
|
|
except IndexError:
|
|
return False
|
|
|
|
|
|
def _get_socket_path(endpoint: str) -> str:
|
|
"""Derive Unix socket file path from a .sock endpoint URL.
|
|
|
|
http://192.168.0.52.sock/v1 -> /run/user/<uid>/192.168.0.52.sock
|
|
"""
|
|
host = endpoint.split("//", 1)[1].split("/")[0].split(":")[0]
|
|
return f"/run/user/{os.getuid()}/{host}"
|
|
|
|
|
|
def get_session(endpoint: str) -> aiohttp.ClientSession:
|
|
"""Return the appropriate aiohttp session for the given endpoint.
|
|
|
|
Unix socket endpoints (.sock) get their own UnixConnector session.
|
|
All other endpoints share the main TCP session.
|
|
"""
|
|
if _is_unix_socket_endpoint(endpoint):
|
|
sess = app_state["socket_sessions"].get(endpoint)
|
|
if sess is not None:
|
|
return sess
|
|
return app_state["session"]
|
|
|
|
|
|
def get_probe_session(endpoint: str) -> aiohttp.ClientSession:
|
|
"""Return the session used for lightweight health/introspection probes.
|
|
|
|
Probes (available/loaded models, endpoint health) run on a connection
|
|
pool kept separate from the proxy/streaming session, so a burst of
|
|
long-lived completion requests cannot starve them — otherwise a probe
|
|
would queue waiting for a connection, hit its deadline, and mark a
|
|
perfectly healthy endpoint as unavailable under load.
|
|
|
|
Unix socket endpoints keep their dedicated per-endpoint session. TCP
|
|
endpoints use the shared probe session, falling back to the main
|
|
session when the probe pool has not been initialised (e.g. in tests).
|
|
"""
|
|
if _is_unix_socket_endpoint(endpoint):
|
|
sess = app_state["socket_sessions"].get(endpoint)
|
|
if sess is not None:
|
|
return sess
|
|
return app_state.get("probe_session") or app_state["session"]
|
|
|
|
|
|
def get_ollama_client(endpoint: str) -> ollama.AsyncClient:
|
|
"""Return a cached ``ollama.AsyncClient`` for the endpoint, creating it once.
|
|
|
|
``ollama.AsyncClient`` wraps an ``httpx.AsyncClient`` whose construction
|
|
builds an SSL context and reloads the OS trust store (~40 ms). It is safe to
|
|
reuse concurrently, so we keep one per endpoint instead of building a fresh
|
|
one on every request — otherwise that 40 ms of CPU runs on the event loop
|
|
per request and caps single-worker throughput at ~25 req/s.
|
|
"""
|
|
cache = app_state["ollama_clients"]
|
|
client = cache.get(endpoint)
|
|
if client is None:
|
|
client = ollama.AsyncClient(host=endpoint)
|
|
cache[endpoint] = client
|
|
return client
|
|
|
|
|
|
def _make_openai_client(
|
|
endpoint: str,
|
|
default_headers: dict | None = None,
|
|
api_key: str = "no-key",
|
|
) -> openai.AsyncOpenAI:
|
|
"""Return a cached AsyncOpenAI client configured for the given endpoint.
|
|
|
|
Clients are cached per ``(endpoint, api_key)`` and reused across requests:
|
|
constructing one builds an SSL context and reloads the OS trust store
|
|
(~40 ms), which serializes the event loop if done per request. For Unix
|
|
socket endpoints, injects the pre-created httpx UDS transport so the OpenAI
|
|
SDK connects via the socket instead of TCP.
|
|
"""
|
|
cache = app_state["openai_clients"]
|
|
cache_key = (endpoint, api_key)
|
|
client = cache.get(cache_key)
|
|
if client is not None:
|
|
return client
|
|
|
|
base_url = ep2base(endpoint)
|
|
kwargs: dict = {"api_key": api_key}
|
|
if default_headers is not None:
|
|
kwargs["default_headers"] = default_headers
|
|
if _is_unix_socket_endpoint(endpoint):
|
|
http_client = app_state["httpx_clients"].get(endpoint)
|
|
if http_client is not None:
|
|
kwargs["http_client"] = http_client
|
|
base_url = "http://localhost/v1"
|
|
client = openai.AsyncOpenAI(base_url=base_url, **kwargs)
|
|
cache[cache_key] = client
|
|
return client
|