diff --git a/.forgejo/workflows/docker-publish-semantic.yml b/.forgejo/workflows/docker-publish-semantic.yml index 56631b9..2174f21 100644 --- a/.forgejo/workflows/docker-publish-semantic.yml +++ b/.forgejo/workflows/docker-publish-semantic.yml @@ -145,3 +145,13 @@ jobs: $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \ ${{ env.REGISTRY }}/${{ env.CACHE_IMAGE }}:platform-semantic-amd64 \ ${{ env.REGISTRY }}/${{ env.CACHE_IMAGE }}:platform-semantic-arm64 + + - name: Delete intermediate platform tags + run: | + CACHE_ENCODED=$(echo "${{ env.CACHE_IMAGE }}" | sed 's|/|%2F|g') + for tag in platform-semantic-amd64 platform-semantic-arm64; do + curl -s -X DELETE \ + -H "Authorization: token ${{ secrets.REGISTRY_TOKEN }}" \ + "https://${{ env.REGISTRY }}/api/v1/packages/${{ github.repository_owner }}/container/${CACHE_ENCODED}/${tag}" \ + && echo "Deleted ${tag}" || echo "Failed to delete ${tag} (ignored)" + done diff --git a/README.md b/README.md index 0a3b341..337b4c3 100644 --- a/README.md +++ b/README.md @@ -77,15 +77,17 @@ uvicorn router:app --host 127.0.0.1 --port 12434 --loop uvloop Pre-built multi-arch images (`linux/amd64`, `linux/arm64`) are published automatically on every release. **Lean image** (exact-match cache, ~300 MB): + ```sh -docker pull ghcr.io/nomyo-ai/nomyo-router:latest -docker pull ghcr.io/nomyo-ai/nomyo-router:0.7.0 +docker pull bitfreedom.net/nomyo-ai/nomyo-router:latest +docker pull bitfreedom.net/nomyo-ai/nomyo-router:v0.7.0 ``` **Semantic image** (semantic cache with `all-MiniLM-L6-v2` pre-baked, ~800 MB): + ```sh -docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic -docker pull ghcr.io/nomyo-ai/nomyo-router:0.7.0-semantic +docker pull bitfreedom.net/nomyo-ai/nomyo-router:latest-semantic +docker pull bitfreedom.net/nomyo-ai/nomyo-router:0.7.0-semantic ``` ### Build the container image locally @@ -155,6 +157,7 @@ cache_history_weight: 0.3 ``` Pull the semantic image: + ```bash docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic ``` @@ -162,6 +165,7 @@ docker pull ghcr.io/nomyo-ai/nomyo-router:latest-semantic ### Cache key strategy Each request is keyed on `model + system_prompt` (exact) combined with a weighted-mean embedding of BM25-weighted chat history (30%) and the last user message (70%). This means: + - Different system prompts → always separate cache namespaces (no cross-tenant leakage) - Same question, different phrasing → cache hit (semantic mode) - MOE requests (`moe-*`) → always bypass the cache diff --git a/requirements.txt b/requirements.txt index a3befbe..2db1ba4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ aiohappyeyeballs==2.6.1 -aiohttp==3.13.3 +aiohttp==3.13.4 aiosignal==1.4.0 -annotated-doc==0.0.3 annotated-types==0.7.0 anyio==4.10.0 async-timeout==5.0.1 @@ -22,6 +21,7 @@ multidict==6.6.4 ollama==0.6.1 openai==1.102.0 orjson>=3.11.5 +numpy>=1.26 pillow==12.1.1 propcache==0.3.2 pydantic==2.11.7 diff --git a/router.py b/router.py index a7f6a75..c87c5ca 100644 --- a/router.py +++ b/router.py @@ -590,7 +590,8 @@ async def token_worker() -> None: # Update in-memory counts for immediate reporting async with token_usage_lock: token_usage_counts[endpoint][model] += (prompt + comp) - await publish_snapshot() + snapshot = _capture_snapshot() + await _distribute_snapshot(snapshot) except asyncio.CancelledError: # Gracefully handle task cancellation during shutdown print("[token_worker] Task cancelled, processing remaining queue items...") @@ -617,7 +618,8 @@ async def token_worker() -> None: }) async with token_usage_lock: token_usage_counts[endpoint][model] += (prompt + comp) - await publish_snapshot() + snapshot = _capture_snapshot() + await _distribute_snapshot(snapshot) except asyncio.QueueEmpty: break print("[token_worker] Task cancelled, remaining items processed.") @@ -1007,9 +1009,6 @@ class fetch: # If anything goes wrong we cannot reply details message = _format_connection_issue(request_url, e) print(f"[fetch.endpoint_details] {message}") - # Record failure so subsequent calls skip this endpoint briefly - async with _available_error_cache_lock: - _available_error_cache[endpoint] = time.time() return [] def ep2base(ep): @@ -1036,7 +1035,8 @@ def dedupe_on_keys(dicts, key_fields): async def increment_usage(endpoint: str, model: str) -> None: async with usage_lock: usage_counts[endpoint][model] += 1 - await publish_snapshot() + snapshot = _capture_snapshot() + await _distribute_snapshot(snapshot) async def decrement_usage(endpoint: str, model: str) -> None: async with usage_lock: @@ -1049,7 +1049,8 @@ async def decrement_usage(endpoint: str, model: str) -> None: usage_counts[endpoint].pop(model, None) #if not usage_counts[endpoint]: # usage_counts.pop(endpoint, None) - await publish_snapshot() + snapshot = _capture_snapshot() + await _distribute_snapshot(snapshot) async def _make_chat_request(model: str, messages: list, tools=None, stream: bool = False, think: bool = False, format=None, options=None, keep_alive: str = None) -> ollama.ChatResponse: """ @@ -1062,8 +1063,10 @@ async def _make_chat_request(model: str, messages: list, tools=None, stream: boo if ":latest" in model: model = model.split(":latest")[0] if messages: - messages = transform_images_to_data_urls(messages) + if any("images" in m for m in messages): + messages = await asyncio.to_thread(transform_images_to_data_urls, messages) messages = transform_tool_calls_to_openai(messages) + messages = _strip_assistant_prefill(messages) params = { "messages": messages, "model": model, @@ -1295,6 +1298,14 @@ def resize_image_if_needed(image_data): print(f"Error processing image: {e}") return None +def _strip_assistant_prefill(messages: list) -> list: + """Remove a trailing assistant message used as prefill. + OpenAI-compatible endpoints (including Claude) do not support prefill and + will reject requests where the last message has role 'assistant'.""" + if messages and messages[-1].get("role") == "assistant": + return messages[:-1] + return messages + def transform_tool_calls_to_openai(message_list): """ Ensure tool_calls in assistant messages conform to the OpenAI format: @@ -1573,18 +1584,17 @@ class rechunk: # ------------------------------------------------------------------ # SSE Helpser # ------------------------------------------------------------------ -async def publish_snapshot(): - # NOTE: This function assumes usage_lock OR token_usage_lock is already held by the caller - # Create a snapshot without acquiring the lock (caller must hold it) - snapshot = orjson.dumps({ - "usage_counts": dict(usage_counts), # Create a copy +def _capture_snapshot() -> str: + """Capture current usage counts as a JSON string. Caller must hold at least one of usage_lock/token_usage_lock.""" + return orjson.dumps({ + "usage_counts": dict(usage_counts), "token_usage_counts": dict(token_usage_counts) }, option=orjson.OPT_SORT_KEYS).decode("utf-8") - # Distribute the snapshot (no lock needed here since we have a copy) +async def _distribute_snapshot(snapshot: str) -> None: + """Push a pre-captured snapshot to all SSE subscribers. Must be called outside any usage lock.""" async with _subscribers_lock: for q in _subscribers: - # If the queue is full, drop the message to avoid back‑pressure. if q.full(): try: await q.get() @@ -1729,10 +1739,13 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: selected = min(candidate_endpoints, key=tracking_usage) tracking_model = get_tracking_model(selected, model) + snapshot = None if reserve: usage_counts[selected][tracking_model] += 1 - await publish_snapshot() - return selected, tracking_model + snapshot = _capture_snapshot() + if snapshot is not None: + await _distribute_snapshot(snapshot) + return selected, tracking_model # ------------------------------------------------------------- # 6. API route – Generate @@ -1959,8 +1972,10 @@ async def chat_proxy(request: Request): model = model.split(":latest") model = model[0] if messages: - messages = transform_images_to_data_urls(messages) + if any("images" in m for m in messages): + messages = await asyncio.to_thread(transform_images_to_data_urls, messages) messages = transform_tool_calls_to_openai(messages) + messages = _strip_assistant_prefill(messages) params = { "messages": messages, "model": model, @@ -3027,6 +3042,7 @@ async def openai_chat_completions_proxy(request: Request): model = model.split(":latest") model = model[0] + messages = _strip_assistant_prefill(messages) params = { "messages": messages, "model": model,