diff --git a/doc/examples/sample-config.yaml b/doc/examples/sample-config.yaml index 9dbce09..49ad389 100644 --- a/doc/examples/sample-config.yaml +++ b/doc/examples/sample-config.yaml @@ -1,40 +1,32 @@ -# Sample NOMYO Router Configuration - -# Basic single endpoint configuration +# config.yaml +# Ollama endpoints endpoints: - - http://localhost:11434 + - http://192.168.0.50:11434 + - http://192.168.0.51:11434 + - http://192.168.0.52:11434 + # External OpenAI-compatible endpoints (will NOT be queried for /api/ps /api/ps_details) + - https://api.openai.com/v1 +# llama-server endpoints (OpenAI-compatible with /v1/models status info) +# These endpoints will be queried for /api/tags, /api/ps, /api/ps_details +# and included in the model selection pool for inference routing +llama_server_endpoints: + - http://localhost:8080/v1 + - http://localhost:8081/v1 + +# Maximum concurrent connections *per endpoint‑model pair* (equals to OLLAMA_NUM_PARALLEL) max_concurrent_connections: 2 -# Optional router-level API key to secure the router and dashboard (leave blank to disable) +# Optional router-level API key that gates router/API/web UI access (leave empty to disable) nomyo-router-api-key: "" -# Multi-endpoint configuration with local Ollama instances -# endpoints: -# - http://ollama-worker1:11434 -# - http://ollama-worker2:11434 -# - http://ollama-worker3:11434 - -# Mixed configuration with Ollama and OpenAI endpoints -# endpoints: -# - http://localhost:11434 -# - https://api.openai.com/v1 - - # API keys for remote endpoints -# Use ${VAR_NAME} syntax to reference environment variables +# Set an environment variable like OPENAI_KEY +# Confirm endpoints are exactly as in endpoints block api_keys: - # Local Ollama instances typically don't require authentication - "http://localhost:11434": "ollama" - - # Remote Ollama instances - # "http://remote-ollama:11434": "ollama" - - # OpenAI API - # "https://api.openai.com/v1": "${OPENAI_KEY}" - - # Anthropic API - # "https://api.anthropic.com/v1": "${ANTHROPIC_KEY}" - - # Other OpenAI-compatible endpoints - # "https://api.mistral.ai/v1": "${MISTRAL_KEY}" + "http://192.168.0.50:11434": "ollama" + "http://192.168.0.51:11434": "ollama" + "http://192.168.0.52:11434": "ollama" + "https://api.openai.com/v1": "${OPENAI_KEY}" + "http://localhost:8080/v1": "llama-server" # Optional API key for llama-server + "http://localhost:8081/v1": "llama-server" \ No newline at end of file diff --git a/router.py b/router.py index 9fb0e86..51f5054 100644 --- a/router.py +++ b/router.py @@ -34,16 +34,18 @@ from PIL import Image # Successful results are cached for 300s _models_cache: dict[str, tuple[Set[str], float]] = {} _loaded_models_cache: dict[str, tuple[Set[str], float]] = {} -# Transient errors are cached for 1s – the key stays until the -# timeout expires, after which the endpoint will be queried again. -_error_cache: dict[str, float] = {} +# Transient errors are cached separately per concern so that a failure +# in one path does not poison the other. +_available_error_cache: dict[str, float] = {} +_loaded_error_cache: dict[str, float] = {} # ------------------------------------------------------------------ # Cache locks # ------------------------------------------------------------------ _models_cache_lock = asyncio.Lock() _loaded_models_cache_lock = asyncio.Lock() -_error_cache_lock = asyncio.Lock() +_available_error_cache_lock = asyncio.Lock() +_loaded_error_cache_lock = asyncio.Lock() # ------------------------------------------------------------------ # In-flight request tracking (prevents cache stampede) @@ -106,6 +108,8 @@ class Config(BaseSettings): "http://localhost:11434", ] ) + # List of llama-server endpoints (OpenAI-compatible with /v1/models status info) + llama_server_endpoints: List[str] = Field(default_factory=list) # Max concurrent connections per endpoint‑model pair, see OLLAMA_NUM_PARALLEL max_concurrent_connections: int = 1 @@ -254,13 +258,19 @@ async def enforce_router_api_key(request: Request, call_next): # Strip the api_key query param from scope so access logs do not leak it _strip_api_key_from_scope(request) if provided_key is None: - response = await call_next(request) - # Add CORS headers for API key authentication requests + # No key provided but authentication is required - return 401 + headers = {} if "/api/" in path and path != "/api/usage-stream": - response.headers["Access-Control-Allow-Origin"] = "*" - response.headers["Access-Control-Allow-Headers"] = "Authorization, Content-Type" - response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" - return response + headers = { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Headers": "Authorization, Content-Type", + "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", + } + return JSONResponse( + content={"detail": "Missing NOMYO Router API key"}, + status_code=401, + headers=headers, + ) if not secrets.compare_digest(str(provided_key), str(expected_key)): return JSONResponse( @@ -339,7 +349,50 @@ def _format_connection_issue(url: str, error: Exception) -> str: return f"Error while contacting {url}: {error}" +def _normalize_llama_model_name(name: str) -> str: + """Extract the model name from a huggingface-style identifier. + e.g. 'unsloth/gpt-oss-20b-GGUF:F16' -> 'gpt-oss-20b-GGUF' + """ + if "/" in name: + name = name.rsplit("/", 1)[1] + if ":" in name: + name = name.split(":")[0] + return name + +def _extract_llama_quant(name: str) -> str: + """Extract the quantization level from a huggingface-style identifier. + e.g. 'unsloth/gpt-oss-20b-GGUF:Q8_0' -> 'Q8_0' + Returns empty string if no quant suffix is present. + """ + if ":" in name: + return name.rsplit(":", 1)[1] + return "" + +def _is_llama_model_loaded(item: dict) -> bool: + """Return True if a llama-server /v1/models item has status 'loaded'. + Handles both dict format ({"value": "loaded"}) and plain string ("loaded").""" + status = item.get("status") + if isinstance(status, dict): + return status.get("value") == "loaded" + if isinstance(status, str): + return status == "loaded" + return False + def is_ext_openai_endpoint(endpoint: str) -> bool: + """ + Determine if an endpoint is an external OpenAI-compatible endpoint (not Ollama or llama-server). + + Returns True for: + - External services like OpenAI.com, Groq, etc. + + Returns False for: + - Ollama endpoints (without /v1, or with /v1 but default port 11434) + - llama-server endpoints (explicitly configured in llama_server_endpoints) + """ + # Check if it's a llama-server endpoint (has /v1 and is in the configured list) + if endpoint in config.llama_server_endpoints: + return False + if "/v1" not in endpoint: return False @@ -353,6 +406,13 @@ def is_ext_openai_endpoint(endpoint: str) -> bool: return True # It's an external OpenAI endpoint +def is_openai_compatible(endpoint: str) -> bool: + """ + Return True if the endpoint speaks the OpenAI API (not native Ollama). + This includes external OpenAI endpoints AND llama-server endpoints. + """ + return "/v1" in endpoint or endpoint in config.llama_server_endpoints + async def token_worker() -> None: try: while True: @@ -508,7 +568,10 @@ class fetch: if api_key is not None: headers = {"Authorization": "Bearer " + api_key} - if "/v1" in endpoint: + if endpoint in config.llama_server_endpoints and "/v1" not in endpoint: + endpoint_url = f"{endpoint}/v1/models" + key = "data" + elif "/v1" in endpoint or endpoint in config.llama_server_endpoints: endpoint_url = f"{endpoint}/models" key = "data" else: @@ -533,10 +596,21 @@ class fetch: message = _format_connection_issue(endpoint_url, e) print(f"[fetch.available_models] {message}") # Update error cache with lock protection - async with _error_cache_lock: - _error_cache[endpoint] = time.time() + async with _available_error_cache_lock: + _available_error_cache[endpoint] = time.time() return set() + async def _refresh_available_models(endpoint: str, api_key: Optional[str] = None) -> None: + """ + Background task to refresh available models cache without blocking the caller. + Used for stale-while-revalidate pattern. + """ + try: + await fetch._fetch_available_models_internal(endpoint, api_key) + except Exception as e: + # Silently fail - cache will remain stale but functional + print(f"[fetch._refresh_available_models] Background refresh failed for {endpoint}: {e}") + async def available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: """ Query /api/tags and return a set of all model names that the @@ -547,6 +621,10 @@ class fetch: Uses request coalescing to prevent cache stampede: if multiple requests arrive when cache is expired, only one actual HTTP request is made. + Uses stale-while-revalidate: when the cache is between 300-600s old, + the stale data is returned immediately while a background refresh runs. + This prevents model blackouts caused by transient timeouts. + If the request fails (e.g. timeout, 5xx, or malformed response), an empty set is returned. """ @@ -554,19 +632,27 @@ class fetch: async with _models_cache_lock: if endpoint in _models_cache: models, cached_at = _models_cache[endpoint] + + # FRESH: < 300s old - return immediately if _is_fresh(cached_at, 300): return models - # Stale entry - remove it + + # STALE: 300-600s old - return stale data and refresh in background + if _is_fresh(cached_at, 600): + asyncio.create_task(fetch._refresh_available_models(endpoint, api_key)) + return models # Return stale data immediately + + # EXPIRED: > 600s old - too stale, must refresh synchronously del _models_cache[endpoint] # Check error cache with lock protection - async with _error_cache_lock: - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): + async with _available_error_cache_lock: + if endpoint in _available_error_cache: + if _is_fresh(_available_error_cache[endpoint], 30): # Still within the short error TTL – pretend nothing is available return set() # Error expired – remove it - del _error_cache[endpoint] + del _available_error_cache[endpoint] # Request coalescing: check if another request is already fetching this endpoint async with _inflight_lock: @@ -593,25 +679,56 @@ class fetch: """ Internal function that performs the actual HTTP request to fetch loaded models. This is called by loaded_models() after checking caches and in-flight requests. + + For Ollama endpoints: queries /api/ps and returns model names + For llama-server endpoints: queries /v1/models and filters for status.value == "loaded" """ client: aiohttp.ClientSession = app_state["session"] - try: - async with client.get(f"{endpoint}/api/ps") as resp: - await _ensure_success(resp) - data = await resp.json() - # The response format is: - # {"models": [{"name": "model1"}, {"name": "model2"}]} - models = {m.get("name") for m in data.get("models", []) if m.get("name")} + + # Check if this is a llama-server endpoint + if endpoint in config.llama_server_endpoints: + # Query /v1/models for llama-server + try: + async with client.get(f"{endpoint}/models") as resp: + await _ensure_success(resp) + data = await resp.json() + + # Filter for loaded models only + items = data.get("data", []) + models = { + item.get("id") + for item in items + if item.get("id") and _is_llama_model_loaded(item) + } - # Update cache with lock protection - async with _loaded_models_cache_lock: - _loaded_models_cache[endpoint] = (models, time.time()) - return models - except Exception as e: - # If anything goes wrong we simply assume the endpoint has no models - message = _format_connection_issue(f"{endpoint}/api/ps", e) - print(f"[fetch.loaded_models] {message}") - return set() + # Update cache with lock protection + async with _loaded_models_cache_lock: + _loaded_models_cache[endpoint] = (models, time.time()) + return models + except Exception as e: + # If anything goes wrong we simply assume the endpoint has no models + message = _format_connection_issue(f"{endpoint}/models", e) + print(f"[fetch.loaded_models] {message}") + return set() + else: + # Original Ollama /api/ps logic + try: + async with client.get(f"{endpoint}/api/ps") as resp: + await _ensure_success(resp) + data = await resp.json() + # The response format is: + # {"models": [{"name": "model1"}, {"name": "model2"}]} + models = {m.get("name") for m in data.get("models", []) if m.get("name")} + + # Update cache with lock protection + async with _loaded_models_cache_lock: + _loaded_models_cache[endpoint] = (models, time.time()) + return models + except Exception as e: + # If anything goes wrong we simply assume the endpoint has no models + message = _format_connection_issue(f"{endpoint}/api/ps", e) + print(f"[fetch.loaded_models] {message}") + return set() async def _refresh_loaded_models(endpoint: str) -> None: """ @@ -642,7 +759,7 @@ class fetch: models, cached_at = _loaded_models_cache[endpoint] # FRESH: < 10s old - return immediately - if _is_fresh(cached_at, 10): + if _is_fresh(cached_at, 30): return models # STALE: 10-60s old - return stale data and refresh in background @@ -655,12 +772,12 @@ class fetch: del _loaded_models_cache[endpoint] # Check error cache with lock protection - async with _error_cache_lock: - if endpoint in _error_cache: - if _is_fresh(_error_cache[endpoint], 10): + async with _loaded_error_cache_lock: + if endpoint in _loaded_error_cache: + if _is_fresh(_loaded_error_cache[endpoint], 30): return set() # Error expired - remove it - del _error_cache[endpoint] + del _loaded_error_cache[endpoint] # Request coalescing: check if another request is already fetching this endpoint async with _inflight_lock: @@ -682,16 +799,27 @@ class fetch: if _inflight_loaded_models.get(endpoint) == task: _inflight_loaded_models.pop(endpoint, None) - async def endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]: + async def endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None, skip_error_cache: bool = False) -> List[dict]: """ Query / to fetch and return a List of dicts with details for the corresponding Ollama endpoint. If the request fails we respond with "N/A" for detail. + + When ``skip_error_cache`` is False (the default), the call is short-circuited + if the endpoint recently failed (recorded in ``_available_error_cache``). + Pass ``skip_error_cache=True`` from health-check routes that must always probe. """ + # Fast-fail if the endpoint is known to be down (unless caller opts out) + if not skip_error_cache: + async with _available_error_cache_lock: + if endpoint in _available_error_cache: + if _is_fresh(_available_error_cache[endpoint], 30): + return [] + client: aiohttp.ClientSession = app_state["session"] headers = None if api_key is not None: headers = {"Authorization": "Bearer " + api_key} - + request_url = f"{endpoint}{route}" try: async with client.get(request_url, headers=headers) as resp: @@ -703,6 +831,9 @@ class fetch: # If anything goes wrong we cannot reply details message = _format_connection_issue(request_url, e) print(f"[fetch.endpoint_details] {message}") + # Record failure so subsequent calls skip this endpoint briefly + async with _available_error_cache_lock: + _available_error_cache[endpoint] = time.time() return [] def ep2base(ep): @@ -749,13 +880,13 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No Helper function to make a chat request to a specific endpoint. Handles endpoint selection, client creation, usage tracking, and request execution. """ - is_openai_endpoint = "/v1" in endpoint - - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest")[0] if messages: messages = transform_images_to_data_urls(messages) + messages = transform_tool_calls_to_openai(messages) params = { "messages": messages, "model": model, @@ -774,21 +905,23 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No "response_format": {"type": "json_schema", "json_schema": format} if format is not None else None } params.update({k: v for k, v in optional_params.items() if v is not None}) - oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) try: - if is_openai_endpoint: + if use_openai: start_ts = time.perf_counter() response = await oclient.chat.completions.create(**params) if stream: # For streaming, we need to collect all chunks chunks = [] + tc_acc = {} # accumulate tool-call deltas async for chunk in response: chunks.append(chunk) + _accumulate_openai_tc_delta(chunk, tc_acc) if chunk.usage is not None: prompt_tok = chunk.usage.prompt_tokens or 0 comp_tok = chunk.usage.completion_tokens or 0 @@ -797,6 +930,9 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No # Convert to Ollama format if chunks: response = rechunk.openai_chat_completion2ollama(chunks[-1], stream, start_ts) + # Inject fully-accumulated tool calls into the final response + if tc_acc and response.message: + response.message.tool_calls = _build_ollama_tool_calls(tc_acc) else: prompt_tok = response.usage.prompt_tokens or 0 comp_tok = response.usage.completion_tokens or 0 @@ -950,6 +1086,39 @@ def resize_image_if_needed(image_data): print(f"Error processing image: {e}") return None +def transform_tool_calls_to_openai(message_list): + """ + Ensure tool_calls in assistant messages conform to the OpenAI format: + - Each tool call must have "type": "function" + - Each tool call must have an "id" + - arguments must be a JSON string, not a dict + Also ensure tool-role messages have a tool_call_id. + """ + # Track generated IDs so tool-role messages can reference them + last_tool_call_ids = {} + for msg in message_list: + role = msg.get("role") + if role == "assistant" and "tool_calls" in msg: + for tc in msg["tool_calls"]: + if "type" not in tc: + tc["type"] = "function" + if "id" not in tc: + tc["id"] = f"call_{secrets.token_hex(16)}" + func = tc.get("function", {}) + if isinstance(func.get("arguments"), dict): + func["arguments"] = orjson.dumps(func["arguments"]).decode("utf-8") + # Remember the id for the following tool-role message + name = func.get("name") + if name: + last_tool_call_ids[name] = tc["id"] + elif role == "tool": + if "tool_call_id" not in msg: + # Try to match by name from a preceding assistant tool_call + name = msg.get("name") or msg.get("tool_name") + if name and name in last_tool_call_ids: + msg["tool_call_id"] = last_tool_call_ids.pop(name) + return message_list + def transform_images_to_data_urls(message_list): for message in message_list: if "images" in message: @@ -977,6 +1146,51 @@ def transform_images_to_data_urls(message_list): return message_list +def _accumulate_openai_tc_delta(chunk, accumulator: dict) -> None: + """Accumulate tool_call deltas from a single OpenAI streaming chunk. + + ``accumulator`` is a dict mapping tool-call *index* to + ``{"id": str, "name": str, "arguments": str}`` where ``arguments`` + is the concatenation of all JSON fragments seen so far. + """ + if not chunk.choices: + return + delta = chunk.choices[0].delta + tc_deltas = getattr(delta, "tool_calls", None) + if not tc_deltas: + return + for tc in tc_deltas: + idx = tc.index + if idx not in accumulator: + accumulator[idx] = { + "id": getattr(tc, "id", None) or f"call_{secrets.token_hex(16)}", + "name": tc.function.name if tc.function else None, + "arguments": "", + } + else: + if getattr(tc, "id", None): + accumulator[idx]["id"] = tc.id + if tc.function and tc.function.name: + accumulator[idx]["name"] = tc.function.name + if tc.function and tc.function.arguments: + accumulator[idx]["arguments"] += tc.function.arguments + +def _build_ollama_tool_calls(accumulator: dict) -> list | None: + """Convert accumulated tool-call data into Ollama-format tool_calls list.""" + if not accumulator: + return None + result = [] + for idx in sorted(accumulator.keys()): + tc = accumulator[idx] + try: + args = orjson.loads(tc["arguments"]) if tc["arguments"] else {} + except (orjson.JSONDecodeError, TypeError): + args = {} + result.append(ollama.Message.ToolCall( + function=ollama.Message.ToolCall.Function(name=tc["name"], arguments=args) + )) + return result + class rechunk: def openai_chat_completion2ollama(chunk: dict, stream: bool, start_ts: float) -> ollama.ChatResponse: now = time.perf_counter() @@ -987,29 +1201,46 @@ class rechunk: done=True, done_reason='stop', total_duration=int((now - start_ts) * 1_000_000_000), - load_duration=100000, + load_duration=100000, prompt_eval_count=int(chunk.usage.prompt_tokens), - prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)), + prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)), eval_count=int(chunk.usage.completion_tokens), eval_duration=int((now - start_ts) * 1_000_000_000), - message={"role": "assistant"} + message=ollama.Message(role="assistant", content=""), ) with_thinking = chunk.choices[0] if chunk.choices[0] else None if stream == True: - thinking = getattr(with_thinking.delta, "reasoning", None) if with_thinking else None + thinking = (getattr(with_thinking.delta, "reasoning_content", None) or getattr(with_thinking.delta, "reasoning", None)) if with_thinking else None role = chunk.choices[0].delta.role or "assistant" content = chunk.choices[0].delta.content or '' else: - thinking = getattr(with_thinking, "reasoning", None) if with_thinking else None + thinking = (getattr(with_thinking.message, "reasoning_content", None) or getattr(with_thinking.message, "reasoning", None)) if with_thinking else None role = chunk.choices[0].message.role or "assistant" content = chunk.choices[0].message.content or '' + # Convert OpenAI tool_calls to Ollama format + # In streaming mode, tool_calls arrive as partial deltas across multiple chunks + # (name only in first delta, arguments as incremental JSON fragments). + # Callers must accumulate deltas and inject the final result; skip here. + ollama_tool_calls = None + if not stream: + raw_tool_calls = getattr(with_thinking.message, "tool_calls", None) if with_thinking else None + if raw_tool_calls: + ollama_tool_calls = [] + for tc in raw_tool_calls: + try: + args = orjson.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else (tc.function.arguments or {}) + except (orjson.JSONDecodeError, TypeError): + args = {} + ollama_tool_calls.append(ollama.Message.ToolCall( + function=ollama.Message.ToolCall.Function(name=tc.function.name, arguments=args) + )) assistant_msg = ollama.Message( role=role, content=content, thinking=thinking, images=None, tool_name=None, - tool_calls=None) + tool_calls=ollama_tool_calls) rechunk = ollama.ChatResponse( model=chunk.model, created_at=iso8601_ns(), @@ -1134,35 +1365,40 @@ async def choose_endpoint(model: str) -> str: 6️⃣ If no endpoint advertises the model at all, raise an error. """ # 1️⃣ Gather advertised‑model sets for all endpoints concurrently - tag_tasks = [fetch.available_models(ep) for ep in config.endpoints if "/v1" not in ep] - tag_tasks += [fetch.available_models(ep, config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] + # Include both config.endpoints and config.llama_server_endpoints + llama_eps_extra = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints] + all_endpoints = config.endpoints + llama_eps_extra + + tag_tasks = [fetch.available_models(ep) for ep in config.endpoints if not is_openai_compatible(ep)] + tag_tasks += [fetch.available_models(ep, config.api_keys.get(ep)) for ep in config.endpoints if is_openai_compatible(ep)] + tag_tasks += [fetch.available_models(ep, config.api_keys.get(ep)) for ep in llama_eps_extra] advertised_sets = await asyncio.gather(*tag_tasks) # 2️⃣ Filter endpoints that advertise the requested model candidate_endpoints = [ - ep for ep, models in zip(config.endpoints, advertised_sets) + ep for ep, models in zip(all_endpoints, advertised_sets) if model in models ] - + # 6️⃣ if not candidate_endpoints: - if ":latest" in model: #ollama naming convention not applicable to openai + if ":latest" in model: #ollama naming convention not applicable to openai/llama-server model_without_latest = model.split(":latest")[0] candidate_endpoints = [ - ep for ep, models in zip(config.endpoints, advertised_sets) - if model_without_latest in models and is_ext_openai_endpoint(ep) + ep for ep, models in zip(all_endpoints, advertised_sets) + if model_without_latest in models and (is_ext_openai_endpoint(ep) or ep in config.llama_server_endpoints) ] if not candidate_endpoints: # Only add :latest suffix if model doesn't already have a version suffix if ":" not in model: model = model + ":latest" candidate_endpoints = [ - ep for ep, models in zip(config.endpoints, advertised_sets) + ep for ep, models in zip(all_endpoints, advertised_sets) if model in models ] if not candidate_endpoints: raise RuntimeError( - f"None of the configured endpoints ({', '.join(config.endpoints)}) " + f"None of the configured endpoints ({', '.join(all_endpoints)}) " f"advertise the model '{model}'." ) # 3️⃣ Among the candidates, find those that have the model *loaded* @@ -1210,7 +1446,7 @@ async def choose_endpoint(model: str) -> str: # Then by total endpoint usage (ascending) to balance idle endpoints endpoints_with_free_slot.sort( key=lambda ep: ( - -usage_counts.get(ep, {}).get(model, 0), # Primary: per-model usage (descending - prefer endpoints with connections) + #-usage_counts.get(ep, {}).get(model, 0), # Primary: per-model usage (descending - prefer endpoints with connections) sum(usage_counts.get(ep, {}).values()) # Secondary: total endpoint usage (ascending - prefer idle endpoints) ) ) @@ -1266,14 +1502,13 @@ async def proxy(request: Request): endpoint = await choose_endpoint(model) - is_openai_endpoint = "/v1" in endpoint - - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest") model = model[0] params = { - "prompt": prompt, + "prompt": prompt, "model": model, } @@ -1289,7 +1524,7 @@ async def proxy(request: Request): "suffix": suffix, } params.update({k: v for k, v in optional_params.items() if v is not None}) - oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) @@ -1297,14 +1532,14 @@ async def proxy(request: Request): # 4. Async generator that streams data and decrements the counter async def stream_generate_response(): try: - if is_openai_endpoint: + if use_openai: start_ts = time.perf_counter() async_gen = await oclient.completions.create(**params) else: async_gen = await client.generate(model=model, prompt=prompt, suffix=suffix, system=system, template=template, context=context, stream=stream, think=think, raw=raw, format=_format, images=images, options=options, keep_alive=keep_alive) if stream == True: async for chunk in async_gen: - if is_openai_endpoint: + if use_openai: chunk = rechunk.openai_completion2ollama(chunk, stream, start_ts) prompt_tok = chunk.prompt_eval_count or 0 comp_tok = chunk.eval_count or 0 @@ -1316,7 +1551,7 @@ async def proxy(request: Request): json_line = orjson.dumps(chunk) yield json_line.encode("utf-8") + b"\n" else: - if is_openai_endpoint: + if use_openai: response = rechunk.openai_completion2ollama(async_gen, stream, start_ts) response = response.model_dump_json() else: @@ -1386,16 +1621,16 @@ async def chat_proxy(request: Request): else: opt = False endpoint = await choose_endpoint(model) - is_openai_endpoint = "/v1" in endpoint - - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest") model = model[0] if messages: messages = transform_images_to_data_urls(messages) + messages = transform_tool_calls_to_openai(messages) params = { - "messages": messages, + "messages": messages, "model": model, } optional_params = { @@ -1412,7 +1647,7 @@ async def chat_proxy(request: Request): "response_format": {"type": "json_schema", "json_schema": _format} if _format is not None else None } params.update({k: v for k, v in optional_params.items() if v is not None}) - oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) @@ -1420,7 +1655,7 @@ async def chat_proxy(request: Request): async def stream_chat_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - if is_openai_endpoint: + if use_openai: start_ts = time.perf_counter() async_gen = await oclient.chat.completions.create(**params) else: @@ -1430,9 +1665,14 @@ async def chat_proxy(request: Request): else: async_gen = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=_format, options=options, keep_alive=keep_alive) if stream == True: + tc_acc = {} # accumulate OpenAI tool-call deltas across chunks async for chunk in async_gen: - if is_openai_endpoint: + if use_openai: + _accumulate_openai_tc_delta(chunk, tc_acc) chunk = rechunk.openai_chat_completion2ollama(chunk, stream, start_ts) + # Inject fully-accumulated tool calls only into the final chunk + if chunk.done and tc_acc and chunk.message: + chunk.message.tool_calls = _build_ollama_tool_calls(tc_acc) # `chunk` can be a dict or a pydantic model – dump to JSON safely prompt_tok = chunk.prompt_eval_count or 0 comp_tok = chunk.eval_count or 0 @@ -1444,7 +1684,7 @@ async def chat_proxy(request: Request): json_line = orjson.dumps(chunk) yield json_line.encode("utf-8") + b"\n" else: - if is_openai_endpoint: + if use_openai: response = rechunk.openai_chat_completion2ollama(async_gen, stream, start_ts) response = response.model_dump_json() else: @@ -1503,13 +1743,12 @@ async def embedding_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) - is_openai_endpoint = "/v1" in endpoint - - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest") model = model[0] - client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) + client = openai.AsyncOpenAI(base_url=ep2base(endpoint), api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) @@ -1517,7 +1756,7 @@ async def embedding_proxy(request: Request): async def stream_embedding_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - if is_openai_endpoint: + if use_openai: async_gen = await client.embeddings.create(input=prompt, model=model) async_gen = rechunk.openai_embeddings2ollama(async_gen) else: @@ -1570,13 +1809,12 @@ async def embed_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) - is_openai_endpoint = is_ext_openai_endpoint(endpoint) #"/v1" in endpoint - - if is_openai_endpoint: + use_openai = is_openai_compatible(endpoint) + if use_openai: if ":latest" in model: model = model.split(":latest") model = model[0] - client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint]) + client = openai.AsyncOpenAI(base_url=ep2base(endpoint), api_key=config.api_keys.get(endpoint, "no-key")) else: client = ollama.AsyncClient(host=endpoint) await increment_usage(endpoint, model) @@ -1584,7 +1822,7 @@ async def embed_proxy(request: Request): async def stream_embedding_response(): try: # The chat method returns a generator of dicts (or GenerateResponse) - if is_openai_endpoint: + if use_openai: async_gen = await client.embeddings.create(input=_input, model=model) async_gen = rechunk.openai_embed2ollama(async_gen, model) else: @@ -1983,8 +2221,11 @@ async def tags_proxy(request: Request): # 1. Query all endpoints for models tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] + # Also query llama-server endpoints not already covered by config.endpoints + llama_eps_for_tags = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints] + tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep)) for ep in llama_eps_for_tags] all_models = await asyncio.gather(*tasks) - + models = {'models': []} for modellist in all_models: for model in modellist: @@ -2010,18 +2251,47 @@ async def tags_proxy(request: Request): @app.get("/api/ps") async def ps_proxy(request: Request): """ - Proxy a ps request to all Ollama endpoints and reply a unique list of all running models. + Proxy a ps request to all Ollama and llama-server endpoints and reply a unique list of all running models. + For Ollama endpoints: queries /api/ps + For llama-server endpoints: queries /v1/models with status.value == "loaded" """ - # 1. Query all endpoints for running models - tasks = [fetch.endpoint_details(ep, "/api/ps", "models") for ep in config.endpoints if "/v1" not in ep] - loaded_models = await asyncio.gather(*tasks) + # 1. Query Ollama endpoints for running models via /api/ps + ollama_tasks = [fetch.endpoint_details(ep, "/api/ps", "models") for ep in config.endpoints if "/v1" not in ep] + # 2. Query llama-server endpoints for loaded models via /v1/models + # Also query endpoints from llama_server_endpoints that may not be in config.endpoints + all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints) + llama_tasks = [ + fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep)) + for ep in all_llama_endpoints + ] + + ollama_loaded = await asyncio.gather(*ollama_tasks) if ollama_tasks else [] + llama_loaded = await asyncio.gather(*llama_tasks) if llama_tasks else [] models = {'models': []} - for modellist in loaded_models: - models['models'] += modellist + # Add Ollama models (if any) + if ollama_loaded: + for modellist in ollama_loaded: + models['models'] += modellist + # Add llama-server models (filter for loaded only, if any) + if llama_loaded: + for modellist in llama_loaded: + loaded_models = [item for item in modellist if _is_llama_model_loaded(item)] + # Convert llama-server format to Ollama-like format for consistency + for item in loaded_models: + raw_id = item.get("id", "") + normalized = _normalize_llama_model_name(raw_id) + quant = _extract_llama_quant(raw_id) + models['models'].append({ + "name": normalized, + "id": normalized, + "digest": "", + "status": item.get("status"), + "details": {"quantization_level": quant} if quant else {} + }) - # 2. Return a JSONResponse with deduplicated currently deployed models + # 3. Return a JSONResponse with deduplicated currently deployed models return JSONResponse( content={"models": dedupe_on_keys(models['models'], ['digest'])}, status_code=200, @@ -2033,19 +2303,63 @@ async def ps_proxy(request: Request): @app.get("/api/ps_details") async def ps_details_proxy(request: Request): """ - Proxy a ps request to all Ollama endpoints and reply with per-endpoint instances. + Proxy a ps request to all Ollama and llama-server endpoints and reply with per-endpoint instances. This keeps /api/ps backward compatible while providing richer data. + + For Ollama endpoints: queries /api/ps + For llama-server endpoints: queries /v1/models with status info """ - tasks = [(ep, fetch.endpoint_details(ep, "/api/ps", "models")) for ep in config.endpoints if "/v1" not in ep] - loaded_models = await asyncio.gather(*[task for _, task in tasks]) + # 1. Query Ollama endpoints via /api/ps + ollama_tasks = [(ep, fetch.endpoint_details(ep, "/api/ps", "models")) for ep in config.endpoints if "/v1" not in ep] + # 2. Query llama-server endpoints via /v1/models + # Also query endpoints from llama_server_endpoints that may not be in config.endpoints + all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints) + llama_tasks = [ + (ep, fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep))) + for ep in all_llama_endpoints + ] + + ollama_loaded = await asyncio.gather(*[task for _, task in ollama_tasks]) if ollama_tasks else [] + llama_loaded = await asyncio.gather(*[task for _, task in llama_tasks]) if llama_tasks else [] models: list[dict] = [] - for (endpoint, modellist) in zip([ep for ep, _ in tasks], loaded_models): - for model in modellist: - if isinstance(model, dict): - model_with_endpoint = dict(model) - model_with_endpoint["endpoint"] = endpoint - models.append(model_with_endpoint) + + # Add Ollama models with endpoint info (if any) + if ollama_loaded: + for (endpoint, modellist) in zip([ep for ep, _ in ollama_tasks], ollama_loaded): + for model in modellist: + if isinstance(model, dict): + model_with_endpoint = dict(model) + model_with_endpoint["endpoint"] = endpoint + models.append(model_with_endpoint) + + # Add llama-server models with endpoint info and full status metadata (if any) + if llama_loaded: + for (endpoint, modellist) in zip([ep for ep, _ in llama_tasks], llama_loaded): + # Filter for loaded models only + loaded_models = [item for item in modellist if _is_llama_model_loaded(item)] + for item in loaded_models: + if isinstance(item, dict) and item.get("id"): + raw_id = item["id"] + normalized = _normalize_llama_model_name(raw_id) + quant = _extract_llama_quant(raw_id) + model_with_endpoint = { + "name": normalized, + "id": normalized, + "original_name": raw_id, + "digest": "", + "details": {"quantization_level": quant} if quant else {}, + "endpoint": endpoint, + "status": item.get("status"), + "created": item.get("created"), + "owned_by": item.get("owned_by") + } + # Include full llama-server status details (args, preset) + status_info = item.get("status", {}) + if isinstance(status_info, dict): + model_with_endpoint["llama_status_args"] = status_info.get("args") + model_with_endpoint["llama_status_preset"] = status_info.get("preset") + models.append(model_with_endpoint) return JSONResponse(content={"models": models}, status_code=200) @@ -2068,7 +2382,7 @@ async def usage_proxy(request: Request): async def config_proxy(request: Request): """ Return a simple JSON object that contains the configured - Ollama endpoints. The front‑end uses this to display + Ollama endpoints and llama_server_endpoints. The front‑end uses this to display which endpoints are being proxied. """ async def check_endpoint(url: str): @@ -2092,9 +2406,17 @@ async def config_proxy(request: Request): detail = _format_connection_issue(target_url, e) return {"url": url, "status": "error", "detail": detail} - results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints]) + # Check Ollama endpoints + ollama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints]) + + # Check llama-server endpoints + llama_results = [] + if config.llama_server_endpoints: + llama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.llama_server_endpoints]) + return { - "endpoints": results, + "endpoints": ollama_results, + "llama_server_endpoints": llama_results, "require_router_api_key": bool(config.router_api_key), } @@ -2129,8 +2451,8 @@ async def openai_embedding_proxy(request: Request): # 2. Endpoint logic endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) - if "/v1" in endpoint: # and is_ext_openai_endpoint(endpoint): - api_key = config.api_keys[endpoint] + if is_openai_compatible(endpoint): + api_key = config.api_keys.get(endpoint, "no-key") else: api_key = "ollama" base_url = ep2base(endpoint) @@ -2215,8 +2537,7 @@ async def openai_chat_completions_proxy(request: Request): endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) base_url = ep2base(endpoint) - - oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) # 3. Async generator that streams completions data and decrements the counter async def stream_ochat_response(): try: @@ -2341,8 +2662,7 @@ async def openai_completions_proxy(request: Request): endpoint = await choose_endpoint(model) await increment_usage(endpoint, model) base_url = ep2base(endpoint) - - oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys[endpoint]) + oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) # 3. Async generator that streams completions data and decrements the counter async def stream_ocompletions_response(model=model): @@ -2398,22 +2718,46 @@ async def openai_completions_proxy(request: Request): @app.get("/v1/models") async def openai_models_proxy(request: Request): """ - Proxy an OpenAI API models request to Ollama endpoints and reply with a unique list of all models. - + Proxy an OpenAI API models request to Ollama and llama-server endpoints and reply with a unique list of models. + + For Ollama endpoints: queries /api/tags (all models) + For llama-server endpoints: queries /v1/models and filters for status.value == "loaded" """ - # 1. Query all endpoints for models - tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] - tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep] - all_models = await asyncio.gather(*tasks) + # 1. Query Ollama endpoints for all models via /api/tags + ollama_tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep] + # 2. Query llama-server endpoints for loaded models via /v1/models + # Also query endpoints from llama_server_endpoints that may not be in config.endpoints + all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints) + llama_tasks = [ + fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep)) + for ep in all_llama_endpoints + ] + + ollama_models = await asyncio.gather(*ollama_tasks) if ollama_tasks else [] + llama_models = await asyncio.gather(*llama_tasks) if llama_tasks else [] models = {'data': []} - for modellist in all_models: - for model in modellist: - if not "id" in model.keys(): # Relable Ollama models with OpenAI Model.id from Model.name - model['id'] = model['name'] - else: - model['name'] = model['id'] - models['data'] += modellist + + # Add Ollama models (if any) + if ollama_models: + for modellist in ollama_models: + for model in modellist: + if not "id" in model.keys(): # Relable Ollama models with OpenAI Model.id from Model.name + model['id'] = model.get('name', model.get('id', '')) + else: + model['name'] = model['id'] + models['data'].append(model) + + # Add llama-server models (filter for loaded only, if any) + if llama_models: + for modellist in llama_models: + loaded_models = [item for item in modellist if _is_llama_model_loaded(item)] + for model in loaded_models: + if not "id" in model.keys(): + model['id'] = model.get('name', model.get('id', '')) + else: + model['name'] = model['id'] + models['data'].append(model) # 2. Return a JSONResponse with a deduplicated list of unique models for inference return JSONResponse( @@ -2459,7 +2803,7 @@ async def health_proxy(request: Request): * The HTTP status code is 200 when everything is healthy, 503 otherwise. """ # Run all health checks in parallel - tasks = [fetch.endpoint_details(ep, "/api/version", "version") for ep in config.endpoints] # if not is_ext_openai_endpoint(ep)] + tasks = [fetch.endpoint_details(ep, "/api/version", "version", skip_error_cache=True) for ep in config.endpoints] # if not is_ext_openai_endpoint(ep)] results = await asyncio.gather(*tasks, return_exceptions=True) @@ -2524,6 +2868,7 @@ async def startup_event() -> None: print( f"Loaded configuration from {config_path}:\n" f" endpoints={config.endpoints},\n" + f" llama_server_endpoints={config.llama_server_endpoints},\n" f" max_concurrent_connections={config.max_concurrent_connections}" ) else: diff --git a/static/index.html b/static/index.html index 09cc6fe..3d4d364 100644 --- a/static/index.html +++ b/static/index.html @@ -379,7 +379,7 @@ Quant Ctx Size - Until + Unload Digest Tokens @@ -683,7 +683,12 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { showApiKeyModal("Enter the NOMYO Router API key to load the dashboard."); } const body = document.getElementById("endpoints-body"); - body.innerHTML = data.endpoints + + // Build HTML for both endpoints and llama_server_endpoints + let html = ""; + + // Add Ollama endpoints + html += data.endpoints .map((e) => { const statusClass = e.status === "ok" @@ -698,6 +703,27 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { `; }) .join(""); + + // Add llama-server endpoints + if (data.llama_server_endpoints && data.llama_server_endpoints.length > 0) { + html += data.llama_server_endpoints + .map((e) => { + const statusClass = + e.status === "ok" + ? "status-ok" + : "status-error"; + const version = e.version || "N/A"; + return ` + + ${e.url} + ${e.status} + ${version} + `; + }) + .join(""); + } + + body.innerHTML = html; } catch (e) { console.error(e); const body = document.getElementById("endpoints-body"); @@ -837,7 +863,7 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { const formatUntil = (value) => { if (value === null || value === undefined || value === "") { - return "Forever"; + return "∞"; } let targetTime; @@ -898,10 +924,11 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { const params = modelInstances[0]?.details?.parameter_size ?? ""; const quant = modelInstances[0]?.details?.quantization_level ?? ""; const ctx = modelInstances[0]?.context_length ?? ""; + const originalName = modelInstances[0]?.original_name || modelName; const uniqueEndpoints = Array.from(new Set(endpoints)); const endpointsData = encodeURIComponent(JSON.stringify(uniqueEndpoints)); return ` - ${modelName} stats + ${modelName} stats ${renderInstanceList(endpoints)} ${params} ${quant}