From bd0d210b2a349d02657a5b6abb0a63069648a6c0 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sun, 1 Feb 2026 10:05:46 +0100 Subject: [PATCH 1/8] feat: enforce api key authentication and update table header - Added proper API key validation in router.py with 401 response when key is missing - Implemented CORS headers for authentication requests - Updated table header from "Until" to "Unload" in static/index.html - Improved security by preventing API key leakage in access logs --- router.py | 18 ++++++++++++------ static/index.html | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/router.py b/router.py index 908b8c9..e15efcc 100644 --- a/router.py +++ b/router.py @@ -250,13 +250,19 @@ async def enforce_router_api_key(request: Request, call_next): # Strip the api_key query param from scope so access logs do not leak it _strip_api_key_from_scope(request) if provided_key is None: - response = await call_next(request) - # Add CORS headers for API key authentication requests + # No key provided but authentication is required - return 401 + headers = {} if "/api/" in path and path != "/api/usage-stream": - response.headers["Access-Control-Allow-Origin"] = "*" - response.headers["Access-Control-Allow-Headers"] = "Authorization, Content-Type" - response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" - return response + headers = { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Headers": "Authorization, Content-Type", + "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", + } + return JSONResponse( + content={"detail": "Missing NOMYO Router API key"}, + status_code=401, + headers=headers, + ) if not secrets.compare_digest(str(provided_key), str(expected_key)): return JSONResponse( diff --git a/static/index.html b/static/index.html index 09cc6fe..b8b1328 100644 --- a/static/index.html +++ b/static/index.html @@ -379,7 +379,7 @@ Quant Ctx Size - Until + Unload Digest Tokens From 92cea1deadc6271a23b02060081ce250cf3e4627 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sun, 8 Feb 2026 11:29:47 +0100 Subject: [PATCH 2/8] feat: update reasoning handling Updated reasoning content handling in router.py to check for both "reasoning_content" and "reasoning" attributes. --- router.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/router.py b/router.py index e15efcc..990974f 100644 --- a/router.py +++ b/router.py @@ -997,11 +997,11 @@ class rechunk: ) with_thinking = chunk.choices[0] if chunk.choices[0] else None if stream == True: - thinking = getattr(with_thinking.delta, "reasoning", None) if with_thinking else None + thinking = (getattr(with_thinking.delta, "reasoning_content", None) or getattr(with_thinking.delta, "reasoning", None)) if with_thinking else None role = chunk.choices[0].delta.role or "assistant" content = chunk.choices[0].delta.content or '' else: - thinking = getattr(with_thinking, "reasoning", None) if with_thinking else None + thinking = (getattr(with_thinking.message, "reasoning_content", None) or getattr(with_thinking.message, "reasoning", None)) if with_thinking else None role = chunk.choices[0].message.role or "assistant" content = chunk.choices[0].message.content or '' assistant_msg = ollama.Message( @@ -1211,7 +1211,7 @@ async def choose_endpoint(model: str) -> str: # Then by total endpoint usage (ascending) to balance idle endpoints endpoints_with_free_slot.sort( key=lambda ep: ( - -usage_counts.get(ep, {}).get(model, 0), # Primary: per-model usage (descending - prefer endpoints with connections) + #-usage_counts.get(ep, {}).get(model, 0), # Primary: per-model usage (descending - prefer endpoints with connections) sum(usage_counts.get(ep, {}).values()) # Secondary: total endpoint usage (ascending - prefer idle endpoints) ) ) From 7deb088c6a28449ff4d2772bfebe51080828975f Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sun, 8 Feb 2026 16:46:40 +0100 Subject: [PATCH 3/8] refactor(cache): split error cache and add stale-while-revalidate Refactor error tracking to use separate caches for 'available' and 'loaded' models, preventing cross-contamination of transient errors. Implement background refresh for available models to prevent blocking requests, and use stale-while-revalidate (300-600s) to serve stale data immediately when the cache is between 300s and 600s old. --- router.py | 55 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/router.py b/router.py index 990974f..fe1fd36 100644 --- a/router.py +++ b/router.py @@ -30,16 +30,18 @@ from PIL import Image # Successful results are cached for 300s _models_cache: dict[str, tuple[Set[str], float]] = {} _loaded_models_cache: dict[str, tuple[Set[str], float]] = {} -# Transient errors are cached for 1s – the key stays until the -# timeout expires, after which the endpoint will be queried again. -_error_cache: dict[str, float] = {} +# Transient errors are cached separately per concern so that a failure +# in one path does not poison the other. +_available_error_cache: dict[str, float] = {} +_loaded_error_cache: dict[str, float] = {} # ------------------------------------------------------------------ # Cache locks # ------------------------------------------------------------------ _models_cache_lock = asyncio.Lock() _loaded_models_cache_lock = asyncio.Lock() -_error_cache_lock = asyncio.Lock() +_available_error_cache_lock = asyncio.Lock() +_loaded_error_cache_lock = asyncio.Lock() # ------------------------------------------------------------------ # In-flight request tracking (prevents cache stampede) @@ -535,10 +537,21 @@ class fetch: message = _format_connection_issue(endpoint_url, e) print(f"[fetch.available_models] {message}") # Update error cache with lock protection - async with _error_cache_lock: - _error_cache[endpoint] = time.time() + async with _available_error_cache_lock: + _available_error_cache[endpoint] = time.time() return set() + async def _refresh_available_models(endpoint: str, api_key: Optional[str] = None) -> None: + """ + Background task to refresh available models cache without blocking the caller. + Used for stale-while-revalidate pattern. + """ + try: + await fetch._fetch_available_models_internal(endpoint, api_key) + except Exception as e: + # Silently fail - cache will remain stale but functional + print(f"[fetch._refresh_available_models] Background refresh failed for {endpoint}: {e}") + async def available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: """ Query /api/tags and return a set of all model names that the @@ -549,6 +562,10 @@ class fetch: Uses request coalescing to prevent cache stampede: if multiple requests arrive when cache is expired, only one actual HTTP request is made. + Uses stale-while-revalidate: when the cache is between 300-600s old, + the stale data is returned immediately while a background refresh runs. + This prevents model blackouts caused by transient timeouts. + If the request fails (e.g. timeout, 5xx, or malformed response), an empty set is returned. """ @@ -556,19 +573,27 @@ class fetch: async with _models_cache_lock: if endpoint in _models_cache: models, cached_at = _models_cache[endpoint] + + # FRESH: < 300s old - return immediately if _is_fresh(cached_at, 300): return models - # Stale entry - remove it + + # STALE: 300-600s old - return stale data and refresh in background + if _is_fresh(cached_at, 600): + asyncio.create_task(fetch._refresh_available_models(endpoint, api_key)) + return models # Return stale data immediately + + # EXPIRED: > 600s old - too stale, must refresh synchronously del _models_cache[endpoint] # Check error cache with lock protection - async with _error_cache_lock: - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): + async with _available_error_cache_lock: + if endpoint in _available_error_cache: + if _is_fresh(_available_error_cache[endpoint], 10): # Still within the short error TTL – pretend nothing is available return set() # Error expired – remove it - del _error_cache[endpoint] + del _available_error_cache[endpoint] # Request coalescing: check if another request is already fetching this endpoint async with _inflight_lock: @@ -657,12 +682,12 @@ class fetch: del _loaded_models_cache[endpoint] # Check error cache with lock protection - async with _error_cache_lock: - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): + async with _loaded_error_cache_lock: + if endpoint in _loaded_error_cache: + if _is_fresh(_loaded_error_cache[endpoint], 10): return set() # Error expired - remove it - del _error_cache[endpoint] + del _loaded_error_cache[endpoint] # Request coalescing: check if another request is already fetching this endpoint async with _inflight_lock: From 1f81e69ce18a9615b2d46a28c15ae52811821ac8 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 9 Feb 2026 11:04:14 +0100 Subject: [PATCH 4/8] refactor(router.py): correctly implement OpenAI tool_calls to Ollama format conversion --- router.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/router.py b/router.py index fe1fd36..89f27e8 100644 --- a/router.py +++ b/router.py @@ -1029,13 +1029,32 @@ class rechunk: thinking = (getattr(with_thinking.message, "reasoning_content", None) or getattr(with_thinking.message, "reasoning", None)) if with_thinking else None role = chunk.choices[0].message.role or "assistant" content = chunk.choices[0].message.content or '' + # Convert OpenAI tool_calls to Ollama format + ollama_tool_calls = None + if stream: + raw_tool_calls = getattr(with_thinking.delta, "tool_calls", None) if with_thinking else None + else: + raw_tool_calls = getattr(with_thinking.message, "tool_calls", None) if with_thinking else None + if raw_tool_calls: + ollama_tool_calls = [] + for tc in raw_tool_calls: + try: + args = orjson.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else (tc.function.arguments or {}) + except (orjson.JSONDecodeError, TypeError): + args = {} + ollama_tool_calls.append({ + "function": { + "name": tc.function.name, + "arguments": args, + } + }) assistant_msg = ollama.Message( role=role, content=content, thinking=thinking, images=None, tool_name=None, - tool_calls=None) + tool_calls=ollama_tool_calls) rechunk = ollama.ChatResponse( model=chunk.model, created_at=iso8601_ns(), From 4892998abcfed6faf18cb4923655bdb67d4598f9 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Tue, 10 Feb 2026 16:46:51 +0100 Subject: [PATCH 5/8] feat(router): Add llama-server endpoints support and model parsing Add `llama_server_endpoints` configuration field to support llama_server OpenAI-compatible endpoints for status checks. Implement helper functions to parse model names and quantization levels from llama-server responses (best effort). Update `is_ext_openai_endpoint` to properly distinguish these endpoints from external OpenAI services. Update sample configuration documentation. --- doc/examples/sample-config.yaml | 56 ++--- router.py | 378 ++++++++++++++++++++++++-------- static/index.html | 31 ++- 3 files changed, 342 insertions(+), 123 deletions(-) diff --git a/doc/examples/sample-config.yaml b/doc/examples/sample-config.yaml index 9dbce09..49ad389 100644 --- a/doc/examples/sample-config.yaml +++ b/doc/examples/sample-config.yaml @@ -1,40 +1,32 @@ -# Sample NOMYO Router Configuration - -# Basic single endpoint configuration +# config.yaml +# Ollama endpoints endpoints: - - http://localhost:11434 + - http://192.168.0.50:11434 + - http://192.168.0.51:11434 + - http://192.168.0.52:11434 + # External OpenAI-compatible endpoints (will NOT be queried for /api/ps /api/ps_details) + - https://api.openai.com/v1 +# llama-server endpoints (OpenAI-compatible with /v1/models status info) +# These endpoints will be queried for /api/tags, /api/ps, /api/ps_details +# and included in the model selection pool for inference routing +llama_server_endpoints: + - http://localhost:8080/v1 + - http://localhost:8081/v1 + +# Maximum concurrent connections *per endpoint‑model pair* (equals to OLLAMA_NUM_PARALLEL) max_concurrent_connections: 2 -# Optional router-level API key to secure the router and dashboard (leave blank to disable) +# Optional router-level API key that gates router/API/web UI access (leave empty to disable) nomyo-router-api-key: "" -# Multi-endpoint configuration with local Ollama instances -# endpoints: -# - http://ollama-worker1:11434 -# - http://ollama-worker2:11434 -# - http://ollama-worker3:11434 - -# Mixed configuration with Ollama and OpenAI endpoints -# endpoints: -# - http://localhost:11434 -# - https://api.openai.com/v1 - - # API keys for remote endpoints -# Use ${VAR_NAME} syntax to reference environment variables +# Set an environment variable like OPENAI_KEY +# Confirm endpoints are exactly as in endpoints block api_keys: - # Local Ollama instances typically don't require authentication - "http://localhost:11434": "ollama" - - # Remote Ollama instances - # "http://remote-ollama:11434": "ollama" - - # OpenAI API - # "https://api.openai.com/v1": "${OPENAI_KEY}" - - # Anthropic API - # "https://api.anthropic.com/v1": "${ANTHROPIC_KEY}" - - # Other OpenAI-compatible endpoints - # "https://api.mistral.ai/v1": "${MISTRAL_KEY}" + "http://192.168.0.50:11434": "ollama" + "http://192.168.0.51:11434": "ollama" + "http://192.168.0.52:11434": "ollama" + "https://api.openai.com/v1": "${OPENAI_KEY}" + "http://localhost:8080/v1": "llama-server" # Optional API key for llama-server + "http://localhost:8081/v1": "llama-server" \ No newline at end of file diff --git a/router.py b/router.py index 89f27e8..6d56684 100644 --- a/router.py +++ b/router.py @@ -104,6 +104,8 @@ class Config(BaseSettings): "http://localhost:11434", ] ) + # List of llama-server endpoints (OpenAI-compatible with /v1/models status info) + llama_server_endpoints: List[str] = Field(default_factory=list) # Max concurrent connections per endpoint‑model pair, see OLLAMA_NUM_PARALLEL max_concurrent_connections: int = 1 @@ -343,7 +345,50 @@ def _format_connection_issue(url: str, error: Exception) -> str: return f"Error while contacting {url}: {error}" +def _normalize_llama_model_name(name: str) -> str: + """Extract the model name from a huggingface-style identifier. + e.g. 'unsloth/gpt-oss-20b-GGUF:F16' -> 'gpt-oss-20b-GGUF' + """ + if "/" in name: + name = name.rsplit("/", 1)[1] + if ":" in name: + name = name.split(":")[0] + return name + +def _extract_llama_quant(name: str) -> str: + """Extract the quantization level from a huggingface-style identifier. + e.g. 'unsloth/gpt-oss-20b-GGUF:Q8_0' -> 'Q8_0' + Returns empty string if no quant suffix is present. + """ + if ":" in name: + return name.rsplit(":", 1)[1] + return "" + +def _is_llama_model_loaded(item: dict) -> bool: + """Return True if a llama-server /v1/models item has status 'loaded'. + Handles both dict format ({"value": "loaded"}) and plain string ("loaded").""" + status = item.get("status") + if isinstance(status, dict): + return status.get("value") == "loaded" + if isinstance(status, str): + return status == "loaded" + return False + def is_ext_openai_endpoint(endpoint: str) -> bool: + """ + Determine if an endpoint is an external OpenAI-compatible endpoint (not Ollama or llama-server). + + Returns True for: + - External services like OpenAI.com, Groq, etc. + + Returns False for: + - Ollama endpoints (without /v1, or with /v1 but default port 11434) + - llama-server endpoints (explicitly configured in llama_server_endpoints) + """ + # Check if it's a llama-server endpoint (has /v1 and is in the configured list) + if endpoint in config.llama_server_endpoints: + return False + if "/v1" not in endpoint: return False @@ -357,6 +402,13 @@ def is_ext_openai_endpoint(endpoint: str) -> bool: return True # It's an external OpenAI endpoint +def is_openai_compatible(endpoint: str) -> bool: + """ + Return True if the endpoint speaks the OpenAI API (not native Ollama). + This includes external OpenAI endpoints AND llama-server endpoints. + """ + return "/v1" in endpoint or endpoint in config.llama_server_endpoints + async def token_worker() -> None: try: while True: @@ -512,7 +564,10 @@ class fetch: if api_key is not None: headers = {"Authorization": "Bearer " + api_key} - if "/v1" in endpoint: + if endpoint in config.llama_server_endpoints and "/v1" not in endpoint: + endpoint_url = f"{endpoint}/v1/models" + key = "data" + elif "/v1" in endpoint or endpoint in config.llama_server_endpoints: endpoint_url = f"{endpoint}/models" key = "data" else: @@ -620,25 +675,56 @@ class fetch: """ Internal function that performs the actual HTTP request to fetch loaded models. This is called by loaded_models() after checking caches and in-flight requests. + + For Ollama endpoints: queries /api/ps and returns model names + For llama-server endpoints: queries /v1/models and filters for status.value == "loaded" """ client: aiohttp.ClientSession = app_state["session"] - try: - async with client.get(f"{endpoint}/api/ps") as resp: - await _ensure_success(resp) - data = await resp.json() - # The response format is: - # {"models": [{"name": "model1"}, {"name": "model2"}]} - models = {m.get("name") for m in data.get("models", []) if m.get("name")} + + # Check if this is a llama-server endpoint + if endpoint in config.llama_server_endpoints: + # Query /v1/models for llama-server + try: + async with client.get(f"{endpoint}/models") as resp: + await _ensure_success(resp) + data = await resp.json() + + # Filter for loaded models only + items = data.get("data", []) + models = { + item.get("id") + for item in items + if item.get("id") and _is_llama_model_loaded(item) + } - # Update cache with lock protection - async with _loaded_models_cache_lock: - _loaded_models_cache[endpoint] = (models, time.time()) - return models - except Exception as e: - # If anything goes wrong we simply assume the endpoint has no models - message = _format_connection_issue(f"{endpoint}/api/ps", e) - print(f"[fetch.loaded_models] {message}") - return set() + # Update cache with lock protection + async with _loaded_models_cache_lock: + _loaded_models_cache[endpoint] = (models, time.time()) + return models + except Exception as e: + # If anything goes wrong we simply assume the endpoint has no models + message = _format_connection_issue(f"{endpoint}/models", e) + print(f"[fetch.loaded_models] {message}") + return set() + else: + # Original Ollama /api/ps logic + try: + async with client.get(f"{endpoint}/api/ps") as resp: + await _ensure_success(resp) + data = await resp.json() + # The response format is: + # {"models": [{"name": "model1"}, {"name": "model2"}]} + models = {m.get("name") for m in data.get("models", []) if m.get("name")} + + # Update cache with lock protection + async with _loaded_models_cache_lock: + _loaded_models_cache[endpoint] = (models, time.time()) + return models + except Exception as e: + # If anything goes wrong we simply assume the endpoint has no models + message = _format_connection_issue(f"{endpoint}/api/ps", e) + print(f"[fetch.loaded_models] {message}") + return set() async def _refresh_loaded_models(endpoint: str) -> None: """ @@ -776,8 +862,8 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No Helper function to make a chat request to a specific endpoint. Handles endpoint selection, client creation, usage tracking, and request execution. """ - is_openai_endpoint = "/v1" in endpoint - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest")[0] if messages: @@ -800,14 +886,14 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No "response_format": {"type": "json_schema", "json_schema": format} if format is not None else None } params.update({k: v for k, v in optional_params.items() if v is not None}) - oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) try: - if is_openai_endpoint: + if use_openai: start_ts = time.perf_counter() response = await oclient.chat.completions.create(**params) if stream: @@ -1179,35 +1265,40 @@ async def choose_endpoint(model: str) -> str: 6️⃣ If no endpoint advertises the model at all, raise an error. """ # 1️⃣ Gather advertised‑model sets for all endpoints concurrently - tag_tasks = [fetch.available_models(ep) for ep in config.endpoints if "/v1" not in ep] - tag_tasks += [fetch.available_models(ep, config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] + # Include both config.endpoints and config.llama_server_endpoints + llama_eps_extra = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints] + all_endpoints = config.endpoints + llama_eps_extra + + tag_tasks = [fetch.available_models(ep) for ep in config.endpoints if not is_openai_compatible(ep)] + tag_tasks += [fetch.available_models(ep, config.api_keys.get(ep)) for ep in config.endpoints if is_openai_compatible(ep)] + tag_tasks += [fetch.available_models(ep, config.api_keys.get(ep)) for ep in llama_eps_extra] advertised_sets = await asyncio.gather(*tag_tasks) # 2️⃣ Filter endpoints that advertise the requested model candidate_endpoints = [ - ep for ep, models in zip(config.endpoints, advertised_sets) + ep for ep, models in zip(all_endpoints, advertised_sets) if model in models ] - + # 6️⃣ if not candidate_endpoints: - if ":latest" in model: #ollama naming convention not applicable to openai + if ":latest" in model: #ollama naming convention not applicable to openai/llama-server model_without_latest = model.split(":latest")[0] candidate_endpoints = [ - ep for ep, models in zip(config.endpoints, advertised_sets) - if model_without_latest in models and is_ext_openai_endpoint(ep) + ep for ep, models in zip(all_endpoints, advertised_sets) + if model_without_latest in models and (is_ext_openai_endpoint(ep) or ep in config.llama_server_endpoints) ] if not candidate_endpoints: # Only add :latest suffix if model doesn't already have a version suffix if ":" not in model: model = model + ":latest" candidate_endpoints = [ - ep for ep, models in zip(config.endpoints, advertised_sets) + ep for ep, models in zip(all_endpoints, advertised_sets) if model in models ] if not candidate_endpoints: raise RuntimeError( - f"None of the configured endpoints ({', '.join(config.endpoints)}) " + f"None of the configured endpoints ({', '.join(all_endpoints)}) " f"advertise the model '{model}'." ) # 3️⃣ Among the candidates, find those that have the model *loaded* @@ -1311,13 +1402,13 @@ async def proxy(request: Request): endpoint = await choose_endpoint(model) - is_openai_endpoint = "/v1" in endpoint - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest") model = model[0] params = { - "prompt": prompt, + "prompt": prompt, "model": model, } @@ -1333,7 +1424,7 @@ async def proxy(request: Request): "suffix": suffix, } params.update({k: v for k, v in optional_params.items() if v is not None}) - oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) @@ -1341,14 +1432,14 @@ async def proxy(request: Request): # 4. Async generator that streams data and decrements the counter async def stream_generate_response(): try: - if is_openai_endpoint: + if use_openai: start_ts = time.perf_counter() async_gen = await oclient.completions.create(**params) else: async_gen = await client.generate(model=model, prompt=prompt, suffix=suffix, system=system, template=template, context=context, stream=stream, think=think, raw=raw, format=_format, images=images, options=options, keep_alive=keep_alive) if stream == True: async for chunk in async_gen: - if is_openai_endpoint: + if use_openai: chunk = rechunk.openai_completion2ollama(chunk, stream, start_ts) prompt_tok = chunk.prompt_eval_count or 0 comp_tok = chunk.eval_count or 0 @@ -1360,7 +1451,7 @@ async def proxy(request: Request): json_line = orjson.dumps(chunk) yield json_line.encode("utf-8") + b"\n" else: - if is_openai_endpoint: + if use_openai: response = rechunk.openai_completion2ollama(async_gen, stream, start_ts) response = response.model_dump_json() else: @@ -1430,15 +1521,15 @@ async def chat_proxy(request: Request): else: opt = False endpoint = await choose_endpoint(model) - is_openai_endpoint = "/v1" in endpoint - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest") model = model[0] if messages: messages = transform_images_to_data_urls(messages) params = { - "messages": messages, + "messages": messages, "model": model, } optional_params = { @@ -1455,7 +1546,7 @@ async def chat_proxy(request: Request): "response_format": {"type": "json_schema", "json_schema": _format} if _format is not None else None } params.update({k: v for k, v in optional_params.items() if v is not None}) - oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) @@ -1463,7 +1554,7 @@ async def chat_proxy(request: Request): async def stream_chat_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - if is_openai_endpoint: + if use_openai: start_ts = time.perf_counter() async_gen = await oclient.chat.completions.create(**params) else: @@ -1474,7 +1565,7 @@ async def chat_proxy(request: Request): async_gen = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=_format, options=options, keep_alive=keep_alive) if stream == True: async for chunk in async_gen: - if is_openai_endpoint: + if use_openai: chunk = rechunk.openai_chat_completion2ollama(chunk, stream, start_ts) # `chunk` can be a dict or a pydantic model – dump to JSON safely prompt_tok = chunk.prompt_eval_count or 0 @@ -1487,7 +1578,7 @@ async def chat_proxy(request: Request): json_line = orjson.dumps(chunk) yield json_line.encode("utf-8") + b"\n" else: - if is_openai_endpoint: + if use_openai: response = rechunk.openai_chat_completion2ollama(async_gen, stream, start_ts) response = response.model_dump_json() else: @@ -1546,12 +1637,12 @@ async def embedding_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) - is_openai_endpoint = "/v1" in endpoint - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest") model = model[0] - client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) + client = openai.AsyncOpenAI(base_url=ep2base(endpoint), api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) @@ -1559,7 +1650,7 @@ async def embedding_proxy(request: Request): async def stream_embedding_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - if is_openai_endpoint: + if use_openai: async_gen = await client.embeddings.create(input=prompt, model=model) async_gen = rechunk.openai_embeddings2ollama(async_gen) else: @@ -1612,12 +1703,12 @@ async def embed_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) - is_openai_endpoint = is_ext_openai_endpoint(endpoint) #"/v1" in endpoint - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest") model = model[0] - client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) + client = openai.AsyncOpenAI(base_url=ep2base(endpoint), api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) @@ -1625,7 +1716,7 @@ async def embed_proxy(request: Request): async def stream_embedding_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - if is_openai_endpoint: + if use_openai: async_gen = await client.embeddings.create(input=_input, model=model) async_gen = rechunk.openai_embed2ollama(async_gen, model) else: @@ -2018,8 +2109,11 @@ async def tags_proxy(request: Request): # 1. Query all endpoints for models tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] + # Also query llama-server endpoints not already covered by config.endpoints + llama_eps_for_tags = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints] + tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep)) for ep in llama_eps_for_tags] all_models = await asyncio.gather(*tasks) - + models = {'models': []} for modellist in all_models: for model in modellist: @@ -2045,18 +2139,47 @@ async def tags_proxy(request: Request): @app.get("/api/ps") async def ps_proxy(request: Request): """ - Proxy a ps request to all Ollama endpoints and reply a unique list of all running models. + Proxy a ps request to all Ollama and llama-server endpoints and reply a unique list of all running models. + For Ollama endpoints: queries /api/ps + For llama-server endpoints: queries /v1/models with status.value == "loaded" """ - # 1. Query all endpoints for running models - tasks = [fetch.endpoint_details(ep, "/api/ps", "models") for ep in config.endpoints if "/v1" not in ep] - loaded_models = await asyncio.gather(*tasks) + # 1. Query Ollama endpoints for running models via /api/ps + ollama_tasks = [fetch.endpoint_details(ep, "/api/ps", "models") for ep in config.endpoints if "/v1" not in ep] + # 2. Query llama-server endpoints for loaded models via /v1/models + # Also query endpoints from llama_server_endpoints that may not be in config.endpoints + all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints) + llama_tasks = [ + fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep)) + for ep in all_llama_endpoints + ] + + ollama_loaded = await asyncio.gather(*ollama_tasks) if ollama_tasks else [] + llama_loaded = await asyncio.gather(*llama_tasks) if llama_tasks else [] models = {'models': []} - for modellist in loaded_models: - models['models'] += modellist + # Add Ollama models (if any) + if ollama_loaded: + for modellist in ollama_loaded: + models['models'] += modellist + # Add llama-server models (filter for loaded only, if any) + if llama_loaded: + for modellist in llama_loaded: + loaded_models = [item for item in modellist if _is_llama_model_loaded(item)] + # Convert llama-server format to Ollama-like format for consistency + for item in loaded_models: + raw_id = item.get("id", "") + normalized = _normalize_llama_model_name(raw_id) + quant = _extract_llama_quant(raw_id) + models['models'].append({ + "name": normalized, + "id": normalized, + "digest": "", + "status": item.get("status"), + "details": {"quantization_level": quant} if quant else {} + }) - # 2. Return a JSONResponse with deduplicated currently deployed models + # 3. Return a JSONResponse with deduplicated currently deployed models return JSONResponse( content={"models": dedupe_on_keys(models['models'], ['digest'])}, status_code=200, @@ -2068,19 +2191,63 @@ async def ps_proxy(request: Request): @app.get("/api/ps_details") async def ps_details_proxy(request: Request): """ - Proxy a ps request to all Ollama endpoints and reply with per-endpoint instances. + Proxy a ps request to all Ollama and llama-server endpoints and reply with per-endpoint instances. This keeps /api/ps backward compatible while providing richer data. + + For Ollama endpoints: queries /api/ps + For llama-server endpoints: queries /v1/models with status info """ - tasks = [(ep, fetch.endpoint_details(ep, "/api/ps", "models")) for ep in config.endpoints if "/v1" not in ep] - loaded_models = await asyncio.gather(*[task for _, task in tasks]) + # 1. Query Ollama endpoints via /api/ps + ollama_tasks = [(ep, fetch.endpoint_details(ep, "/api/ps", "models")) for ep in config.endpoints if "/v1" not in ep] + # 2. Query llama-server endpoints via /v1/models + # Also query endpoints from llama_server_endpoints that may not be in config.endpoints + all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints) + llama_tasks = [ + (ep, fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep))) + for ep in all_llama_endpoints + ] + + ollama_loaded = await asyncio.gather(*[task for _, task in ollama_tasks]) if ollama_tasks else [] + llama_loaded = await asyncio.gather(*[task for _, task in llama_tasks]) if llama_tasks else [] models: list[dict] = [] - for (endpoint, modellist) in zip([ep for ep, _ in tasks], loaded_models): - for model in modellist: - if isinstance(model, dict): - model_with_endpoint = dict(model) - model_with_endpoint["endpoint"] = endpoint - models.append(model_with_endpoint) + + # Add Ollama models with endpoint info (if any) + if ollama_loaded: + for (endpoint, modellist) in zip([ep for ep, _ in ollama_tasks], ollama_loaded): + for model in modellist: + if isinstance(model, dict): + model_with_endpoint = dict(model) + model_with_endpoint["endpoint"] = endpoint + models.append(model_with_endpoint) + + # Add llama-server models with endpoint info and full status metadata (if any) + if llama_loaded: + for (endpoint, modellist) in zip([ep for ep, _ in llama_tasks], llama_loaded): + # Filter for loaded models only + loaded_models = [item for item in modellist if _is_llama_model_loaded(item)] + for item in loaded_models: + if isinstance(item, dict) and item.get("id"): + raw_id = item["id"] + normalized = _normalize_llama_model_name(raw_id) + quant = _extract_llama_quant(raw_id) + model_with_endpoint = { + "name": normalized, + "id": normalized, + "original_name": raw_id, + "digest": "", + "details": {"quantization_level": quant} if quant else {}, + "endpoint": endpoint, + "status": item.get("status"), + "created": item.get("created"), + "owned_by": item.get("owned_by") + } + # Include full llama-server status details (args, preset) + status_info = item.get("status", {}) + if isinstance(status_info, dict): + model_with_endpoint["llama_status_args"] = status_info.get("args") + model_with_endpoint["llama_status_preset"] = status_info.get("preset") + models.append(model_with_endpoint) return JSONResponse(content={"models": models}, status_code=200) @@ -2103,7 +2270,7 @@ async def usage_proxy(request: Request): async def config_proxy(request: Request): """ Return a simple JSON object that contains the configured - Ollama endpoints. The front‑end uses this to display + Ollama endpoints and llama_server_endpoints. The front‑end uses this to display which endpoints are being proxied. """ async def check_endpoint(url: str): @@ -2127,9 +2294,17 @@ async def config_proxy(request: Request): detail = _format_connection_issue(target_url, e) return {"url": url, "status": "error", "detail": detail} - results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints]) + # Check Ollama endpoints + ollama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints]) + + # Check llama-server endpoints + llama_results = [] + if config.llama_server_endpoints: + llama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.llama_server_endpoints]) + return { - "endpoints": results, + "endpoints": ollama_results, + "llama_server_endpoints": llama_results, "require_router_api_key": bool(config.router_api_key), } @@ -2164,8 +2339,8 @@ async def openai_embedding_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) - if "/v1" in endpoint: # and is_ext_openai_endpoint(endpoint): - api_key = config.api_keys[endpoint] + if is_openai_compatible(endpoint): + api_key = config.api_keys.get(endpoint, "no-key") else: api_key = "ollama" base_url = ep2base(endpoint) @@ -2249,7 +2424,7 @@ async def openai_chat_completions_proxy(request: Request): endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) base_url = ep2base(endpoint) - oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) # 3. Async generator that streams completions data and decrements the counter async def stream_ochat_response(): try: @@ -2374,7 +2549,7 @@ async def openai_completions_proxy(request: Request): endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) base_url = ep2base(endpoint) - oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) # 3. Async generator that streams completions data and decrements the counter async def stream_ocompletions_response(model=model): @@ -2430,22 +2605,46 @@ async def openai_completions_proxy(request: Request): @app.get("/v1/models") async def openai_models_proxy(request: Request): """ - Proxy an OpenAI API models request to Ollama endpoints and reply with a unique list of all models. - + Proxy an OpenAI API models request to Ollama and llama-server endpoints and reply with a unique list of models. + + For Ollama endpoints: queries /api/tags (all models) + For llama-server endpoints: queries /v1/models and filters for status.value == "loaded" """ - # 1. Query all endpoints for models - tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] - tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] - all_models = await asyncio.gather(*tasks) + # 1. Query Ollama endpoints for all models via /api/tags + ollama_tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] + # 2. Query llama-server endpoints for loaded models via /v1/models + # Also query endpoints from llama_server_endpoints that may not be in config.endpoints + all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints) + llama_tasks = [ + fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep)) + for ep in all_llama_endpoints + ] + + ollama_models = await asyncio.gather(*ollama_tasks) if ollama_tasks else [] + llama_models = await asyncio.gather(*llama_tasks) if llama_tasks else [] models = {'data': []} - for modellist in all_models: - for model in modellist: - if not "id" in model.keys(): # Relable Ollama models with OpenAI Model.id from Model.name - model['id'] = model['name'] - else: - model['name'] = model['id'] - models['data'] += modellist + + # Add Ollama models (if any) + if ollama_models: + for modellist in ollama_models: + for model in modellist: + if not "id" in model.keys(): # Relable Ollama models with OpenAI Model.id from Model.name + model['id'] = model.get('name', model.get('id', '')) + else: + model['name'] = model['id'] + models['data'].append(model) + + # Add llama-server models (filter for loaded only, if any) + if llama_models: + for modellist in llama_models: + loaded_models = [item for item in modellist if _is_llama_model_loaded(item)] + for model in loaded_models: + if not "id" in model.keys(): + model['id'] = model.get('name', model.get('id', '')) + else: + model['name'] = model['id'] + models['data'].append(model) # 2. Return a JSONResponse with a deduplicated list of unique models for inference return JSONResponse( @@ -2556,6 +2755,7 @@ async def startup_event() -> None: print( f"Loaded configuration from {config_path}:\n" f" endpoints={config.endpoints},\n" + f" llama_server_endpoints={config.llama_server_endpoints},\n" f" max_concurrent_connections={config.max_concurrent_connections}" ) else: diff --git a/static/index.html b/static/index.html index b8b1328..c3757f3 100644 --- a/static/index.html +++ b/static/index.html @@ -683,7 +683,12 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { showApiKeyModal("Enter the NOMYO Router API key to load the dashboard."); } const body = document.getElementById("endpoints-body"); - body.innerHTML = data.endpoints + + // Build HTML for both endpoints and llama_server_endpoints + let html = ""; + + // Add Ollama endpoints + html += data.endpoints .map((e) => { const statusClass = e.status === "ok" @@ -698,6 +703,27 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { `; }) .join(""); + + // Add llama-server endpoints + if (data.llama_server_endpoints && data.llama_server_endpoints.length > 0) { + html += data.llama_server_endpoints + .map((e) => { + const statusClass = + e.status === "ok" + ? "status-ok" + : "status-error"; + const version = e.version || "N/A"; + return ` + + ${e.url} + ${e.status} + ${version} + `; + }) + .join(""); + } + + body.innerHTML = html; } catch (e) { console.error(e); const body = document.getElementById("endpoints-body"); @@ -898,10 +924,11 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { const params = modelInstances[0]?.details?.parameter_size ?? ""; const quant = modelInstances[0]?.details?.quantization_level ?? ""; const ctx = modelInstances[0]?.context_length ?? ""; + const originalName = modelInstances[0]?.original_name || modelName; const uniqueEndpoints = Array.from(new Set(endpoints)); const endpointsData = encodeURIComponent(JSON.stringify(uniqueEndpoints)); return ` - ${modelName} stats + ${modelName} stats ${renderInstanceList(endpoints)} ${params} ${quant} From 9875eb977a1e6d8c15991ec84531eb5563d91de4 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Tue, 10 Feb 2026 20:21:46 +0100 Subject: [PATCH 6/8] feat: Add tool call normalization and streaming delta accumulation Adds support for correctly handling tool calls in chat requests. Normalizes tool call data (ensuring IDs, types, and JSON arguments) in non-streaming mode and accumulates OpenAI-style deltas during streaming to build the final Ollama response. --- router.py | 126 +++++++++++++++++++++++++++++++++++++++------- static/index.html | 2 +- 2 files changed, 108 insertions(+), 20 deletions(-) diff --git a/router.py b/router.py index 6d56684..e39b3a2 100644 --- a/router.py +++ b/router.py @@ -868,6 +868,7 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No model = model.split(":latest")[0] if messages: messages = transform_images_to_data_urls(messages) + messages = transform_tool_calls_to_openai(messages) params = { "messages": messages, "model": model, @@ -899,8 +900,10 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No if stream: # For streaming, we need to collect all chunks chunks = [] + tc_acc = {} # accumulate tool-call deltas async for chunk in response: chunks.append(chunk) + _accumulate_openai_tc_delta(chunk, tc_acc) if chunk.usage is not None: prompt_tok = chunk.usage.prompt_tokens or 0 comp_tok = chunk.usage.completion_tokens or 0 @@ -909,6 +912,9 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No # Convert to Ollama format if chunks: response = rechunk.openai_chat_completion2ollama(chunks[-1], stream, start_ts) + # Inject fully-accumulated tool calls into the final response + if tc_acc and response.message: + response.message.tool_calls = _build_ollama_tool_calls(tc_acc) else: prompt_tok = response.usage.prompt_tokens or 0 comp_tok = response.usage.completion_tokens or 0 @@ -1062,6 +1068,39 @@ def resize_image_if_needed(image_data): print(f"Error processing image: {e}") return None +def transform_tool_calls_to_openai(message_list): + """ + Ensure tool_calls in assistant messages conform to the OpenAI format: + - Each tool call must have "type": "function" + - Each tool call must have an "id" + - arguments must be a JSON string, not a dict + Also ensure tool-role messages have a tool_call_id. + """ + # Track generated IDs so tool-role messages can reference them + last_tool_call_ids = {} + for msg in message_list: + role = msg.get("role") + if role == "assistant" and "tool_calls" in msg: + for tc in msg["tool_calls"]: + if "type" not in tc: + tc["type"] = "function" + if "id" not in tc: + tc["id"] = f"call_{secrets.token_hex(16)}" + func = tc.get("function", {}) + if isinstance(func.get("arguments"), dict): + func["arguments"] = orjson.dumps(func["arguments"]).decode("utf-8") + # Remember the id for the following tool-role message + name = func.get("name") + if name: + last_tool_call_ids[name] = tc["id"] + elif role == "tool": + if "tool_call_id" not in msg: + # Try to match by name from a preceding assistant tool_call + name = msg.get("name") or msg.get("tool_name") + if name and name in last_tool_call_ids: + msg["tool_call_id"] = last_tool_call_ids.pop(name) + return message_list + def transform_images_to_data_urls(message_list): for message in message_list: if "images" in message: @@ -1089,6 +1128,51 @@ def transform_images_to_data_urls(message_list): return message_list +def _accumulate_openai_tc_delta(chunk, accumulator: dict) -> None: + """Accumulate tool_call deltas from a single OpenAI streaming chunk. + + ``accumulator`` is a dict mapping tool-call *index* to + ``{"id": str, "name": str, "arguments": str}`` where ``arguments`` + is the concatenation of all JSON fragments seen so far. + """ + if not chunk.choices: + return + delta = chunk.choices[0].delta + tc_deltas = getattr(delta, "tool_calls", None) + if not tc_deltas: + return + for tc in tc_deltas: + idx = tc.index + if idx not in accumulator: + accumulator[idx] = { + "id": getattr(tc, "id", None) or f"call_{secrets.token_hex(16)}", + "name": tc.function.name if tc.function else None, + "arguments": "", + } + else: + if getattr(tc, "id", None): + accumulator[idx]["id"] = tc.id + if tc.function and tc.function.name: + accumulator[idx]["name"] = tc.function.name + if tc.function and tc.function.arguments: + accumulator[idx]["arguments"] += tc.function.arguments + +def _build_ollama_tool_calls(accumulator: dict) -> list | None: + """Convert accumulated tool-call data into Ollama-format tool_calls list.""" + if not accumulator: + return None + result = [] + for idx in sorted(accumulator.keys()): + tc = accumulator[idx] + try: + args = orjson.loads(tc["arguments"]) if tc["arguments"] else {} + except (orjson.JSONDecodeError, TypeError): + args = {} + result.append(ollama.Message.ToolCall( + function=ollama.Message.ToolCall.Function(name=tc["name"], arguments=args) + )) + return result + class rechunk: def openai_chat_completion2ollama(chunk: dict, stream: bool, start_ts: float) -> ollama.ChatResponse: now = time.perf_counter() @@ -1099,12 +1183,12 @@ class rechunk: done=True, done_reason='stop', total_duration=int((now - start_ts) * 1_000_000_000), - load_duration=100000, + load_duration=100000, prompt_eval_count=int(chunk.usage.prompt_tokens), - prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)), + prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)), eval_count=int(chunk.usage.completion_tokens), eval_duration=int((now - start_ts) * 1_000_000_000), - message={"role": "assistant"} + message=ollama.Message(role="assistant", content=""), ) with_thinking = chunk.choices[0] if chunk.choices[0] else None if stream == True: @@ -1116,24 +1200,22 @@ class rechunk: role = chunk.choices[0].message.role or "assistant" content = chunk.choices[0].message.content or '' # Convert OpenAI tool_calls to Ollama format + # In streaming mode, tool_calls arrive as partial deltas across multiple chunks + # (name only in first delta, arguments as incremental JSON fragments). + # Callers must accumulate deltas and inject the final result; skip here. ollama_tool_calls = None - if stream: - raw_tool_calls = getattr(with_thinking.delta, "tool_calls", None) if with_thinking else None - else: + if not stream: raw_tool_calls = getattr(with_thinking.message, "tool_calls", None) if with_thinking else None - if raw_tool_calls: - ollama_tool_calls = [] - for tc in raw_tool_calls: - try: - args = orjson.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else (tc.function.arguments or {}) - except (orjson.JSONDecodeError, TypeError): - args = {} - ollama_tool_calls.append({ - "function": { - "name": tc.function.name, - "arguments": args, - } - }) + if raw_tool_calls: + ollama_tool_calls = [] + for tc in raw_tool_calls: + try: + args = orjson.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else (tc.function.arguments or {}) + except (orjson.JSONDecodeError, TypeError): + args = {} + ollama_tool_calls.append(ollama.Message.ToolCall( + function=ollama.Message.ToolCall.Function(name=tc.function.name, arguments=args) + )) assistant_msg = ollama.Message( role=role, content=content, @@ -1528,6 +1610,7 @@ async def chat_proxy(request: Request): model = model[0] if messages: messages = transform_images_to_data_urls(messages) + messages = transform_tool_calls_to_openai(messages) params = { "messages": messages, "model": model, @@ -1564,9 +1647,14 @@ async def chat_proxy(request: Request): else: async_gen = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=_format, options=options, keep_alive=keep_alive) if stream == True: + tc_acc = {} # accumulate OpenAI tool-call deltas across chunks async for chunk in async_gen: if use_openai: + _accumulate_openai_tc_delta(chunk, tc_acc) chunk = rechunk.openai_chat_completion2ollama(chunk, stream, start_ts) + # Inject fully-accumulated tool calls only into the final chunk + if chunk.done and tc_acc and chunk.message: + chunk.message.tool_calls = _build_ollama_tool_calls(tc_acc) # `chunk` can be a dict or a pydantic model – dump to JSON safely prompt_tok = chunk.prompt_eval_count or 0 comp_tok = chunk.eval_count or 0 diff --git a/static/index.html b/static/index.html index c3757f3..3d4d364 100644 --- a/static/index.html +++ b/static/index.html @@ -863,7 +863,7 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { const formatUntil = (value) => { if (value === null || value === undefined || value === "") { - return "Forever"; + return "∞"; } let targetTime; From f7ef413090780af8c3c533dd817556529fd1b418 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 12 Feb 2026 16:28:40 +0100 Subject: [PATCH 7/8] replays 3af166c8a4f9bb9332183bdc045a2eb2fa7fd661 to grant merge into main --- db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db.py b/db.py index 9f4efd3..11df49c 100644 --- a/db.py +++ b/db.py @@ -50,7 +50,6 @@ class TokenDatabase: PRIMARY KEY(endpoint, model) ) ''') - await db.execute('CREATE INDEX IF NOT EXISTS idx_token_time_series_timestamp ON token_time_series(timestamp)') await db.execute(''' CREATE TABLE IF NOT EXISTS token_time_series ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -63,6 +62,7 @@ class TokenDatabase: FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model) ) ''') + await db.execute('CREATE INDEX IF NOT EXISTS idx_token_time_series_timestamp ON token_time_series(timestamp)') await db.commit() async def update_token_counts(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): From 08b77428b83b377349cec2348c51580c1dfda307 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Fri, 13 Feb 2026 10:11:41 +0100 Subject: [PATCH 8/8] refactor(router): bump cache TTLs and skip error cache for health checks - Increased error and loaded model cache freshness thresholds from 10s to 30s. - Added `skip_error_cache` parameter to `endpoint_details` to prevent cached failures from blocking health checks. - Implemented automatic error recording in `_available_error_cache` on API request failures. --- router.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/router.py b/router.py index e39b3a2..dcbe60b 100644 --- a/router.py +++ b/router.py @@ -644,7 +644,7 @@ class fetch: # Check error cache with lock protection async with _available_error_cache_lock: if endpoint in _available_error_cache: - if _is_fresh(_available_error_cache[endpoint], 10): + if _is_fresh(_available_error_cache[endpoint], 30): # Still within the short error TTL – pretend nothing is available return set() # Error expired – remove it @@ -755,7 +755,7 @@ class fetch: models, cached_at = _loaded_models_cache[endpoint] # FRESH: < 10s old - return immediately - if _is_fresh(cached_at, 10): + if _is_fresh(cached_at, 30): return models # STALE: 10-60s old - return stale data and refresh in background @@ -770,7 +770,7 @@ class fetch: # Check error cache with lock protection async with _loaded_error_cache_lock: if endpoint in _loaded_error_cache: - if _is_fresh(_loaded_error_cache[endpoint], 10): + if _is_fresh(_loaded_error_cache[endpoint], 30): return set() # Error expired - remove it del _loaded_error_cache[endpoint] @@ -795,16 +795,27 @@ class fetch: if _inflight_loaded_models.get(endpoint) == task: _inflight_loaded_models.pop(endpoint, None) - async def endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]: + async def endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None, skip_error_cache: bool = False) -> List[dict]: """ Query / to fetch and return a List of dicts with details for the corresponding Ollama endpoint. If the request fails we respond with "N/A" for detail. + + When ``skip_error_cache`` is False (the default), the call is short-circuited + if the endpoint recently failed (recorded in ``_available_error_cache``). + Pass ``skip_error_cache=True`` from health-check routes that must always probe. """ + # Fast-fail if the endpoint is known to be down (unless caller opts out) + if not skip_error_cache: + async with _available_error_cache_lock: + if endpoint in _available_error_cache: + if _is_fresh(_available_error_cache[endpoint], 30): + return [] + client: aiohttp.ClientSession = app_state["session"] headers = None if api_key is not None: headers = {"Authorization": "Bearer " + api_key} - + request_url = f"{endpoint}{route}" try: async with client.get(request_url, headers=headers) as resp: @@ -816,6 +827,9 @@ class fetch: # If anything goes wrong we cannot reply details message = _format_connection_issue(request_url, e) print(f"[fetch.endpoint_details] {message}") + # Record failure so subsequent calls skip this endpoint briefly + async with _available_error_cache_lock: + _available_error_cache[endpoint] = time.time() return [] def ep2base(ep): @@ -2778,7 +2792,7 @@ async def health_proxy(request: Request): * The HTTP status code is 200 when everything is healthy, 503 otherwise. """ # Run all health checks in parallel - tasks = [fetch.endpoint_details(ep, "/api/version", "version") for ep in config.endpoints] # if not is_ext_openai_endpoint(ep)] + tasks = [fetch.endpoint_details(ep, "/api/version", "version", skip_error_cache=True) for ep in config.endpoints] # if not is_ext_openai_endpoint(ep)] results = await asyncio.gather(*tasks, return_exceptions=True)