diff --git a/.forgejo/workflows/nyxscanner.yml b/.forgejo/workflows/nyxscanner.yml new file mode 100644 index 0000000..9d1a8dc --- /dev/null +++ b/.forgejo/workflows/nyxscanner.yml @@ -0,0 +1,32 @@ +name: NYX Security Scan + +on: + pull_request: + branches: [main] + +jobs: + nyx-scan: + runs-on: docker-amd64 + + steps: + - name: Checkout PR + run: | + git clone --depth=1 \ + "https://oauth2:${{ github.token }}@bitfreedom.net/code/${{ github.repository }}.git" \ + . + git fetch --depth=1 origin ${{ github.sha }} + git checkout ${{ github.sha }} + + - name: Fetch action source + run: | + git clone --depth=1 --branch master \ + "https://oauth2:${{ github.token }}@bitfreedom.net/code/nomyo-ai/actions.git" \ + ./.nyx-action + + - uses: ./.nyx-action/nyx-scan + with: + forgejo_push_token: ${{ secrets.FORGEJO_PUSH_TOKEN }} + repository: ${{ github.repository }} + pr_number: ${{ github.event.pull_request.number }} + sha: ${{ github.sha }} + fail_on: HIGH \ No newline at end of file diff --git a/.forgejo/workflows/opencode.yml b/.forgejo/workflows/opencode.yml new file mode 100644 index 0000000..6d126d3 --- /dev/null +++ b/.forgejo/workflows/opencode.yml @@ -0,0 +1,62 @@ +name: opencode +on: + issue_comment: + types: [created] + pull_request_review_comment: + types: [created] + pull_request_review: + types: [submitted] + +jobs: + opencode: + if: | + contains(github.event.comment.body, '/oc') || + contains(github.event.comment.body, '/opencode') + runs-on: docker-amd64 + container: + image: node:lts-bookworm + permissions: + id-token: write + contents: write + pull-requests: write + issues: write + steps: + - name: Install git, curl and Docker + run: | + apt-get update -qq + apt-get install -y -qq git curl unzip docker.io + + - name: Start Docker daemon + run: | + dockerd --host=unix:///var/run/docker.sock --iptables=false --dns=8.8.8.8 --dns=8.8.4.4 > /tmp/dockerd.log 2>&1 & + for i in $(seq 1 30); do + sleep 2 + docker info > /dev/null 2>&1 && echo "Docker daemon ready" && exit 0 + echo "Waiting for Docker daemon... ($i/30)" + done + echo "=== dockerd failed to start, logs: ===" + cat /tmp/dockerd.log + exit 1 + + - name: Checkout repository + run: | + git clone --depth=1 --branch "${{ github.ref_name }}" \ + "https://oauth2:${{ github.token }}@bitfreedom.net/code/${{ github.repository }}.git" \ + . + + - name: Fetch action source + run: | + git clone --depth=1 --branch v1 \ + "https://oauth2:${{ github.token }}@bitfreedom.net/code/nomyo-ai/actions.git" \ + ./.opencode-action + + - name: Run opencode + uses: ./.opencode-action + with: + nomyo_api_key: ${{ secrets.NOMYO_API_KEY }} + model: nomyo/unsloth/Qwen3.6-35B-A3B-GGUF:UD-Q4_K_M + forgejo_api_url: https://bitfreedom.net/code/ + forgejo_token: ${{ secrets.FORGEJO_TOKEN }} + forgejo_push_token: ${{ secrets.FORGEJO_PUSH_TOKEN }} + + diff --git a/.nyx/triage.json b/.nyx/triage.json new file mode 100644 index 0000000..2c4dc31 --- /dev/null +++ b/.nyx/triage.json @@ -0,0 +1,24 @@ +{ + "version": 1, + "decisions": [], + "suppression_rules": [ + { + "by": "rule", + "value": "py.auth.token_override_without_validation", + "state": "suppressed", + "note": "false_positive: token validation handled upstream by middleware" + }, + { + "by": "rule", + "value": "state-resource-leak", + "state": "suppressed", + "note": "false_positive: resource lifecycle managed externally" + }, + { + "by": "rule", + "value": "py.crypto.sha1", + "state": "suppressed", + "note": "accepted_risk: used for non-security checksum only" + } + ] +} \ No newline at end of file diff --git a/config.yaml b/config.yaml index 76fbbe1..2107a3c 100644 --- a/config.yaml +++ b/config.yaml @@ -26,6 +26,26 @@ max_concurrent_connections: 2 # When false (default), equally-idle endpoints are chosen at random. # priority_routing: true +# Conversation affinity (optional, default: false). +# Pins a conversation to the endpoint that served its first turn so the +# llama.cpp / Ollama prompt cache (KV cache) stays warm — first turn pays +# the cold prefill, every follow-up turn reuses the same prefix. +# +# Fingerprint = sha1(model + leading system messages + first user turn). +# Same chat → same fingerprint on every follow-up turn → same pin, TTL +# refreshed on each reuse. Soft preference: if the pinned endpoint no +# longer has the model loaded or has no free slot, the standard algorithm +# takes over (no failure, just a cache miss). +# +# Heads-up: most chat UIs (Open WebUI, LibreChat, …) fire side requests for +# title / tag / follow-up generation. Those have their own first turn and +# therefore their own pin, so a single visible "chat" may show several dots +# in the dashboard's Affinity column. That is correct — each pin matches a +# real warm KV prefix on the backend. See doc/configuration.md for details. +conversation_affinity: true +conversation_affinity_ttl: 300 # seconds of inactivity before a pin expires; + # bumped on every reuse. Matches Ollama's default keep_alive. + # Optional router-level API key that gates router/API/web UI access (leave empty to disable) nomyo-router-api-key: "" diff --git a/doc/configuration.md b/doc/configuration.md index 7d9986a..1addd66 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -166,6 +166,91 @@ With this config the primary handles up to 4 concurrent requests before the seco --- +### `conversation_affinity` + +**Type**: `bool` (optional) + +**Default**: `false` + +**Companion setting**: [`conversation_affinity_ttl`](#conversation_affinity_ttl) + +**Description**: When enabled, the router prefers to send follow-up requests of the same conversation back to the endpoint that already served the first turn. This keeps the backend's prompt cache (the llama.cpp / Ollama **KV cache**) warm: the first user turn pays the cold prefill cost, every later turn reuses the same prefix and only generates new tokens. It is a **soft preference** — when the previously-chosen endpoint is no longer eligible (model unloaded, no free slot), the router falls back to the standard selection algorithm (`priority_routing` or random). + +#### How a conversation is identified + +The router does **not** track session IDs or auth tokens. It computes a stable fingerprint per request from: + +``` +SHA1( model + + every leading message with role="system" + + the first message with role="user" ) +``` + +Anything after the first user turn is ignored — those later messages extend the same KV prefix, so they don't change the cache identity. + +**What this means in practice** + +| You send… | Fingerprint behaves like… | +|---|---| +| Turn 2 of the same chat (history grows but first system+user are unchanged) | **Same** as turn 1 → pin is reused and TTL refreshed | +| Turn 1 of a fresh chat | **New** fingerprint → new pin | +| Same first user prompt but a different model | **New** fingerprint (model is part of the hash) | +| Same chat but the client mutates the system prompt between turns (e.g. injects a fresh timestamp) | **New** fingerprint — the affinity will not stick | + +#### TTL and refresh + +Every time `choose_endpoint` returns a pinned endpoint, the entry's expiry is bumped to `now + conversation_affinity_ttl`. An idle conversation drops out of the map once that window elapses without traffic. Default 300 s matches Ollama's default `keep_alive` — once the backend has unloaded the model, the KV cache is gone too, so a stale pin would be pointless anyway. + +#### Why the dashboard may show more than one dot per visible conversation + +The fingerprint is computed per **HTTP request**, not per chat-window. Most chat UIs (Open WebUI in particular) fire several **auxiliary** requests alongside the real conversation: + +- *Title generation* — synthetic system prompt + the user message as content +- *Follow-up question suggestion* — synthetic system prompt + the conversation as content +- *Tag generation*, *memory extraction*, *retrieval query rewriting*, etc. + +Each of those has its own `(system + first user turn)` and therefore its own fingerprint and its own pin in [the affinity dot matrix](monitoring.md#affinity-stats-conversation-affinity). They all *correctly* refer to a real warm KV-cache prefix on the backend, so the routing they drive is right — they just don't visually map 1:1 to a user-perceived "conversation." + +#### Example + +```yaml +endpoints: + - http://gpu-primary:11434 + - http://gpu-secondary:11434 + +conversation_affinity: true +conversation_affinity_ttl: 300 +``` + +With this configuration, a chat that starts on `gpu-primary` will keep returning to `gpu-primary` for follow-up turns as long as the model is still loaded there and a slot is free, even if `gpu-secondary` happens to be more idle at that moment. Cold-prefill cost is paid once instead of once per turn. + +#### When to enable + +- ✅ Interactive chat workloads with long histories — the prefill savings on every follow-up turn are substantial. +- ✅ Multi-endpoint deployments where models are loaded on more than one node. +- ❌ Pure one-shot / single-turn workloads (no KV-cache to keep warm). +- ❌ When you specifically want strict load-balancing parity — affinity intentionally biases against perfect balance. + +--- + +### `conversation_affinity_ttl` + +**Type**: `int` (seconds, optional) + +**Default**: `300` + +**Description**: How long a conversation stays pinned to its endpoint after the last request that touched it. Refreshed on every reuse — so an actively-used conversation keeps its pin indefinitely; an abandoned one expires after `conversation_affinity_ttl` seconds of silence. + +**Recommendation**: leave this aligned with the backend's `keep_alive` window. If the model is unloaded by the backend, the KV cache is gone and there is no benefit to keeping the pin. + +**Example**: +```yaml +conversation_affinity: true +conversation_affinity_ttl: 600 # half an hour of inactivity before un-pinning +``` + +--- + ### `router_api_key` **Type**: `str` (optional) diff --git a/doc/monitoring.md b/doc/monitoring.md index b5bcbff..ab75d25 100644 --- a/doc/monitoring.md +++ b/doc/monitoring.md @@ -166,6 +166,39 @@ curl -X POST http://localhost:12434/api/cache/invalidate Clears all cached entries and resets hit/miss counters. +### Affinity Stats (Conversation Affinity) + +```bash +curl http://localhost:12434/api/affinity_stats +``` + +Response when [`conversation_affinity`](configuration.md#conversation_affinity) is enabled: + +```json +{ + "enabled": true, + "ttl": 300, + "entries": [ + { "endpoint": "http://gpu-primary:11434", "model": "llama3.2:latest", "remaining": 287.4 }, + { "endpoint": "http://gpu-primary:11434", "model": "llama3.2:latest", "remaining": 113.0 }, + { "endpoint": "http://gpu-secondary:11434", "model": "qwen2.5-coder:7b", "remaining": 44.8 } + ] +} +``` + +Response when the feature is disabled: +```json +{ "enabled": false, "ttl": 300, "entries": [] } +``` + +- One element per **live pinned conversation** (no fingerprints or content — just the endpoint/model the pin points to and how many seconds it has left before expiry). +- Aggregation by `(endpoint, model)` is left to the consumer: the dashboard does this client-side. +- The endpoint is gated by the same `nomyo-router-api-key` middleware as the rest of `/api/*`. + +The dashboard's **Running Models (PS) → Affinity** column is rendered from this data. The column auto-hides when `enabled: false`. Each row shows one dot per live pin against that `(endpoint, model)` pair; dot opacity = `remaining / ttl` (floor 0.15), so freshly-routed pins are solid and pins close to expiry fade out. A `+N` overflow badge appears once a single (endpoint, model) holds more than 12 active pins; an em-dash (`—`) marks an `(endpoint, model)` with no live pins. + +> Multiple dots for what looks like "one chat window" is normal — most chat UIs (Open WebUI, LibreChat, …) fire auxiliary requests (title generation, follow-up suggestions, tag extraction) that have their own first-turn fingerprint and therefore their own pin. See [Conversation Affinity → Why the dashboard may show more than one dot per visible conversation](configuration.md#conversation_affinity) for the details. + ### Real-time Usage Stream ```bash diff --git a/router.py b/router.py index 603387e..08225fb 100644 --- a/router.py +++ b/router.py @@ -2,11 +2,11 @@ title: NOMYO Router - an (O)llama and OpenAI API v1 Proxy with Endpoint:Model aware routing author: alpha-nerd-nomyo author_url: https://github.com/nomyo-ai -version: 0.7 +version: 0.9 license: AGPL """ # ------------------------------------------------------------- -import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, random, base64, io, enhance, secrets, math, socket, httpx +import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, random, base64, io, enhance, secrets, math, socket, httpx, hashlib try: import truststore; truststore.inject_into_ssl() except ImportError: @@ -223,6 +223,15 @@ class Config(BaseSettings): # When True, config order = priority; routes by utilization ratio + config index (WRR) priority_routing: bool = Field(default=False) + # Conversation affinity: route the same conversation back to the endpoint that + # previously served it, to keep the llama.cpp / Ollama prompt cache (KV cache) warm. + # Soft preference — falls back to the standard algorithm when the affine endpoint + # is saturated or no longer has the model loaded. + conversation_affinity: bool = Field(default=False) + # TTL (seconds) for affinity entries. Defaults to Ollama's default keep_alive (5 min): + # if the backend has already evicted the model, the KV cache is cold anyway. + conversation_affinity_ttl: int = Field(default=300) + api_keys: Dict[str, str] = Field(default_factory=dict) # Optional router-level API key used to gate access to this service and dashboard router_api_key: Optional[str] = Field(default=None, env="NOMYO_ROUTER_API_KEY") @@ -247,9 +256,8 @@ class Config(BaseSettings): cache_history_weight: float = Field(default=0.3) class Config: - # Load from `config.yaml` first, then from env variables + # YAML loading is handled manually via Config.from_yaml(); env vars use this prefix. env_prefix = "NOMYO_ROUTER_" - yaml_file = Path("config.yaml") # relative to cwd @classmethod def _expand_env_refs(cls, obj): @@ -436,6 +444,47 @@ token_usage_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict( usage_lock = asyncio.Lock() # protects access to usage_counts token_usage_lock = asyncio.Lock() +# Conversation affinity map: fingerprint -> (endpoint, model, expires_at_monotonic). +# Keeps the same conversation pinned to the endpoint that already has its +# KV-cache prefix warm. Model is stored so the dashboard can aggregate live +# entries per (endpoint, model) without recomputing fingerprints. +# Never held together with usage_lock. +_affinity_map: Dict[str, tuple[str, str, float]] = {} +_affinity_lock = asyncio.Lock() +_AFFINITY_MAX_ENTRIES = 10000 + + +def _conversation_fingerprint(model: str, messages: Optional[list], + prompt: Optional[str]) -> Optional[str]: + """ + Stable hash over (model, first system + first user turn). That prefix + determines whether the backend's prompt cache is reusable; later turns + don't influence the routing decision because they extend the same prefix. + Returns None when there is no usable prefix. + """ + parts: list[str] = [model or "_"] + if messages: + for m in messages: + role = m.get("role") if isinstance(m, dict) else None + if role not in ("system", "user"): + continue + content = m.get("content") + if isinstance(content, list): # OpenAI multimodal parts + content = "".join( + p.get("text", "") for p in content + if isinstance(p, dict) and p.get("type") == "text" + ) + if not isinstance(content, str): + continue + parts.append(f"{role}:{content}") + if role == "user": + break + elif prompt: + parts.append(f"user:{prompt}") + else: + return None + return hashlib.sha1("\x1f".join(parts).encode("utf-8", "replace")).hexdigest() + # Database instance db: "TokenDatabase" = None @@ -1369,30 +1418,30 @@ def resize_image_if_needed(image_data): pass # Decode the base64 image data image_bytes = base64.b64decode(image_data) - image = Image.open(io.BytesIO(image_bytes)) - if image.mode not in ("RGB", "L"): - image = image.convert("RGB") + with Image.open(io.BytesIO(image_bytes)) as image: + if image.mode not in ("RGB", "L"): + image = image.convert("RGB") - # Get current size - width, height = image.size + # Get current size + width, height = image.size - # Calculate the new dimensions while maintaining aspect ratio - if width > 512 or height > 512: - aspect_ratio = width / height - if aspect_ratio > 1: # Width is larger - new_width = 512 - new_height = int(512 / aspect_ratio) - else: # Height is larger - new_height = 512 - new_width = int(512 * aspect_ratio) + # Calculate the new dimensions while maintaining aspect ratio + if width > 512 or height > 512: + aspect_ratio = width / height + if aspect_ratio > 1: # Width is larger + new_width = 512 + new_height = int(512 / aspect_ratio) + else: # Height is larger + new_height = 512 + new_width = int(512 * aspect_ratio) - image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) + image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) - # Encode the resized image back to base64 - buffered = io.BytesIO() - image.save(buffered, format="PNG") - resized_image_data = base64.b64encode(buffered.getvalue()).decode("utf-8") - return resized_image_data + # Encode the resized image back to base64 + buffered = io.BytesIO() + image.save(buffered, format="PNG") + resized_image_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + return resized_image_data except Exception as e: print(f"Error processing image: {e}") @@ -1738,7 +1787,8 @@ def get_max_connections(ep: str) -> int: "max_concurrent_connections", config.max_concurrent_connections ) -async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: +async def choose_endpoint(model: str, reserve: bool = True, + affinity_key: Optional[str] = None) -> tuple[str, str]: """ Determine which endpoint to use for the given model while respecting the `max_concurrent_connections` per endpoint‑model pair **and** @@ -1748,10 +1798,14 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: 1️⃣ Query every endpoint for its advertised models (`/api/tags`). 2️⃣ Build a list of endpoints that contain the requested model. + 2️⃣.5 If conversation affinity is enabled and the caller passes + ``affinity_key``, prefer the endpoint that previously served the + same conversation — but only when it still has the model loaded + and a free slot. Otherwise fall through to the standard logic. 3️⃣ For those endpoints, find those that have the model loaded (`/api/ps`) *and* still have a free slot. 4️⃣ If none are both loaded and free, fall back to any endpoint - from the filtered list that simply has a free slot and randomly + from the filtered list that simply has a free slot and randomly select one. 5️⃣ If all are saturated, pick any endpoint from the filtered list (the request will queue on that endpoint). @@ -1799,6 +1853,19 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints] loaded_sets = await asyncio.gather(*load_tasks) + # Look up a possible affinity hint *before* taking usage_lock. The two + # locks are never held together to avoid lock-ordering issues. + affine_ep: Optional[str] = None + if config.conversation_affinity and affinity_key: + async with _affinity_lock: + entry = _affinity_map.get(affinity_key) + if entry is not None: + ep, _stored_model, expires_at = entry + if expires_at < time.monotonic(): + _affinity_map.pop(affinity_key, None) + else: + affine_ep = ep + # Protect all reads/writes of usage_counts with the lock so that selection # and reservation are atomic — concurrent callers see each other's pending load. async with usage_lock: @@ -1814,59 +1881,75 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: # Priority map: position in all_endpoints list (lower = higher priority) ep_priority = {ep: i for i, ep in enumerate(all_endpoints)} - # 3️⃣ Endpoints that have the model loaded *and* a free slot - loaded_and_free = [ - ep for ep, models in zip(candidate_endpoints, loaded_sets) - if model in models and tracking_usage(ep) < get_max_connections(ep) - ] + selected: Optional[str] = None - if loaded_and_free: - if config.priority_routing: - # WRR: sort by config order first (stable), then by utilization ratio. - # Stable sort preserves priority for equal-ratio endpoints. - loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999)) - loaded_and_free.sort(key=utilization_ratio) - selected = loaded_and_free[0] - else: - # Sort ascending for load balancing — all endpoints here already have the - # model loaded, so there is no model-switching cost to optimise for. - loaded_and_free.sort(key=tracking_usage) - # When all candidates are equally idle, randomise to avoid always picking - # the first entry in a stable sort. - if all(tracking_usage(ep) == 0 for ep in loaded_and_free): - selected = random.choice(loaded_and_free) - else: - selected = loaded_and_free[0] - else: - # 4️⃣ Endpoints among the candidates that simply have a free slot - endpoints_with_free_slot = [ - ep for ep in candidate_endpoints - if tracking_usage(ep) < get_max_connections(ep) + # 2️⃣.5 Conversation affinity preference — only honour the hint when + # the affine endpoint still advertises the model loaded *and* has a + # free slot. Otherwise fall back to the standard algorithm. + if affine_ep: + ep_loaded = { + ep: set(models) + for ep, models in zip(candidate_endpoints, loaded_sets) + } + if (affine_ep in candidate_endpoints + and model in ep_loaded.get(affine_ep, set()) + and tracking_usage(affine_ep) < get_max_connections(affine_ep)): + selected = affine_ep + + if selected is None: + # 3️⃣ Endpoints that have the model loaded *and* a free slot + loaded_and_free = [ + ep for ep, models in zip(candidate_endpoints, loaded_sets) + if model in models and tracking_usage(ep) < get_max_connections(ep) ] - if endpoints_with_free_slot: + if loaded_and_free: if config.priority_routing: - endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999)) - endpoints_with_free_slot.sort(key=utilization_ratio) - selected = endpoints_with_free_slot[0] + # WRR: sort by config order first (stable), then by utilization ratio. + # Stable sort preserves priority for equal-ratio endpoints. + loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999)) + loaded_and_free.sort(key=utilization_ratio) + selected = loaded_and_free[0] else: - # Sort by total endpoint load (ascending) to prefer idle endpoints. - endpoints_with_free_slot.sort( - key=lambda ep: sum(usage_counts.get(ep, {}).values()) - ) - if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot): - selected = random.choice(endpoints_with_free_slot) + # Sort ascending for load balancing — all endpoints here already have the + # model loaded, so there is no model-switching cost to optimise for. + loaded_and_free.sort(key=tracking_usage) + # When all candidates are equally idle, randomise to avoid always picking + # the first entry in a stable sort. + if all(tracking_usage(ep) == 0 for ep in loaded_and_free): + selected = random.choice(loaded_and_free) else: - selected = endpoints_with_free_slot[0] + selected = loaded_and_free[0] else: - # 5️⃣ All candidate endpoints are saturated – pick the least-busy one (will queue) - if config.priority_routing: - selected = min( - candidate_endpoints, - key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)), - ) + # 4️⃣ Endpoints among the candidates that simply have a free slot + endpoints_with_free_slot = [ + ep for ep in candidate_endpoints + if tracking_usage(ep) < get_max_connections(ep) + ] + + if endpoints_with_free_slot: + if config.priority_routing: + endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999)) + endpoints_with_free_slot.sort(key=utilization_ratio) + selected = endpoints_with_free_slot[0] + else: + # Sort by total endpoint load (ascending) to prefer idle endpoints. + endpoints_with_free_slot.sort( + key=lambda ep: sum(usage_counts.get(ep, {}).values()) + ) + if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot): + selected = random.choice(endpoints_with_free_slot) + else: + selected = endpoints_with_free_slot[0] else: - selected = min(candidate_endpoints, key=tracking_usage) + # 5️⃣ All candidate endpoints are saturated – pick the least-busy one (will queue) + if config.priority_routing: + selected = min( + candidate_endpoints, + key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)), + ) + else: + selected = min(candidate_endpoints, key=tracking_usage) tracking_model = get_tracking_model(selected, model) snapshot = None @@ -1875,6 +1958,15 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]: snapshot = _capture_snapshot() if snapshot is not None: await _distribute_snapshot(snapshot) + # Record / refresh affinity *after* releasing usage_lock. + if reserve and config.conversation_affinity and affinity_key: + expires_at = time.monotonic() + config.conversation_affinity_ttl + async with _affinity_lock: + _affinity_map[affinity_key] = (selected, model, expires_at) + if len(_affinity_map) > _AFFINITY_MAX_ENTRIES: + now = time.monotonic() + for k in [k for k, v in _affinity_map.items() if v[2] < now]: + _affinity_map.pop(k, None) return selected, tracking_model # ------------------------------------------------------------- @@ -1925,7 +2017,8 @@ async def proxy(request: Request): yield _cached return StreamingResponse(_serve_cached_generate(), media_type="application/json") - endpoint, tracking_model = await choose_endpoint(model) + _affinity_key = _conversation_fingerprint(model, None, prompt) + endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key) use_openai = is_openai_compatible(endpoint) if use_openai: if ":latest" in model: @@ -2095,7 +2188,8 @@ async def chat_proxy(request: Request): opt = True else: opt = False - endpoint, tracking_model = await choose_endpoint(model) + _affinity_key = _conversation_fingerprint(model, messages, None) + endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key) use_openai = is_openai_compatible(endpoint) if use_openai: if ":latest" in model: @@ -3010,6 +3104,43 @@ async def ps_details_proxy(request: Request): return JSONResponse(content={"models": models}, status_code=200) +# ------------------------------------------------------------- +# 18b. Conversation-affinity stats – feeds the PS-table dot matrix +# ------------------------------------------------------------- +@app.get("/api/affinity_stats") +async def affinity_stats(request: Request): + """ + Aggregate live conversation-affinity pins, one entry per pinned conversation. + Each entry exposes only the endpoint, model, and remaining TTL in seconds — + no fingerprints or content. When conversation_affinity is disabled the + `entries` list is always empty. + """ + if not config.conversation_affinity: + return {"enabled": False, "ttl": config.conversation_affinity_ttl, "entries": []} + + now = time.monotonic() + entries: list[dict] = [] + llama_eps = set(config.llama_server_endpoints) + async with _affinity_lock: + for fp, (ep, mdl, expires_at) in list(_affinity_map.items()): + remaining = expires_at - now + if remaining <= 0: + _affinity_map.pop(fp, None) + continue + # Mirror the normalisation used by /api/ps_details so the dashboard + # can join affinity entries to PS rows by (endpoint, model). + display_model = _normalize_llama_model_name(mdl) if ep in llama_eps else mdl + entries.append({ + "endpoint": ep, + "model": display_model, + "remaining": round(remaining, 2), + }) + return { + "enabled": True, + "ttl": config.conversation_affinity_ttl, + "entries": entries, + } + # ------------------------------------------------------------- # 19. Proxy usage route – for monitoring # ------------------------------------------------------------- @@ -3228,7 +3359,8 @@ async def openai_chat_completions_proxy(request: Request): return StreamingResponse(_serve_cached_ochat_json(), media_type="application/json") # 2. Endpoint logic - endpoint, tracking_model = await choose_endpoint(model) + _affinity_key = _conversation_fingerprint(model, messages, None) + endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key) oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) # 3. Helpers and API call — done in handler scope so try/except works reliably async def _normalize_images_in_messages(msgs: list) -> list: @@ -3538,7 +3670,8 @@ async def openai_completions_proxy(request: Request): return StreamingResponse(_serve_cached_ocompl_json(), media_type="application/json") # 2. Endpoint logic - endpoint, tracking_model = await choose_endpoint(model) + _affinity_key = _conversation_fingerprint(model, None, prompt) + endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key) oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key")) # 3. Async generator that streams completions data and decrements the counter @@ -4028,6 +4161,16 @@ async def startup_event() -> None: @app.on_event("shutdown") async def shutdown_event() -> None: await close_all_sse_queues() + + # Stop background tasks first so they stop touching the DB before we close it. + for t in (token_worker_task, flush_task): + if t is not None: + t.cancel() + try: + await t + except (asyncio.CancelledError, Exception): + pass + await flush_remaining_buffers() await app_state["session"].close() @@ -4047,7 +4190,11 @@ async def shutdown_event() -> None: except Exception as e: print(f"[shutdown] Error closing httpx client {ep}: {e}") - if token_worker_task is not None: - token_worker_task.cancel() - if flush_task is not None: - flush_task.cancel() + # Close the aiosqlite connection last — its worker thread is non-daemon + # and would otherwise keep the interpreter alive after lifespan completes. + if db is not None: + try: + await db.close() + print("[shutdown] Closed token DB connection.") + except Exception as e: + print(f"[shutdown] Error closing DB: {e}") diff --git a/static/index.html b/static/index.html index 419d7bb..b29f22b 100644 --- a/static/index.html +++ b/static/index.html @@ -121,6 +121,45 @@ .ps-subrow + .ps-subrow { margin-top: 2px; } + #ps-table .affinity-col, + #ps-table .affinity-cell { + display: none; + } + #ps-table.affinity-on .affinity-col, + #ps-table.affinity-on .affinity-cell { + display: table-cell; + width: 90px; + text-align: center; + padding-left: 6px; + padding-right: 6px; + } + #ps-table.affinity-on .affinity-dots { + max-width: 78px; + } + .affinity-dots { + display: inline-flex; + flex-wrap: wrap; + gap: 3px; + align-items: center; + line-height: 1; + } + .affinity-dot { + width: 8px; + height: 8px; + border-radius: 50%; + background: #2e7d32; + display: inline-block; + transition: opacity 1s linear; + } + .affinity-overflow { + font-size: 10px; + color: #555; + margin-left: 2px; + } + .affinity-empty { + color: #bbb; + font-size: 11px; + } #ps-table { width: max-content; min-width: 100%; @@ -131,13 +170,13 @@ max-width: 300px; white-space: nowrap; } - /* Optimize narrow columns */ - #ps-table th:nth-child(3), - #ps-table td:nth-child(3), + /* Optimize narrow columns (Params / Quant / Ctx) */ #ps-table th:nth-child(4), #ps-table td:nth-child(4), #ps-table th:nth-child(5), - #ps-table td:nth-child(5) { + #ps-table td:nth-child(5), + #ps-table th:nth-child(6), + #ps-table td:nth-child(6) { width: 80px; text-align: center; } @@ -395,6 +434,7 @@ Model Endpoint + Affinity Params Quant Ctx @@ -406,7 +446,7 @@ - Loading… + Loading… @@ -932,6 +972,14 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { return items.map((item) => `
${item || ""}
`).join(""); }; + const escapeAttr = (s) => String(s).replace(/&/g, "&").replace(/"/g, """).replace(//g, ">"); + const renderAffinitySlots = (endpoints, modelName) => { + if (!endpoints.length) return ""; + return endpoints + .map((ep) => `
`) + .join(""); + }; + body.innerHTML = Array.from(grouped.entries()) .map(([modelName, modelInstances]) => { const existingRow = psRows.get(modelName); @@ -955,6 +1003,7 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { return ` ${modelName} stats ${renderInstanceList(endpoints)} + ${renderAffinitySlots(endpoints, modelName)} ${params} ${quant} ${ctx} @@ -972,11 +1021,83 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { const model = row.dataset.model; if (model) psRows.set(model, row); }); + renderAffinityDots(); } catch (e) { console.error(e); } } + /* ---------- Conversation-affinity dots ---------- */ + const AFFINITY_MAX_DOTS = 12; + let affinityIndex = new Map(); // `${endpoint}|${model}` -> array of {expiresAt} + let affinityTtl = 300; + let affinityEnabled = false; + + async function loadAffinity() { + try { + const data = await fetchJSON("/api/affinity_stats"); + affinityEnabled = !!data.enabled; + affinityTtl = Number(data.ttl) || 300; + const now = Date.now() / 1000; + const idx = new Map(); + for (const e of data.entries || []) { + const key = `${e.endpoint}|${e.model}`; + if (!idx.has(key)) idx.set(key, []); + idx.get(key).push({ expiresAt: now + Number(e.remaining) }); + } + affinityIndex = idx; + applyAffinityColumnVisibility(); + renderAffinityDots(); + } catch (err) { + // Endpoint may 404 on older deployments — silently degrade. + affinityEnabled = false; + affinityIndex = new Map(); + applyAffinityColumnVisibility(); + renderAffinityDots(); + } + } + + function applyAffinityColumnVisibility() { + const table = document.getElementById("ps-table"); + if (!table) return; + table.classList.toggle("affinity-on", affinityEnabled); + } + + function renderAffinityDots() { + const spans = document.querySelectorAll(".affinity-dots"); + if (!spans.length) return; + const now = Date.now() / 1000; + spans.forEach((span) => { + const ep = span.dataset.endpoint; + const mdl = span.dataset.model; + const key = `${ep}|${mdl}`; + const pins = (affinityIndex.get(key) || []).filter((p) => p.expiresAt > now); + if (pins.length !== (affinityIndex.get(key) || []).length) { + if (pins.length) affinityIndex.set(key, pins); + else affinityIndex.delete(key); + } + if (!pins.length) { + span.innerHTML = affinityEnabled + ? `` + : ""; + return; + } + // Sort freshest first so visible dots are the most "recent". + pins.sort((a, b) => b.expiresAt - a.expiresAt); + const visible = pins.slice(0, AFFINITY_MAX_DOTS); + const overflow = pins.length - visible.length; + const dotsHtml = visible + .map((p) => { + const remaining = Math.max(0, p.expiresAt - now); + const opacity = Math.max(0.15, Math.min(1, remaining / affinityTtl)); + const secs = Math.round(remaining); + return ``; + }) + .join(""); + span.innerHTML = dotsHtml + (overflow > 0 ? `+${overflow}` : ""); + }); + } + /* ---------- Usage Chart (stacked‑percentage) ---------- */ function getColor(seed) { const h = Math.abs(hashString(seed) % 360); @@ -1173,10 +1294,13 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { loadEndpoints(); loadTags(); loadPS(); + loadAffinity(); loadUsage(); initHeaderChart(); setInterval(tickTpsChart, 1000); setInterval(loadPS, 60_000); + setInterval(loadAffinity, 15_000); + setInterval(renderAffinityDots, 2_000); setInterval(loadEndpoints, 300_000); /* show logic */