Merge pull request #25 from nomyo-ai/dev-v0.6

- updated reasoning handling
- improved model and error caches
- fixed openai tool calling incl. ollama translations
- direct support for llama.cpp's llama_server via llama_server_endpoint config
- basic llama_server model info in dashboard
- improved endpoint info fetching behaviour in error cases
This commit is contained in:
Alpha Nerd 2026-02-13 10:34:42 +01:00 committed by GitHub
commit 9ef1b770ba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 528 additions and 164 deletions

View file

@ -1,40 +1,32 @@
# Sample NOMYO Router Configuration
# Basic single endpoint configuration
# config.yaml
# Ollama endpoints
endpoints:
- http://localhost:11434
- http://192.168.0.50:11434
- http://192.168.0.51:11434
- http://192.168.0.52:11434
# External OpenAI-compatible endpoints (will NOT be queried for /api/ps /api/ps_details)
- https://api.openai.com/v1
# llama-server endpoints (OpenAI-compatible with /v1/models status info)
# These endpoints will be queried for /api/tags, /api/ps, /api/ps_details
# and included in the model selection pool for inference routing
llama_server_endpoints:
- http://localhost:8080/v1
- http://localhost:8081/v1
# Maximum concurrent connections *per endpointmodel pair* (equals to OLLAMA_NUM_PARALLEL)
max_concurrent_connections: 2
# Optional router-level API key to secure the router and dashboard (leave blank to disable)
# Optional router-level API key that gates router/API/web UI access (leave empty to disable)
nomyo-router-api-key: ""
# Multi-endpoint configuration with local Ollama instances
# endpoints:
# - http://ollama-worker1:11434
# - http://ollama-worker2:11434
# - http://ollama-worker3:11434
# Mixed configuration with Ollama and OpenAI endpoints
# endpoints:
# - http://localhost:11434
# - https://api.openai.com/v1
# API keys for remote endpoints
# Use ${VAR_NAME} syntax to reference environment variables
# Set an environment variable like OPENAI_KEY
# Confirm endpoints are exactly as in endpoints block
api_keys:
# Local Ollama instances typically don't require authentication
"http://localhost:11434": "ollama"
# Remote Ollama instances
# "http://remote-ollama:11434": "ollama"
# OpenAI API
# "https://api.openai.com/v1": "${OPENAI_KEY}"
# Anthropic API
# "https://api.anthropic.com/v1": "${ANTHROPIC_KEY}"
# Other OpenAI-compatible endpoints
# "https://api.mistral.ai/v1": "${MISTRAL_KEY}"
"http://192.168.0.50:11434": "ollama"
"http://192.168.0.51:11434": "ollama"
"http://192.168.0.52:11434": "ollama"
"https://api.openai.com/v1": "${OPENAI_KEY}"
"http://localhost:8080/v1": "llama-server" # Optional API key for llama-server
"http://localhost:8081/v1": "llama-server"

533
router.py
View file

