From 8ce7277884b254a46c2581ee7fdd17e6a591455b Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 30 Aug 2025 23:30:21 +0200 Subject: [PATCH 1/6] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 451e82e..4ae10ba 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ uvicorn router:app --host 127.0.0.1 --port 12434 NOMYO Router accepts any Ollama request on the configured port for any Ollama endpoint from your frontend application. It then checks the available backends for the specific request. When the request is embed(dings), chat or generate the request will be forwarded to a single Ollama server, answered and send back to the router which forwards it back to the frontend. -If now a another request for the same model config is made, NOMYO Router is aware which model runs on which Ollama server and routes the request to an Ollama server where this model is already deployed. +Any other request for the same model config is made, NOMYO Router is aware which model runs on which Ollama server and routes the request to an Ollama server where this model is already deployed. If at the same time there are more than max concurrent connections than configured, NOMYO Router will route this request to another Ollama server for completion. From ae0f6c14f75bba1537d64c947fe2733b011e5980 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 30 Aug 2025 23:32:07 +0200 Subject: [PATCH 2/6] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4ae10ba..9d514eb 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ uvicorn router:app --host 127.0.0.1 --port 12434 NOMYO Router accepts any Ollama request on the configured port for any Ollama endpoint from your frontend application. It then checks the available backends for the specific request. When the request is embed(dings), chat or generate the request will be forwarded to a single Ollama server, answered and send back to the router which forwards it back to the frontend. -Any other request for the same model config is made, NOMYO Router is aware which model runs on which Ollama server and routes the request to an Ollama server where this model is already deployed. +If another request for the same model config is made, NOMYO Router is aware which model runs on which Ollama server and routes the request to an Ollama server where this model is already deployed. If at the same time there are more than max concurrent connections than configured, NOMYO Router will route this request to another Ollama server for completion. From 295ace0401db599126872c25a0580e0c3c1a6b27 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sun, 31 Aug 2025 11:40:13 +0200 Subject: [PATCH 3/6] Add files via upload fixing a result type bug in exception handleer --- router.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/router.py b/router.py index b3124ba..13fd123 100644 --- a/router.py +++ b/router.py @@ -120,9 +120,10 @@ async def fetch_endpoint_details(endpoint: str, route: str, detail: str) -> List data = resp.json() detail = data.get(detail) return detail - except Exception: - # If anything goes wrong we cannot reply versions - return {detail: "N/A"} + except Exception as e: + # If anything goes wrong we cannot reply details + print(e) + return {detail: ["N/A"]} def ep2base(ep): if "/v1" in ep: From 64549b4e1cf5b7aaa4ce173477051f2db1a4c321 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 1 Sep 2025 09:01:41 +0200 Subject: [PATCH 4/6] Add files via upload Increasing timeout for heavy load situatuions --- router.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/router.py b/router.py index 13fd123..5183e10 100644 --- a/router.py +++ b/router.py @@ -71,7 +71,7 @@ async def fetch_available_models(endpoint: str) -> Set[str]: set is returned. """ try: - async with httpx.AsyncClient(timeout=1.0) as client: + async with httpx.AsyncClient(timeout=5.0) as client: if "/v1" in endpoint: resp = await client.get(f"{endpoint}/models") else: @@ -118,12 +118,12 @@ async def fetch_endpoint_details(endpoint: str, route: str, detail: str) -> List resp = await client.get(f"{endpoint}{route}") resp.raise_for_status() data = resp.json() - detail = data.get(detail) + detail = data.get(detail, []) return detail except Exception as e: # If anything goes wrong we cannot reply details print(e) - return {detail: ["N/A"]} + return {detail: []} def ep2base(ep): if "/v1" in ep: @@ -795,7 +795,7 @@ async def ps_proxy(request: Request): for modellist in loaded_models: models['models'] += modellist - # 25. Return a JSONResponse with deduplicated currently deployed models + # 2. Return a JSONResponse with deduplicated currently deployed models return JSONResponse( content={"models": dedupe_on_keys(models['models'], ['digest'])}, status_code=200, From 9f19350f55d478239e213fb35fbe6cde93b5b0f9 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 1 Sep 2025 09:30:23 +0200 Subject: [PATCH 5/6] Add files via upload improving high load endpoint selection by moving from rr to lc algorithm for connections > max_concurrent_connections --- router.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/router.py b/router.py index 5183e10..d356885 100644 --- a/router.py +++ b/router.py @@ -71,7 +71,7 @@ async def fetch_available_models(endpoint: str) -> Set[str]: set is returned. """ try: - async with httpx.AsyncClient(timeout=5.0) as client: + async with httpx.AsyncClient(timeout=2.5) as client: if "/v1" in endpoint: resp = await client.get(f"{endpoint}/models") else: @@ -85,8 +85,9 @@ async def fetch_available_models(endpoint: str) -> Set[str]: else: models = {m.get("name") for m in data.get("models", []) if m.get("name")} return models - except Exception: + except Exception as e: # Treat any error as if the endpoint offers no models + print(e) return set() async def fetch_loaded_models(endpoint: str) -> Set[str]: @@ -206,14 +207,19 @@ async def choose_endpoint(model: str) -> str: loaded_sets = await asyncio.gather(*load_tasks) async with usage_lock: + # Helper: get current usage count for (endpoint, model) + def current_usage(ep: str) -> int: + return usage_counts.get(ep, {}).get(model, 0) + # 3️⃣ Endpoints that have the model loaded *and* a free slot loaded_and_free = [ ep for ep, models in zip(candidate_endpoints, loaded_sets) if model in models and usage_counts[ep].get(model, 0) < config.max_concurrent_connections ] - + if loaded_and_free: - return random.choice(loaded_and_free) + ep = min(loaded_and_free, key=current_usage) + return ep # 4️⃣ Endpoints among the candidates that simply have a free slot endpoints_with_free_slot = [ @@ -222,10 +228,12 @@ async def choose_endpoint(model: str) -> str: ] if endpoints_with_free_slot: - return random.choice(endpoints_with_free_slot) + ep = min(endpoints_with_free_slot, key=current_usage) + return ep # 5️⃣ All candidate endpoints are saturated – pick any (will queue) - return random.choice(candidate_endpoints) + ep = min(candidate_endpoints, key=current_usage) + return ep # ------------------------------------------------------------- # 6. API route – Generate From 0c6387f5afde488ff19070f85e34596d9fd3e9dd Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 1 Sep 2025 11:07:07 +0200 Subject: [PATCH 6/6] Add files via upload adding persistent connections for endpoints adding cache to available models routine --- requirements.txt | 1 + router.py | 65 ++++++++++++++++++++++++++++++------------------ 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/requirements.txt b/requirements.txt index e39b50c..4d43ce4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +aiocache==0.12.3 annotated-types==0.7.0 anyio==4.10.0 certifi==2025.8.3 diff --git a/router.py b/router.py index d356885..9173976 100644 --- a/router.py +++ b/router.py @@ -15,6 +15,7 @@ from starlette.responses import StreamingResponse, JSONResponse, Response, HTMLR from pydantic import Field from pydantic_settings import BaseSettings from collections import defaultdict +from aiocache import cached, Cache # ------------------------------------------------------------- # 1. Configuration loader @@ -60,6 +61,21 @@ usage_lock = asyncio.Lock() # protects access to usage_counts # ------------------------------------------------------------- # 4. Helperfunctions # ------------------------------------------------------------- +def get_httpx_client(endpoint: str) -> httpx.AsyncClient: + """ + Use persistent connections to request endpoint info for reliable results + in high load situations or saturated endpoints. + """ + return httpx.AsyncClient( + base_url=endpoint, + timeout=httpx.Timeout(5.0, read=5.0, write=5.0, connect=5.0), + limits=httpx.Limits( + max_keepalive_connections=64, + max_connections=64 + ) + ) + +@cached(cache=Cache.MEMORY, ttl=300) async def fetch_available_models(endpoint: str) -> Set[str]: """ Query /api/tags and return a set of all model names that the @@ -70,41 +86,42 @@ async def fetch_available_models(endpoint: str) -> Set[str]: If the request fails (e.g. timeout, 5xx, or malformed response), an empty set is returned. """ + client = get_httpx_client(endpoint) try: - async with httpx.AsyncClient(timeout=2.5) as client: - if "/v1" in endpoint: - resp = await client.get(f"{endpoint}/models") - else: - resp = await client.get(f"{endpoint}/api/tags") - resp.raise_for_status() - data = resp.json() - # Expected format: - # {"models": [{"name": "model1"}, {"name": "model2"}]} - if "/v1" in endpoint: - models = {m.get("id") for m in data.get("data", []) if m.get("name")} - else: - models = {m.get("name") for m in data.get("models", []) if m.get("name")} - return models + if "/v1" in endpoint: + resp = await client.get(f"/models") + else: + resp = await client.get(f"/api/tags") + resp.raise_for_status() + data = resp.json() + # Expected format: + # {"models": [{"name": "model1"}, {"name": "model2"}]} + if "/v1" in endpoint: + models = {m.get("id") for m in data.get("data", []) if m.get("name")} + else: + models = {m.get("name") for m in data.get("models", []) if m.get("name")} + return models except Exception as e: # Treat any error as if the endpoint offers no models print(e) return set() + async def fetch_loaded_models(endpoint: str) -> Set[str]: """ Query /api/ps and return a set of model names that are currently loaded on that endpoint. If the request fails (e.g. timeout, 5xx), an empty set is returned. """ + client = get_httpx_client(endpoint) try: - async with httpx.AsyncClient(timeout=1.0) as client: - resp = await client.get(f"{endpoint}/api/ps") - resp.raise_for_status() - data = resp.json() - # The response format is: - # {"models": [{"name": "model1"}, {"name": "model2"}]} - models = {m.get("name") for m in data.get("models", []) if m.get("name")} - return models + resp = await client.get(f"/api/ps") + resp.raise_for_status() + data = resp.json() + # The response format is: + # {"models": [{"name": "model1"}, {"name": "model2"}]} + models = {m.get("name") for m in data.get("models", []) if m.get("name")} + return models except Exception: # If anything goes wrong we simply assume the endpoint has no models return set() @@ -193,7 +210,7 @@ async def choose_endpoint(model: str) -> str: ep for ep, models in zip(config.endpoints, advertised_sets) if model in models ] - + # 6️⃣ if not candidate_endpoints: raise RuntimeError( @@ -231,7 +248,7 @@ async def choose_endpoint(model: str) -> str: ep = min(endpoints_with_free_slot, key=current_usage) return ep - # 5️⃣ All candidate endpoints are saturated – pick any (will queue) + # 5️⃣ All candidate endpoints are saturated – pick one with lowest usages count (will queue) ep = min(candidate_endpoints, key=current_usage) return ep