nomyo-router/router.py

471 lines
17 KiB
Python
Raw Permalink Normal View History

2025-08-26 18:19:43 +02:00
"""
2026-03-05 11:09:20 +01:00
title: NOMYO Router - an (O)llama and OpenAI API v1 Proxy with Endpoint:Model aware routing
2025-08-26 18:19:43 +02:00
author: alpha-nerd-nomyo
author_url: https://github.com/nomyo-ai
2026-05-13 14:59:05 +02:00
version: 0.9
2025-08-26 18:19:43 +02:00
license: AGPL
"""
# -------------------------------------------------------------
import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, random, base64, io, enhance, secrets, math, socket, httpx, hashlib
try:
import truststore; truststore.inject_into_ssl()
except ImportError:
pass
from datetime import datetime, timezone
2025-08-26 18:19:43 +02:00
from pathlib import Path
# Directory containing static files (relative to this script)
STATIC_DIR = Path(__file__).parent / "static"
2025-09-05 12:11:31 +02:00
from typing import Dict, Set, List, Optional
from urllib.parse import urlparse, parse_qsl, urlencode
2025-08-26 18:19:43 +02:00
from fastapi import FastAPI, Request, HTTPException
2025-09-05 12:11:31 +02:00
from fastapi_sse import sse_handler
from fastapi.staticfiles import StaticFiles
2025-09-11 09:46:19 +02:00
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse, JSONResponse, Response, HTMLResponse, RedirectResponse
2025-08-26 18:19:43 +02:00
from pydantic import Field
from pydantic_settings import BaseSettings
from collections import defaultdict
from PIL import Image
2026-05-19 10:05:27 +02:00
from security import _mask_secrets
from context_window import (
_count_message_tokens,
_trim_messages_for_context,
_calibrated_trim_target,
_endpoint_nctx,
_CTX_TRIM_SMALL_LIMIT,
)
2026-05-19 11:18:06 +02:00
from state import (
_models_cache,
_loaded_models_cache,
_available_error_cache,
_loaded_error_cache,
_completion_error_cache,
_COMPLETION_ERROR_TTL,
_models_cache_lock,
_loaded_models_cache_lock,
_available_error_cache_lock,
_loaded_error_cache_lock,
_completion_error_cache_lock,
_inflight_available_models,
_inflight_loaded_models,
_inflight_lock,
_bg_refresh_available,
_bg_refresh_loaded,
_bg_refresh_lock,
_subscribers,
_subscribers_lock,
token_queue,
app_state,
token_buffer,
time_series_buffer,
buffer_lock,
FLUSH_INTERVAL,
)
2026-05-19 11:18:06 +02:00
# Rebound on startup — must stay in router.py module namespace.
token_worker_task: asyncio.Task | None = None
flush_task: asyncio.Task | None = None
2026-05-19 11:00:50 +02:00
from config import Config, _config_path_from_env
from ollama._types import TokenLogprob, Logprob
from db import TokenDatabase
2026-03-08 09:12:09 +01:00
from cache import init_llm_cache, get_llm_cache, openai_nonstream_to_sse
2026-05-19 12:05:51 +02:00
# Create the global config object it will be overwritten on startup.
# Submodules read it lazily via config.get_config().
config = Config.from_yaml(_config_path_from_env())
2025-08-26 18:19:43 +02:00
# -------------------------------------------------------------
# 2. FastAPI application
# -------------------------------------------------------------
app = FastAPI()
2025-09-05 12:11:31 +02:00
sse_handler.app = app
2025-09-11 09:46:19 +02:00
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
2026-05-19 12:05:51 +02:00
from state import default_headers
# -------------------------------------------------------------
# Router-level authentication (optional)
# -------------------------------------------------------------
def _extract_router_api_key(request: Request) -> Optional[str]:
"""
Extract the provided router API key from the Authorization header or `api_key`
query parameter. The middleware uses this to gate access to API routes when
a router_api_key is configured.
"""
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.lower().startswith("bearer "):
key = auth_header.split(" ", 1)[1].strip()
if key: # Ensure key is not empty
return key
query_key = request.query_params.get("api_key")
if query_key:
return query_key
return None
def _strip_api_key_from_scope(request: Request) -> None:
"""
Remove api_key from the ASGI scope query string to avoid leaking it in logs.
"""
scope = request.scope
raw_qs = scope.get("query_string", b"")
if not raw_qs:
return
params = parse_qsl(raw_qs.decode("utf-8"), keep_blank_values=True)
filtered = [(k, v) for (k, v) in params if k != "api_key"]
scope["query_string"] = urlencode(filtered).encode("utf-8")
@app.middleware("http")
async def enforce_router_api_key(request: Request, call_next):
"""
Enforce the optional NOMYO Router API key for all non-static requests.
When `config.router_api_key` is set, clients must supply the key either in
the Authorization header (`Bearer <key>`) or as `api_key` query parameter.
"""
expected_key = config.router_api_key
if not expected_key or request.method == "OPTIONS":
return await call_next(request)
path = request.url.path
# Allow static assets (CSS, JS, images, fonts) but NOT HTML pages,
# which would bypass auth by accessing /static/index.html directly.
_STATIC_ASSET_EXTS = {".css", ".js", ".ico", ".png", ".jpg", ".jpeg", ".svg", ".woff", ".woff2", ".ttf", ".map"}
is_static_asset = path.startswith("/static") and Path(path).suffix.lower() in _STATIC_ASSET_EXTS
if is_static_asset or path in {"/", "/favicon.ico"}:
return await call_next(request)
provided_key = _extract_router_api_key(request)
# Strip the api_key query param from scope so access logs do not leak it
_strip_api_key_from_scope(request)
if provided_key is None:
# No key provided but authentication is required - return 401
headers = {}
if "/api/" in path and path != "/api/usage-stream":
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Authorization, Content-Type",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
}
return JSONResponse(
content={"detail": "Missing NOMYO Router API key"},
status_code=401,
headers=headers,
)
if not secrets.compare_digest(str(provided_key), str(expected_key)):
return JSONResponse(
content={"detail": "Invalid NOMYO Router API key"},
status_code=403,
)
response = await call_next(request)
# Add CORS headers for authenticated API requests
if "/api/" in path and path != "/api/usage-stream":
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Headers"] = "Authorization, Content-Type"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
return response
@app.exception_handler(openai.APIStatusError)
async def _openai_api_status_error_handler(request: Request, exc: openai.APIStatusError):
"""Forward upstream OpenAI-SDK status errors with their original status code and body
instead of letting them bubble up as 500s."""
body = exc.body if exc.body is not None else {"error": {"message": str(exc), "code": exc.status_code}}
return JSONResponse(status_code=exc.status_code, content=body)
2026-05-19 11:18:06 +02:00
from state import (
usage_counts,
token_usage_counts,
usage_lock,
token_usage_lock,
_affinity_map,
_affinity_lock,
_AFFINITY_MAX_ENTRIES,
)
2026-05-19 10:05:27 +02:00
from fingerprint import _conversation_fingerprint
# Database instance
db: "TokenDatabase" = None
2025-08-26 18:19:43 +02:00
# -------------------------------------------------------------
2026-05-19 12:05:51 +02:00
# 4. Helperfunctions
2025-08-26 18:19:43 +02:00
# -------------------------------------------------------------
2026-05-19 12:05:51 +02:00
from backends.normalize import (
_normalize_llama_model_name,
_extract_llama_quant,
ep2base,
dedupe_on_keys,
)
from backends.sessions import (
_is_unix_socket_endpoint,
_get_socket_path,
get_session,
_make_openai_client,
get_ollama_client,
2026-05-19 12:05:51 +02:00
)
from backends.health import (
_is_fresh,
_ensure_success,
_format_connection_issue,
_is_backend_connection_error,
_mark_backend_unhealthy,
_is_llama_model_loaded,
_is_llama_model_loaded_or_sleeping,
)
2026-05-19 12:05:51 +02:00
from backends.normalize import (
is_ext_openai_endpoint,
is_openai_compatible,
2026-06-14 16:34:31 +02:00
llama_endpoints,
2026-05-19 12:05:51 +02:00
get_tracking_model,
)
from tokens import token_worker, flush_buffer, flush_remaining_buffers
2026-05-19 12:05:51 +02:00
from backends.probe import fetch
2025-08-26 18:19:43 +02:00
from routing import increment_usage, decrement_usage
2025-09-05 12:11:31 +02:00
2026-05-19 14:09:52 +02:00
from requests.chat import _make_chat_request, _make_moe_requests
2026-05-19 10:05:27 +02:00
from images import iso8601_ns, is_base64, resize_image_if_needed
2026-05-19 14:09:52 +02:00
from requests.messages import (
_strip_assistant_prefill,
transform_tool_calls_to_openai,
transform_images_to_data_urls,
_strip_images_from_messages,
_accumulate_openai_tc_delta,
_build_ollama_tool_calls,
_convert_openai_logprobs,
get_last_user_content,
)
from requests.rechunk import rechunk
from sse import (
_capture_snapshot,
_distribute_snapshot,
close_all_sse_queues,
subscribe,
unsubscribe,
get_usage_counts,
)
2025-08-26 18:19:43 +02:00
# -------------------------------------------------------------
# 5. Endpoint selection logic (respecting the configurable limit)
# -------------------------------------------------------------
from routing import get_max_connections, choose_endpoint
2025-08-26 18:19:43 +02:00
2026-05-19 14:57:39 +02:00
# (Ollama /api/* routes — moved to api/ollama.py)
# -------------------------------------------------------------
# 18b. Conversation-affinity stats feeds the PS-table dot matrix
# -------------------------------------------------------------
2026-05-19 14:57:39 +02:00
# (affinity_stats, usage, config — moved to api/management.py)
# (v1/* routes — moved to api/openai.py)
# (cache routes — moved to api/management.py)
# -------------------------------------------------------------
# 26. Serve the static frontend
# -------------------------------------------------------------
app.mount("/static", StaticFiles(directory="static"), name="static")
2026-05-19 14:57:39 +02:00
from api.static import router as static_router
app.include_router(static_router)
from api.management import router as management_router
app.include_router(management_router)
from api.openai import router as openai_router
app.include_router(openai_router)
from api.responses import router as responses_router
app.include_router(responses_router)
2026-05-19 14:57:39 +02:00
from api.ollama import router as ollama_router
app.include_router(ollama_router)
2025-09-05 12:11:31 +02:00
2026-05-19 14:57:39 +02:00
# (health, hostname, usage-stream — moved to api/management.py)
2025-09-05 12:11:31 +02:00
# -------------------------------------------------------------
# 28. FastAPI startup/shutdown events
2025-08-26 18:19:43 +02:00
# -------------------------------------------------------------
@app.on_event("startup")
async def startup_event() -> None:
global config, db, token_worker_task, flush_task
2025-08-26 18:19:43 +02:00
# Load YAML config (or use defaults if not present)
config_path = _config_path_from_env()
config = Config.from_yaml(config_path)
if config_path.exists():
print(
f"Loaded configuration from {config_path}:\n"
f" endpoints={config.endpoints},\n"
f" llama_server_endpoints={config.llama_server_endpoints},\n"
2026-06-14 16:34:31 +02:00
f" llama_swap_endpoints={config.llama_swap_endpoints},\n"
2026-04-22 17:27:34 +02:00
f" max_concurrent_connections={config.max_concurrent_connections},\n"
f" endpoint_config={config.endpoint_config},\n"
f" priority_routing={config.priority_routing}"
)
else:
print(
f"No configuration file found at {config_path}. "
"Falling back to default settings."
)
# Initialize database
db = TokenDatabase(config.db_path)
await db.init_db()
# Reconcile Responses-API background tasks lost across a restart: their
# in-memory asyncio task is gone but the DB row may still read queued /
# in_progress, so mark those failed to give polling clients a terminal state.
_orphaned = await db.fail_orphaned_responses()
if _orphaned:
print(f"[startup] Marked {_orphaned} orphaned background response(s) as failed.")
# Load existing token counts from database
async for count_entry in db.load_token_counts():
endpoint = count_entry['endpoint']
model = count_entry['model']
input_tokens = count_entry['input_tokens']
output_tokens = count_entry['output_tokens']
total_tokens = count_entry['total_tokens']
token_usage_counts[endpoint][model] = total_tokens
ssl_context = ssl.create_default_context()
connector = aiohttp.TCPConnector(limit=0, limit_per_host=512, ssl=ssl_context)
timeout = aiohttp.ClientTimeout(total=60, connect=15, sock_read=120, sock_connect=15)
2026-05-08 12:15:51 +02:00
session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={"Referer": default_headers.get("HTTP-Referer", "https://nomyo.ai")},
)
app_state["connector"] = connector
app_state["session"] = session
# Dedicated pool for health/introspection probes, isolated from the proxy
# session above. Streaming completions can hold the proxy pool's per-host
# slots open for a long time; without a separate pool the lightweight
# probes queue behind them, hit their deadline, and mark healthy endpoints
# as unavailable under load.
probe_connector = aiohttp.TCPConnector(limit=0, limit_per_host=64, ssl=ssl_context)
probe_session = aiohttp.ClientSession(
connector=probe_connector,
timeout=timeout,
headers={"Referer": default_headers.get("HTTP-Referer", "https://nomyo.ai")},
)
app_state["probe_connector"] = probe_connector
app_state["probe_session"] = probe_session
2026-05-08 12:15:51 +02:00
# Create httpx clients for external OpenAI endpoints (Google, etc.)
# aiohttp strips Referer headers for cross-origin requests, so we use httpx
for ep in config.endpoints:
if is_ext_openai_endpoint(ep):
app_state["httpx_clients"][ep] = httpx.AsyncClient(timeout=30.0)
# Create per-endpoint Unix socket sessions for .sock endpoints
2026-06-14 16:34:31 +02:00
for ep in llama_endpoints(config):
if _is_unix_socket_endpoint(ep):
sock_path = _get_socket_path(ep)
sock_connector = aiohttp.UnixConnector(path=sock_path)
sock_timeout = aiohttp.ClientTimeout(total=300, connect=5, sock_read=300)
sock_session = aiohttp.ClientSession(connector=sock_connector, timeout=sock_timeout)
app_state["socket_sessions"][ep] = sock_session
transport = httpx.AsyncHTTPTransport(uds=sock_path)
app_state["httpx_clients"][ep] = httpx.AsyncClient(transport=transport, timeout=300.0)
print(f"[startup] Unix socket session: {ep} -> {sock_path}")
# Pre-create long-lived backend clients so the expensive SSL-context /
# trust-store construction (~40 ms each) happens once here instead of on the
# request path. Ollama endpoints are reached via both the native ollama
# client (/api/chat, /api/generate) and the OpenAI client (/v1/* routes),
# so warm both; OpenAI-compatible endpoints only need the OpenAI client.
_warm_endpoints = config.endpoints + [
2026-06-14 16:34:31 +02:00
ep for ep in llama_endpoints(config) if ep not in config.endpoints
]
for ep in _warm_endpoints:
try:
if not is_openai_compatible(ep):
get_ollama_client(ep)
_make_openai_client(
ep, default_headers=default_headers,
api_key=config.api_keys.get(ep, "no-key"),
)
except Exception as e:
print(f"[startup] Backend client pre-warm failed for {ep}: {e}")
token_worker_task = asyncio.create_task(token_worker())
flush_task = asyncio.create_task(flush_buffer())
2026-03-08 09:12:09 +01:00
await init_llm_cache(config)
@app.on_event("shutdown")
async def shutdown_event() -> None:
await close_all_sse_queues()
# Stop background tasks first so they stop touching the DB before we close it.
for t in (token_worker_task, flush_task):
if t is not None:
t.cancel()
try:
await t
except (asyncio.CancelledError, Exception):
pass
await flush_remaining_buffers()
await app_state["session"].close()
if app_state.get("probe_session") is not None:
await app_state["probe_session"].close()
app_state["probe_session"] = None
app_state["probe_connector"] = None
# Close Unix socket sessions
for ep, sess in list(app_state.get("socket_sessions", {}).items()):
try:
await sess.close()
print(f"[shutdown] Closed Unix socket session: {ep}")
except Exception as e:
print(f"[shutdown] Error closing Unix socket session {ep}: {e}")
# Close httpx Unix socket clients
for ep, client in list(app_state.get("httpx_clients", {}).items()):
try:
await client.aclose()
print(f"[shutdown] Closed httpx client: {ep}")
except Exception as e:
print(f"[shutdown] Error closing httpx client {ep}: {e}")
# Close cached backend clients (reused across requests; see startup pre-warm).
for key, client in list(app_state.get("ollama_clients", {}).items()):
try:
await client._client.aclose()
except Exception as e:
print(f"[shutdown] Error closing ollama client {key}: {e}")
app_state["ollama_clients"].clear()
for key, client in list(app_state.get("openai_clients", {}).items()):
try:
await client.close()
except Exception as e:
print(f"[shutdown] Error closing openai client {key}: {e}")
app_state["openai_clients"].clear()
# Close the aiosqlite connection last — its worker thread is non-daemon
# and would otherwise keep the interpreter alive after lifespan completes.
if db is not None:
try:
await db.close()
print("[shutdown] Closed token DB connection.")
except Exception as e:
print(f"[shutdown] Error closing DB: {e}")