Merge pull request 'dev-v0.7.x -> prod' (#19) from dev-v0.7.x into main
All checks were successful
Build and Publish Docker Image (Semantic Cache) / build (amd64, linux/amd64, docker-amd64) (push) Successful in 37s
Build and Publish Docker Image / build (amd64, linux/amd64, docker-amd64) (push) Successful in 35s
Build and Publish Docker Image (Semantic Cache) / build (arm64, linux/arm64, docker-arm64) (push) Successful in 10m8s
Build and Publish Docker Image (Semantic Cache) / merge (push) Successful in 34s
Build and Publish Docker Image / build (arm64, linux/arm64, docker-arm64) (push) Successful in 10m4s
Build and Publish Docker Image / merge (push) Successful in 34s

Reviewed-on: https://bitfreedom.net/code/code/nomyo-ai/nomyo-router/pulls/19
This commit is contained in:
Alpha Nerd 2026-04-07 16:25:56 +02:00
commit 4086b1eab8
4 changed files with 54 additions and 24 deletions

View file

@ -145,3 +145,13 @@ jobs:
$(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \ $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \
${{ env.REGISTRY }}/${{ env.CACHE_IMAGE }}:platform-semantic-amd64 \ ${{ env.REGISTRY }}/${{ env.CACHE_IMAGE }}:platform-semantic-amd64 \
${{ env.REGISTRY }}/${{ env.CACHE_IMAGE }}:platform-semantic-arm64 ${{ env.REGISTRY }}/${{ env.CACHE_IMAGE }}:platform-semantic-arm64
- name: Delete intermediate platform tags
run: |
CACHE_ENCODED=$(echo "${{ env.CACHE_IMAGE }}" | sed 's|/|%2F|g')
for tag in platform-semantic-amd64 platform-semantic-arm64; do
curl -s -X DELETE \
-H "Authorization: token ${{ secrets.REGISTRY_TOKEN }}" \
"https://${{ env.REGISTRY }}/api/v1/packages/${{ github.repository_owner }}/container/${CACHE_ENCODED}/${tag}" \
&& echo "Deleted ${tag}" || echo "Failed to delete ${tag} (ignored)"
done

View file

@ -77,15 +77,17 @@ uvicorn router:app --host 127.0.0.1 --port 12434 --loop uvloop
Pre-built multi-arch images (`linux/amd64`, `linux/arm64`) are published automatically on every release. Pre-built multi-arch images (`linux/amd64`, `linux/arm64`) are published automatically on every release.
**Lean image** (exact-match cache, ~300 MB): **Lean image** (exact-match cache, ~300 MB):
```sh ```sh
docker pull ghcr.io/nomyo-ai/nomyo-router:latest docker pull bitfreedom.net/nomyo-ai/nomyo-router:latest
docker pull ghcr.io/nomyo-ai/nomyo-router:0.7.0 docker pull bitfreedom.net/nomyo-ai/nomyo-router:v0.7.0
``` ```
**Semantic image** (semantic cache with `all-MiniLM-L6-v2` pre-baked, ~800 MB): **Semantic image** (semantic cache with `all-MiniLM-L6-v2` pre-baked, ~800 MB):
```sh ```sh
docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic docker pull bitfreedom.net/nomyo-ai/nomyo-router:latest-semantic
docker pull ghcr.io/nomyo-ai/nomyo-router:0.7.0-semantic docker pull bitfreedom.net/nomyo-ai/nomyo-router:0.7.0-semantic
``` ```
### Build the container image locally ### Build the container image locally
@ -155,6 +157,7 @@ cache_history_weight: 0.3
``` ```
Pull the semantic image: Pull the semantic image:
```bash ```bash
docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic
``` ```
@ -162,6 +165,7 @@ docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic
### Cache key strategy ### Cache key strategy
Each request is keyed on `model + system_prompt` (exact) combined with a weighted-mean embedding of BM25-weighted chat history (30%) and the last user message (70%). This means: Each request is keyed on `model + system_prompt` (exact) combined with a weighted-mean embedding of BM25-weighted chat history (30%) and the last user message (70%). This means:
- Different system prompts → always separate cache namespaces (no cross-tenant leakage) - Different system prompts → always separate cache namespaces (no cross-tenant leakage)
- Same question, different phrasing → cache hit (semantic mode) - Same question, different phrasing → cache hit (semantic mode)
- MOE requests (`moe-*`) → always bypass the cache - MOE requests (`moe-*`) → always bypass the cache

View file

@ -1,7 +1,6 @@
aiohappyeyeballs==2.6.1 aiohappyeyeballs==2.6.1
aiohttp==3.13.3 aiohttp==3.13.4
aiosignal==1.4.0 aiosignal==1.4.0
annotated-doc==0.0.3
annotated-types==0.7.0 annotated-types==0.7.0
anyio==4.10.0 anyio==4.10.0
async-timeout==5.0.1 async-timeout==5.0.1
@ -22,6 +21,7 @@ multidict==6.6.4
ollama==0.6.1 ollama==0.6.1
openai==1.102.0 openai==1.102.0
orjson>=3.11.5 orjson>=3.11.5
numpy>=1.26
pillow==12.1.1 pillow==12.1.1
propcache==0.3.2 propcache==0.3.2
pydantic==2.11.7 pydantic==2.11.7

View file

@ -590,7 +590,8 @@ async def token_worker() -> None:
# Update in-memory counts for immediate reporting # Update in-memory counts for immediate reporting
async with token_usage_lock: async with token_usage_lock:
token_usage_counts[endpoint][model] += (prompt + comp) token_usage_counts[endpoint][model] += (prompt + comp)
await publish_snapshot() snapshot = _capture_snapshot()
await _distribute_snapshot(snapshot)
except asyncio.CancelledError: except asyncio.CancelledError:
# Gracefully handle task cancellation during shutdown # Gracefully handle task cancellation during shutdown
print("[token_worker] Task cancelled, processing remaining queue items...") print("[token_worker] Task cancelled, processing remaining queue items...")
@ -617,7 +618,8 @@ async def token_worker() -> None:
}) })
async with token_usage_lock: async with token_usage_lock:
token_usage_counts[endpoint][model] += (prompt + comp) token_usage_counts[endpoint][model] += (prompt + comp)
await publish_snapshot() snapshot = _capture_snapshot()
await _distribute_snapshot(snapshot)
except asyncio.QueueEmpty: except asyncio.QueueEmpty:
break break
print("[token_worker] Task cancelled, remaining items processed.") print("[token_worker] Task cancelled, remaining items processed.")
@ -1007,9 +1009,6 @@ class fetch:
# If anything goes wrong we cannot reply details # If anything goes wrong we cannot reply details
message = _format_connection_issue(request_url, e) message = _format_connection_issue(request_url, e)
print(f"[fetch.endpoint_details] {message}") print(f"[fetch.endpoint_details] {message}")
# Record failure so subsequent calls skip this endpoint briefly
async with _available_error_cache_lock:
_available_error_cache[endpoint] = time.time()
return [] return []
def ep2base(ep): def ep2base(ep):
@ -1036,7 +1035,8 @@ def dedupe_on_keys(dicts, key_fields):
async def increment_usage(endpoint: str, model: str) -> None: async def increment_usage(endpoint: str, model: str) -> None:
async with usage_lock: async with usage_lock:
usage_counts[endpoint][model] += 1 usage_counts[endpoint][model] += 1
await publish_snapshot() snapshot = _capture_snapshot()
await _distribute_snapshot(snapshot)
async def decrement_usage(endpoint: str, model: str) -> None: async def decrement_usage(endpoint: str, model: str) -> None:
async with usage_lock: async with usage_lock:
@ -1049,7 +1049,8 @@ async def decrement_usage(endpoint: str, model: str) -> None:
usage_counts[endpoint].pop(model, None) usage_counts[endpoint].pop(model, None)
#if not usage_counts[endpoint]: #if not usage_counts[endpoint]:
# usage_counts.pop(endpoint, None) # usage_counts.pop(endpoint, None)
await publish_snapshot() snapshot = _capture_snapshot()
await _distribute_snapshot(snapshot)
async def _make_chat_request(model: str, messages: list, tools=None, stream: bool = False, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse: async def _make_chat_request(model: str, messages: list, tools=None, stream: bool = False, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse:
""" """
@ -1062,8 +1063,10 @@ async def _make_chat_request(model: str, messages: list, tools=None, stream: boo
if ":latest" in model: if ":latest" in model:
model = model.split(":latest")[0] model = model.split(":latest")[0]
if messages: if messages:
messages = transform_images_to_data_urls(messages) if any("images" in m for m in messages):
messages = await asyncio.to_thread(transform_images_to_data_urls, messages)
messages = transform_tool_calls_to_openai(messages) messages = transform_tool_calls_to_openai(messages)
messages = _strip_assistant_prefill(messages)
params = { params = {
"messages": messages, "messages": messages,
"model": model, "model": model,
@ -1295,6 +1298,14 @@ def resize_image_if_needed(image_data):
print(f"Error processing image: {e}") print(f"Error processing image: {e}")
return None return None
def _strip_assistant_prefill(messages: list) -> list:
"""Remove a trailing assistant message used as prefill.
OpenAI-compatible endpoints (including Claude) do not support prefill and
will reject requests where the last message has role 'assistant'."""
if messages and messages[-1].get("role") == "assistant":
return messages[:-1]
return messages
def transform_tool_calls_to_openai(message_list): def transform_tool_calls_to_openai(message_list):
""" """
Ensure tool_calls in assistant messages conform to the OpenAI format: Ensure tool_calls in assistant messages conform to the OpenAI format:
@ -1573,18 +1584,17 @@ class rechunk:
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# SSE Helpser # SSE Helpser
# ------------------------------------------------------------------ # ------------------------------------------------------------------
async def publish_snapshot(): def _capture_snapshot() -> str:
# NOTE: This function assumes usage_lock OR token_usage_lock is already held by the caller """Capture current usage counts as a JSON string. Caller must hold at least one of usage_lock/token_usage_lock."""
# Create a snapshot without acquiring the lock (caller must hold it) return orjson.dumps({
snapshot = orjson.dumps({ "usage_counts": dict(usage_counts),
"usage_counts": dict(usage_counts), # Create a copy
"token_usage_counts": dict(token_usage_counts) "token_usage_counts": dict(token_usage_counts)
}, option=orjson.OPT_SORT_KEYS).decode("utf-8") }, option=orjson.OPT_SORT_KEYS).decode("utf-8")
# Distribute the snapshot (no lock needed here since we have a copy) async def _distribute_snapshot(snapshot: str) -> None:
"""Push a pre-captured snapshot to all SSE subscribers. Must be called outside any usage lock."""
async with _subscribers_lock: async with _subscribers_lock:
for q in _subscribers: for q in _subscribers:
# If the queue is full, drop the message to avoid backpressure.
if q.full(): if q.full():
try: try:
await q.get() await q.get()
@ -1729,10 +1739,13 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]:
selected = min(candidate_endpoints, key=tracking_usage) selected = min(candidate_endpoints, key=tracking_usage)
tracking_model = get_tracking_model(selected, model) tracking_model = get_tracking_model(selected, model)
snapshot = None
if reserve: if reserve:
usage_counts[selected][tracking_model] += 1 usage_counts[selected][tracking_model] += 1
await publish_snapshot() snapshot = _capture_snapshot()
return selected, tracking_model if snapshot is not None:
await _distribute_snapshot(snapshot)
return selected, tracking_model
# ------------------------------------------------------------- # -------------------------------------------------------------
# 6. API route Generate # 6. API route Generate
@ -1959,8 +1972,10 @@ async def chat_proxy(request: Request):
model = model.split(":latest") model = model.split(":latest")
model = model[0] model = model[0]
if messages: if messages:
messages = transform_images_to_data_urls(messages) if any("images" in m for m in messages):
messages = await asyncio.to_thread(transform_images_to_data_urls, messages)
messages = transform_tool_calls_to_openai(messages) messages = transform_tool_calls_to_openai(messages)
messages = _strip_assistant_prefill(messages)
params = { params = {
"messages": messages, "messages": messages,
"model": model, "model": model,
@ -3027,6 +3042,7 @@ async def openai_chat_completions_proxy(request: Request):
model = model.split(":latest") model = model.split(":latest")
model = model[0] model = model[0]
messages = _strip_assistant_prefill(messages)
params = { params = {
"messages": messages, "messages": messages,
"model": model, "model": model,