From 28afa4e9c04adcb1985da7c4ca9cc5805eddaecd Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Mon, 6 Apr 2026 11:32:47 +0200 Subject: [PATCH 1/7] fix: missing requirement fix: strip assistant prefill when ollama -> openai translaton + openai guard --- requirements.txt | 2 +- router.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index a3befbe..222dfc8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ aiohappyeyeballs==2.6.1 aiohttp==3.13.3 aiosignal==1.4.0 -annotated-doc==0.0.3 annotated-types==0.7.0 anyio==4.10.0 async-timeout==5.0.1 @@ -22,6 +21,7 @@ multidict==6.6.4 ollama==0.6.1 openai==1.102.0 orjson>=3.11.5 +numpy>=1.26 pillow==12.1.1 propcache==0.3.2 pydantic==2.11.7 diff --git a/router.py b/router.py index a7f6a75..047bc57 100644 --- a/router.py +++ b/router.py @@ -1064,6 +1064,7 @@ async def _make_chat_request(model: str, messages: list, tools=None, stream: boo if messages: messages = transform_images_to_data_urls(messages) messages = transform_tool_calls_to_openai(messages) + messages = _strip_assistant_prefill(messages) params = { "messages": messages, "model": model, @@ -1295,6 +1296,14 @@ def resize_image_if_needed(image_data): print(f"Error processing image: {e}") return None +def _strip_assistant_prefill(messages: list) -> list: + """Remove a trailing assistant message used as prefill. + OpenAI-compatible endpoints (including Claude) do not support prefill and + will reject requests where the last message has role 'assistant'.""" + if messages and messages[-1].get("role") == "assistant": + return messages[:-1] + return messages + def transform_tool_calls_to_openai(message_list): """ Ensure tool_calls in assistant messages conform to the OpenAI format: @@ -1961,6 +1970,7 @@ async def chat_proxy(request: Request): if messages: messages = transform_images_to_data_urls(messages) messages = transform_tool_calls_to_openai(messages) + messages = _strip_assistant_prefill(messages) params = { "messages": messages, "model": model, @@ -3027,6 +3037,7 @@ async def openai_chat_completions_proxy(request: Request): model = model.split(":latest") model = model[0] + messages = _strip_assistant_prefill(messages) params = { "messages": messages, "model": model, From 6bc4157d65c8ccf9e5a85322b17784ce5ac32da6 Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Mon, 6 Apr 2026 15:13:01 +0200 Subject: [PATCH 2/7] feat: add workflow cleanup --- .forgejo/workflows/docker-publish-semantic.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.forgejo/workflows/docker-publish-semantic.yml b/.forgejo/workflows/docker-publish-semantic.yml index 56631b9..2174f21 100644 --- a/.forgejo/workflows/docker-publish-semantic.yml +++ b/.forgejo/workflows/docker-publish-semantic.yml @@ -145,3 +145,13 @@ jobs: $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \ ${{ env.REGISTRY }}/${{ env.CACHE_IMAGE }}:platform-semantic-amd64 \ ${{ env.REGISTRY }}/${{ env.CACHE_IMAGE }}:platform-semantic-arm64 + + - name: Delete intermediate platform tags + run: | + CACHE_ENCODED=$(echo "${{ env.CACHE_IMAGE }}" | sed 's|/|%2F|g') + for tag in platform-semantic-amd64 platform-semantic-arm64; do + curl -s -X DELETE \ + -H "Authorization: token ${{ secrets.REGISTRY_TOKEN }}" \ + "https://${{ env.REGISTRY }}/api/v1/packages/${{ github.repository_owner }}/container/${CACHE_ENCODED}/${tag}" \ + && echo "Deleted ${tag}" || echo "Failed to delete ${tag} (ignored)" + done From e912b71b5c83a86a512b724e73524797e2b3a9b9 Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Tue, 7 Apr 2026 09:14:00 +0200 Subject: [PATCH 3/7] doc: update readme --- README.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 0a3b341..337b4c3 100644 --- a/README.md +++ b/README.md @@ -77,15 +77,17 @@ uvicorn router:app --host 127.0.0.1 --port 12434 --loop uvloop Pre-built multi-arch images (`linux/amd64`, `linux/arm64`) are published automatically on every release. **Lean image** (exact-match cache, ~300 MB): + ```sh -docker pull ghcr.io/nomyo-ai/nomyo-router:latest -docker pull ghcr.io/nomyo-ai/nomyo-router:0.7.0 +docker pull bitfreedom.net/nomyo-ai/nomyo-router:latest +docker pull bitfreedom.net/nomyo-ai/nomyo-router:v0.7.0 ``` **Semantic image** (semantic cache with `all-MiniLM-L6-v2` pre-baked, ~800 MB): + ```sh -docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic -docker pull ghcr.io/nomyo-ai/nomyo-router:0.7.0-semantic +docker pull bitfreedom.net/nomyo-ai/nomyo-router:latest-semantic +docker pull bitfreedom.net/nomyo-ai/nomyo-router:0.7.0-semantic ``` ### Build the container image locally @@ -155,6 +157,7 @@ cache_history_weight: 0.3 ``` Pull the semantic image: + ```bash docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic ``` @@ -162,6 +165,7 @@ docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic ### Cache key strategy Each request is keyed on `model + system_prompt` (exact) combined with a weighted-mean embedding of BM25-weighted chat history (30%) and the last user message (70%). This means: + - Different system prompts → always separate cache namespaces (no cross-tenant leakage) - Same question, different phrasing → cache hit (semantic mode) - MOE requests (`moe-*`) → always bypass the cache From 5170162a80e39b7f0ba34cd41c5e46078248f8c7 Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Tue, 7 Apr 2026 09:18:12 +0200 Subject: [PATCH 4/7] fix: make image transform non-blocking --- router.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/router.py b/router.py index 047bc57..9785401 100644 --- a/router.py +++ b/router.py @@ -1062,7 +1062,7 @@ async def _make_chat_request(model: str, messages: list, tools=None, stream: boo if ":latest" in model: model = model.split(":latest")[0] if messages: - messages = transform_images_to_data_urls(messages) + messages = await asyncio.to_thread(transform_images_to_data_urls, messages) messages = transform_tool_calls_to_openai(messages) messages = _strip_assistant_prefill(messages) params = { @@ -1968,7 +1968,7 @@ async def chat_proxy(request: Request): model = model.split(":latest") model = model[0] if messages: - messages = transform_images_to_data_urls(messages) + messages = await asyncio.to_thread(transform_images_to_data_urls, messages) messages = transform_tool_calls_to_openai(messages) messages = _strip_assistant_prefill(messages) params = { From 81013ec3b16831cb66e876e3e95b0fe78e7ae7b9 Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Tue, 7 Apr 2026 09:32:53 +0200 Subject: [PATCH 5/7] fix: available_error_cache poisoning --- router.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/router.py b/router.py index 9785401..08a2e18 100644 --- a/router.py +++ b/router.py @@ -1007,9 +1007,6 @@ class fetch: # If anything goes wrong we cannot reply details message = _format_connection_issue(request_url, e) print(f"[fetch.endpoint_details] {message}") - # Record failure so subsequent calls skip this endpoint briefly - async with _available_error_cache_lock: - _available_error_cache[endpoint] = time.time() return [] def ep2base(ep): From 2c87472483ff2a6f1c72d310a0249497834fccc1 Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Tue, 7 Apr 2026 13:28:34 +0200 Subject: [PATCH 6/7] fix: conditional to_thread for the image_transform to relieve threadpool pressure --- router.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/router.py b/router.py index 08a2e18..94cf1c2 100644 --- a/router.py +++ b/router.py @@ -1059,7 +1059,8 @@ async def _make_chat_request(model: str, messages: list, tools=None, stream: boo if ":latest" in model: model = model.split(":latest")[0] if messages: - messages = await asyncio.to_thread(transform_images_to_data_urls, messages) + if any("images" in m for m in messages): + messages = await asyncio.to_thread(transform_images_to_data_urls, messages) messages = transform_tool_calls_to_openai(messages) messages = _strip_assistant_prefill(messages) params = { @@ -1965,7 +1966,8 @@ async def chat_proxy(request: Request): model = model.split(":latest") model = model[0] if messages: - messages = await asyncio.to_thread(transform_images_to_data_urls, messages) + if any("images" in m for m in messages): + messages = await asyncio.to_thread(transform_images_to_data_urls, messages) messages = transform_tool_calls_to_openai(messages) messages = _strip_assistant_prefill(messages) params = { From e7cd8d4d68827d5a0a5427fc21795da62101bfbe Mon Sep 17 00:00:00 2001 From: alpha nerd Date: Tue, 7 Apr 2026 15:30:52 +0200 Subject: [PATCH 7/7] fix: usage locks now release before the subscriber queue awaits --- requirements.txt | 2 +- router.py | 32 +++++++++++++++++++------------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/requirements.txt b/requirements.txt index 222dfc8..2db1ba4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ aiohappyeyeballs==2.6.1 -aiohttp==3.13.3 +aiohttp==3.13.4 aiosignal==1.4.0 annotated-types==0.7.0 anyio==4.10.0 diff --git a/router.py b/router.py index 94cf1c2..c87c5ca 100644 --- a/router.py +++ b/router.py @@ -590,7 +590,8 @@ async def token_worker() -> None: # Update in-memory counts for immediate reporting async with token_usage_lock: token_usage_counts[endpoint][model] += (prompt + comp) - await publish_snapshot() + snapshot = _capture_snapshot() + await _distribute_snapshot(snapshot) except asyncio.CancelledError: # Gracefully handle task cancellation during shutdown print("[token_worker] Task cancelled, processing remaining queue items...") @@ -617,7 +618,8 @@ async def token_worker() -> None: }) async with token_usage_lock: token_usage_counts[endpoint][model] += (prompt + comp) - await publish_snapshot() + snapshot = _capture_snapshot() + await _distribute_snapshot(snapshot) except asyncio.QueueEmpty: break print("[token_worker] Task cancelled, remaining items processed.") @@ -1033,7 +1035,8 @@ def dedupe_on_keys(dicts, key_fields): async def increment_usage(endpoint: str, model: str) -> None: async with usage_lock: usage_counts[endpoint][model] += 1 - await publish_snapshot() + snapshot = _capture_snapshot() + await _distribute_snapshot(snapshot) async def decrement_usage(endpoint: str, model: str) -> None: async with usage_lock: @@ -1046,7 +1049,8 @@ async def decrement_usage(endpoint: str, model: str) -> None: usage_counts[endpoint].pop(model, None) #if not usage_counts[endpoint]: # usage_counts.pop(endpoint, None) - await publish_snapshot() + snapshot = _capture_snapshot() + await _distribute_snapshot(snapshot) async def _make_chat_request(model: str, messages: list, tools=None, stream: bool = False, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse: """ @@ -1580,18 +1584,17 @@ class rechunk: # ------------------------------------------------------------------ # SSE Helpser # ------------------------------------------------------------------ -async def publish_snapshot(): - # NOTE: This function assumes usage_lock OR token_usage_lock is already held by the caller - # Create a snapshot without acquiring the lock (caller must hold it) - snapshot = orjson.dumps({ - "usage_counts": dict(usage_counts), # Create a copy +def _capture_snapshot() -> str: + """Capture current usage counts as a JSON string. Caller must hold at least one of usage_lock/token_usage_lock.""" + return orjson.dumps({ + "usage_counts": dict(usage_counts), "token_usage_counts": dict(token_usage_counts) }, option=orjson.OPT_SORT_KEYS).decode("utf-8") - # Distribute the snapshot (no lock needed here since we have a copy) +async def _distribute_snapshot(snapshot: str) -> None: + """Push a pre-captured snapshot to all SSE subscribers. Must be called outside any usage lock.""" async with _subscribers_lock: for q in _subscribers: - # If the queue is full, drop the message to avoid back‑pressure. if q.full(): try: await q.get() @@ -1736,10 +1739,13 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: selected = min(candidate_endpoints, key=tracking_usage) tracking_model = get_tracking_model(selected, model) + snapshot = None if reserve: usage_counts[selected][tracking_model] += 1 - await publish_snapshot() - return selected, tracking_model + snapshot = _capture_snapshot() + if snapshot is not None: + await _distribute_snapshot(snapshot) + return selected, tracking_model # ------------------------------------------------------------- # 6. API route – Generate