using global aiohttp sessionpool for improved performance
This commit is contained in:
parent
d3e4555c8c
commit
2813ecb044
2 changed files with 54 additions and 33 deletions
81
router.py
81
router.py
|
|
@ -2,11 +2,11 @@
|
|||
title: NOMYO Router - an Ollama Proxy with Endpoint:Model aware routing
|
||||
author: alpha-nerd-nomyo
|
||||
author_url: https://github.com/nomyo-ai
|
||||
version: 0.2.2
|
||||
version: 0.3
|
||||
license: AGPL
|
||||
"""
|
||||
# -------------------------------------------------------------
|
||||
import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp
|
||||
import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl
|
||||
from pathlib import Path
|
||||
from typing import Dict, Set, List, Optional
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
|
|
@ -32,6 +32,14 @@ _error_cache: dict[str, float] = {}
|
|||
_subscribers: Set[asyncio.Queue] = set()
|
||||
_subscribers_lock = asyncio.Lock()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# aiohttp Global Sessions
|
||||
# ------------------------------------------------------------------
|
||||
app_state = {
|
||||
"session": None,
|
||||
"connector": None,
|
||||
}
|
||||
|
||||
# -------------------------------------------------------------
|
||||
# 1. Configuration loader
|
||||
# -------------------------------------------------------------
|
||||
|
|
@ -140,11 +148,11 @@ async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -
|
|||
else:
|
||||
endpoint_url = f"{endpoint}/api/tags"
|
||||
key = "models"
|
||||
client: aiohttp.ClientSession = app_state["session"]
|
||||
try:
|
||||
async with aiohttp.ClientSession(timeout=aiotimeout) as client:
|
||||
async with client.get(endpoint_url, headers=headers) as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
async with client.get(endpoint_url, headers=headers) as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
|
||||
items = data.get(key, [])
|
||||
models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")}
|
||||
|
|
@ -169,11 +177,11 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]:
|
|||
loaded on that endpoint. If the request fails (e.g. timeout, 5xx), an empty
|
||||
set is returned.
|
||||
"""
|
||||
client: aiohttp.ClientSession = app_state["session"]
|
||||
try:
|
||||
async with aiohttp.ClientSession(timeout=aiotimeout) as client:
|
||||
async with client.get(f"/api/ps") as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
async with client.get(f"/api/ps") as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
# The response format is:
|
||||
# {"models": [{"name": "model1"}, {"name": "model2"}]}
|
||||
models = {m.get("name") for m in data.get("models", []) if m.get("name")}
|
||||
|
|
@ -187,15 +195,15 @@ async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key
|
|||
Query <endpoint>/<route> to fetch <detail> and return a List of dicts with details
|
||||
for the corresponding Ollama endpoint. If the request fails we respond with "N/A" for detail.
|
||||
"""
|
||||
client: aiohttp.ClientSession = app_state["session"]
|
||||
headers = None
|
||||
if api_key is not None:
|
||||
headers = {"Authorization": "Bearer " + api_key}
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession(timeout=aiotimeout) as client:
|
||||
async with client.get(f"{endpoint}{route}", headers=headers) as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
async with client.get(f"{endpoint}{route}", headers=headers) as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
detail = data.get(detail, [])
|
||||
return detail
|
||||
except Exception as e:
|
||||
|
|
@ -945,20 +953,20 @@ async def config_proxy(request: Request):
|
|||
"""
|
||||
async def check_endpoint(url: str):
|
||||
try:
|
||||
async with aiohttp.ClientSession(timeout=aiotimeout) as client:
|
||||
if "/v1" in url:
|
||||
headers = {"Authorization": "Bearer " + config.api_keys[url]}
|
||||
async with client.get(f"{url}/models", headers=headers) as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
else:
|
||||
async with client.get(f"{url}/api/version") as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
if "/v1" in url:
|
||||
return {"url": url, "status": "ok", "version": "latest"}
|
||||
else:
|
||||
return {"url": url, "status": "ok", "version": data.get("version")}
|
||||
client: aiohttp.ClientSession = app_state["session"]
|
||||
if "/v1" in url:
|
||||
headers = {"Authorization": "Bearer " + config.api_keys[url]}
|
||||
async with client.get(f"{url}/models", headers=headers) as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
else:
|
||||
async with client.get(f"{url}/api/version") as resp:
|
||||
await _ensure_success(resp)
|
||||
data = await resp.json()
|
||||
if "/v1" in url:
|
||||
return {"url": url, "status": "ok", "version": "latest"}
|
||||
else:
|
||||
return {"url": url, "status": "ok", "version": data.get("version")}
|
||||
except Exception as exc:
|
||||
return {"url": url, "status": "error", "detail": str(exc)}
|
||||
|
||||
|
|
@ -1327,7 +1335,7 @@ async def usage_stream(request: Request):
|
|||
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
||||
|
||||
# -------------------------------------------------------------
|
||||
# 28. FastAPI startup event – load configuration
|
||||
# 28. FastAPI startup/shutdown events
|
||||
# -------------------------------------------------------------
|
||||
@app.on_event("startup")
|
||||
async def startup_event() -> None:
|
||||
|
|
@ -1335,4 +1343,17 @@ async def startup_event() -> None:
|
|||
# Load YAML config (or use defaults if not present)
|
||||
config = Config.from_yaml(Path("config.yaml"))
|
||||
print(f"Loaded configuration:\n endpoints={config.endpoints},\n "
|
||||
f"max_concurrent_connections={config.max_concurrent_connections}")
|
||||
f"max_concurrent_connections={config.max_concurrent_connections}")
|
||||
|
||||
ssl_context = ssl.create_default_context()
|
||||
connector = aiohttp.TCPConnector(limit=0, limit_per_host=512, ssl=ssl_context)
|
||||
timeout = aiohttp.ClientTimeout(total=5, connect=5, sock_read=120, sock_connect=5)
|
||||
session = aiohttp.ClientSession(connector=connector, timeout=timeout)
|
||||
|
||||
app_state["connector"] = connector
|
||||
app_state["session"] = session
|
||||
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_event() -> None:
|
||||
await app_state["session"].close()
|
||||
|
|
@ -21,9 +21,9 @@
|
|||
top: 1rem; /* distance from top edge */
|
||||
right: 1rem; /* distance from right edge */
|
||||
cursor: pointer;
|
||||
min-width: 2.5rem;
|
||||
min-height: 2.5rem;
|
||||
font-size: 1.5rem;
|
||||
min-width: 1rem;
|
||||
min-height: 1rem;
|
||||
font-size: 1rem;
|
||||
}
|
||||
.tables-wrapper {
|
||||
display: flex;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue