diff --git a/.forgejo/workflows/nyxscanner.yml b/.forgejo/workflows/nyxscanner.yml deleted file mode 100644 index 9d1a8dc..0000000 --- a/.forgejo/workflows/nyxscanner.yml +++ /dev/null @@ -1,32 +0,0 @@ -name: NYX Security Scan - -on: - pull_request: - branches: [main] - -jobs: - nyx-scan: - runs-on: docker-amd64 - - steps: - - name: Checkout PR - run: | - git clone --depth=1 \ - "https://oauth2:${{ github.token }}@bitfreedom.net/code/${{ github.repository }}.git" \ - . - git fetch --depth=1 origin ${{ github.sha }} - git checkout ${{ github.sha }} - - - name: Fetch action source - run: | - git clone --depth=1 --branch master \ - "https://oauth2:${{ github.token }}@bitfreedom.net/code/nomyo-ai/actions.git" \ - ./.nyx-action - - - uses: ./.nyx-action/nyx-scan - with: - forgejo_push_token: ${{ secrets.FORGEJO_PUSH_TOKEN }} - repository: ${{ github.repository }} - pr_number: ${{ github.event.pull_request.number }} - sha: ${{ github.sha }} - fail_on: HIGH \ No newline at end of file diff --git a/.forgejo/workflows/opencode.yml b/.forgejo/workflows/opencode.yml deleted file mode 100644 index 6d126d3..0000000 --- a/.forgejo/workflows/opencode.yml +++ /dev/null @@ -1,62 +0,0 @@ -name: opencode -on: - issue_comment: - types: [created] - pull_request_review_comment: - types: [created] - pull_request_review: - types: [submitted] - -jobs: - opencode: - if: | - contains(github.event.comment.body, '/oc') || - contains(github.event.comment.body, '/opencode') - runs-on: docker-amd64 - container: - image: node:lts-bookworm - permissions: - id-token: write - contents: write - pull-requests: write - issues: write - steps: - - name: Install git, curl and Docker - run: | - apt-get update -qq - apt-get install -y -qq git curl unzip docker.io - - - name: Start Docker daemon - run: | - dockerd --host=unix:///var/run/docker.sock --iptables=false --dns=8.8.8.8 --dns=8.8.4.4 > /tmp/dockerd.log 2>&1 & - for i in $(seq 1 30); do - sleep 2 - docker info > /dev/null 2>&1 && echo "Docker daemon ready" && exit 0 - echo "Waiting for Docker daemon... ($i/30)" - done - echo "=== dockerd failed to start, logs: ===" - cat /tmp/dockerd.log - exit 1 - - - name: Checkout repository - run: | - git clone --depth=1 --branch "${{ github.ref_name }}" \ - "https://oauth2:${{ github.token }}@bitfreedom.net/code/${{ github.repository }}.git" \ - . - - - name: Fetch action source - run: | - git clone --depth=1 --branch v1 \ - "https://oauth2:${{ github.token }}@bitfreedom.net/code/nomyo-ai/actions.git" \ - ./.opencode-action - - - name: Run opencode - uses: ./.opencode-action - with: - nomyo_api_key: ${{ secrets.NOMYO_API_KEY }} - model: nomyo/unsloth/Qwen3.6-35B-A3B-GGUF:UD-Q4_K_M - forgejo_api_url: https://bitfreedom.net/code/ - forgejo_token: ${{ secrets.FORGEJO_TOKEN }} - forgejo_push_token: ${{ secrets.FORGEJO_PUSH_TOKEN }} - - diff --git a/.nyx/triage.json b/.nyx/triage.json deleted file mode 100644 index 2c4dc31..0000000 --- a/.nyx/triage.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "version": 1, - "decisions": [], - "suppression_rules": [ - { - "by": "rule", - "value": "py.auth.token_override_without_validation", - "state": "suppressed", - "note": "false_positive: token validation handled upstream by middleware" - }, - { - "by": "rule", - "value": "state-resource-leak", - "state": "suppressed", - "note": "false_positive: resource lifecycle managed externally" - }, - { - "by": "rule", - "value": "py.crypto.sha1", - "state": "suppressed", - "note": "accepted_risk: used for non-security checksum only" - } - ] -} \ No newline at end of file diff --git a/config.yaml b/config.yaml index 2107a3c..76fbbe1 100644 --- a/config.yaml +++ b/config.yaml @@ -26,26 +26,6 @@ max_concurrent_connections: 2 # When false (default), equally-idle endpoints are chosen at random. # priority_routing: true -# Conversation affinity (optional, default: false). -# Pins a conversation to the endpoint that served its first turn so the -# llama.cpp / Ollama prompt cache (KV cache) stays warm — first turn pays -# the cold prefill, every follow-up turn reuses the same prefix. -# -# Fingerprint = sha1(model + leading system messages + first user turn). -# Same chat → same fingerprint on every follow-up turn → same pin, TTL -# refreshed on each reuse. Soft preference: if the pinned endpoint no -# longer has the model loaded or has no free slot, the standard algorithm -# takes over (no failure, just a cache miss). -# -# Heads-up: most chat UIs (Open WebUI, LibreChat, …) fire side requests for -# title / tag / follow-up generation. Those have their own first turn and -# therefore their own pin, so a single visible "chat" may show several dots -# in the dashboard's Affinity column. That is correct — each pin matches a -# real warm KV prefix on the backend. See doc/configuration.md for details. -conversation_affinity: true -conversation_affinity_ttl: 300 # seconds of inactivity before a pin expires; - # bumped on every reuse. Matches Ollama's default keep_alive. - # Optional router-level API key that gates router/API/web UI access (leave empty to disable) nomyo-router-api-key: "" diff --git a/doc/configuration.md b/doc/configuration.md index 1addd66..7d9986a 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -166,91 +166,6 @@ With this config the primary handles up to 4 concurrent requests before the seco --- -### `conversation_affinity` - -**Type**: `bool` (optional) - -**Default**: `false` - -**Companion setting**: [`conversation_affinity_ttl`](#conversation_affinity_ttl) - -**Description**: When enabled, the router prefers to send follow-up requests of the same conversation back to the endpoint that already served the first turn. This keeps the backend's prompt cache (the llama.cpp / Ollama **KV cache**) warm: the first user turn pays the cold prefill cost, every later turn reuses the same prefix and only generates new tokens. It is a **soft preference** — when the previously-chosen endpoint is no longer eligible (model unloaded, no free slot), the router falls back to the standard selection algorithm (`priority_routing` or random). - -#### How a conversation is identified - -The router does **not** track session IDs or auth tokens. It computes a stable fingerprint per request from: - -``` -SHA1( model - + every leading message with role="system" - + the first message with role="user" ) -``` - -Anything after the first user turn is ignored — those later messages extend the same KV prefix, so they don't change the cache identity. - -**What this means in practice** - -| You send… | Fingerprint behaves like… | -|---|---| -| Turn 2 of the same chat (history grows but first system+user are unchanged) | **Same** as turn 1 → pin is reused and TTL refreshed | -| Turn 1 of a fresh chat | **New** fingerprint → new pin | -| Same first user prompt but a different model | **New** fingerprint (model is part of the hash) | -| Same chat but the client mutates the system prompt between turns (e.g. injects a fresh timestamp) | **New** fingerprint — the affinity will not stick | - -#### TTL and refresh - -Every time `choose_endpoint` returns a pinned endpoint, the entry's expiry is bumped to `now + conversation_affinity_ttl`. An idle conversation drops out of the map once that window elapses without traffic. Default 300 s matches Ollama's default `keep_alive` — once the backend has unloaded the model, the KV cache is gone too, so a stale pin would be pointless anyway. - -#### Why the dashboard may show more than one dot per visible conversation - -The fingerprint is computed per **HTTP request**, not per chat-window. Most chat UIs (Open WebUI in particular) fire several **auxiliary** requests alongside the real conversation: - -- *Title generation* — synthetic system prompt + the user message as content -- *Follow-up question suggestion* — synthetic system prompt + the conversation as content -- *Tag generation*, *memory extraction*, *retrieval query rewriting*, etc. - -Each of those has its own `(system + first user turn)` and therefore its own fingerprint and its own pin in [the affinity dot matrix](monitoring.md#affinity-stats-conversation-affinity). They all *correctly* refer to a real warm KV-cache prefix on the backend, so the routing they drive is right — they just don't visually map 1:1 to a user-perceived "conversation." - -#### Example - -```yaml -endpoints: - - http://gpu-primary:11434 - - http://gpu-secondary:11434 - -conversation_affinity: true -conversation_affinity_ttl: 300 -``` - -With this configuration, a chat that starts on `gpu-primary` will keep returning to `gpu-primary` for follow-up turns as long as the model is still loaded there and a slot is free, even if `gpu-secondary` happens to be more idle at that moment. Cold-prefill cost is paid once instead of once per turn. - -#### When to enable - -- ✅ Interactive chat workloads with long histories — the prefill savings on every follow-up turn are substantial. -- ✅ Multi-endpoint deployments where models are loaded on more than one node. -- ❌ Pure one-shot / single-turn workloads (no KV-cache to keep warm). -- ❌ When you specifically want strict load-balancing parity — affinity intentionally biases against perfect balance. - ---- - -### `conversation_affinity_ttl` - -**Type**: `int` (seconds, optional) - -**Default**: `300` - -**Description**: How long a conversation stays pinned to its endpoint after the last request that touched it. Refreshed on every reuse — so an actively-used conversation keeps its pin indefinitely; an abandoned one expires after `conversation_affinity_ttl` seconds of silence. - -**Recommendation**: leave this aligned with the backend's `keep_alive` window. If the model is unloaded by the backend, the KV cache is gone and there is no benefit to keeping the pin. - -**Example**: -```yaml -conversation_affinity: true -conversation_affinity_ttl: 600 # half an hour of inactivity before un-pinning -``` - ---- - ### `router_api_key` **Type**: `str` (optional) diff --git a/doc/monitoring.md b/doc/monitoring.md index ab75d25..b5bcbff 100644 --- a/doc/monitoring.md +++ b/doc/monitoring.md @@ -166,39 +166,6 @@ curl -X POST http://localhost:12434/api/cache/invalidate Clears all cached entries and resets hit/miss counters. -### Affinity Stats (Conversation Affinity) - -```bash -curl http://localhost:12434/api/affinity_stats -``` - -Response when [`conversation_affinity`](configuration.md#conversation_affinity) is enabled: - -```json -{ - "enabled": true, - "ttl": 300, - "entries": [ - { "endpoint": "http://gpu-primary:11434", "model": "llama3.2:latest", "remaining": 287.4 }, - { "endpoint": "http://gpu-primary:11434", "model": "llama3.2:latest", "remaining": 113.0 }, - { "endpoint": "http://gpu-secondary:11434", "model": "qwen2.5-coder:7b", "remaining": 44.8 } - ] -} -``` - -Response when the feature is disabled: -```json -{ "enabled": false, "ttl": 300, "entries": [] } -``` - -- One element per **live pinned conversation** (no fingerprints or content — just the endpoint/model the pin points to and how many seconds it has left before expiry). -- Aggregation by `(endpoint, model)` is left to the consumer: the dashboard does this client-side. -- The endpoint is gated by the same `nomyo-router-api-key` middleware as the rest of `/api/*`. - -The dashboard's **Running Models (PS) → Affinity** column is rendered from this data. The column auto-hides when `enabled: false`. Each row shows one dot per live pin against that `(endpoint, model)` pair; dot opacity = `remaining / ttl` (floor 0.15), so freshly-routed pins are solid and pins close to expiry fade out. A `+N` overflow badge appears once a single (endpoint, model) holds more than 12 active pins; an em-dash (`—`) marks an `(endpoint, model)` with no live pins. - -> Multiple dots for what looks like "one chat window" is normal — most chat UIs (Open WebUI, LibreChat, …) fire auxiliary requests (title generation, follow-up suggestions, tag extraction) that have their own first-turn fingerprint and therefore their own pin. See [Conversation Affinity → Why the dashboard may show more than one dot per visible conversation](configuration.md#conversation_affinity) for the details. - ### Real-time Usage Stream ```bash diff --git a/router.py b/router.py index 08225fb..603387e 100644 --- a/router.py +++ b/router.py @@ -2,11 +2,11 @@ title: NOMYO Router - an (O)llama and OpenAI API v1 Proxy with Endpoint:Model aware routing author: alpha-nerd-nomyo author_url: https://github.com/nomyo-ai -version: 0.9 +version: 0.7 license: AGPL """ # ------------------------------------------------------------- -import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, random, base64, io, enhance, secrets, math, socket, httpx, hashlib +import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, random, base64, io, enhance, secrets, math, socket, httpx try: import truststore; truststore.inject_into_ssl() except ImportError: @@ -223,15 +223,6 @@ class Config(BaseSettings): # When True, config order = priority; routes by utilization ratio + config index (WRR) priority_routing: bool = Field(default=False) - # Conversation affinity: route the same conversation back to the endpoint that - # previously served it, to keep the llama.cpp / Ollama prompt cache (KV cache) warm. - # Soft preference — falls back to the standard algorithm when the affine endpoint - # is saturated or no longer has the model loaded. - conversation_affinity: bool = Field(default=False) - # TTL (seconds) for affinity entries. Defaults to Ollama's default keep_alive (5 min): - # if the backend has already evicted the model, the KV cache is cold anyway. - conversation_affinity_ttl: int = Field(default=300) - api_keys: Dict[str, str] = Field(default_factory=dict) # Optional router-level API key used to gate access to this service and dashboard router_api_key: Optional[str] = Field(default=None, env="NOMYO_ROUTER_API_KEY") @@ -256,8 +247,9 @@ class Config(BaseSettings): cache_history_weight: float = Field(default=0.3) class Config: - # YAML loading is handled manually via Config.from_yaml(); env vars use this prefix. + # Load from `config.yaml` first, then from env variables env_prefix = "NOMYO_ROUTER_" + yaml_file = Path("config.yaml") # relative to cwd @classmethod def _expand_env_refs(cls, obj): @@ -444,47 +436,6 @@ token_usage_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict( usage_lock = asyncio.Lock() # protects access to usage_counts token_usage_lock = asyncio.Lock() -# Conversation affinity map: fingerprint -> (endpoint, model, expires_at_monotonic). -# Keeps the same conversation pinned to the endpoint that already has its -# KV-cache prefix warm. Model is stored so the dashboard can aggregate live -# entries per (endpoint, model) without recomputing fingerprints. -# Never held together with usage_lock. -_affinity_map: Dict[str, tuple[str, str, float]] = {} -_affinity_lock = asyncio.Lock() -_AFFINITY_MAX_ENTRIES = 10000 - - -def _conversation_fingerprint(model: str, messages: Optional[list], - prompt: Optional[str]) -> Optional[str]: - """ - Stable hash over (model, first system + first user turn). That prefix - determines whether the backend's prompt cache is reusable; later turns - don't influence the routing decision because they extend the same prefix. - Returns None when there is no usable prefix. - """ - parts: list[str] = [model or "_"] - if messages: - for m in messages: - role = m.get("role") if isinstance(m, dict) else None - if role not in ("system", "user"): - continue - content = m.get("content") - if isinstance(content, list): # OpenAI multimodal parts - content = "".join( - p.get("text", "") for p in content - if isinstance(p, dict) and p.get("type") == "text" - ) - if not isinstance(content, str): - continue - parts.append(f"{role}:{content}") - if role == "user": - break - elif prompt: - parts.append(f"user:{prompt}") - else: - return None - return hashlib.sha1("\x1f".join(parts).encode("utf-8", "replace")).hexdigest() - # Database instance db: "TokenDatabase" = None @@ -1418,30 +1369,30 @@ def resize_image_if_needed(image_data): pass # Decode the base64 image data image_bytes = base64.b64decode(image_data) - with Image.open(io.BytesIO(image_bytes)) as image: - if image.mode not in ("RGB", "L"): - image = image.convert("RGB") + image = Image.open(io.BytesIO(image_bytes)) + if image.mode not in ("RGB", "L"): + image = image.convert("RGB") - # Get current size - width, height = image.size + # Get current size + width, height = image.size - # Calculate the new dimensions while maintaining aspect ratio - if width > 512 or height > 512: - aspect_ratio = width / height - if aspect_ratio > 1: # Width is larger - new_width = 512 - new_height = int(512 / aspect_ratio) - else: # Height is larger - new_height = 512 - new_width = int(512 * aspect_ratio) + # Calculate the new dimensions while maintaining aspect ratio + if width > 512 or height > 512: + aspect_ratio = width / height + if aspect_ratio > 1: # Width is larger + new_width = 512 + new_height = int(512 / aspect_ratio) + else: # Height is larger + new_height = 512 + new_width = int(512 * aspect_ratio) - image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) + image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) - # Encode the resized image back to base64 - buffered = io.BytesIO() - image.save(buffered, format="PNG") - resized_image_data = base64.b64encode(buffered.getvalue()).decode("utf-8") - return resized_image_data + # Encode the resized image back to base64 + buffered = io.BytesIO() + image.save(buffered, format="PNG") + resized_image_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + return resized_image_data except Exception as e: print(f"Error processing image: {e}") @@ -1787,8 +1738,7 @@ def get_max_connections(ep: str) -> int: "max_concurrent_connections", config.max_concurrent_connections ) -async def choose_endpoint(model: str, reserve: bool = True, - affinity_key: Optional[str] = None) -> tuple[str, str]: +async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: """ Determine which endpoint to use for the given model while respecting the `max_concurrent_connections` per endpoint‑model pair **and** @@ -1798,14 +1748,10 @@ async def choose_endpoint(model: str, reserve: bool = True, 1️⃣ Query every endpoint for its advertised models (`/api/tags`). 2️⃣ Build a list of endpoints that contain the requested model. - 2️⃣.5 If conversation affinity is enabled and the caller passes - ``affinity_key``, prefer the endpoint that previously served the - same conversation — but only when it still has the model loaded - and a free slot. Otherwise fall through to the standard logic. 3️⃣ For those endpoints, find those that have the model loaded (`/api/ps`) *and* still have a free slot. 4️⃣ If none are both loaded and free, fall back to any endpoint - from the filtered list that simply has a free slot and randomly + from the filtered list that simply has a free slot and randomly select one. 5️⃣ If all are saturated, pick any endpoint from the filtered list (the request will queue on that endpoint). @@ -1853,19 +1799,6 @@ async def choose_endpoint(model: str, reserve: bool = True, load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints] loaded_sets = await asyncio.gather(*load_tasks) - # Look up a possible affinity hint *before* taking usage_lock. The two - # locks are never held together to avoid lock-ordering issues. - affine_ep: Optional[str] = None - if config.conversation_affinity and affinity_key: - async with _affinity_lock: - entry = _affinity_map.get(affinity_key) - if entry is not None: - ep, _stored_model, expires_at = entry - if expires_at < time.monotonic(): - _affinity_map.pop(affinity_key, None) - else: - affine_ep = ep - # Protect all reads/writes of usage_counts with the lock so that selection # and reservation are atomic — concurrent callers see each other's pending load. async with usage_lock: @@ -1881,75 +1814,59 @@ async def choose_endpoint(model: str, reserve: bool = True, # Priority map: position in all_endpoints list (lower = higher priority) ep_priority = {ep: i for i, ep in enumerate(all_endpoints)} - selected: Optional[str] = None + # 3️⃣ Endpoints that have the model loaded *and* a free slot + loaded_and_free = [ + ep for ep, models in zip(candidate_endpoints, loaded_sets) + if model in models and tracking_usage(ep) < get_max_connections(ep) + ] - # 2️⃣.5 Conversation affinity preference — only honour the hint when - # the affine endpoint still advertises the model loaded *and* has a - # free slot. Otherwise fall back to the standard algorithm. - if affine_ep: - ep_loaded = { - ep: set(models) - for ep, models in zip(candidate_endpoints, loaded_sets) - } - if (affine_ep in candidate_endpoints - and model in ep_loaded.get(affine_ep, set()) - and tracking_usage(affine_ep) < get_max_connections(affine_ep)): - selected = affine_ep - - if selected is None: - # 3️⃣ Endpoints that have the model loaded *and* a free slot - loaded_and_free = [ - ep for ep, models in zip(candidate_endpoints, loaded_sets) - if model in models and tracking_usage(ep) < get_max_connections(ep) + if loaded_and_free: + if config.priority_routing: + # WRR: sort by config order first (stable), then by utilization ratio. + # Stable sort preserves priority for equal-ratio endpoints. + loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999)) + loaded_and_free.sort(key=utilization_ratio) + selected = loaded_and_free[0] + else: + # Sort ascending for load balancing — all endpoints here already have the + # model loaded, so there is no model-switching cost to optimise for. + loaded_and_free.sort(key=tracking_usage) + # When all candidates are equally idle, randomise to avoid always picking + # the first entry in a stable sort. + if all(tracking_usage(ep) == 0 for ep in loaded_and_free): + selected = random.choice(loaded_and_free) + else: + selected = loaded_and_free[0] + else: + # 4️⃣ Endpoints among the candidates that simply have a free slot + endpoints_with_free_slot = [ + ep for ep in candidate_endpoints + if tracking_usage(ep) < get_max_connections(ep) ] - if loaded_and_free: + if endpoints_with_free_slot: if config.priority_routing: - # WRR: sort by config order first (stable), then by utilization ratio. - # Stable sort preserves priority for equal-ratio endpoints. - loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999)) - loaded_and_free.sort(key=utilization_ratio) - selected = loaded_and_free[0] + endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999)) + endpoints_with_free_slot.sort(key=utilization_ratio) + selected = endpoints_with_free_slot[0] else: - # Sort ascending for load balancing — all endpoints here already have the - # model loaded, so there is no model-switching cost to optimise for. - loaded_and_free.sort(key=tracking_usage) - # When all candidates are equally idle, randomise to avoid always picking - # the first entry in a stable sort. - if all(tracking_usage(ep) == 0 for ep in loaded_and_free): - selected = random.choice(loaded_and_free) + # Sort by total endpoint load (ascending) to prefer idle endpoints. + endpoints_with_free_slot.sort( + key=lambda ep: sum(usage_counts.get(ep, {}).values()) + ) + if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot): + selected = random.choice(endpoints_with_free_slot) else: - selected = loaded_and_free[0] - else: - # 4️⃣ Endpoints among the candidates that simply have a free slot - endpoints_with_free_slot = [ - ep for ep in candidate_endpoints - if tracking_usage(ep) < get_max_connections(ep) - ] - - if endpoints_with_free_slot: - if config.priority_routing: - endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999)) - endpoints_with_free_slot.sort(key=utilization_ratio) selected = endpoints_with_free_slot[0] - else: - # Sort by total endpoint load (ascending) to prefer idle endpoints. - endpoints_with_free_slot.sort( - key=lambda ep: sum(usage_counts.get(ep, {}).values()) - ) - if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot): - selected = random.choice(endpoints_with_free_slot) - else: - selected = endpoints_with_free_slot[0] + else: + # 5️⃣ All candidate endpoints are saturated – pick the least-busy one (will queue) + if config.priority_routing: + selected = min( + candidate_endpoints, + key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)), + ) else: - # 5️⃣ All candidate endpoints are saturated – pick the least-busy one (will queue) - if config.priority_routing: - selected = min( - candidate_endpoints, - key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)), - ) - else: - selected = min(candidate_endpoints, key=tracking_usage) + selected = min(candidate_endpoints, key=tracking_usage) tracking_model = get_tracking_model(selected, model) snapshot = None @@ -1958,15 +1875,6 @@ async def choose_endpoint(model: str, reserve: bool = True, snapshot = _capture_snapshot() if snapshot is not None: await _distribute_snapshot(snapshot) - # Record / refresh affinity *after* releasing usage_lock. - if reserve and config.conversation_affinity and affinity_key: - expires_at = time.monotonic() + config.conversation_affinity_ttl - async with _affinity_lock: - _affinity_map[affinity_key] = (selected, model, expires_at) - if len(_affinity_map) > _AFFINITY_MAX_ENTRIES: - now = time.monotonic() - for k in [k for k, v in _affinity_map.items() if v[2] < now]: - _affinity_map.pop(k, None) return selected, tracking_model # ------------------------------------------------------------- @@ -2017,8 +1925,7 @@ async def proxy(request: Request): yield _cached return StreamingResponse(_serve_cached_generate(), media_type="application/json") - _affinity_key = _conversation_fingerprint(model, None, prompt) - endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key) + endpoint, tracking_model = await choose_endpoint(model) use_openai = is_openai_compatible(endpoint) if use_openai: if ":latest" in model: @@ -2188,8 +2095,7 @@ async def chat_proxy(request: Request): opt = True else: opt = False - _affinity_key = _conversation_fingerprint(model, messages, None) - endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key) + endpoint, tracking_model = await choose_endpoint(model) use_openai = is_openai_compatible(endpoint) if use_openai: if ":latest" in model: @@ -3104,43 +3010,6 @@ async def ps_details_proxy(request: Request): return JSONResponse(content={"models": models}, status_code=200) -# ------------------------------------------------------------- -# 18b. Conversation-affinity stats – feeds the PS-table dot matrix -# ------------------------------------------------------------- -@app.get("/api/affinity_stats") -async def affinity_stats(request: Request): - """ - Aggregate live conversation-affinity pins, one entry per pinned conversation. - Each entry exposes only the endpoint, model, and remaining TTL in seconds — - no fingerprints or content. When conversation_affinity is disabled the - `entries` list is always empty. - """ - if not config.conversation_affinity: - return {"enabled": False, "ttl": config.conversation_affinity_ttl, "entries": []} - - now = time.monotonic() - entries: list[dict] = [] - llama_eps = set(config.llama_server_endpoints) - async with _affinity_lock: - for fp, (ep, mdl, expires_at) in list(_affinity_map.items()): - remaining = expires_at - now - if remaining <= 0: - _affinity_map.pop(fp, None) - continue - # Mirror the normalisation used by /api/ps_details so the dashboard - # can join affinity entries to PS rows by (endpoint, model). - display_model = _normalize_llama_model_name(mdl) if ep in llama_eps else mdl - entries.append({ - "endpoint": ep, - "model": display_model, - "remaining": round(remaining, 2), - }) - return { - "enabled": True, - "ttl": config.conversation_affinity_ttl, - "entries": entries, - } - # ------------------------------------------------------------- # 19. Proxy usage route – for monitoring # ------------------------------------------------------------- @@ -3359,8 +3228,7 @@ async def openai_chat_completions_proxy(request: Request): return StreamingResponse(_serve_cached_ochat_json(), media_type="application/json") # 2. Endpoint logic - _affinity_key = _conversation_fingerprint(model, messages, None) - endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key) + endpoint, tracking_model = await choose_endpoint(model) oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) # 3. Helpers and API call — done in handler scope so try/except works reliably async def _normalize_images_in_messages(msgs: list) -> list: @@ -3670,8 +3538,7 @@ async def openai_completions_proxy(request: Request): return StreamingResponse(_serve_cached_ocompl_json(), media_type="application/json") # 2. Endpoint logic - _affinity_key = _conversation_fingerprint(model, None, prompt) - endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key) + endpoint, tracking_model = await choose_endpoint(model) oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) # 3. Async generator that streams completions data and decrements the counter @@ -4161,16 +4028,6 @@ async def startup_event() -> None: @app.on_event("shutdown") async def shutdown_event() -> None: await close_all_sse_queues() - - # Stop background tasks first so they stop touching the DB before we close it. - for t in (token_worker_task, flush_task): - if t is not None: - t.cancel() - try: - await t - except (asyncio.CancelledError, Exception): - pass - await flush_remaining_buffers() await app_state["session"].close() @@ -4190,11 +4047,7 @@ async def shutdown_event() -> None: except Exception as e: print(f"[shutdown] Error closing httpx client {ep}: {e}") - # Close the aiosqlite connection last — its worker thread is non-daemon - # and would otherwise keep the interpreter alive after lifespan completes. - if db is not None: - try: - await db.close() - print("[shutdown] Closed token DB connection.") - except Exception as e: - print(f"[shutdown] Error closing DB: {e}") + if token_worker_task is not None: + token_worker_task.cancel() + if flush_task is not None: + flush_task.cancel() diff --git a/static/index.html b/static/index.html index b29f22b..419d7bb 100644 --- a/static/index.html +++ b/static/index.html @@ -121,45 +121,6 @@ .ps-subrow + .ps-subrow { margin-top: 2px; } - #ps-table .affinity-col, - #ps-table .affinity-cell { - display: none; - } - #ps-table.affinity-on .affinity-col, - #ps-table.affinity-on .affinity-cell { - display: table-cell; - width: 90px; - text-align: center; - padding-left: 6px; - padding-right: 6px; - } - #ps-table.affinity-on .affinity-dots { - max-width: 78px; - } - .affinity-dots { - display: inline-flex; - flex-wrap: wrap; - gap: 3px; - align-items: center; - line-height: 1; - } - .affinity-dot { - width: 8px; - height: 8px; - border-radius: 50%; - background: #2e7d32; - display: inline-block; - transition: opacity 1s linear; - } - .affinity-overflow { - font-size: 10px; - color: #555; - margin-left: 2px; - } - .affinity-empty { - color: #bbb; - font-size: 11px; - } #ps-table { width: max-content; min-width: 100%; @@ -170,13 +131,13 @@ max-width: 300px; white-space: nowrap; } - /* Optimize narrow columns (Params / Quant / Ctx) */ + /* Optimize narrow columns */ + #ps-table th:nth-child(3), + #ps-table td:nth-child(3), #ps-table th:nth-child(4), #ps-table td:nth-child(4), #ps-table th:nth-child(5), - #ps-table td:nth-child(5), - #ps-table th:nth-child(6), - #ps-table td:nth-child(6) { + #ps-table td:nth-child(5) { width: 80px; text-align: center; } @@ -434,7 +395,6 @@