moving from httpx to aiohttp

This commit is contained in:
Alpha Nerd 2025-09-09 17:08:00 +02:00
parent b6a9aa82cb
commit d3e4555c8c
2 changed files with 43 additions and 60 deletions

View file

@ -14,8 +14,6 @@ fastapi-sse==1.1.1
frozenlist==1.7.0 frozenlist==1.7.0
h11==0.16.0 h11==0.16.0
httpcore==1.0.9 httpcore==1.0.9
httpx==0.28.1
httpx-aiohttp==0.1.8
idna==3.10 idna==3.10
jiter==0.10.0 jiter==0.10.0
multidict==6.6.4 multidict==6.6.4

View file

@ -6,8 +6,7 @@ version: 0.2.2
license: AGPL license: AGPL
""" """
# ------------------------------------------------------------- # -------------------------------------------------------------
import json, time, asyncio, yaml, httpx, ollama, openai, os, re import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp
from httpx_aiohttp import AiohttpTransport
from pathlib import Path from pathlib import Path
from typing import Dict, Set, List, Optional from typing import Dict, Set, List, Optional
from fastapi import FastAPI, Request, HTTPException from fastapi import FastAPI, Request, HTTPException
@ -95,23 +94,15 @@ usage_lock = asyncio.Lock() # protects access to usage_counts
# ------------------------------------------------------------- # -------------------------------------------------------------
# 4. Helperfunctions # 4. Helperfunctions
# ------------------------------------------------------------- # -------------------------------------------------------------
aiotimeout = aiohttp.ClientTimeout(total=5)
def _is_fresh(cached_at: float, ttl: int) -> bool: def _is_fresh(cached_at: float, ttl: int) -> bool:
return (time.time() - cached_at) < ttl return (time.time() - cached_at) < ttl
def get_httpx_client(endpoint: str) -> httpx.AsyncClient: async def _ensure_success(resp: aiohttp.ClientResponse) -> None:
""" if resp.status >= 400:
Use persistent connections to request endpoint info for reliable results text = await resp.text()
in high load situations or saturated endpoints. raise HTTPException(status_code=resp.status, detail=text)
"""
return httpx.AsyncClient(
base_url=endpoint,
timeout=httpx.Timeout(5.0, read=5.0, write=None, connect=5.0),
#limits=httpx.Limits(
# max_keepalive_connections=64,
# max_connections=64
#),
transport=AiohttpTransport()
)
async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]:
""" """
@ -143,20 +134,20 @@ async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -
# Error expired remove it # Error expired remove it
del _error_cache[endpoint] del _error_cache[endpoint]
if "/v1" in endpoint:
endpoint_url = f"{endpoint}/models"
key = "data"
else:
endpoint_url = f"{endpoint}/api/tags"
key = "models"
try: try:
client = get_httpx_client(endpoint) async with aiohttp.ClientSession(timeout=aiotimeout) as client:
if "/v1" in endpoint: async with client.get(endpoint_url, headers=headers) as resp:
resp = await client.get(f"/models", headers=headers) await _ensure_success(resp)
else: data = await resp.json()
resp = await client.get(f"/api/tags")
resp.raise_for_status() items = data.get(key, [])
data = resp.json() models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")}
# Expected format:
# {"models": [{"name": "model1"}, {"name": "model2"}]}
if "/v1" in endpoint:
models = {m.get("id") for m in data.get("data", []) if m.get("id")}
else:
models = {m.get("name") for m in data.get("models", []) if m.get("name")}
if models: if models:
_models_cache[endpoint] = (models, time.time()) _models_cache[endpoint] = (models, time.time())
@ -170,8 +161,6 @@ async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -
print(f"[fetch_available_models] {endpoint} error: {e}") print(f"[fetch_available_models] {endpoint} error: {e}")
_error_cache[endpoint] = time.time() _error_cache[endpoint] = time.time()
return set() return set()
finally:
await client.aclose()
async def fetch_loaded_models(endpoint: str) -> Set[str]: async def fetch_loaded_models(endpoint: str) -> Set[str]:
@ -181,10 +170,10 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]:
set is returned. set is returned.
""" """
try: try:
client = get_httpx_client(endpoint) async with aiohttp.ClientSession(timeout=aiotimeout) as client:
resp = await client.get(f"/api/ps") async with client.get(f"/api/ps") as resp:
resp.raise_for_status() await _ensure_success(resp)
data = resp.json() data = await resp.json()
# The response format is: # The response format is:
# {"models": [{"name": "model1"}, {"name": "model2"}]} # {"models": [{"name": "model1"}, {"name": "model2"}]}
models = {m.get("name") for m in data.get("models", []) if m.get("name")} models = {m.get("name") for m in data.get("models", []) if m.get("name")}
@ -192,8 +181,6 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]:
except Exception: except Exception:
# If anything goes wrong we simply assume the endpoint has no models # If anything goes wrong we simply assume the endpoint has no models
return set() return set()
finally:
await client.aclose()
async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]: async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]:
""" """
@ -205,18 +192,16 @@ async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key
headers = {"Authorization": "Bearer " + api_key} headers = {"Authorization": "Bearer " + api_key}
try: try:
client = get_httpx_client(endpoint) async with aiohttp.ClientSession(timeout=aiotimeout) as client:
resp = await client.get(f"{route}", headers=headers) async with client.get(f"{endpoint}{route}", headers=headers) as resp:
resp.raise_for_status() await _ensure_success(resp)
data = resp.json() data = await resp.json()
detail = data.get(detail, []) detail = data.get(detail, [])
return detail return detail
except Exception as e: except Exception as e:
# If anything goes wrong we cannot reply details # If anything goes wrong we cannot reply details
print(e) print(e)
return [] return []
finally:
await client.aclose()
def ep2base(ep): def ep2base(ep):
if "/v1" in ep: if "/v1" in ep:
@ -960,22 +945,22 @@ async def config_proxy(request: Request):
""" """
async def check_endpoint(url: str): async def check_endpoint(url: str):
try: try:
async with httpx.AsyncClient(timeout=1, transport=AiohttpTransport()) as client: async with aiohttp.ClientSession(timeout=aiotimeout) as client:
if "/v1" in url: if "/v1" in url:
headers = {"Authorization": "Bearer " + config.api_keys[url]} headers = {"Authorization": "Bearer " + config.api_keys[url]}
r = await client.get(f"{url}/models", headers=headers) async with client.get(f"{url}/models", headers=headers) as resp:
await _ensure_success(resp)
data = await resp.json()
else: else:
r = await client.get(f"{url}/api/version") async with client.get(f"{url}/api/version") as resp:
r.raise_for_status() await _ensure_success(resp)
data = r.json() data = await resp.json()
if "/v1" in url: if "/v1" in url:
return {"url": url, "status": "ok", "version": "latest"} return {"url": url, "status": "ok", "version": "latest"}
else: else:
return {"url": url, "status": "ok", "version": data.get("version")} return {"url": url, "status": "ok", "version": data.get("version")}
except Exception as exc: except Exception as exc:
return {"url": url, "status": "error", "detail": str(exc)} return {"url": url, "status": "error", "detail": str(exc)}
finally:
await client.aclose()
results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints]) results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints])
return {"endpoints": results} return {"endpoints": results}