From d3e4555c8c515c9f5d63e8bf70a70ebfe6ad2a35 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Tue, 9 Sep 2025 17:08:00 +0200 Subject: [PATCH 01/27] moving from httpx to aiohttp --- requirements.txt | 2 - router.py | 101 ++++++++++++++++++++--------------------------- 2 files changed, 43 insertions(+), 60 deletions(-) diff --git a/requirements.txt b/requirements.txt index 4ffd391..df8c11a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,8 +14,6 @@ fastapi-sse==1.1.1 frozenlist==1.7.0 h11==0.16.0 httpcore==1.0.9 -httpx==0.28.1 -httpx-aiohttp==0.1.8 idna==3.10 jiter==0.10.0 multidict==6.6.4 diff --git a/router.py b/router.py index 51cf51d..aae47af 100644 --- a/router.py +++ b/router.py @@ -6,8 +6,7 @@ version: 0.2.2 license: AGPL """ # ------------------------------------------------------------- -import json, time, asyncio, yaml, httpx, ollama, openai, os, re -from httpx_aiohttp import AiohttpTransport +import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp from pathlib import Path from typing import Dict, Set, List, Optional from fastapi import FastAPI, Request, HTTPException @@ -95,23 +94,15 @@ usage_lock = asyncio.Lock() # protects access to usage_counts # ------------------------------------------------------------- # 4. Helperfunctions # ------------------------------------------------------------- +aiotimeout = aiohttp.ClientTimeout(total=5) + def _is_fresh(cached_at: float, ttl: int) -> bool: return (time.time() - cached_at) < ttl -def get_httpx_client(endpoint: str) -> httpx.AsyncClient: - """ - Use persistent connections to request endpoint info for reliable results - in high load situations or saturated endpoints. - """ - return httpx.AsyncClient( - base_url=endpoint, - timeout=httpx.Timeout(5.0, read=5.0, write=None, connect=5.0), - #limits=httpx.Limits( - # max_keepalive_connections=64, - # max_connections=64 - #), - transport=AiohttpTransport() - ) +async def _ensure_success(resp: aiohttp.ClientResponse) -> None: + if resp.status >= 400: + text = await resp.text() + raise HTTPException(status_code=resp.status, detail=text) async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: """ @@ -143,35 +134,33 @@ async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) - # Error expired – remove it del _error_cache[endpoint] + if "/v1" in endpoint: + endpoint_url = f"{endpoint}/models" + key = "data" + else: + endpoint_url = f"{endpoint}/api/tags" + key = "models" try: - client = get_httpx_client(endpoint) - if "/v1" in endpoint: - resp = await client.get(f"/models", headers=headers) - else: - resp = await client.get(f"/api/tags") - resp.raise_for_status() - data = resp.json() - # Expected format: - # {"models": [{"name": "model1"}, {"name": "model2"}]} - if "/v1" in endpoint: - models = {m.get("id") for m in data.get("data", []) if m.get("id")} - else: - models = {m.get("name") for m in data.get("models", []) if m.get("name")} - - if models: - _models_cache[endpoint] = (models, time.time()) - return models - else: - # Empty list – treat as “no models”, but still cache for 300s - _models_cache[endpoint] = (models, time.time()) - return models + async with aiohttp.ClientSession(timeout=aiotimeout) as client: + async with client.get(endpoint_url, headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() + + items = data.get(key, []) + models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")} + + if models: + _models_cache[endpoint] = (models, time.time()) + return models + else: + # Empty list – treat as “no models”, but still cache for 300s + _models_cache[endpoint] = (models, time.time()) + return models except Exception as e: # Treat any error as if the endpoint offers no models print(f"[fetch_available_models] {endpoint} error: {e}") _error_cache[endpoint] = time.time() return set() - finally: - await client.aclose() async def fetch_loaded_models(endpoint: str) -> Set[str]: @@ -181,10 +170,10 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]: set is returned. """ try: - client = get_httpx_client(endpoint) - resp = await client.get(f"/api/ps") - resp.raise_for_status() - data = resp.json() + async with aiohttp.ClientSession(timeout=aiotimeout) as client: + async with client.get(f"/api/ps") as resp: + await _ensure_success(resp) + data = await resp.json() # The response format is: # {"models": [{"name": "model1"}, {"name": "model2"}]} models = {m.get("name") for m in data.get("models", []) if m.get("name")} @@ -192,8 +181,6 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]: except Exception: # If anything goes wrong we simply assume the endpoint has no models return set() - finally: - await client.aclose() async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]: """ @@ -205,18 +192,16 @@ async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key headers = {"Authorization": "Bearer " + api_key} try: - client = get_httpx_client(endpoint) - resp = await client.get(f"{route}", headers=headers) - resp.raise_for_status() - data = resp.json() + async with aiohttp.ClientSession(timeout=aiotimeout) as client: + async with client.get(f"{endpoint}{route}", headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() detail = data.get(detail, []) return detail except Exception as e: # If anything goes wrong we cannot reply details print(e) return [] - finally: - await client.aclose() def ep2base(ep): if "/v1" in ep: @@ -960,22 +945,22 @@ async def config_proxy(request: Request): """ async def check_endpoint(url: str): try: - async with httpx.AsyncClient(timeout=1, transport=AiohttpTransport()) as client: + async with aiohttp.ClientSession(timeout=aiotimeout) as client: if "/v1" in url: headers = {"Authorization": "Bearer " + config.api_keys[url]} - r = await client.get(f"{url}/models", headers=headers) + async with client.get(f"{url}/models", headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() else: - r = await client.get(f"{url}/api/version") - r.raise_for_status() - data = r.json() + async with client.get(f"{url}/api/version") as resp: + await _ensure_success(resp) + data = await resp.json() if "/v1" in url: return {"url": url, "status": "ok", "version": "latest"} else: return {"url": url, "status": "ok", "version": data.get("version")} except Exception as exc: return {"url": url, "status": "error", "detail": str(exc)} - finally: - await client.aclose() results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints]) return {"endpoints": results} From 2813ecb04496f1537ede9d61bb72b320105dc659 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Wed, 10 Sep 2025 10:21:49 +0200 Subject: [PATCH 02/27] using global aiohttp sessionpool for improved performance --- router.py | 81 +++++++++++++++++++++++++++++------------------ static/index.html | 6 ++-- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/router.py b/router.py index aae47af..f01ef92 100644 --- a/router.py +++ b/router.py @@ -2,11 +2,11 @@ title: NOMYO Router - an Ollama Proxy with Endpoint:Model aware routing author: alpha-nerd-nomyo author_url: https://github.com/nomyo-ai -version: 0.2.2 +version: 0.3 license: AGPL """ # ------------------------------------------------------------- -import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp +import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl from pathlib import Path from typing import Dict, Set, List, Optional from fastapi import FastAPI, Request, HTTPException @@ -32,6 +32,14 @@ _error_cache: dict[str, float] = {} _subscribers: Set[asyncio.Queue] = set() _subscribers_lock = asyncio.Lock() +# ------------------------------------------------------------------ +# aiohttp Global Sessions +# ------------------------------------------------------------------ +app_state = { + "session": None, + "connector": None, +} + # ------------------------------------------------------------- # 1. Configuration loader # ------------------------------------------------------------- @@ -140,11 +148,11 @@ async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) - else: endpoint_url = f"{endpoint}/api/tags" key = "models" + client: aiohttp.ClientSession = app_state["session"] try: - async with aiohttp.ClientSession(timeout=aiotimeout) as client: - async with client.get(endpoint_url, headers=headers) as resp: - await _ensure_success(resp) - data = await resp.json() + async with client.get(endpoint_url, headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() items = data.get(key, []) models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")} @@ -169,11 +177,11 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]: loaded on that endpoint. If the request fails (e.g. timeout, 5xx), an empty set is returned. """ + client: aiohttp.ClientSession = app_state["session"] try: - async with aiohttp.ClientSession(timeout=aiotimeout) as client: - async with client.get(f"/api/ps") as resp: - await _ensure_success(resp) - data = await resp.json() + async with client.get(f"/api/ps") as resp: + await _ensure_success(resp) + data = await resp.json() # The response format is: # {"models": [{"name": "model1"}, {"name": "model2"}]} models = {m.get("name") for m in data.get("models", []) if m.get("name")} @@ -187,15 +195,15 @@ async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key Query / to fetch and return a List of dicts with details for the corresponding Ollama endpoint. If the request fails we respond with "N/A" for detail. """ + client: aiohttp.ClientSession = app_state["session"] headers = None if api_key is not None: headers = {"Authorization": "Bearer " + api_key} try: - async with aiohttp.ClientSession(timeout=aiotimeout) as client: - async with client.get(f"{endpoint}{route}", headers=headers) as resp: - await _ensure_success(resp) - data = await resp.json() + async with client.get(f"{endpoint}{route}", headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() detail = data.get(detail, []) return detail except Exception as e: @@ -945,20 +953,20 @@ async def config_proxy(request: Request): """ async def check_endpoint(url: str): try: - async with aiohttp.ClientSession(timeout=aiotimeout) as client: - if "/v1" in url: - headers = {"Authorization": "Bearer " + config.api_keys[url]} - async with client.get(f"{url}/models", headers=headers) as resp: - await _ensure_success(resp) - data = await resp.json() - else: - async with client.get(f"{url}/api/version") as resp: - await _ensure_success(resp) - data = await resp.json() - if "/v1" in url: - return {"url": url, "status": "ok", "version": "latest"} - else: - return {"url": url, "status": "ok", "version": data.get("version")} + client: aiohttp.ClientSession = app_state["session"] + if "/v1" in url: + headers = {"Authorization": "Bearer " + config.api_keys[url]} + async with client.get(f"{url}/models", headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() + else: + async with client.get(f"{url}/api/version") as resp: + await _ensure_success(resp) + data = await resp.json() + if "/v1" in url: + return {"url": url, "status": "ok", "version": "latest"} + else: + return {"url": url, "status": "ok", "version": data.get("version")} except Exception as exc: return {"url": url, "status": "error", "detail": str(exc)} @@ -1327,7 +1335,7 @@ async def usage_stream(request: Request): return StreamingResponse(event_generator(), media_type="text/event-stream") # ------------------------------------------------------------- -# 28. FastAPI startup event – load configuration +# 28. FastAPI startup/shutdown events # ------------------------------------------------------------- @app.on_event("startup") async def startup_event() -> None: @@ -1335,4 +1343,17 @@ async def startup_event() -> None: # Load YAML config (or use defaults if not present) config = Config.from_yaml(Path("config.yaml")) print(f"Loaded configuration:\n endpoints={config.endpoints},\n " - f"max_concurrent_connections={config.max_concurrent_connections}") \ No newline at end of file + f"max_concurrent_connections={config.max_concurrent_connections}") + + ssl_context = ssl.create_default_context() + connector = aiohttp.TCPConnector(limit=0, limit_per_host=512, ssl=ssl_context) + timeout = aiohttp.ClientTimeout(total=5, connect=5, sock_read=120, sock_connect=5) + session = aiohttp.ClientSession(connector=connector, timeout=timeout) + + app_state["connector"] = connector + app_state["session"] = session + + +@app.on_event("shutdown") +async def shutdown_event() -> None: + await app_state["session"].close() \ No newline at end of file diff --git a/static/index.html b/static/index.html index 01d902a..0bd5399 100644 --- a/static/index.html +++ b/static/index.html @@ -21,9 +21,9 @@ top: 1rem; /* distance from top edge */ right: 1rem; /* distance from right edge */ cursor: pointer; - min-width: 2.5rem; - min-height: 2.5rem; - font-size: 1.5rem; + min-width: 1rem; + min-height: 1rem; + font-size: 1rem; } .tables-wrapper { display: flex; From ddd3eb9e84154af87751af1ef76ed0c7516e111c Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Wed, 10 Sep 2025 15:25:25 +0200 Subject: [PATCH 03/27] params handling for googleapis --- config.yaml | 2 ++ router.py | 36 ++++++++++++++++++++++++------------ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/config.yaml b/config.yaml index 93ae117..81f04f5 100644 --- a/config.yaml +++ b/config.yaml @@ -5,6 +5,7 @@ endpoints: - http://192.168.0.52:11434 #- https://openrouter.ai/api/v1 #- https://api.openai.com/v1 + #- https://generativelanguage.googleapis.com/v1beta/openai # Maximum concurrent connections *per endpoint‑model pair* (equals to OLLAMA_NUM_PARALLEL) max_concurrent_connections: 2 @@ -18,3 +19,4 @@ api_keys: "http://192.168.0.52:11434": "ollama" #"https://openrouter.ai/api/v1": "${OPENROUTER_KEY}" #"https://api.openai.com/v1": "${OPENAI_KEY}" + #"https://generativelanguage.googleapis.com/v1beta/openai": "${GEMINI_KEY}" \ No newline at end of file diff --git a/router.py b/router.py index f01ef92..3b8e330 100644 --- a/router.py +++ b/router.py @@ -1051,8 +1051,6 @@ async def openai_chat_completions_proxy(request: Request): params = { "messages": messages, "model": model, - "stop": stop, - "stream": stream, } if tools is not None: @@ -1075,6 +1073,11 @@ async def openai_chat_completions_proxy(request: Request): params["presence_penalty"] = presence_penalty if frequency_penalty is not None: params["frequency_penalty"] = frequency_penalty + if stop is not None: + params["stop"] = stop + if stream is not None: + params["stream"] = stream + if not model: raise HTTPException( @@ -1157,20 +1160,29 @@ async def openai_completions_proxy(request: Request): params = { "prompt": prompt, "model": model, - "frequency_penalty": frequency_penalty, - "presence_penalty": presence_penalty, - "seed": seed, - "stop": stop, - "stream": stream, - "temperature": temperature, - "top_p": top_p, - "max_tokens": max_tokens, - "suffix": suffix } if stream_options is not None: params["stream_options"] = stream_options - + if frequency_penalty is not None: + params["frequency_penalty"] = frequency_penalty + if presence_penalty is not None: + params["presence_penalty"] = presence_penalty + if seed is not None: + params["seed"] = seed + if stop is not None: + params["stop"] = stop + if stream is not None: + params["stream"] = stream + if temperature is not None: + params["temperature"] = temperature + if top_p is not None: + params["top_p"] = top_p + if max_tokens is not None: + params["max_tokens"] = max_tokens + if suffix is not None: + params["suffix"] = suffix + if not model: raise HTTPException( status_code=400, detail="Missing required field 'model'" From ee0d9c6cca693fb3ce01372d04d81d1862c35058 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Wed, 10 Sep 2025 18:42:24 +0200 Subject: [PATCH 04/27] fixing fetch_loaded_models and safer usage_counts calls --- router.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/router.py b/router.py index 3b8e330..9e7daeb 100644 --- a/router.py +++ b/router.py @@ -179,7 +179,7 @@ async def fetch_loaded_models(endpoint: str) -> Set[str]: """ client: aiohttp.ClientSession = app_state["session"] try: - async with client.get(f"/api/ps") as resp: + async with client.get(f"{endpoint}/api/ps") as resp: await _ensure_success(resp) data = await resp.json() # The response format is: @@ -327,7 +327,7 @@ async def choose_endpoint(model: str) -> str: # (concurrently, but only for the filtered list) load_tasks = [fetch_loaded_models(ep) for ep in candidate_endpoints] loaded_sets = await asyncio.gather(*load_tasks) - + async with usage_lock: # Helper: get current usage count for (endpoint, model) def current_usage(ep: str) -> int: @@ -336,7 +336,7 @@ async def choose_endpoint(model: str) -> str: # 3️⃣ Endpoints that have the model loaded *and* a free slot loaded_and_free = [ ep for ep, models in zip(candidate_endpoints, loaded_sets) - if model in models and usage_counts[ep].get(model, 0) < config.max_concurrent_connections + if model in models and usage_counts.get(ep, {}).get(model, 0) < config.max_concurrent_connections ] if loaded_and_free: @@ -346,7 +346,7 @@ async def choose_endpoint(model: str) -> str: # 4️⃣ Endpoints among the candidates that simply have a free slot endpoints_with_free_slot = [ ep for ep in candidate_endpoints - if usage_counts[ep].get(model, 0) < config.max_concurrent_connections + if usage_counts.get(ep, {}).get(model, 0) < config.max_concurrent_connections ] if endpoints_with_free_slot: From af24cf5c87aeea35efc8206c4956d38775cd299e Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 11 Sep 2025 09:46:19 +0200 Subject: [PATCH 05/27] adding CORS middleware --- router.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/router.py b/router.py index 9e7daeb..34e0252 100644 --- a/router.py +++ b/router.py @@ -12,6 +12,7 @@ from typing import Dict, Set, List, Optional from fastapi import FastAPI, Request, HTTPException from fastapi_sse import sse_handler from fastapi.staticfiles import StaticFiles +from fastapi.middleware.cors import CORSMiddleware from starlette.responses import StreamingResponse, JSONResponse, Response, HTMLResponse, RedirectResponse from pydantic import Field from pydantic_settings import BaseSettings @@ -92,7 +93,13 @@ config = Config() # ------------------------------------------------------------- app = FastAPI() sse_handler.app = app - +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["GET", "POST", "DELETE"], + allow_headers=["Authorization", "Content-Type"], +) # ------------------------------------------------------------- # 3. Global state: per‑endpoint per‑model active connection counters # ------------------------------------------------------------- From 0c353939d9424e94eb268a3b9a354ab5d84e5bcf Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 11 Sep 2025 13:56:51 +0200 Subject: [PATCH 06/27] better v1 endpoint paramter handling --- router.py | 82 +++++++++++++++++++++++-------------------------------- 1 file changed, 34 insertions(+), 48 deletions(-) diff --git a/router.py b/router.py index 34e0252..a2b7ac3 100644 --- a/router.py +++ b/router.py @@ -995,14 +995,14 @@ async def openai_embedding_proxy(request: Request): payload = json.loads(body_bytes.decode("utf-8")) model = payload.get("model") - input = payload.get("input") + doc = payload.get("input") if not model: raise HTTPException( status_code=400, detail="Missing required field 'model'" ) - if not input: + if not doc: raise HTTPException( status_code=400, detail="Missing required field 'input'" ) @@ -1019,7 +1019,7 @@ async def openai_embedding_proxy(request: Request): oclient = openai.AsyncOpenAI(base_url=endpoint+"/v1", api_key=api_key) # 3. Async generator that streams embedding data and decrements the counter - async_gen = await oclient.embeddings.create(input=[input], model=model) + async_gen = await oclient.embeddings.create(input=[doc], model=model) await decrement_usage(endpoint, model) @@ -1060,31 +1060,22 @@ async def openai_chat_completions_proxy(request: Request): "model": model, } - if tools is not None: - params["tools"] = tools - if response_format is not None: - params["response_format"] = response_format - if stream_options is not None: - params["stream_options"] = stream_options - if max_completion_tokens is not None: - params["max_completion_tokens"] = max_completion_tokens - if max_tokens is not None: - params["max_tokens"] = max_tokens - if temperature is not None: - params["temperature"] = temperature - if top_p is not None: - params["top_p"] = top_p - if seed is not None: - params["seed"] = seed - if presence_penalty is not None: - params["presence_penalty"] = presence_penalty - if frequency_penalty is not None: - params["frequency_penalty"] = frequency_penalty - if stop is not None: - params["stop"] = stop - if stream is not None: - params["stream"] = stream - + optional_params = { + "tools": tools, + "response_format": response_format, + "stream_options": stream_options, + "max_completion_tokens": max_completion_tokens, + "max_tokens": max_tokens, + "temperature": temperature, + "top_p": top_p, + "seed": seed, + "presence_penalty": presence_penalty, + "frequency_penalty": frequency_penalty, + "stop": stop, + "stream": stream, + } + + params.update({k: v for k, v in optional_params.items() if v is not None}) if not model: raise HTTPException( @@ -1169,26 +1160,21 @@ async def openai_completions_proxy(request: Request): "model": model, } - if stream_options is not None: - params["stream_options"] = stream_options - if frequency_penalty is not None: - params["frequency_penalty"] = frequency_penalty - if presence_penalty is not None: - params["presence_penalty"] = presence_penalty - if seed is not None: - params["seed"] = seed - if stop is not None: - params["stop"] = stop - if stream is not None: - params["stream"] = stream - if temperature is not None: - params["temperature"] = temperature - if top_p is not None: - params["top_p"] = top_p - if max_tokens is not None: - params["max_tokens"] = max_tokens - if suffix is not None: - params["suffix"] = suffix + optional_params = { + "frequency_penalty": frequency_penalty, + "presence_penalty": presence_penalty, + "seed": seed, + "stop": stop, + "stream": stream, + "stream_options": stream_options, + "temperature": temperature, + "top_p": top_p, + "max_tokens": max_tokens, + "max_completion_tokens": max_completion_tokens, + "suffix": suffix + } + + params.update({k: v for k, v in optional_params.items() if v is not None}) if not model: raise HTTPException( From 175f035d860404254f085e2013ff67be5656a24b Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 11 Sep 2025 18:53:23 +0200 Subject: [PATCH 07/27] removing reserved words var names --- router.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/router.py b/router.py index a2b7ac3..3e18f20 100644 --- a/router.py +++ b/router.py @@ -386,7 +386,7 @@ async def proxy(request: Request): stream = payload.get("stream") think = payload.get("think") raw = payload.get("raw") - format = payload.get("format") + _format = payload.get("format") images = payload.get("images") options = payload.get("options") keep_alive = payload.get("keep_alive") @@ -414,7 +414,7 @@ async def proxy(request: Request): # 4. Async generator that streams data and decrements the counter async def stream_generate_response(): try: - async_gen = await client.generate(model=model, prompt=prompt, suffix=suffix, system=system, template=template, context=context, stream=stream, think=think, raw=raw, format=format, images=images, options=options, keep_alive=keep_alive) + async_gen = await client.generate(model=model, prompt=prompt, suffix=suffix, system=system, template=template, context=context, stream=stream, think=think, raw=raw, format=_format, images=images, options=options, keep_alive=keep_alive) if stream == True: async for chunk in async_gen: if hasattr(chunk, "model_dump_json"): @@ -974,8 +974,8 @@ async def config_proxy(request: Request): return {"url": url, "status": "ok", "version": "latest"} else: return {"url": url, "status": "ok", "version": data.get("version")} - except Exception as exc: - return {"url": url, "status": "error", "detail": str(exc)} + except Exception as e: + return {"url": url, "status": "error", "detail": str(e)} results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints]) return {"endpoints": results} From 25b287eba6db4cccf367f3616d70fa5bee89fec6 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Fri, 12 Sep 2025 09:44:56 +0200 Subject: [PATCH 08/27] improved SSE queue handling on shutdown --- router.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/router.py b/router.py index 3e18f20..19043fe 100644 --- a/router.py +++ b/router.py @@ -269,6 +269,11 @@ async def publish_snapshot(): continue await q.put(snapshot) +async def close_all_sse_queues(): + for q in list(_subscribers): + # sentinel value that the generator will recognise + await q.put(None) + # ------------------------------------------------------------------ # Subscriber helpers # ------------------------------------------------------------------ @@ -1331,6 +1336,8 @@ async def usage_stream(request: Request): if await request.is_disconnected(): break data = await queue.get() + if data is None: + break # Send the data as a single SSE message yield f"data: {data}\n\n" finally: @@ -1361,4 +1368,5 @@ async def startup_event() -> None: @app.on_event("shutdown") async def shutdown_event() -> None: + await close_all_sse_queues() await app_state["session"].close() \ No newline at end of file From 6381dd09c3becf99a0653b8ce32514238fd75510 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 13 Sep 2025 11:24:28 +0200 Subject: [PATCH 09/27] starting an openai2ollama client translation layer with rechunking class --- router.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/router.py b/router.py index 19043fe..4c568d2 100644 --- a/router.py +++ b/router.py @@ -6,7 +6,7 @@ version: 0.3 license: AGPL """ # ------------------------------------------------------------- -import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl +import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, datetime from pathlib import Path from typing import Dict, Set, List, Optional from fastapi import FastAPI, Request, HTTPException @@ -257,6 +257,31 @@ async def decrement_usage(endpoint: str, model: str) -> None: # usage_counts.pop(endpoint, None) await publish_snapshot() +def iso8601_ns(): + ns_since_epoch = time.time_ns() + dt = datetime.datetime.fromtimestamp( + ns_since_epoch / 1_000_000_000, # seconds + tz=datetime.timezone.utc + ) + iso8601_with_ns = ( + dt.strftime("%Y-%m-%dT%H:%M:%S.") + f"{ns_since_epoch % 1_000_000_000:09d}Z" + ) + return iso8601_with_ns + +class rechunk: + def openai_chat_completion2ollama(chunk): + chunk = { "model": chunk.model, + "created_at": iso8601_ns() , + "done_reason": chunk.choices[0].finish_reason, + "load_duration": None, + "prompt_eval_count": None, + "prompt_eval_duration": None, + "eval_count": None, + "eval_duration": None, + "message": {"role": chunk.choices[0].delta.role, "content": chunk.choices[0].delta.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None}, + } + return chunk + # ------------------------------------------------------------------ # SSE Helpser # ------------------------------------------------------------------ @@ -473,7 +498,7 @@ async def chat_proxy(request: Request): ) if not isinstance(messages, list): raise HTTPException( - status_code=400, detail="Missing or invalid 'message' field (must be a list)" + status_code=400, detail="Missing or invalid 'messages' field (must be a list)" ) except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") from e @@ -481,20 +506,41 @@ async def chat_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) - client = ollama.AsyncClient(host=endpoint) + if "/v1" in endpoint: + params = { + "messages": messages, + "model": model, + } + + optional_params = { + "tools": tools, + "stream": stream, + } + + params.update({k: v for k, v in optional_params.items() if v is not None}) + oclient = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) + else: + client = ollama.AsyncClient(host=endpoint) # 3. Async generator that streams chat data and decrements the counter async def stream_chat_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - async_gen = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=format, options=options, keep_alive=keep_alive) + if "/v1" in endpoint: + async_gen = await oclient.chat.completions.create(**params) + else: + async_gen = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=format, options=options, keep_alive=keep_alive) if stream == True: async for chunk in async_gen: + if "/v1" in endpoint: + print(chunk) + chunk = rechunk.openai_chat_completion2ollama(chunk) # `chunk` can be a dict or a pydantic model – dump to JSON safely if hasattr(chunk, "model_dump_json"): json_line = chunk.model_dump_json() else: json_line = json.dumps(chunk) + print(json_line) yield json_line.encode("utf-8") + b"\n" else: json_line = ( From fd49a09c8b2f799fa7c9c4b889580105d98e5e40 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 13 Sep 2025 12:10:40 +0200 Subject: [PATCH 10/27] fix: openai endpoint if loop and extending chunk vars --- router.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/router.py b/router.py index 4c568d2..dfef0ca 100644 --- a/router.py +++ b/router.py @@ -269,7 +269,7 @@ def iso8601_ns(): return iso8601_with_ns class rechunk: - def openai_chat_completion2ollama(chunk): + def openai_chat_completion2ollama(chunk: dict, start_ts: float): chunk = { "model": chunk.model, "created_at": iso8601_ns() , "done_reason": chunk.choices[0].finish_reason, @@ -279,6 +279,10 @@ class rechunk: "eval_count": None, "eval_duration": None, "message": {"role": chunk.choices[0].delta.role, "content": chunk.choices[0].delta.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None}, + "eval_count": (chunk.usage.completion_tokens if chunk.usage is not None else None), + "prompt_eval_count": (chunk.usage.prompt_tokens if chunk.usage is not None else None), + "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), + "response_token/s": (round(chunk.usage.total_tokens / (time.perf_counter() - start_ts), 2) if chunk.usage is not None else None) } return chunk @@ -523,18 +527,20 @@ async def chat_proxy(request: Request): client = ollama.AsyncClient(host=endpoint) # 3. Async generator that streams chat data and decrements the counter + is_openai_endpoint = "/v1" in endpoint async def stream_chat_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - if "/v1" in endpoint: + if is_openai_endpoint: + start_ts = time.perf_counter() async_gen = await oclient.chat.completions.create(**params) else: async_gen = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=format, options=options, keep_alive=keep_alive) if stream == True: async for chunk in async_gen: - if "/v1" in endpoint: + if is_openai_endpoint: print(chunk) - chunk = rechunk.openai_chat_completion2ollama(chunk) + chunk = rechunk.openai_chat_completion2ollama(chunk, start_ts) # `chunk` can be a dict or a pydantic model – dump to JSON safely if hasattr(chunk, "model_dump_json"): json_line = chunk.model_dump_json() From b7b39672962d334067d8325f0a53a48751594fc0 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 13 Sep 2025 12:28:42 +0200 Subject: [PATCH 11/27] adding stream == False options to ollama 2 openai translation in /api/chat --- router.py | 56 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/router.py b/router.py index dfef0ca..9c8204a 100644 --- a/router.py +++ b/router.py @@ -269,21 +269,37 @@ def iso8601_ns(): return iso8601_with_ns class rechunk: - def openai_chat_completion2ollama(chunk: dict, start_ts: float): - chunk = { "model": chunk.model, - "created_at": iso8601_ns() , - "done_reason": chunk.choices[0].finish_reason, - "load_duration": None, - "prompt_eval_count": None, - "prompt_eval_duration": None, - "eval_count": None, - "eval_duration": None, - "message": {"role": chunk.choices[0].delta.role, "content": chunk.choices[0].delta.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None}, - "eval_count": (chunk.usage.completion_tokens if chunk.usage is not None else None), - "prompt_eval_count": (chunk.usage.prompt_tokens if chunk.usage is not None else None), - "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), - "response_token/s": (round(chunk.usage.total_tokens / (time.perf_counter() - start_ts), 2) if chunk.usage is not None else None) - } + def openai_chat_completion2ollama(chunk: dict, stream: bool, start_ts: float): + if stream == True: + chunk = { "model": chunk.model, + "created_at": iso8601_ns() , + "done_reason": chunk.choices[0].finish_reason, + "load_duration": None, + "prompt_eval_count": None, + "prompt_eval_duration": None, + "eval_count": None, + "eval_duration": None, + "message": {"role": chunk.choices[0].delta.role, "content": chunk.choices[0].delta.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None}, + "eval_count": (chunk.usage.completion_tokens if chunk.usage is not None else None), + "prompt_eval_count": (chunk.usage.prompt_tokens if chunk.usage is not None else None), + "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), + "response_token/s": (round(chunk.usage.total_tokens / (time.perf_counter() - start_ts), 2) if chunk.usage is not None else None) + } + else: + chunk = { "model": chunk.model, + "created_at": iso8601_ns() , + "done_reason": chunk.choices[0].finish_reason, + "load_duration": None, + "prompt_eval_count": None, + "prompt_eval_duration": None, + "eval_count": None, + "eval_duration": None, + "message": {"role": chunk.choices[0].message.role, "content": chunk.choices[0].message.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None}, + "eval_count": (chunk.usage.completion_tokens if chunk.usage is not None else None), + "prompt_eval_count": (chunk.usage.prompt_tokens if chunk.usage is not None else None), + "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), + "response_token/s": (round(chunk.usage.total_tokens / (time.perf_counter() - start_ts), 2) if chunk.usage is not None else None) + } return chunk # ------------------------------------------------------------------ @@ -540,7 +556,7 @@ async def chat_proxy(request: Request): async for chunk in async_gen: if is_openai_endpoint: print(chunk) - chunk = rechunk.openai_chat_completion2ollama(chunk, start_ts) + chunk = rechunk.openai_chat_completion2ollama(chunk, stream, start_ts) # `chunk` can be a dict or a pydantic model – dump to JSON safely if hasattr(chunk, "model_dump_json"): json_line = chunk.model_dump_json() @@ -549,11 +565,17 @@ async def chat_proxy(request: Request): print(json_line) yield json_line.encode("utf-8") + b"\n" else: + if is_openai_endpoint: + response = rechunk.openai_chat_completion2ollama(async_gen, stream, start_ts) + response = json.dumps(response) + else: + repsonse = async_gen.model_dump_json() json_line = ( - async_gen.model_dump_json() + response if hasattr(async_gen, "model_dump_json") else json.dumps(async_gen) ) + print(json_line) yield json_line.encode("utf-8") + b"\n" finally: From 0a7fd8ca5235288d126017710c5ac66d17132d43 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 13 Sep 2025 12:38:13 +0200 Subject: [PATCH 12/27] simplification in rechunk --- router.py | 49 +++++++++++++++++-------------------------------- 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/router.py b/router.py index 9c8204a..e49152e 100644 --- a/router.py +++ b/router.py @@ -270,37 +270,24 @@ def iso8601_ns(): class rechunk: def openai_chat_completion2ollama(chunk: dict, stream: bool, start_ts: float): + rechunk = { "model": chunk.model, + "created_at": iso8601_ns() , + "done_reason": chunk.choices[0].finish_reason, + "load_duration": None, + "prompt_eval_count": None, + "prompt_eval_duration": None, + "eval_count": None, + "eval_duration": None, + "eval_count": (chunk.usage.completion_tokens if chunk.usage is not None else None), + "prompt_eval_count": (chunk.usage.prompt_tokens if chunk.usage is not None else None), + "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), + "response_token/s": (round(chunk.usage.total_tokens / (time.perf_counter() - start_ts), 2) if chunk.usage is not None else None) + } if stream == True: - chunk = { "model": chunk.model, - "created_at": iso8601_ns() , - "done_reason": chunk.choices[0].finish_reason, - "load_duration": None, - "prompt_eval_count": None, - "prompt_eval_duration": None, - "eval_count": None, - "eval_duration": None, - "message": {"role": chunk.choices[0].delta.role, "content": chunk.choices[0].delta.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None}, - "eval_count": (chunk.usage.completion_tokens if chunk.usage is not None else None), - "prompt_eval_count": (chunk.usage.prompt_tokens if chunk.usage is not None else None), - "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), - "response_token/s": (round(chunk.usage.total_tokens / (time.perf_counter() - start_ts), 2) if chunk.usage is not None else None) - } + rechunk["message"] = {"role": chunk.choices[0].delta.role, "content": chunk.choices[0].delta.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None} else: - chunk = { "model": chunk.model, - "created_at": iso8601_ns() , - "done_reason": chunk.choices[0].finish_reason, - "load_duration": None, - "prompt_eval_count": None, - "prompt_eval_duration": None, - "eval_count": None, - "eval_duration": None, - "message": {"role": chunk.choices[0].message.role, "content": chunk.choices[0].message.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None}, - "eval_count": (chunk.usage.completion_tokens if chunk.usage is not None else None), - "prompt_eval_count": (chunk.usage.prompt_tokens if chunk.usage is not None else None), - "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), - "response_token/s": (round(chunk.usage.total_tokens / (time.perf_counter() - start_ts), 2) if chunk.usage is not None else None) - } - return chunk + rechunk["message"] = {"role": chunk.choices[0].message.role, "content": chunk.choices[0].message.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None} + return rechunk # ------------------------------------------------------------------ # SSE Helpser @@ -555,21 +542,19 @@ async def chat_proxy(request: Request): if stream == True: async for chunk in async_gen: if is_openai_endpoint: - print(chunk) chunk = rechunk.openai_chat_completion2ollama(chunk, stream, start_ts) # `chunk` can be a dict or a pydantic model – dump to JSON safely if hasattr(chunk, "model_dump_json"): json_line = chunk.model_dump_json() else: json_line = json.dumps(chunk) - print(json_line) yield json_line.encode("utf-8") + b"\n" else: if is_openai_endpoint: response = rechunk.openai_chat_completion2ollama(async_gen, stream, start_ts) response = json.dumps(response) else: - repsonse = async_gen.model_dump_json() + response = async_gen.model_dump_json() json_line = ( response if hasattr(async_gen, "model_dump_json") From 9ea852f154a17091acc90c2959b76905277491dd Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 13 Sep 2025 16:57:09 +0200 Subject: [PATCH 13/27] adding fetch class and ollama client completions on openai endpoints --- router.py | 271 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 151 insertions(+), 120 deletions(-) diff --git a/router.py b/router.py index e49152e..eb2a90a 100644 --- a/router.py +++ b/router.py @@ -119,111 +119,112 @@ async def _ensure_success(resp: aiohttp.ClientResponse) -> None: text = await resp.text() raise HTTPException(status_code=resp.status, detail=text) -async def fetch_available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: - """ - Query /api/tags and return a set of all model names that the - endpoint *advertises* (i.e. is capable of serving). This endpoint lists - every model that is installed on the Ollama instance, regardless of - whether the model is currently loaded into memory. +class fetch: + async def available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: + """ + Query /api/tags and return a set of all model names that the + endpoint *advertises* (i.e. is capable of serving). This endpoint lists + every model that is installed on the Ollama instance, regardless of + whether the model is currently loaded into memory. - If the request fails (e.g. timeout, 5xx, or malformed response), an empty - set is returned. - """ - headers = None - if api_key is not None: - headers = {"Authorization": "Bearer " + api_key} + If the request fails (e.g. timeout, 5xx, or malformed response), an empty + set is returned. + """ + headers = None + if api_key is not None: + headers = {"Authorization": "Bearer " + api_key} - if endpoint in _models_cache: - models, cached_at = _models_cache[endpoint] - if _is_fresh(cached_at, 300): - return models - else: - # stale entry – drop it - del _models_cache[endpoint] - - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 1): - # Still within the short error TTL – pretend nothing is available - return set() - else: - # Error expired – remove it - del _error_cache[endpoint] - - if "/v1" in endpoint: - endpoint_url = f"{endpoint}/models" - key = "data" - else: - endpoint_url = f"{endpoint}/api/tags" - key = "models" - client: aiohttp.ClientSession = app_state["session"] - try: - async with client.get(endpoint_url, headers=headers) as resp: - await _ensure_success(resp) - data = await resp.json() - - items = data.get(key, []) - models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")} - - if models: - _models_cache[endpoint] = (models, time.time()) + if endpoint in _models_cache: + models, cached_at = _models_cache[endpoint] + if _is_fresh(cached_at, 300): return models else: - # Empty list – treat as “no models”, but still cache for 300s - _models_cache[endpoint] = (models, time.time()) - return models - except Exception as e: - # Treat any error as if the endpoint offers no models - print(f"[fetch_available_models] {endpoint} error: {e}") - _error_cache[endpoint] = time.time() - return set() + # stale entry – drop it + del _models_cache[endpoint] + + if endpoint in _error_cache: + if _is_fresh(_error_cache[endpoint], 1): + # Still within the short error TTL – pretend nothing is available + return set() + else: + # Error expired – remove it + del _error_cache[endpoint] + + if "/v1" in endpoint: + endpoint_url = f"{endpoint}/models" + key = "data" + else: + endpoint_url = f"{endpoint}/api/tags" + key = "models" + client: aiohttp.ClientSession = app_state["session"] + try: + async with client.get(endpoint_url, headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() + + items = data.get(key, []) + models = {item.get("id") or item.get("name") for item in items if item.get("id") or item.get("name")} + + if models: + _models_cache[endpoint] = (models, time.time()) + return models + else: + # Empty list – treat as “no models”, but still cache for 300s + _models_cache[endpoint] = (models, time.time()) + return models + except Exception as e: + # Treat any error as if the endpoint offers no models + print(f"[fetch.available_models] {endpoint} error: {e}") + _error_cache[endpoint] = time.time() + return set() -async def fetch_loaded_models(endpoint: str) -> Set[str]: - """ - Query /api/ps and return a set of model names that are currently - loaded on that endpoint. If the request fails (e.g. timeout, 5xx), an empty - set is returned. - """ - client: aiohttp.ClientSession = app_state["session"] - try: - async with client.get(f"{endpoint}/api/ps") as resp: - await _ensure_success(resp) - data = await resp.json() - # The response format is: - # {"models": [{"name": "model1"}, {"name": "model2"}]} - models = {m.get("name") for m in data.get("models", []) if m.get("name")} - return models - except Exception: - # If anything goes wrong we simply assume the endpoint has no models - return set() + async def loaded_models(endpoint: str) -> Set[str]: + """ + Query /api/ps and return a set of model names that are currently + loaded on that endpoint. If the request fails (e.g. timeout, 5xx), an empty + set is returned. + """ + client: aiohttp.ClientSession = app_state["session"] + try: + async with client.get(f"{endpoint}/api/ps") as resp: + await _ensure_success(resp) + data = await resp.json() + # The response format is: + # {"models": [{"name": "model1"}, {"name": "model2"}]} + models = {m.get("name") for m in data.get("models", []) if m.get("name")} + return models + except Exception: + # If anything goes wrong we simply assume the endpoint has no models + return set() -async def fetch_endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]: - """ - Query / to fetch and return a List of dicts with details - for the corresponding Ollama endpoint. If the request fails we respond with "N/A" for detail. - """ - client: aiohttp.ClientSession = app_state["session"] - headers = None - if api_key is not None: - headers = {"Authorization": "Bearer " + api_key} - - try: - async with client.get(f"{endpoint}{route}", headers=headers) as resp: - await _ensure_success(resp) - data = await resp.json() - detail = data.get(detail, []) - return detail - except Exception as e: - # If anything goes wrong we cannot reply details - print(e) - return [] + async def endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]: + """ + Query / to fetch and return a List of dicts with details + for the corresponding Ollama endpoint. If the request fails we respond with "N/A" for detail. + """ + client: aiohttp.ClientSession = app_state["session"] + headers = None + if api_key is not None: + headers = {"Authorization": "Bearer " + api_key} + + try: + async with client.get(f"{endpoint}{route}", headers=headers) as resp: + await _ensure_success(resp) + data = await resp.json() + detail = data.get(detail, []) + return detail + except Exception as e: + # If anything goes wrong we cannot reply details + print(e) + return [] -def ep2base(ep): - if "/v1" in ep: - base_url = ep - else: - base_url = ep+"/v1" - return base_url + def ep2base(ep): + if "/v1" in ep: + base_url = ep + else: + base_url = ep+"/v1" + return base_url def dedupe_on_keys(dicts, key_fields): """ @@ -288,6 +289,19 @@ class rechunk: else: rechunk["message"] = {"role": chunk.choices[0].message.role, "content": chunk.choices[0].message.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None} return rechunk + + def openai_completion2ollama(chunk: dict, stream: bool, start_ts: float): + rechunk = { "model": chunk.model, + "created_at": iso8601_ns(), + "load_duration": None, + "done_reason": chunk.choices[0].finish_reason, + "total_duration": None, + "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), + "thinking": None, + "context": None, + "response": chunk.choices[0].text + } + return rechunk # ------------------------------------------------------------------ # SSE Helpser @@ -350,8 +364,8 @@ async def choose_endpoint(model: str) -> str: 6️⃣ If no endpoint advertises the model at all, raise an error. """ # 1️⃣ Gather advertised‑model sets for all endpoints concurrently - tag_tasks = [fetch_available_models(ep) for ep in config.endpoints if "/v1" not in ep] - tag_tasks += [fetch_available_models(ep, config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] + tag_tasks = [fetch.available_models(ep) for ep in config.endpoints if "/v1" not in ep] + tag_tasks += [fetch.available_models(ep, config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] advertised_sets = await asyncio.gather(*tag_tasks) # 2️⃣ Filter endpoints that advertise the requested model @@ -369,7 +383,7 @@ async def choose_endpoint(model: str) -> str: # 3️⃣ Among the candidates, find those that have the model *loaded* # (concurrently, but only for the filtered list) - load_tasks = [fetch_loaded_models(ep) for ep in candidate_endpoints] + load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints] loaded_sets = await asyncio.gather(*load_tasks) async with usage_lock: @@ -409,7 +423,6 @@ async def proxy(request: Request): """ Proxy a generate request to Ollama and stream the response back to the client. """ - # 1. Parse and validate request try: body_bytes = await request.body() payload = json.loads(body_bytes.decode("utf-8")) @@ -439,29 +452,50 @@ async def proxy(request: Request): except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") from e - # 2. Decide which endpoint to use + endpoint = await choose_endpoint(model) - - # Increment usage counter for this endpoint‑model pair await increment_usage(endpoint, model) + is_openai_endpoint = "/v1" in endpoint + if is_openai_endpoint: + params = { + "prompt": prompt, + "model": model, + } - # 3. Create Ollama client instance - client = ollama.AsyncClient(host=endpoint) + optional_params = { + "stream": stream, + } + + params.update({k: v for k, v in optional_params.items() if v is not None}) + oclient = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) + else: + client = ollama.AsyncClient(host=endpoint) # 4. Async generator that streams data and decrements the counter async def stream_generate_response(): try: - async_gen = await client.generate(model=model, prompt=prompt, suffix=suffix, system=system, template=template, context=context, stream=stream, think=think, raw=raw, format=_format, images=images, options=options, keep_alive=keep_alive) + if is_openai_endpoint: + start_ts = time.perf_counter() + async_gen = await oclient.completions.create(**params) + else: + async_gen = await client.generate(model=model, prompt=prompt, suffix=suffix, system=system, template=template, context=context, stream=stream, think=think, raw=raw, format=_format, images=images, options=options, keep_alive=keep_alive) if stream == True: async for chunk in async_gen: + if is_openai_endpoint: + chunk = rechunk.openai_completion2ollama(chunk, stream, start_ts) if hasattr(chunk, "model_dump_json"): json_line = chunk.model_dump_json() else: json_line = json.dumps(chunk) yield json_line.encode("utf-8") + b"\n" else: + if is_openai_endpoint: + response = rechunk.openai_completion2ollama(async_gen, stream, start_ts) + response = json.dumps(response) + else: + response = async_gen.model_dump_json() json_line = ( - async_gen.model_dump_json() + response if hasattr(async_gen, "model_dump_json") else json.dumps(async_gen) ) @@ -513,7 +547,8 @@ async def chat_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) - if "/v1" in endpoint: + is_openai_endpoint = "/v1" in endpoint + if is_openai_endpoint: params = { "messages": messages, "model": model, @@ -530,7 +565,6 @@ async def chat_proxy(request: Request): client = ollama.AsyncClient(host=endpoint) # 3. Async generator that streams chat data and decrements the counter - is_openai_endpoint = "/v1" in endpoint async def stream_chat_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) @@ -560,7 +594,6 @@ async def chat_proxy(request: Request): if hasattr(async_gen, "model_dump_json") else json.dumps(async_gen) ) - print(json_line) yield json_line.encode("utf-8") + b"\n" finally: @@ -941,7 +974,7 @@ async def version_proxy(request: Request): """ # 1. Query all endpoints for version - tasks = [fetch_endpoint_details(ep, "/api/version", "version") for ep in config.endpoints if "/v1" not in ep] + tasks = [fetch.endpoint_details(ep, "/api/version", "version") for ep in config.endpoints if "/v1" not in ep] all_versions = await asyncio.gather(*tasks) def version_key(v): @@ -964,8 +997,8 @@ async def tags_proxy(request: Request): """ # 1. Query all endpoints for models - tasks = [fetch_endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] - tasks += [fetch_endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] + tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] + tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] all_models = await asyncio.gather(*tasks) models = {'models': []} @@ -988,7 +1021,7 @@ async def ps_proxy(request: Request): """ # 1. Query all endpoints for running models - tasks = [fetch_endpoint_details(ep, "/api/ps", "models") for ep in config.endpoints if "/v1" not in ep] + tasks = [fetch.endpoint_details(ep, "/api/ps", "models") for ep in config.endpoints if "/v1" not in ep] loaded_models = await asyncio.gather(*tasks) models = {'models': []} @@ -1300,8 +1333,8 @@ async def openai_models_proxy(request: Request): """ # 1. Query all endpoints for models - tasks = [fetch_endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] - tasks += [fetch_endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] + tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] + tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] all_models = await asyncio.gather(*tasks) models = {'data': []} @@ -1351,9 +1384,7 @@ async def health_proxy(request: Request): * The HTTP status code is 200 when everything is healthy, 503 otherwise. """ # Run all health checks in parallel - tasks = [ - fetch_endpoint_details(ep, "/api/version", "version") for ep in config.endpoints - ] + tasks = [fetch.endpoint_details(ep, "/api/version", "version") for ep in config.endpoints] results = await asyncio.gather(*tasks, return_exceptions=True) From 49b1ea16d07cd44c4532b37bb73c5fe944f3a418 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 13 Sep 2025 18:11:05 +0200 Subject: [PATCH 14/27] hotfix ep2base --- router.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/router.py b/router.py index eb2a90a..689877a 100644 --- a/router.py +++ b/router.py @@ -219,12 +219,12 @@ class fetch: print(e) return [] - def ep2base(ep): - if "/v1" in ep: - base_url = ep - else: - base_url = ep+"/v1" - return base_url +def ep2base(ep): + if "/v1" in ep: + base_url = ep + else: + base_url = ep+"/v1" + return base_url def dedupe_on_keys(dicts, key_fields): """ From bd21906687abf71af1ec2958f952ec07db7ba509 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 15 Sep 2025 09:04:38 +0200 Subject: [PATCH 15/27] fixing /v1/embeddings --- router.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/router.py b/router.py index 689877a..d7a4cbd 100644 --- a/router.py +++ b/router.py @@ -1113,7 +1113,8 @@ async def openai_embedding_proxy(request: Request): api_key = config.api_keys[endpoint] else: api_key = "ollama" - oclient = openai.AsyncOpenAI(base_url=endpoint+"/v1", api_key=api_key) + base_url = ep2base(endpoint) + oclient = openai.AsyncOpenAI(base_url=base_url, api_key=api_key) # 3. Async generator that streams embedding data and decrements the counter async_gen = await oclient.embeddings.create(input=[doc], model=model) From 6c9ffad8340f59f0ba4c2c47e694f3bd52cded16 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 15 Sep 2025 11:47:55 +0200 Subject: [PATCH 16/27] adding ollama embeddings conversion calls to openai endpoint --- router.py | 46 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/router.py b/router.py index d7a4cbd..51e0137 100644 --- a/router.py +++ b/router.py @@ -302,7 +302,25 @@ class rechunk: "response": chunk.choices[0].text } return rechunk + + def openai_embeddings2ollama(chunk: dict): + rechunk = {"embedding": chunk.data[0].embedding} + return rechunk + def openai_embed2ollama(chunk: dict, model: str): + rechunk = { "model": model, + "created_at": iso8601_ns(), + "done": None, + "done_reason": None, + "total_duration": None, + "load_duration": None, + "prompt_eval_count": None, + "prompt_eval_duration": None, + "eval_count": None, + "eval_duration": None, + "embeddings": [chunk.data[0].embedding] + } + return rechunk # ------------------------------------------------------------------ # SSE Helpser # ------------------------------------------------------------------ @@ -639,13 +657,21 @@ async def embedding_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) - client = ollama.AsyncClient(host=endpoint) + is_openai_endpoint = "/v1" in endpoint + if is_openai_endpoint: + client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) + else: + client = ollama.AsyncClient(host=endpoint) # 3. Async generator that streams embedding data and decrements the counter async def stream_embedding_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - async_gen = await client.embeddings(model=model, prompt=prompt, options=options, keep_alive=keep_alive) + if is_openai_endpoint: + async_gen = await client.embeddings.create(input=[prompt], model=model) + async_gen = rechunk.openai_embeddings2ollama(async_gen) + else: + async_gen = await client.embeddings(model=model, prompt=prompt, options=options, keep_alive=keep_alive) if hasattr(async_gen, "model_dump_json"): json_line = async_gen.model_dump_json() else: @@ -676,7 +702,7 @@ async def embed_proxy(request: Request): payload = json.loads(body_bytes.decode("utf-8")) model = payload.get("model") - input = payload.get("input") + _input = payload.get("input") truncate = payload.get("truncate") options = payload.get("options") keep_alive = payload.get("keep_alive") @@ -685,7 +711,7 @@ async def embed_proxy(request: Request): raise HTTPException( status_code=400, detail="Missing required field 'model'" ) - if not input: + if not _input: raise HTTPException( status_code=400, detail="Missing required field 'input'" ) @@ -695,13 +721,21 @@ async def embed_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) - client = ollama.AsyncClient(host=endpoint) + is_openai_endpoint = "/v1" in endpoint + if is_openai_endpoint: + client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) + else: + client = ollama.AsyncClient(host=endpoint) # 3. Async generator that streams embed data and decrements the counter async def stream_embedding_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - async_gen = await client.embed(model=model, input=input, truncate=truncate, options=options, keep_alive=keep_alive) + if is_openai_endpoint: + async_gen = await client.embeddings.create(input=[_input], model=model) + async_gen = rechunk.openai_embed2ollama(async_gen, model) + else: + async_gen = await client.embed(model=model, input=_input, truncate=truncate, options=options, keep_alive=keep_alive) if hasattr(async_gen, "model_dump_json"): json_line = async_gen.model_dump_json() else: From ed84be27605b97a8f7a73ba09599133011337d62 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 15 Sep 2025 11:57:00 +0200 Subject: [PATCH 17/27] relabling openai models with ollama compatible tags --- router.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/router.py b/router.py index 51e0137..987ab6c 100644 --- a/router.py +++ b/router.py @@ -1037,6 +1037,11 @@ async def tags_proxy(request: Request): models = {'models': []} for modellist in all_models: + for model in modellist: + if not "model" in model.keys(): # Relable OpenAI models with Ollama Model.model from Model.id + model['model'] = model['id'] + else: + model['id'] = model['model'] models['models'] += modellist # 2. Return a JSONResponse with a deduplicated list of unique models for inference From beb5395e247a518e464fa85c6c762402792dac32 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 15 Sep 2025 12:06:42 +0200 Subject: [PATCH 18/27] Update README.md --- README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/README.md b/README.md index 6ca4cd0..f6cc58f 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,19 @@ endpoints: - http://ollama0:11434 - http://ollama1:11434 - http://ollama2:11434 + - https://api.openai.com/v1 # Maximum concurrent connections *per endpoint‑model pair* max_concurrent_connections: 2 + +# API keys for remote endpoints +# Set an environment variable like OPENAI_KEY +# Confirm endpoints are exactly as in endpoints block +api_keys: + "http://192.168.0.50:11434": "ollama" + "http://192.168.0.51:11434": "ollama" + "http://192.168.0.52:11434": "ollama" + "https://api.openai.com/v1": "${OPENAI_KEY}" ``` Run the NOMYO Router in a dedicated virtual environment, install the requirements and run with uvicorn: @@ -30,6 +40,13 @@ python3 -m venv .venv/router source .venv/router/bin/activate pip3 install -r requirements.txt ``` + +on the shell do: + +``` +export OPENAI_KEY=YOUR_SECRET_API_KEY +``` + finally you can ``` From da8b165f4a1e075c174774791b1d1a76aecf5307 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 15 Sep 2025 17:00:53 +0200 Subject: [PATCH 19/27] fixing openai models relabling for ollama client libs --- router.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/router.py b/router.py index 987ab6c..d8a318a 100644 --- a/router.py +++ b/router.py @@ -1042,6 +1042,10 @@ async def tags_proxy(request: Request): model['model'] = model['id'] else: model['id'] = model['model'] + if not "name" in model.keys(): # Relable OpenAI models with Ollama Model.name from Model.model to have model,name keys + model['name'] = model['model'] + else: + model['id'] = model['model'] models['models'] += modellist # 2. Return a JSONResponse with a deduplicated list of unique models for inference From 4b5834d7dfe4965ee938744d7283387fa2a9777d Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 15 Sep 2025 17:39:15 +0200 Subject: [PATCH 20/27] comliance with ollama naming conventions and openai model['id'] --- router.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/router.py b/router.py index d8a318a..5780045 100644 --- a/router.py +++ b/router.py @@ -394,10 +394,19 @@ async def choose_endpoint(model: str) -> str: # 6️⃣ if not candidate_endpoints: - raise RuntimeError( - f"None of the configured endpoints ({', '.join(config.endpoints)}) " - f"advertise the model '{model}'." - ) + if ":latest" in model: #ollama naming convention not applicable to openai + model = model.split(":") + model = model[0] + print(model) + candidate_endpoints = [ + ep for ep, models in zip(config.endpoints, advertised_sets) + if model in models + ] + if not candidate_endpoints: + raise RuntimeError( + f"None of the configured endpoints ({', '.join(config.endpoints)}) " + f"advertise the model '{model}'." + ) # 3️⃣ Among the candidates, find those that have the model *loaded* # (concurrently, but only for the filtered list) @@ -472,9 +481,11 @@ async def proxy(request: Request): endpoint = await choose_endpoint(model) - await increment_usage(endpoint, model) is_openai_endpoint = "/v1" in endpoint if is_openai_endpoint: + if ":latest" in model: + model = model.split(":") + model = model[0] params = { "prompt": prompt, "model": model, @@ -488,6 +499,7 @@ async def proxy(request: Request): oclient = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) else: client = ollama.AsyncClient(host=endpoint) + await increment_usage(endpoint, model) # 4. Async generator that streams data and decrements the counter async def stream_generate_response(): @@ -564,9 +576,11 @@ async def chat_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) - await increment_usage(endpoint, model) is_openai_endpoint = "/v1" in endpoint if is_openai_endpoint: + if ":latest" in model: + model = model.split(":") + model = model[0] params = { "messages": messages, "model": model, @@ -581,7 +595,7 @@ async def chat_proxy(request: Request): oclient = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) else: client = ollama.AsyncClient(host=endpoint) - + await increment_usage(endpoint, model) # 3. Async generator that streams chat data and decrements the counter async def stream_chat_response(): try: From 16dba93c0dbd932e9f992471e95df543e98a7bb9 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 15 Sep 2025 17:48:17 +0200 Subject: [PATCH 21/27] compliance for ollama embeddings endpoints using openai models --- router.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/router.py b/router.py index 5780045..10b3a5e 100644 --- a/router.py +++ b/router.py @@ -397,7 +397,6 @@ async def choose_endpoint(model: str) -> str: if ":latest" in model: #ollama naming convention not applicable to openai model = model.split(":") model = model[0] - print(model) candidate_endpoints = [ ep for ep, models in zip(config.endpoints, advertised_sets) if model in models @@ -670,13 +669,15 @@ async def embedding_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) - await increment_usage(endpoint, model) is_openai_endpoint = "/v1" in endpoint if is_openai_endpoint: + if ":latest" in model: + model = model.split(":") + model = model[0] client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) else: client = ollama.AsyncClient(host=endpoint) - + await increment_usage(endpoint, model) # 3. Async generator that streams embedding data and decrements the counter async def stream_embedding_response(): try: @@ -734,13 +735,15 @@ async def embed_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) - await increment_usage(endpoint, model) is_openai_endpoint = "/v1" in endpoint if is_openai_endpoint: + if ":latest" in model: + model = model.split(":") + model = model[0] client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) else: client = ollama.AsyncClient(host=endpoint) - + await increment_usage(endpoint, model) # 3. Async generator that streams embed data and decrements the counter async def stream_embedding_response(): try: From 795873b4c946f027c9334771d94477c0815c6832 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 15 Sep 2025 19:12:00 +0200 Subject: [PATCH 22/27] finalizing compliance tasks --- router.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/router.py b/router.py index 10b3a5e..316c188 100644 --- a/router.py +++ b/router.py @@ -395,7 +395,7 @@ async def choose_endpoint(model: str) -> str: # 6️⃣ if not candidate_endpoints: if ":latest" in model: #ollama naming convention not applicable to openai - model = model.split(":") + model = model.split(":latest") model = model[0] candidate_endpoints = [ ep for ep, models in zip(config.endpoints, advertised_sets) @@ -483,7 +483,7 @@ async def proxy(request: Request): is_openai_endpoint = "/v1" in endpoint if is_openai_endpoint: if ":latest" in model: - model = model.split(":") + model = model.split(":latest") model = model[0] params = { "prompt": prompt, @@ -578,7 +578,7 @@ async def chat_proxy(request: Request): is_openai_endpoint = "/v1" in endpoint if is_openai_endpoint: if ":latest" in model: - model = model.split(":") + model = model.split(":latest") model = model[0] params = { "messages": messages, @@ -672,7 +672,7 @@ async def embedding_proxy(request: Request): is_openai_endpoint = "/v1" in endpoint if is_openai_endpoint: if ":latest" in model: - model = model.split(":") + model = model.split(":latest") model = model[0] client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) else: @@ -738,7 +738,7 @@ async def embed_proxy(request: Request): is_openai_endpoint = "/v1" in endpoint if is_openai_endpoint: if ":latest" in model: - model = model.split(":") + model = model.split(":latest") model = model[0] client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) else: @@ -1056,7 +1056,7 @@ async def tags_proxy(request: Request): for modellist in all_models: for model in modellist: if not "model" in model.keys(): # Relable OpenAI models with Ollama Model.model from Model.id - model['model'] = model['id'] + model['model'] = model['id'] + ":latest" else: model['id'] = model['model'] if not "name" in model.keys(): # Relable OpenAI models with Ollama Model.name from Model.model to have model,name keys From f4678018bf981b06ee88d98f8cfada13771ccdc2 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Tue, 16 Sep 2025 17:51:51 +0200 Subject: [PATCH 23/27] adding thinking to rechunk class --- router.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/router.py b/router.py index 316c188..1c00373 100644 --- a/router.py +++ b/router.py @@ -285,9 +285,9 @@ class rechunk: "response_token/s": (round(chunk.usage.total_tokens / (time.perf_counter() - start_ts), 2) if chunk.usage is not None else None) } if stream == True: - rechunk["message"] = {"role": chunk.choices[0].delta.role, "content": chunk.choices[0].delta.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None} + rechunk["message"] = {"role": chunk.choices[0].delta.role or "assistant", "content": chunk.choices[0].delta.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None} else: - rechunk["message"] = {"role": chunk.choices[0].message.role, "content": chunk.choices[0].message.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None} + rechunk["message"] = {"role": chunk.choices[0].message.role or "assistant", "content": chunk.choices[0].message.content, "thinking": None, "images": None, "tool_name": None, "tool_calls": None} return rechunk def openai_completion2ollama(chunk: dict, stream: bool, start_ts: float): @@ -297,7 +297,7 @@ class rechunk: "done_reason": chunk.choices[0].finish_reason, "total_duration": None, "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), - "thinking": None, + "thinking": chunk.choices[0].reasoning or None, "context": None, "response": chunk.choices[0].text } From d85d120cc81d0c1f344bc65f022af7a732259a19 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Wed, 17 Sep 2025 11:39:51 +0200 Subject: [PATCH 24/27] fixing thinking mode in rechunk and model renaming in /v1 endpoints --- config.yaml | 22 ---------------------- 1 file changed, 22 deletions(-) delete mode 100644 config.yaml diff --git a/config.yaml b/config.yaml deleted file mode 100644 index 81f04f5..0000000 --- a/config.yaml +++ /dev/null @@ -1,22 +0,0 @@ -# config.yaml -endpoints: - - http://192.168.0.50:11434 - - http://192.168.0.51:11434 - - http://192.168.0.52:11434 - #- https://openrouter.ai/api/v1 - #- https://api.openai.com/v1 - #- https://generativelanguage.googleapis.com/v1beta/openai - -# Maximum concurrent connections *per endpoint‑model pair* (equals to OLLAMA_NUM_PARALLEL) -max_concurrent_connections: 2 - -# API keys for remote endpoints -# Set an environment variable like OPENAI_KEY -# Confirm endpoints are exactly as in endpoints block -api_keys: - "http://192.168.0.50:11434": "ollama" - "http://192.168.0.51:11434": "ollama" - "http://192.168.0.52:11434": "ollama" - #"https://openrouter.ai/api/v1": "${OPENROUTER_KEY}" - #"https://api.openai.com/v1": "${OPENAI_KEY}" - #"https://generativelanguage.googleapis.com/v1beta/openai": "${GEMINI_KEY}" \ No newline at end of file From deca8e37adc72e27a8993718b2f508cf124e1d7f Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Wed, 17 Sep 2025 11:40:48 +0200 Subject: [PATCH 25/27] fixing model re-naming in /v1 endpoints and thinking in rechunk --- .gitignore | 3 +++ router.py | 12 +++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 5dac518..4bb65cd 100644 --- a/.gitignore +++ b/.gitignore @@ -63,3 +63,6 @@ cython_debug/ # Logfile(s) *.log *.sqlite3 + +# Config +config.yaml \ No newline at end of file diff --git a/router.py b/router.py index 1c00373..2aa0886 100644 --- a/router.py +++ b/router.py @@ -291,13 +291,15 @@ class rechunk: return rechunk def openai_completion2ollama(chunk: dict, stream: bool, start_ts: float): + with_thinking = chunk.choices[0] if chunk.choices[0] else None + thinking = getattr(with_thinking, "reasoning", None) if with_thinking else None rechunk = { "model": chunk.model, "created_at": iso8601_ns(), "load_duration": None, "done_reason": chunk.choices[0].finish_reason, "total_duration": None, "eval_duration": (int((time.perf_counter() - start_ts) * 1000) if chunk.usage is not None else None), - "thinking": chunk.choices[0].reasoning or None, + "thinking": thinking, "context": None, "response": chunk.choices[0].text } @@ -1213,6 +1215,10 @@ async def openai_chat_completions_proxy(request: Request): max_completion_tokens = payload.get("max_completion_tokens") tools = payload.get("tools") + if ":latest" in model: + model = model.split(":latest") + model = model[0] + params = { "messages": messages, "model": model, @@ -1313,6 +1319,10 @@ async def openai_completions_proxy(request: Request): max_completion_tokens = payload.get("max_completion_tokens") suffix = payload.get("suffix") + if ":latest" in model: + model = model.split(":latest") + model = model[0] + params = { "prompt": prompt, "model": model, From 96995cd37f607019a112a6809bf70bd9c7fb9da7 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Wed, 17 Sep 2025 11:43:12 +0200 Subject: [PATCH 26/27] Create config.yaml --- config.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 config.yaml diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..bb8e8f5 --- /dev/null +++ b/config.yaml @@ -0,0 +1,18 @@ +# config.yaml +endpoints: + - http://192.168.0.50:11434 + - http://192.168.0.51:11434 + - http://192.168.0.52:11434 + - https://api.openai.com/v1 + +# Maximum concurrent connections *per endpoint‑model pair* (equals to OLLAMA_NUM_PARALLEL) +max_concurrent_connections: 2 + +# API keys for remote endpoints +# Set an environment variable like OPENAI_KEY +# Confirm endpoints are exactly as in endpoints block +api_keys: + "http://192.168.0.50:11434": "ollama" + "http://192.168.0.51:11434": "ollama" + "http://192.168.0.52:11434": "ollama" + "https://api.openai.com/v1": "${OPENAI_KEY}" From 8fe3880af78645576fb5ac882403a901f6ed240a Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 18 Sep 2025 18:49:11 +0200 Subject: [PATCH 27/27] randomize endpoint selection for bootstrapping ollamas --- router.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/router.py b/router.py index 2aa0886..374ccbc 100644 --- a/router.py +++ b/router.py @@ -6,7 +6,7 @@ version: 0.3 license: AGPL """ # ------------------------------------------------------------- -import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, datetime +import json, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, datetime, random from pathlib import Path from typing import Dict, Set, List, Optional from fastapi import FastAPI, Request, HTTPException @@ -376,11 +376,12 @@ async def choose_endpoint(model: str) -> str: 1️⃣ Query every endpoint for its advertised models (`/api/tags`). 2️⃣ Build a list of endpoints that contain the requested model. 3️⃣ For those endpoints, find those that have the model loaded - (`/api/ps`) *and* still have a free slot. + (`/api/ps`) *and* still have a free slot. 4️⃣ If none are both loaded and free, fall back to any endpoint - from the filtered list that simply has a free slot. + from the filtered list that simply has a free slot and randomly + select one. 5️⃣ If all are saturated, pick any endpoint from the filtered list - (the request will queue on that endpoint). + (the request will queue on that endpoint). 6️⃣ If no endpoint advertises the model at all, raise an error. """ # 1️⃣ Gather advertised‑model sets for all endpoints concurrently @@ -436,8 +437,7 @@ async def choose_endpoint(model: str) -> str: ] if endpoints_with_free_slot: - ep = min(endpoints_with_free_slot, key=current_usage) - return ep + return random.choice(endpoints_with_free_slot) # 5️⃣ All candidate endpoints are saturated – pick one with lowest usages count (will queue) ep = min(candidate_endpoints, key=current_usage)