@ -34,16 +34,18 @@ from PIL import Image
# Successful results are cached for 300s
_models_cache: dict[str, tuple[Set[str], float]] = {}
_loaded_models_cache: dict[str, tuple[Set[str], float]] = {}
# Transient errors are cached for 1s the key stays until the
# timeout expires, after which the endpoint will be queried again.
_error_cache: dict[str, float] = {}
# Transient errors are cached separately per concern so that a failure
# in one path does not poison the other.
_available_error_cache: dict[str, float] = {}
_loaded_error_cache: dict[str, float] = {}
# ------------------------------------------------------------------
# Cache locks
# ------------------------------------------------------------------
_models_cache_lock = asyncio.Lock()
_loaded_models_cache_lock = asyncio.Lock()
_error_cache_lock = asyncio.Lock()
_available_error_cache_lock = asyncio.Lock()
_loaded_error_cache_lock = asyncio.Lock()
# ------------------------------------------------------------------
# In-flight request tracking (prevents cache stampede)
@ -106,6 +108,8 @@ class Config(BaseSettings):
"http://localhost:11434",
]
)
# List of llama-server endpoints (OpenAI-compatible with /v1/models status info)
llama_server_endpoints: List[str] = Field(default_factory=list)
# Max concurrent connections per endpointmodel pair, see OLLAMA_NUM_PARALLEL
max_concurrent_connections: int = 1
@ -254,13 +258,19 @@ async def enforce_router_api_key(request: Request, call_next):
# Strip the api_key query param from scope so access logs do not leak it
_strip_api_key_from_scope(request)
if provided_key is None:
response = await call_next(request)
# Add CORS headers for API key authentication requests
# No key provided but authentication is required - return 401
headers = {}
if "/api/" in path and path != "/api/usage-stream":
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Headers"] = "Authorization, Content-Type"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
return response
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Authorization, Content-Type",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
}
return JSONResponse(
content={"detail": "Missing NOMYO Router API key"},
status_code=401,
headers=headers,
)
if not secrets.compare_digest(str(provided_key), str(expected_key)):
return JSONResponse(
@ -339,7 +349,50 @@ def _format_connection_issue(url: str, error: Exception) -> str:
return f"Error while contacting {url}: {error}"
def _normalize_llama_model_name(name: str) -> str:
"""Extract the model name from a huggingface-style identifier.
e.g. 'unsloth/gpt-oss-20b-GGUF:F16' -> 'gpt-oss-20b-GGUF'
"""
if "/" in name:
name = name.rsplit("/", 1)[1]
if ":" in name:
name = name.split(":")[0]
return name
def _extract_llama_quant(name: str) -> str:
"""Extract the quantization level from a huggingface-style identifier.
e.g. 'unsloth/gpt-oss-20b-GGUF:Q8_0' -> 'Q8_0'
Returns empty string if no quant suffix is present.
"""
if ":" in name:
return name.rsplit(":", 1)[1]
return ""
def _is_llama_model_loaded(item: dict) -> bool:
"""Return True if a llama-server /v1/models item has status 'loaded'.
Handles both dict format ({"value": "loaded"}) and plain string ("loaded")."""
status = item.get("status")
if isinstance(status, dict):
return status.get("value") == "loaded"
if isinstance(status, str):
return status == "loaded"
return False
def is_ext_openai_endpoint(endpoint: str) -> bool:
"""
Determine if an endpoint is an external OpenAI-compatible endpoint (not Ollama or llama-server).
Returns True for:
- External services like OpenAI.com, Groq, etc.
Returns False for:
- Ollama endpoints (without /v1, or with /v1 but default port 11434)
- llama-server endpoints (explicitly configured in llama_server_endpoints)
"""
# Check if it's a llama-server endpoint (has /v1 and is in the configured list)
if endpoint in config.llama_server_endpoints:
return False
if "/v1" not in endpoint:
return False
@ -353,6 +406,13 @@ def is_ext_openai_endpoint(endpoint: str) -> bool:
return True # It's an external OpenAI endpoint
def is_openai_compatible(endpoint: str) -> bool:
"""
Return True if the endpoint speaks the OpenAI API (not native Ollama).
This includes external OpenAI endpoints AND llama-server endpoints.
"""
return "/v1" in endpoint or endpoint in config.llama_server_endpoints
async def token_worker() -> None:
try:
while True:
@ -508,7 +568,10 @@ class fetch:
if api_key is not None:
headers = {"Authorization": "Bearer " + api_key}
if "/v1" in endpoint:
if endpoint in config.llama_server_endpoints and "/v1" not in endpoint:
endpoint_url = f"{endpoint}/v1/models"
key = "data"
elif "/v1" in endpoint or endpoint in config.llama_server_endpoints:
endpoint_url = f"{endpoint}/models"
key = "data"
else:
@ -533,10 +596,21 @@ class fetch:
message = _format_connection_issue(endpoint_url, e)
print(f"[fetch.available_models] {message}")
# Update error cache with lock protection
async with _error_cache_lock:
_error_cache[endpoint] = time.time()
async with _available_error_cache_lock:
_available_error_cache[endpoint] = time.time()
return set()
async def _refresh_available_models(endpoint: str, api_key: Optional[str] = None) -> None:
"""
Background task to refresh available models cache without blocking the caller.
Used for stale-while-revalidate pattern.
"""
try:
await fetch._fetch_available_models_internal(endpoint, api_key)
except Exception as e:
# Silently fail - cache will remain stale but functional
print(f"[fetch._refresh_available_models] Background refresh failed for {endpoint}: {e}")
async def available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]:
"""
Query <endpoint>/api/tags and return a set of all model names that the
@ -547,6 +621,10 @@ class fetch:
Uses request coalescing to prevent cache stampede: if multiple requests
arrive when cache is expired, only one actual HTTP request is made.
Uses stale-while-revalidate: when the cache is between 300-600s old,
the stale data is returned immediately while a background refresh runs.
This prevents model blackouts caused by transient timeouts.
If the request fails (e.g. timeout, 5xx, or malformed response), an empty
set is returned.
"""
@ -554,19 +632,27 @@ class fetch:
async with _models_cache_lock:
if endpoint in _models_cache:
models, cached_at = _models_cache[endpoint]
# FRESH: < 300s old - return immediately
if _is_fresh(cached_at, 300):
return models
# Stale entry - remove it
# STALE: 300-600s old - return stale data and refresh in background
if _is_fresh(cached_at, 600):
asyncio.create_task(fetch._refresh_available_models(endpoint, api_key))
return models # Return stale data immediately
# EXPIRED: > 600s old - too stale, must refresh synchronously
del _models_cache[endpoint]
# Check error cache with lock protection
async with _error_cache_lock:
if endpoint in _error_cache:
if _is_fresh(_error_cache[endpoint], 10):
async with _available_error_cache_lock:
if endpoint in _available_error_cache:
if _is_fresh(_available_error_cache[endpoint], 30):
# Still within the short error TTL pretend nothing is available
return set()
# Error expired remove it
del _error_cache[endpoint]
del _available_error_cache[endpoint]
# Request coalescing: check if another request is already fetching this endpoint
async with _inflight_lock:
@ -593,8 +679,39 @@ class fetch:
"""
Internal function that performs the actual HTTP request to fetch loaded models.
This is called by loaded_models() after checking caches and in-flight requests.
For Ollama endpoints: queries /api/ps and returns model names
For llama-server endpoints: queries /v1/models and filters for status.value == "loaded"
"""
client: aiohttp.ClientSession = app_state["session"]
# Check if this is a llama-server endpoint
if endpoint in config.llama_server_endpoints:
# Query /v1/models for llama-server
try:
async with client.get(f"{endpoint}/models") as resp:
await _ensure_success(resp)
data = await resp.json()
# Filter for loaded models only
items = data.get("data", [])
models = {
item.get("id")
for item in items
if item.get("id") and _is_llama_model_loaded(item)
}
# Update cache with lock protection
async with _loaded_models_cache_lock:
_loaded_models_cache[endpoint] = (models, time.time())
return models
except Exception as e:
# If anything goes wrong we simply assume the endpoint has no models
message = _format_connection_issue(f"{endpoint}/models", e)
print(f"[fetch.loaded_models] {message}")
return set()
else:
# Original Ollama /api/ps logic
try:
async with client.get(f"{endpoint}/api/ps") as resp:
await _ensure_success(resp)
@ -642,7 +759,7 @@ class fetch:
models, cached_at = _loaded_models_cache[endpoint]
# FRESH: < 10s old - return immediately
if _is_fresh(cached_at, 10):
if _is_fresh(cached_at, 30):
return models
# STALE: 10-60s old - return stale data and refresh in background
@ -655,12 +772,12 @@ class fetch:
del _loaded_models_cache[endpoint]
# Check error cache with lock protection
async with _error_cache_lock:
if endpoint in _error_cache:
if _is_fresh(_error_cache[endpoint], 10):
async with _loaded_error_cache_lock:
if endpoint in _loaded_error_cache:
if _is_fresh(_loaded_error_cache[endpoint], 30):
return set()
# Error expired - remove it
del _error_cache[endpoint]
del _loaded_error_cache[endpoint]
# Request coalescing: check if another request is already fetching this endpoint
async with _inflight_lock:
@ -682,11 +799,22 @@ class fetch:
if _inflight_loaded_models.get(endpoint) == task:
_inflight_loaded_models.pop(endpoint, None)
async def endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None) -> List[dict]:
async def endpoint_details(endpoint: str, route: str, detail: str, api_key: Optional[str] = None, skip_error_cache: bool = False) -> List[dict]:
"""
Query <endpoint>/<route> to fetch <detail> and return a List of dicts with details
for the corresponding Ollama endpoint. If the request fails we respond with "N/A" for detail.
When ``skip_error_cache`` is False (the default), the call is short-circuited
if the endpoint recently failed (recorded in ``_available_error_cache``).
Pass ``skip_error_cache=True`` from health-check routes that must always probe.
"""
# Fast-fail if the endpoint is known to be down (unless caller opts out)
if not skip_error_cache:
async with _available_error_cache_lock:
if endpoint in _available_error_cache:
if _is_fresh(_available_error_cache[endpoint], 30):
return []
client: aiohttp.ClientSession = app_state["session"]
headers = None
if api_key is not None:
@ -703,6 +831,9 @@ class fetch:
# If anything goes wrong we cannot reply details
message = _format_connection_issue(request_url, e)
print(f"[fetch.endpoint_details] {message}")
# Record failure so subsequent calls skip this endpoint briefly
async with _available_error_cache_lock:
_available_error_cache[endpoint] = time.time()
return []
def ep2base(ep):
@ -749,13 +880,13 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No
Helper function to make a chat request to a specific endpoint.
Handles endpoint selection, client creation, usage tracking, and request execution.
"""
is_openai_endpoint = "/v1" in endpoint
if is_openai_endpoint:
use_openai = is_openai_compatible(endpoint)
if use_openai:
if ":latest" in model:
model = model.split(":latest")[0]
if messages:
messages = transform_images_to_data_urls(messages)
messages = transform_tool_calls_to_openai(messages)
params = {
"messages": messages,
"model": model,
@ -774,21 +905,23 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No
"response_format": {"type": "json_schema", "json_schema": format} if format is not None else None
}
params.update({k: v for k, v in optional_params.items() if v is not None})
oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint])
oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
else:
client = ollama.AsyncClient(host=endpoint)
await increment_usage(endpoint, model)
try:
if is_openai_endpoint:
if use_openai:
start_ts = time.perf_counter()
response = await oclient.chat.completions.create(**params)
if stream:
# For streaming, we need to collect all chunks
chunks = []
tc_acc = {} # accumulate tool-call deltas
async for chunk in response:
chunks.append(chunk)
_accumulate_openai_tc_delta(chunk, tc_acc)
if chunk.usage is not None:
prompt_tok = chunk.usage.prompt_tokens or 0
comp_tok = chunk.usage.completion_tokens or 0
@ -797,6 +930,9 @@ async def _make_chat_request(endpoint: str, model: str, messages: list, tools=No
# Convert to Ollama format
if chunks:
response = rechunk.openai_chat_completion2ollama(chunks[-1], stream, start_ts)
# Inject fully-accumulated tool calls into the final response
if tc_acc and response.message:
response.message.tool_calls = _build_ollama_tool_calls(tc_acc)
else:
prompt_tok = response.usage.prompt_tokens or 0
comp_tok = response.usage.completion_tokens or 0
@ -950,6 +1086,39 @@ def resize_image_if_needed(image_data):
print(f"Error processing image: {e}")
return None
def transform_tool_calls_to_openai(message_list):
"""
Ensure tool_calls in assistant messages conform to the OpenAI format:
- Each tool call must have "type": "function"
- Each tool call must have an "id"
- arguments must be a JSON string, not a dict
Also ensure tool-role messages have a tool_call_id.
"""
# Track generated IDs so tool-role messages can reference them
last_tool_call_ids = {}
for msg in message_list:
role = msg.get("role")
if role == "assistant" and "tool_calls" in msg:
for tc in msg["tool_calls"]:
if "type" not in tc:
tc["type"] = "function"
if "id" not in tc:
tc["id"] = f"call_{secrets.token_hex(16)}"
func = tc.get("function", {})
if isinstance(func.get("arguments"), dict):
func["arguments"] = orjson.dumps(func["arguments"]).decode("utf-8")
# Remember the id for the following tool-role message
name = func.get("name")
if name:
last_tool_call_ids[name] = tc["id"]
elif role == "tool":
if "tool_call_id" not in msg:
# Try to match by name from a preceding assistant tool_call
name = msg.get("name") or msg.get("tool_name")
if name and name in last_tool_call_ids:
msg["tool_call_id"] = last_tool_call_ids.pop(name)
return message_list
def transform_images_to_data_urls(message_list):
for message in message_list:
if "images" in message:
@ -977,6 +1146,51 @@ def transform_images_to_data_urls(message_list):
return message_list
def _accumulate_openai_tc_delta(chunk, accumulator: dict) -> None:
"""Accumulate tool_call deltas from a single OpenAI streaming chunk.
``accumulator`` is a dict mapping tool-call *index* to
``{"id": str, "name": str, "arguments": str}`` where ``arguments``
is the concatenation of all JSON fragments seen so far.
"""
if not chunk.choices:
return
delta = chunk.choices[0].delta
tc_deltas = getattr(delta, "tool_calls", None)
if not tc_deltas:
return
for tc in tc_deltas:
idx = tc.index
if idx not in accumulator:
accumulator[idx] = {
"id": getattr(tc, "id", None) or f"call_{secrets.token_hex(16)}",
"name": tc.function.name if tc.function else None,
"arguments": "",
}
else:
if getattr(tc, "id", None):
accumulator[idx]["id"] = tc.id
if tc.function and tc.function.name:
accumulator[idx]["name"] = tc.function.name
if tc.function and tc.function.arguments:
accumulator[idx]["arguments"] += tc.function.arguments
def _build_ollama_tool_calls(accumulator: dict) -> list | None:
"""Convert accumulated tool-call data into Ollama-format tool_calls list."""
if not accumulator:
return None
result = []
for idx in sorted(accumulator.keys()):
tc = accumulator[idx]
try:
args = orjson.loads(tc["arguments"]) if tc["arguments"] else {}
except (orjson.JSONDecodeError, TypeError):
args = {}
result.append(ollama.Message.ToolCall(
function=ollama.Message.ToolCall.Function(name=tc["name"], arguments=args)
))
return result
class rechunk:
def openai_chat_completion2ollama(chunk: dict, stream: bool, start_ts: float) -> ollama.ChatResponse:
now = time.perf_counter()
@ -992,24 +1206,41 @@ class rechunk:
prompt_eval_duration=int((now - start_ts) * 1_000_000_000 * (chunk.usage.prompt_tokens / chunk.usage.completion_tokens / 100)),
eval_count=int(chunk.usage.completion_tokens),
eval_duration=int((now - start_ts) * 1_000_000_000),
message={"role": "assistant"}
message=ollama.Message(role="assistant", content=""),
)
with_thinking = chunk.choices[0] if chunk.choices[0] else None
if stream == True:
thinking = getattr(with_thinking.delta, "reasoning", None) if with_thinking else None
thinking = (getattr(with_thinking.delta, "reasoning_content", None) or getattr(with_thinking.delta, "reasoning", None)) if with_thinking else None
role = chunk.choices[0].delta.role or "assistant"
content = chunk.choices[0].delta.content or ''
else:
thinking = getattr(with_thinking, "reasoning", None) if with_thinking else None
thinking = (getattr(with_thinking.message, "reasoning_content", None) or getattr(with_thinking.message, "reasoning", None)) if with_thinking else None
role = chunk.choices[0].message.role or "assistant"
content = chunk.choices[0].message.content or ''
# Convert OpenAI tool_calls to Ollama format
# In streaming mode, tool_calls arrive as partial deltas across multiple chunks
# (name only in first delta, arguments as incremental JSON fragments).
# Callers must accumulate deltas and inject the final result; skip here.
ollama_tool_calls = None
if not stream:
raw_tool_calls = getattr(with_thinking.message, "tool_calls", None) if with_thinking else None
if raw_tool_calls:
ollama_tool_calls = []
for tc in raw_tool_calls:
try:
args = orjson.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else (tc.function.arguments or {})
except (orjson.JSONDecodeError, TypeError):
args = {}
ollama_tool_calls.append(ollama.Message.ToolCall(
function=ollama.Message.ToolCall.Function(name=tc.function.name, arguments=args)
))
assistant_msg = ollama.Message(
role=role,
content=content,
thinking=thinking,
images=None,
tool_name=None,
tool_calls=None)
tool_calls=ollama_tool_calls)
rechunk = ollama.ChatResponse(
model=chunk.model,
created_at=iso8601_ns(),
@ -1134,35 +1365,40 @@ async def choose_endpoint(model: str) -> str:
6 If no endpoint advertises the model at all, raise an error.
"""
# 1⃣ Gather advertisedmodel sets for all endpoints concurrently
tag_tasks = [fetch.available_models(ep) for ep in config.endpoints if "/v1" not in ep]
tag_tasks += [fetch.available_models(ep, config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep]
# Include both config.endpoints and config.llama_server_endpoints
llama_eps_extra = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints]
all_endpoints = config.endpoints + llama_eps_extra
tag_tasks = [fetch.available_models(ep) for ep in config.endpoints if not is_openai_compatible(ep)]
tag_tasks += [fetch.available_models(ep, config.api_keys.get(ep)) for ep in config.endpoints if is_openai_compatible(ep)]
tag_tasks += [fetch.available_models(ep, config.api_keys.get(ep)) for ep in llama_eps_extra]
advertised_sets = await asyncio.gather(*tag_tasks)
# 2⃣ Filter endpoints that advertise the requested model
candidate_endpoints = [
ep for ep, models in zip(config.endpoints, advertised_sets)
ep for ep, models in zip(all_endpoints, advertised_sets)
if model in models
]
# 6
if not candidate_endpoints:
if ":latest" in model: #ollama naming convention not applicable to openai
if ":latest" in model: #ollama naming convention not applicable to openai/llama-server
model_without_latest = model.split(":latest")[0]
candidate_endpoints = [
ep for ep, models in zip(config.endpoints, advertised_sets)
if model_without_latest in models and is_ext_openai_endpoint(ep)
ep for ep, models in zip(all_endpoints, advertised_sets)
if model_without_latest in models and (is_ext_openai_endpoint(ep) or ep in config.llama_server_endpoints)
]
if not candidate_endpoints:
# Only add :latest suffix if model doesn't already have a version suffix
if ":" not in model:
model = model + ":latest"
candidate_endpoints = [
ep for ep, models in zip(config.endpoints, advertised_sets)
ep for ep, models in zip(all_endpoints, advertised_sets)
if model in models
]
if not candidate_endpoints:
raise RuntimeError(
f"None of the configured endpoints ({', '.join(config.endpoints)}) "
f"None of the configured endpoints ({', '.join(all_endpoints)}) "
f"advertise the model '{model}'."
)
# 3⃣ Among the candidates, find those that have the model *loaded*
@ -1210,7 +1446,7 @@ async def choose_endpoint(model: str) -> str:
# Then by total endpoint usage (ascending) to balance idle endpoints
endpoints_with_free_slot.sort(
key=lambda ep: (
-usage_counts.get(ep, {}).get(model, 0), # Primary: per-model usage (descending - prefer endpoints with connections)
#-usage_counts.get(ep, {}).get(model, 0), # Primary: per-model usage (descending - prefer endpoints with connections)
sum(usage_counts.get(ep, {}).values()) # Secondary: total endpoint usage (ascending - prefer idle endpoints)
)
)
@ -1266,9 +1502,8 @@ async def proxy(request: Request):
endpoint = await choose_endpoint(model)
is_openai_endpoint = "/v1" in endpoint
if is_openai_endpoint:
use_openai = is_openai_compatible(endpoint)
if use_openai:
if ":latest" in model:
model = model.split(":latest")
model = model[0]
@ -1289,7 +1524,7 @@ async def proxy(request: Request):
"suffix": suffix,
}
params.update({k: v for k, v in optional_params.items() if v is not None})
oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint])
oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
else:
client = ollama.AsyncClient(host=endpoint)
await increment_usage(endpoint, model)
@ -1297,14 +1532,14 @@ async def proxy(request: Request):
# 4. Async generator that streams data and decrements the counter
async def stream_generate_response():
try:
if is_openai_endpoint:
if use_openai:
start_ts = time.perf_counter()
async_gen = await oclient.completions.create(**params)
else:
async_gen = await client.generate(model=model, prompt=prompt, suffix=suffix, system=system, template=template, context=context, stream=stream, think=think, raw=raw, format=_format, images=images, options=options, keep_alive=keep_alive)
if stream == True:
async for chunk in async_gen:
if is_openai_endpoint:
if use_openai:
chunk = rechunk.openai_completion2ollama(chunk, stream, start_ts)
prompt_tok = chunk.prompt_eval_count or 0
comp_tok = chunk.eval_count or 0
@ -1316,7 +1551,7 @@ async def proxy(request: Request):
json_line = orjson.dumps(chunk)
yield json_line.encode("utf-8") + b"\n"
else:
if is_openai_endpoint:
if use_openai:
response = rechunk.openai_completion2ollama(async_gen, stream, start_ts)
response = response.model_dump_json()
else:
@ -1386,14 +1621,14 @@ async def chat_proxy(request: Request):
else:
opt = False
endpoint = await choose_endpoint(model)
is_openai_endpoint = "/v1" in endpoint
if is_openai_endpoint:
use_openai = is_openai_compatible(endpoint)
if use_openai:
if ":latest" in model:
model = model.split(":latest")
model = model[0]
if messages:
messages = transform_images_to_data_urls(messages)
messages = transform_tool_calls_to_openai(messages)
params = {
"messages": messages,
"model": model,
@ -1412,7 +1647,7 @@ async def chat_proxy(request: Request):
"response_format": {"type": "json_schema", "json_schema": _format} if _format is not None else None
}
params.update({k: v for k, v in optional_params.items() if v is not None})
oclient = openai.AsyncOpenAI(base_url=endpoint, default_headers=default_headers, api_key=config.api_keys[endpoint])
oclient = openai.AsyncOpenAI(base_url=ep2base(endpoint), default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
else:
client = ollama.AsyncClient(host=endpoint)
await increment_usage(endpoint, model)
@ -1420,7 +1655,7 @@ async def chat_proxy(request: Request):
async def stream_chat_response():
try:
# The chat method returns a generator of dicts (or GenerateResponse)
if is_openai_endpoint:
if use_openai:
start_ts = time.perf_counter()
async_gen = await oclient.chat.completions.create(**params)
else:
@ -1430,9 +1665,14 @@ async def chat_proxy(request: Request):
else:
async_gen = await client.chat(model=model, messages=messages, tools=tools, stream=stream, think=think, format=_format, options=options, keep_alive=keep_alive)
if stream == True:
tc_acc = {} # accumulate OpenAI tool-call deltas across chunks
async for chunk in async_gen:
if is_openai_endpoint:
if use_openai:
_accumulate_openai_tc_delta(chunk, tc_acc)
chunk = rechunk.openai_chat_completion2ollama(chunk, stream, start_ts)
# Inject fully-accumulated tool calls only into the final chunk
if chunk.done and tc_acc and chunk.message:
chunk.message.tool_calls = _build_ollama_tool_calls(tc_acc)
# `chunk` can be a dict or a pydantic model dump to JSON safely
prompt_tok = chunk.prompt_eval_count or 0
comp_tok = chunk.eval_count or 0
@ -1444,7 +1684,7 @@ async def chat_proxy(request: Request):
json_line = orjson.dumps(chunk)
yield json_line.encode("utf-8") + b"\n"
else:
if is_openai_endpoint:
if use_openai:
response = rechunk.openai_chat_completion2ollama(async_gen, stream, start_ts)
response = response.model_dump_json()
else:
@ -1503,13 +1743,12 @@ async def embedding_proxy(request: Request):
# 2. Endpoint logic
endpoint = await choose_endpoint(model)
is_openai_endpoint = "/v1" in endpoint
if is_openai_endpoint:
use_openai = is_openai_compatible(endpoint)
if use_openai:
if ":latest" in model:
model = model.split(":latest")
model = model[0]
client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint])
client = openai.AsyncOpenAI(base_url=ep2base(endpoint), api_key=config.api_keys.get(endpoint, "no-key"))
else:
client = ollama.AsyncClient(host=endpoint)
await increment_usage(endpoint, model)
@ -1517,7 +1756,7 @@ async def embedding_proxy(request: Request):
async def stream_embedding_response():
try:
# The chat method returns a generator of dicts (or GenerateResponse)
if is_openai_endpoint:
if use_openai:
async_gen = await client.embeddings.create(input=prompt, model=model)
async_gen = rechunk.openai_embeddings2ollama(async_gen)
else:
@ -1570,13 +1809,12 @@ async def embed_proxy(request: Request):
# 2. Endpoint logic
endpoint = await choose_endpoint(model)
is_openai_endpoint = is_ext_openai_endpoint(endpoint) #"/v1" in endpoint
if is_openai_endpoint:
use_openai = is_openai_compatible(endpoint)
if use_openai:
if ":latest" in model:
model = model.split(":latest")
model = model[0]
client = openai.AsyncOpenAI(base_url=endpoint, api_key=config.api_keys[endpoint])
client = openai.AsyncOpenAI(base_url=ep2base(endpoint), api_key=config.api_keys.get(endpoint, "no-key"))
else:
client = ollama.AsyncClient(host=endpoint)
await increment_usage(endpoint, model)
@ -1584,7 +1822,7 @@ async def embed_proxy(request: Request):
async def stream_embedding_response():
try:
# The chat method returns a generator of dicts (or GenerateResponse)
if is_openai_endpoint:
if use_openai:
async_gen = await client.embeddings.create(input=_input, model=model)
async_gen = rechunk.openai_embed2ollama(async_gen, model)
else:
@ -1983,6 +2221,9 @@ async def tags_proxy(request: Request):
# 1. Query all endpoints for models
tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep]
tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep]
# Also query llama-server endpoints not already covered by config.endpoints
llama_eps_for_tags = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints]
tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep)) for ep in llama_eps_for_tags]
all_models = await asyncio.gather(*tasks)
models = {'models': []}
@ -2010,18 +2251,47 @@ async def tags_proxy(request: Request):
@app.get("/api/ps")
async def ps_proxy(request: Request):
"""
Proxy a ps request to all Ollama endpoints and reply a unique list of all running models.
Proxy a ps request to all Ollama and llama-server endpoints and reply a unique list of all running models.
For Ollama endpoints: queries /api/ps
For llama-server endpoints: queries /v1/models with status.value == "loaded"
"""
# 1. Query all endpoints for running models
tasks = [fetch.endpoint_details(ep, "/api/ps", "models") for ep in config.endpoints if "/v1" not in ep]
loaded_models = await asyncio.gather(*tasks)
# 1. Query Ollama endpoints for running models via /api/ps
ollama_tasks = [fetch.endpoint_details(ep, "/api/ps", "models") for ep in config.endpoints if "/v1" not in ep]
# 2. Query llama-server endpoints for loaded models via /v1/models
# Also query endpoints from llama_server_endpoints that may not be in config.endpoints
all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints)
llama_tasks = [
fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep))
for ep in all_llama_endpoints
]
ollama_loaded = await asyncio.gather(*ollama_tasks) if ollama_tasks else []
llama_loaded = await asyncio.gather(*llama_tasks) if llama_tasks else []
models = {'models': []}
for modellist in loaded_models:
# Add Ollama models (if any)
if ollama_loaded:
for modellist in ollama_loaded:
models['models'] += modellist
# Add llama-server models (filter for loaded only, if any)
if llama_loaded:
for modellist in llama_loaded:
loaded_models = [item for item in modellist if _is_llama_model_loaded(item)]
# Convert llama-server format to Ollama-like format for consistency
for item in loaded_models:
raw_id = item.get("id", "")
normalized = _normalize_llama_model_name(raw_id)
quant = _extract_llama_quant(raw_id)
models['models'].append({
"name": normalized,
"id": normalized,
"digest": "",
"status": item.get("status"),
"details": {"quantization_level": quant} if quant else {}
})
# 2. Return a JSONResponse with deduplicated currently deployed models
# 3. Return a JSONResponse with deduplicated currently deployed models
return JSONResponse(
content={"models": dedupe_on_keys(models['models'], ['digest'])},
status_code=200,
@ -2033,20 +2303,64 @@ async def ps_proxy(request: Request):
@app.get("/api/ps_details")
async def ps_details_proxy(request: Request):
"""
Proxy a ps request to all Ollama endpoints and reply with per-endpoint instances.
Proxy a ps request to all Ollama and llama-server endpoints and reply with per-endpoint instances.
This keeps /api/ps backward compatible while providing richer data.
For Ollama endpoints: queries /api/ps
For llama-server endpoints: queries /v1/models with status info
"""
tasks = [(ep, fetch.endpoint_details(ep, "/api/ps", "models")) for ep in config.endpoints if "/v1" not in ep]
loaded_models = await asyncio.gather(*[task for _, task in tasks])
# 1. Query Ollama endpoints via /api/ps
ollama_tasks = [(ep, fetch.endpoint_details(ep, "/api/ps", "models")) for ep in config.endpoints if "/v1" not in ep]
# 2. Query llama-server endpoints via /v1/models
# Also query endpoints from llama_server_endpoints that may not be in config.endpoints
all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints)
llama_tasks = [
(ep, fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep)))
for ep in all_llama_endpoints
]
ollama_loaded = await asyncio.gather(*[task for _, task in ollama_tasks]) if ollama_tasks else []
llama_loaded = await asyncio.gather(*[task for _, task in llama_tasks]) if llama_tasks else []
models: list[dict] = []
for (endpoint, modellist) in zip([ep for ep, _ in tasks], loaded_models):
# Add Ollama models with endpoint info (if any)
if ollama_loaded:
for (endpoint, modellist) in zip([ep for ep, _ in ollama_tasks], ollama_loaded):
for model in modellist:
if isinstance(model, dict):
model_with_endpoint = dict(model)
model_with_endpoint["endpoint"] = endpoint
models.append(model_with_endpoint)
# Add llama-server models with endpoint info and full status metadata (if any)
if llama_loaded:
for (endpoint, modellist) in zip([ep for ep, _ in llama_tasks], llama_loaded):
# Filter for loaded models only
loaded_models = [item for item in modellist if _is_llama_model_loaded(item)]
for item in loaded_models:
if isinstance(item, dict) and item.get("id"):
raw_id = item["id"]
normalized = _normalize_llama_model_name(raw_id)
quant = _extract_llama_quant(raw_id)
model_with_endpoint = {
"name": normalized,
"id": normalized,
"original_name": raw_id,
"digest": "",
"details": {"quantization_level": quant} if quant else {},
"endpoint": endpoint,
"status": item.get("status"),
"created": item.get("created"),
"owned_by": item.get("owned_by")
}
# Include full llama-server status details (args, preset)
status_info = item.get("status", {})
if isinstance(status_info, dict):
model_with_endpoint["llama_status_args"] = status_info.get("args")
model_with_endpoint["llama_status_preset"] = status_info.get("preset")
models.append(model_with_endpoint)
return JSONResponse(content={"models": models}, status_code=200)
# -------------------------------------------------------------
@ -2068,7 +2382,7 @@ async def usage_proxy(request: Request):
async def config_proxy(request: Request):
"""
Return a simple JSON object that contains the configured
Ollama endpoints. The frontend uses this to display
Ollama endpoints and llama_server_endpoints. The frontend uses this to display
which endpoints are being proxied.
"""
async def check_endpoint(url: str):
@ -2092,9 +2406,17 @@ async def config_proxy(request: Request):
detail = _format_connection_issue(target_url, e)
return {"url": url, "status": "error", "detail": detail}
results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints])
# Check Ollama endpoints
ollama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.endpoints])
# Check llama-server endpoints
llama_results = []
if config.llama_server_endpoints:
llama_results = await asyncio.gather(*[check_endpoint(ep) for ep in config.llama_server_endpoints])
return {
"endpoints": results,
"endpoints": ollama_results,
"llama_server_endpoints": llama_results,
"require_router_api_key": bool(config.router_api_key),
}
@ -2129,8 +2451,8 @@ async def openai_embedding_proxy(request: Request):
# 2. Endpoint logic
endpoint = await choose_endpoint(model)
await increment_usage(endpoint, model)
if "/v1" in endpoint: # and is_ext_openai_endpoint(endpoint):
api_key = config.api_keys[endpoint]
if is_openai_compatible(endpoint):
api_key = config.api_keys.get(endpoint, "no-key")
else:
api_key = "ollama"
base_url = ep2base(endpoint)
@ -2215,8 +2537,7 @@ async def openai_chat_completions_proxy(request: Request):
endpoint = await choose_endpoint(model)
await increment_usage(endpoint, model)
base_url = ep2base(endpoint)
oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys[endpoint])
oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
# 3. Async generator that streams completions data and decrements the counter
async def stream_ochat_response():
try:
@ -2341,8 +2662,7 @@ async def openai_completions_proxy(request: Request):
endpoint = await choose_endpoint(model)
await increment_usage(endpoint, model)
base_url = ep2base(endpoint)
oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys[endpoint])
oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
# 3. Async generator that streams completions data and decrements the counter
async def stream_ocompletions_response(model=model):
@ -2398,22 +2718,46 @@ async def openai_completions_proxy(request: Request):
@app.get("/v1/models")
async def openai_models_proxy(request: Request):
"""
Proxy an OpenAI API models request to Ollama endpoints and reply with a unique list of all models.
Proxy an OpenAI API models request to Ollama and llama-server endpoints and reply with a unique list of models.
For Ollama endpoints: queries /api/tags (all models)
For llama-server endpoints: queries /v1/models and filters for status.value == "loaded"
"""
# 1. Query all endpoints for models
tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep]
tasks += [fetch.endpoint_details(ep, "/models", "data", config.api_keys[ep]) for ep in config.endpoints if "/v1" in ep]
all_models = await asyncio.gather(*tasks)
# 1. Query Ollama endpoints for all models via /api/tags
ollama_tasks = [fetch.endpoint_details(ep, "/api/tags", "models") for ep in config.endpoints if "/v1" not in ep]
# 2. Query llama-server endpoints for loaded models via /v1/models
# Also query endpoints from llama_server_endpoints that may not be in config.endpoints
all_llama_endpoints = set(config.llama_server_endpoints) | set(ep for ep in config.endpoints if ep in config.llama_server_endpoints)
llama_tasks = [
fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep))
for ep in all_llama_endpoints
]
ollama_models = await asyncio.gather(*ollama_tasks) if ollama_tasks else []
llama_models = await asyncio.gather(*llama_tasks) if llama_tasks else []
models = {'data': []}
for modellist in all_models:
# Add Ollama models (if any)
if ollama_models:
for modellist in ollama_models:
for model in modellist:
if not "id" in model.keys(): # Relable Ollama models with OpenAI Model.id from Model.name
model['id'] = model['name']
model['id'] = model.get('name', model.get('id', ''))
else:
model['name'] = model['id']
models['data'] += modellist
models['data'].append(model)
# Add llama-server models (filter for loaded only, if any)
if llama_models:
for modellist in llama_models:
loaded_models = [item for item in modellist if _is_llama_model_loaded(item)]
for model in loaded_models:
if not "id" in model.keys():
model['id'] = model.get('name', model.get('id', ''))
else:
model['name'] = model['id']
models['data'].append(model)
# 2. Return a JSONResponse with a deduplicated list of unique models for inference
return JSONResponse(
@ -2459,7 +2803,7 @@ async def health_proxy(request: Request):
* The HTTP status code is 200 when everything is healthy, 503 otherwise.
"""
# Run all health checks in parallel
tasks = [fetch.endpoint_details(ep, "/api/version", "version") for ep in config.endpoints] # if not is_ext_openai_endpoint(ep)]
tasks = [fetch.endpoint_details(ep, "/api/version", "version", skip_error_cache=True) for ep in config.endpoints] # if not is_ext_openai_endpoint(ep)]
results = await asyncio.gather(*tasks, return_exceptions=True)
@ -2524,6 +2868,7 @@ async def startup_event() -> None:
print(
f"Loaded configuration from {config_path}:\n"
f" endpoints={config.endpoints},\n"
f" llama_server_endpoints={config.llama_server_endpoints},\n"
f" max_concurrent_connections={config.max_concurrent_connections}"
)
else:

View file

@ -379,7 +379,7 @@
<th>Quant</th>
<th>Ctx</th>
<th>Size</th>
<th>Until</th>
<th>Unload</th>
<th>Digest</th>
<th>Tokens</th>
</tr>
@ -683,7 +683,12 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
showApiKeyModal("Enter the NOMYO Router API key to load the dashboard.");
}
const body = document.getElementById("endpoints-body");
body.innerHTML = data.endpoints
// Build HTML for both endpoints and llama_server_endpoints
let html = "";
// Add Ollama endpoints
html += data.endpoints
.map((e) => {
const statusClass =
e.status === "ok"
@ -698,6 +703,27 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
</tr>`;
})
.join("");
// Add llama-server endpoints
if (data.llama_server_endpoints && data.llama_server_endpoints.length > 0) {
html += data.llama_server_endpoints
.map((e) => {
const statusClass =
e.status === "ok"
? "status-ok"
: "status-error";
const version = e.version || "N/A";
return `
<tr>
<td class="endpoint">${e.url}</td>
<td class="status ${statusClass}">${e.status}</td>
<td class="version">${version}</td>
</tr>`;
})
.join("");
}
body.innerHTML = html;
} catch (e) {
console.error(e);
const body = document.getElementById("endpoints-body");
@ -837,7 +863,7 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
const formatUntil = (value) => {
if (value === null || value === undefined || value === "") {
return "Forever";
return "";
}
let targetTime;
@ -898,10 +924,11 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
const params = modelInstances[0]?.details?.parameter_size ?? "";
const quant = modelInstances[0]?.details?.quantization_level ?? "";
const ctx = modelInstances[0]?.context_length ?? "";
const originalName = modelInstances[0]?.original_name || modelName;
const uniqueEndpoints = Array.from(new Set(endpoints));
const endpointsData = encodeURIComponent(JSON.stringify(uniqueEndpoints));
return `<tr data-model="${modelName}" data-endpoints="${endpointsData}">
<td class="model">${modelName} <a href="#" class="stats-link" data-model="${modelName}">stats</a></td>
<td class="model">${modelName} <a href="#" class="stats-link" data-model="${originalName}">stats</a></td>
<td>${renderInstanceList(endpoints)}</td>
<td>${params}</td>
<td>${quant}</td>