Add files via upload
BREAKING CHANGE: - new config.yaml config block - new dependency: httpx-aiohttp for faster endpoint queries in bigger installations - new dynamic dashboard
This commit is contained in:
parent
20790d95ed
commit
b3b67fdbf2
3 changed files with 191 additions and 203 deletions
37
router.py
37
router.py
|
|
@ -7,6 +7,7 @@ license: AGPL
|
|||
"""
|
||||
# -------------------------------------------------------------
|
||||
import json, time, asyncio, yaml, httpx, ollama, openai, os, re
|
||||
from httpx_aiohttp import AiohttpTransport
|
||||
from pathlib import Path
|
||||
from typing import Dict, Set, List, Optional
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
|
|
@ -96,11 +97,12 @@ def get_httpx_client(endpoint: str) -> httpx.AsyncClient:
|
|||
"""
|
||||
return httpx.AsyncClient(
|
||||
base_url=endpoint,
|
||||
timeout=httpx.Timeout(5.0, read=5.0, write=5.0, connect=5.0),
|
||||
limits=httpx.Limits(
|
||||
max_keepalive_connections=64,
|
||||
max_connections=64
|
||||
)
|
||||
timeout=httpx.Timeout(5.0, read=5.0, write=None, connect=5.0),
|
||||
#limits=httpx.Limits(
|
||||
# max_keepalive_connections=64,
|
||||
# max_connections=64
|
||||
#),
|
||||
transport=AiohttpTransport()
|
||||
)
|
||||
|
||||
async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]:
|
||||
|
|
@ -133,8 +135,8 @@ async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -
|
|||
# Error expired – remove it
|
||||
del _error_cache[endpoint]
|
||||
|
||||
client = get_httpx_client(endpoint)
|
||||
try:
|
||||
client = get_httpx_client(endpoint)
|
||||
if "/v1" in endpoint:
|
||||
resp = await client.get(f"/models", headers=headers)
|
||||
else:
|
||||
|
|
@ -147,7 +149,7 @@ async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -
|
|||
models = {m.get("id") for m in data.get("data", []) if m.get("id")}
|
||||
else:
|
||||
models = {m.get("name") for m in data.get("models", []) if m.get("name")}
|
||||
|
||||
|
||||
if models:
|
||||
_models_cache[endpoint] = (models, time.time())
|
||||
return models
|
||||
|
|
@ -160,6 +162,8 @@ async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -
|
|||
print(f"[fetch_available_models] {endpoint} error: {e}")
|
||||
_error_cache[endpoint] = time.time()
|
||||
return set()
|
||||
finally:
|
||||
await client.aclose()
|
||||
|
||||
|
||||
async def fetch_loaded_models(endpoint: str) -> Set[str]:
|
||||
|
|
@ -168,8 +172,8 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]:
|
|||
loaded on that endpoint. If the request fails (e.g. timeout, 5xx), an empty
|
||||
set is returned.
|
||||
"""
|
||||
client = get_httpx_client(endpoint)
|
||||
try:
|
||||
client = get_httpx_client(endpoint)
|
||||
resp = await client.get(f"/api/ps")
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
|
@ -180,6 +184,8 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]:
|
|||
except Exception:
|
||||
# If anything goes wrong we simply assume the endpoint has no models
|
||||
return set()
|
||||
finally:
|
||||
await client.aclose()
|
||||
|
||||
async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]:
|
||||
"""
|
||||
|
|
@ -189,8 +195,9 @@ async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key
|
|||
headers = None
|
||||
if api_key is not None:
|
||||
headers = {"Authorization": "Bearer " + api_key}
|
||||
client = get_httpx_client(endpoint)
|
||||
|
||||
try:
|
||||
client = get_httpx_client(endpoint)
|
||||
resp = await client.get(f"{route}", headers=headers)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
|
@ -200,6 +207,8 @@ async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key
|
|||
# If anything goes wrong we cannot reply details
|
||||
print(e)
|
||||
return []
|
||||
finally:
|
||||
await client.aclose()
|
||||
|
||||
def ep2base(ep):
|
||||
if "/v1" in ep:
|
||||
|
|
@ -235,8 +244,8 @@ async def decrement_usage(endpoint: str, model: str) -> None:
|
|||
# Optionally, clean up zero entries
|
||||
if usage_counts[endpoint].get(model, 0) == 0:
|
||||
usage_counts[endpoint].pop(model, None)
|
||||
if not usage_counts[endpoint]:
|
||||
usage_counts.pop(endpoint, None)
|
||||
#if not usage_counts[endpoint]:
|
||||
# usage_counts.pop(endpoint, None)
|
||||
|
||||
# -------------------------------------------------------------
|
||||
# 5. Endpoint selection logic (respecting the configurable limit)
|
||||
|
|
@ -640,7 +649,7 @@ async def show_proxy(request: Request, model: Optional[str] = None):
|
|||
|
||||
# 2. Endpoint logic
|
||||
endpoint = await choose_endpoint(model)
|
||||
await increment_usage(endpoint, model)
|
||||
#await increment_usage(endpoint, model)
|
||||
client = ollama.AsyncClient(host=endpoint)
|
||||
|
||||
# 3. Proxy a simple show request
|
||||
|
|
@ -907,7 +916,7 @@ async def config_proxy(request: Request):
|
|||
"""
|
||||
async def check_endpoint(url: str):
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=1) as client:
|
||||
async with httpx.AsyncClient(timeout=1, transport=AiohttpTransport()) as client:
|
||||
if "/v1" in url:
|
||||
headers = {"Authorization": "Bearer " + config.api_keys[url]}
|
||||
r = await client.get(f"{url}/models", headers=headers)
|
||||
|
|
@ -921,6 +930,8 @@ async def config_proxy(request: Request):
|
|||
return {"url": url, "status": "ok", "version": data.get("version")}
|
||||
except Exception as exc:
|
||||
return {"url": url, "status": "error", "detail": str(exc)}
|
||||
finally:
|
||||
await client.aclose()
|
||||
|
||||
results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints])
|
||||
return {"endpoints": results}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue