Merge pull request 'dev-0.9.x -> main' (#76) from dev-0.9.x into main
Some checks failed
Build and Publish Docker Image / build (arm64, linux/arm64, docker-arm64) (push) Waiting to run
Build and Publish Docker Image / merge (push) Blocked by required conditions
Build and Publish Docker Image (Semantic Cache) / build (amd64, linux/amd64, docker-amd64) (push) Failing after 3m55s
Build and Publish Docker Image / build (amd64, linux/amd64, docker-amd64) (push) Successful in 1m22s
Build and Publish Docker Image (Semantic Cache) / build (arm64, linux/arm64, docker-arm64) (push) Successful in 14m44s
Build and Publish Docker Image (Semantic Cache) / merge (push) Has been skipped

Reviewed-on: https://bitfreedom.net/code/code/nomyo-ai/nomyo-router/pulls/76
This commit is contained in:
Alpha Nerd 2026-05-15 09:16:09 +02:00
commit 648b016629
8 changed files with 611 additions and 84 deletions

View file

@ -0,0 +1,32 @@
name: NYX Security Scan
on:
pull_request:
branches: [main]
jobs:
nyx-scan:
runs-on: docker-amd64
steps:
- name: Checkout PR
run: |
git clone --depth=1 \
"https://oauth2:${{ github.token }}@bitfreedom.net/code/${{ github.repository }}.git" \
.
git fetch --depth=1 origin ${{ github.sha }}
git checkout ${{ github.sha }}
- name: Fetch action source
run: |
git clone --depth=1 --branch master \
"https://oauth2:${{ github.token }}@bitfreedom.net/code/nomyo-ai/actions.git" \
./.nyx-action
- uses: ./.nyx-action/nyx-scan
with:
forgejo_push_token: ${{ secrets.FORGEJO_PUSH_TOKEN }}
repository: ${{ github.repository }}
pr_number: ${{ github.event.pull_request.number }}
sha: ${{ github.sha }}
fail_on: HIGH

View file

@ -0,0 +1,62 @@
name: opencode
on:
issue_comment:
types: [created]
pull_request_review_comment:
types: [created]
pull_request_review:
types: [submitted]
jobs:
opencode:
if: |
contains(github.event.comment.body, '/oc') ||
contains(github.event.comment.body, '/opencode')
runs-on: docker-amd64
container:
image: node:lts-bookworm
permissions:
id-token: write
contents: write
pull-requests: write
issues: write
steps:
- name: Install git, curl and Docker
run: |
apt-get update -qq
apt-get install -y -qq git curl unzip docker.io
- name: Start Docker daemon
run: |
dockerd --host=unix:///var/run/docker.sock --iptables=false --dns=8.8.8.8 --dns=8.8.4.4 > /tmp/dockerd.log 2>&1 &
for i in $(seq 1 30); do
sleep 2
docker info > /dev/null 2>&1 && echo "Docker daemon ready" && exit 0
echo "Waiting for Docker daemon... ($i/30)"
done
echo "=== dockerd failed to start, logs: ==="
cat /tmp/dockerd.log
exit 1
- name: Checkout repository
run: |
git clone --depth=1 --branch "${{ github.ref_name }}" \
"https://oauth2:${{ github.token }}@bitfreedom.net/code/${{ github.repository }}.git" \
.
- name: Fetch action source
run: |
git clone --depth=1 --branch v1 \
"https://oauth2:${{ github.token }}@bitfreedom.net/code/nomyo-ai/actions.git" \
./.opencode-action
- name: Run opencode
uses: ./.opencode-action
with:
nomyo_api_key: ${{ secrets.NOMYO_API_KEY }}
model: nomyo/unsloth/Qwen3.6-35B-A3B-GGUF:UD-Q4_K_M
forgejo_api_url: https://bitfreedom.net/code/
forgejo_token: ${{ secrets.FORGEJO_TOKEN }}
forgejo_push_token: ${{ secrets.FORGEJO_PUSH_TOKEN }}

24
.nyx/triage.json Normal file
View file

@ -0,0 +1,24 @@
{
"version": 1,
"decisions": [],
"suppression_rules": [
{
"by": "rule",
"value": "py.auth.token_override_without_validation",
"state": "suppressed",
"note": "false_positive: token validation handled upstream by middleware"
},
{
"by": "rule",
"value": "state-resource-leak",
"state": "suppressed",
"note": "false_positive: resource lifecycle managed externally"
},
{
"by": "rule",
"value": "py.crypto.sha1",
"state": "suppressed",
"note": "accepted_risk: used for non-security checksum only"
}
]
}

View file

@ -26,6 +26,26 @@ max_concurrent_connections: 2
# When false (default), equally-idle endpoints are chosen at random.
# priority_routing: true
# Conversation affinity (optional, default: false).
# Pins a conversation to the endpoint that served its first turn so the
# llama.cpp / Ollama prompt cache (KV cache) stays warm — first turn pays
# the cold prefill, every follow-up turn reuses the same prefix.
#
# Fingerprint = sha1(model + leading system messages + first user turn).
# Same chat → same fingerprint on every follow-up turn → same pin, TTL
# refreshed on each reuse. Soft preference: if the pinned endpoint no
# longer has the model loaded or has no free slot, the standard algorithm
# takes over (no failure, just a cache miss).
#
# Heads-up: most chat UIs (Open WebUI, LibreChat, …) fire side requests for
# title / tag / follow-up generation. Those have their own first turn and
# therefore their own pin, so a single visible "chat" may show several dots
# in the dashboard's Affinity column. That is correct — each pin matches a
# real warm KV prefix on the backend. See doc/configuration.md for details.
conversation_affinity: true
conversation_affinity_ttl: 300 # seconds of inactivity before a pin expires;
# bumped on every reuse. Matches Ollama's default keep_alive.
# Optional router-level API key that gates router/API/web UI access (leave empty to disable)
nomyo-router-api-key: ""

View file

@ -166,6 +166,91 @@ With this config the primary handles up to 4 concurrent requests before the seco
---
### `conversation_affinity`
**Type**: `bool` (optional)
**Default**: `false`
**Companion setting**: [`conversation_affinity_ttl`](#conversation_affinity_ttl)
**Description**: When enabled, the router prefers to send follow-up requests of the same conversation back to the endpoint that already served the first turn. This keeps the backend's prompt cache (the llama.cpp / Ollama **KV cache**) warm: the first user turn pays the cold prefill cost, every later turn reuses the same prefix and only generates new tokens. It is a **soft preference** — when the previously-chosen endpoint is no longer eligible (model unloaded, no free slot), the router falls back to the standard selection algorithm (`priority_routing` or random).
#### How a conversation is identified
The router does **not** track session IDs or auth tokens. It computes a stable fingerprint per request from:
```
SHA1( model
+ every leading message with role="system"
+ the first message with role="user" )
```
Anything after the first user turn is ignored — those later messages extend the same KV prefix, so they don't change the cache identity.
**What this means in practice**
| You send… | Fingerprint behaves like… |
|---|---|
| Turn 2 of the same chat (history grows but first system+user are unchanged) | **Same** as turn 1 → pin is reused and TTL refreshed |
| Turn 1 of a fresh chat | **New** fingerprint → new pin |
| Same first user prompt but a different model | **New** fingerprint (model is part of the hash) |
| Same chat but the client mutates the system prompt between turns (e.g. injects a fresh timestamp) | **New** fingerprint — the affinity will not stick |
#### TTL and refresh
Every time `choose_endpoint` returns a pinned endpoint, the entry's expiry is bumped to `now + conversation_affinity_ttl`. An idle conversation drops out of the map once that window elapses without traffic. Default 300 s matches Ollama's default `keep_alive` — once the backend has unloaded the model, the KV cache is gone too, so a stale pin would be pointless anyway.
#### Why the dashboard may show more than one dot per visible conversation
The fingerprint is computed per **HTTP request**, not per chat-window. Most chat UIs (Open WebUI in particular) fire several **auxiliary** requests alongside the real conversation:
- *Title generation* — synthetic system prompt + the user message as content
- *Follow-up question suggestion* — synthetic system prompt + the conversation as content
- *Tag generation*, *memory extraction*, *retrieval query rewriting*, etc.
Each of those has its own `(system + first user turn)` and therefore its own fingerprint and its own pin in [the affinity dot matrix](monitoring.md#affinity-stats-conversation-affinity). They all *correctly* refer to a real warm KV-cache prefix on the backend, so the routing they drive is right — they just don't visually map 1:1 to a user-perceived "conversation."
#### Example
```yaml
endpoints:
- http://gpu-primary:11434
- http://gpu-secondary:11434
conversation_affinity: true
conversation_affinity_ttl: 300
```
With this configuration, a chat that starts on `gpu-primary` will keep returning to `gpu-primary` for follow-up turns as long as the model is still loaded there and a slot is free, even if `gpu-secondary` happens to be more idle at that moment. Cold-prefill cost is paid once instead of once per turn.
#### When to enable
- ✅ Interactive chat workloads with long histories — the prefill savings on every follow-up turn are substantial.
- ✅ Multi-endpoint deployments where models are loaded on more than one node.
- ❌ Pure one-shot / single-turn workloads (no KV-cache to keep warm).
- ❌ When you specifically want strict load-balancing parity — affinity intentionally biases against perfect balance.
---
### `conversation_affinity_ttl`
**Type**: `int` (seconds, optional)
**Default**: `300`
**Description**: How long a conversation stays pinned to its endpoint after the last request that touched it. Refreshed on every reuse — so an actively-used conversation keeps its pin indefinitely; an abandoned one expires after `conversation_affinity_ttl` seconds of silence.
**Recommendation**: leave this aligned with the backend's `keep_alive` window. If the model is unloaded by the backend, the KV cache is gone and there is no benefit to keeping the pin.
**Example**:
```yaml
conversation_affinity: true
conversation_affinity_ttl: 600 # half an hour of inactivity before un-pinning
```
---
### `router_api_key`
**Type**: `str` (optional)

View file

@ -166,6 +166,39 @@ curl -X POST http://localhost:12434/api/cache/invalidate
Clears all cached entries and resets hit/miss counters.
### Affinity Stats (Conversation Affinity)
```bash
curl http://localhost:12434/api/affinity_stats
```
Response when [`conversation_affinity`](configuration.md#conversation_affinity) is enabled:
```json
{
"enabled": true,
"ttl": 300,
"entries": [
{ "endpoint": "http://gpu-primary:11434", "model": "llama3.2:latest", "remaining": 287.4 },
{ "endpoint": "http://gpu-primary:11434", "model": "llama3.2:latest", "remaining": 113.0 },
{ "endpoint": "http://gpu-secondary:11434", "model": "qwen2.5-coder:7b", "remaining": 44.8 }
]
}
```
Response when the feature is disabled:
```json
{ "enabled": false, "ttl": 300, "entries": [] }
```
- One element per **live pinned conversation** (no fingerprints or content — just the endpoint/model the pin points to and how many seconds it has left before expiry).
- Aggregation by `(endpoint, model)` is left to the consumer: the dashboard does this client-side.
- The endpoint is gated by the same `nomyo-router-api-key` middleware as the rest of `/api/*`.
The dashboard's **Running Models (PS) → Affinity** column is rendered from this data. The column auto-hides when `enabled: false`. Each row shows one dot per live pin against that `(endpoint, model)` pair; dot opacity = `remaining / ttl` (floor 0.15), so freshly-routed pins are solid and pins close to expiry fade out. A `+N` overflow badge appears once a single (endpoint, model) holds more than 12 active pins; an em-dash (`—`) marks an `(endpoint, model)` with no live pins.
> Multiple dots for what looks like "one chat window" is normal — most chat UIs (Open WebUI, LibreChat, …) fire auxiliary requests (title generation, follow-up suggestions, tag extraction) that have their own first-turn fingerprint and therefore their own pin. See [Conversation Affinity → Why the dashboard may show more than one dot per visible conversation](configuration.md#conversation_affinity) for the details.
### Real-time Usage Stream
```bash

305
router.py
View file

@ -2,11 +2,11 @@
title: NOMYO Router - an (O)llama and OpenAI API v1 Proxy with Endpoint:Model aware routing
author: alpha-nerd-nomyo
author_url: https://github.com/nomyo-ai
version: 0.7
version: 0.9
license: AGPL
"""
# -------------------------------------------------------------
import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, random, base64, io, enhance, secrets, math, socket, httpx
import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, random, base64, io, enhance, secrets, math, socket, httpx, hashlib
try:
import truststore; truststore.inject_into_ssl()
except ImportError:
@ -223,6 +223,15 @@ class Config(BaseSettings):
# When True, config order = priority; routes by utilization ratio + config index (WRR)
priority_routing: bool = Field(default=False)
# Conversation affinity: route the same conversation back to the endpoint that
# previously served it, to keep the llama.cpp / Ollama prompt cache (KV cache) warm.
# Soft preference — falls back to the standard algorithm when the affine endpoint
# is saturated or no longer has the model loaded.
conversation_affinity: bool = Field(default=False)
# TTL (seconds) for affinity entries. Defaults to Ollama's default keep_alive (5 min):
# if the backend has already evicted the model, the KV cache is cold anyway.
conversation_affinity_ttl: int = Field(default=300)
api_keys: Dict[str, str] = Field(default_factory=dict)
# Optional router-level API key used to gate access to this service and dashboard
router_api_key: Optional[str] = Field(default=None, env="NOMYO_ROUTER_API_KEY")
@ -247,9 +256,8 @@ class Config(BaseSettings):
cache_history_weight: float = Field(default=0.3)
class Config:
# Load from `config.yaml` first, then from env variables
# YAML loading is handled manually via Config.from_yaml(); env vars use this prefix.
env_prefix = "NOMYO_ROUTER_"
yaml_file = Path("config.yaml") # relative to cwd
@classmethod
def _expand_env_refs(cls, obj):
@ -436,6 +444,47 @@ token_usage_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(
usage_lock = asyncio.Lock() # protects access to usage_counts
token_usage_lock = asyncio.Lock()
# Conversation affinity map: fingerprint -> (endpoint, model, expires_at_monotonic).
# Keeps the same conversation pinned to the endpoint that already has its
# KV-cache prefix warm. Model is stored so the dashboard can aggregate live
# entries per (endpoint, model) without recomputing fingerprints.
# Never held together with usage_lock.
_affinity_map: Dict[str, tuple[str, str, float]] = {}
_affinity_lock = asyncio.Lock()
_AFFINITY_MAX_ENTRIES = 10000
def _conversation_fingerprint(model: str, messages: Optional[list],
prompt: Optional[str]) -> Optional[str]:
"""
Stable hash over (model, first system + first user turn). That prefix
determines whether the backend's prompt cache is reusable; later turns
don't influence the routing decision because they extend the same prefix.
Returns None when there is no usable prefix.
"""
parts: list[str] = [model or "_"]
if messages:
for m in messages:
role = m.get("role") if isinstance(m, dict) else None
if role not in ("system", "user"):
continue
content = m.get("content")
if isinstance(content, list): # OpenAI multimodal parts
content = "".join(
p.get("text", "") for p in content
if isinstance(p, dict) and p.get("type") == "text"
)
if not isinstance(content, str):
continue
parts.append(f"{role}:{content}")
if role == "user":
break
elif prompt:
parts.append(f"user:{prompt}")
else:
return None
return hashlib.sha1("\x1f".join(parts).encode("utf-8", "replace")).hexdigest()
# Database instance
db: "TokenDatabase" = None
@ -1369,30 +1418,30 @@ def resize_image_if_needed(image_data):
pass
# Decode the base64 image data
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
if image.mode not in ("RGB", "L"):
image = image.convert("RGB")
with Image.open(io.BytesIO(image_bytes)) as image:
if image.mode not in ("RGB", "L"):
image = image.convert("RGB")
# Get current size
width, height = image.size
# Get current size
width, height = image.size
# Calculate the new dimensions while maintaining aspect ratio
if width > 512 or height > 512:
aspect_ratio = width / height
if aspect_ratio > 1: # Width is larger
new_width = 512
new_height = int(512 / aspect_ratio)
else: # Height is larger
new_height = 512
new_width = int(512 * aspect_ratio)
# Calculate the new dimensions while maintaining aspect ratio
if width > 512 or height > 512:
aspect_ratio = width / height
if aspect_ratio > 1: # Width is larger
new_width = 512
new_height = int(512 / aspect_ratio)
else: # Height is larger
new_height = 512
new_width = int(512 * aspect_ratio)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Encode the resized image back to base64
buffered = io.BytesIO()
image.save(buffered, format="PNG")
resized_image_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
return resized_image_data
# Encode the resized image back to base64
buffered = io.BytesIO()
image.save(buffered, format="PNG")
resized_image_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
return resized_image_data
except Exception as e:
print(f"Error processing image: {e}")
@ -1738,7 +1787,8 @@ def get_max_connections(ep: str) -> int:
"max_concurrent_connections", config.max_concurrent_connections
)
async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]:
async def choose_endpoint(model: str, reserve: bool = True,
affinity_key: Optional[str] = None) -> tuple[str, str]:
"""
Determine which endpoint to use for the given model while respecting
the `max_concurrent_connections` per endpointmodel pair **and**
@ -1748,10 +1798,14 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]:
1 Query every endpoint for its advertised models (`/api/tags`).
2 Build a list of endpoints that contain the requested model.
2.5 If conversation affinity is enabled and the caller passes
``affinity_key``, prefer the endpoint that previously served the
same conversation but only when it still has the model loaded
and a free slot. Otherwise fall through to the standard logic.
3 For those endpoints, find those that have the model loaded
(`/api/ps`) *and* still have a free slot.
4 If none are both loaded and free, fall back to any endpoint
from the filtered list that simply has a free slot and randomly
from the filtered list that simply has a free slot and randomly
select one.
5 If all are saturated, pick any endpoint from the filtered list
(the request will queue on that endpoint).
@ -1799,6 +1853,19 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]:
load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints]
loaded_sets = await asyncio.gather(*load_tasks)
# Look up a possible affinity hint *before* taking usage_lock. The two
# locks are never held together to avoid lock-ordering issues.
affine_ep: Optional[str] = None
if config.conversation_affinity and affinity_key:
async with _affinity_lock:
entry = _affinity_map.get(affinity_key)
if entry is not None:
ep, _stored_model, expires_at = entry
if expires_at < time.monotonic():
_affinity_map.pop(affinity_key, None)
else:
affine_ep = ep
# Protect all reads/writes of usage_counts with the lock so that selection
# and reservation are atomic — concurrent callers see each other's pending load.
async with usage_lock:
@ -1814,59 +1881,75 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]:
# Priority map: position in all_endpoints list (lower = higher priority)
ep_priority = {ep: i for i, ep in enumerate(all_endpoints)}
# 3⃣ Endpoints that have the model loaded *and* a free slot
loaded_and_free = [
ep for ep, models in zip(candidate_endpoints, loaded_sets)
if model in models and tracking_usage(ep) < get_max_connections(ep)
]
selected: Optional[str] = None
if loaded_and_free:
if config.priority_routing:
# WRR: sort by config order first (stable), then by utilization ratio.
# Stable sort preserves priority for equal-ratio endpoints.
loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999))
loaded_and_free.sort(key=utilization_ratio)
selected = loaded_and_free[0]
else:
# Sort ascending for load balancing — all endpoints here already have the
# model loaded, so there is no model-switching cost to optimise for.
loaded_and_free.sort(key=tracking_usage)
# When all candidates are equally idle, randomise to avoid always picking
# the first entry in a stable sort.
if all(tracking_usage(ep) == 0 for ep in loaded_and_free):
selected = random.choice(loaded_and_free)
else:
selected = loaded_and_free[0]
else:
# 4⃣ Endpoints among the candidates that simply have a free slot
endpoints_with_free_slot = [
ep for ep in candidate_endpoints
if tracking_usage(ep) < get_max_connections(ep)
# 2⃣.5 Conversation affinity preference — only honour the hint when
# the affine endpoint still advertises the model loaded *and* has a
# free slot. Otherwise fall back to the standard algorithm.
if affine_ep:
ep_loaded = {
ep: set(models)
for ep, models in zip(candidate_endpoints, loaded_sets)
}
if (affine_ep in candidate_endpoints
and model in ep_loaded.get(affine_ep, set())
and tracking_usage(affine_ep) < get_max_connections(affine_ep)):
selected = affine_ep
if selected is None:
# 3⃣ Endpoints that have the model loaded *and* a free slot
loaded_and_free = [
ep for ep, models in zip(candidate_endpoints, loaded_sets)
if model in models and tracking_usage(ep) < get_max_connections(ep)
]
if endpoints_with_free_slot:
if loaded_and_free:
if config.priority_routing:
endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999))
endpoints_with_free_slot.sort(key=utilization_ratio)
selected = endpoints_with_free_slot[0]
# WRR: sort by config order first (stable), then by utilization ratio.
# Stable sort preserves priority for equal-ratio endpoints.
loaded_and_free.sort(key=lambda ep: ep_priority.get(ep, 999))
loaded_and_free.sort(key=utilization_ratio)
selected = loaded_and_free[0]
else:
# Sort by total endpoint load (ascending) to prefer idle endpoints.
endpoints_with_free_slot.sort(
key=lambda ep: sum(usage_counts.get(ep, {}).values())
)
if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot):
selected = random.choice(endpoints_with_free_slot)
# Sort ascending for load balancing — all endpoints here already have the
# model loaded, so there is no model-switching cost to optimise for.
loaded_and_free.sort(key=tracking_usage)
# When all candidates are equally idle, randomise to avoid always picking
# the first entry in a stable sort.
if all(tracking_usage(ep) == 0 for ep in loaded_and_free):
selected = random.choice(loaded_and_free)
else:
selected = endpoints_with_free_slot[0]
selected = loaded_and_free[0]
else:
# 5⃣ All candidate endpoints are saturated pick the least-busy one (will queue)
if config.priority_routing:
selected = min(
candidate_endpoints,
key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)),
)
# 4⃣ Endpoints among the candidates that simply have a free slot
endpoints_with_free_slot = [
ep for ep in candidate_endpoints
if tracking_usage(ep) < get_max_connections(ep)
]
if endpoints_with_free_slot:
if config.priority_routing:
endpoints_with_free_slot.sort(key=lambda ep: ep_priority.get(ep, 999))
endpoints_with_free_slot.sort(key=utilization_ratio)
selected = endpoints_with_free_slot[0]
else:
# Sort by total endpoint load (ascending) to prefer idle endpoints.
endpoints_with_free_slot.sort(
key=lambda ep: sum(usage_counts.get(ep, {}).values())
)
if all(tracking_usage(ep) == 0 for ep in endpoints_with_free_slot):
selected = random.choice(endpoints_with_free_slot)
else:
selected = endpoints_with_free_slot[0]
else:
selected = min(candidate_endpoints, key=tracking_usage)
# 5⃣ All candidate endpoints are saturated pick the least-busy one (will queue)
if config.priority_routing:
selected = min(
candidate_endpoints,
key=lambda ep: (utilization_ratio(ep), ep_priority.get(ep, 999)),
)
else:
selected = min(candidate_endpoints, key=tracking_usage)
tracking_model = get_tracking_model(selected, model)
snapshot = None
@ -1875,6 +1958,15 @@ async def choose_endpoint(model: str, reserve: bool = True) -> tuple[str, str]:
snapshot = _capture_snapshot()
if snapshot is not None:
await _distribute_snapshot(snapshot)
# Record / refresh affinity *after* releasing usage_lock.
if reserve and config.conversation_affinity and affinity_key:
expires_at = time.monotonic() + config.conversation_affinity_ttl
async with _affinity_lock:
_affinity_map[affinity_key] = (selected, model, expires_at)
if len(_affinity_map) > _AFFINITY_MAX_ENTRIES:
now = time.monotonic()
for k in [k for k, v in _affinity_map.items() if v[2] < now]:
_affinity_map.pop(k, None)
return selected, tracking_model
# -------------------------------------------------------------
@ -1925,7 +2017,8 @@ async def proxy(request: Request):
yield _cached
return StreamingResponse(_serve_cached_generate(), media_type="application/json")
endpoint, tracking_model = await choose_endpoint(model)
_affinity_key = _conversation_fingerprint(model, None, prompt)
endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key)
use_openai = is_openai_compatible(endpoint)
if use_openai:
if ":latest" in model:
@ -2095,7 +2188,8 @@ async def chat_proxy(request: Request):
opt = True
else:
opt = False
endpoint, tracking_model = await choose_endpoint(model)
_affinity_key = _conversation_fingerprint(model, messages, None)
endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key)
use_openai = is_openai_compatible(endpoint)
if use_openai:
if ":latest" in model:
@ -3010,6 +3104,43 @@ async def ps_details_proxy(request: Request):
return JSONResponse(content={"models": models}, status_code=200)
# -------------------------------------------------------------
# 18b. Conversation-affinity stats feeds the PS-table dot matrix
# -------------------------------------------------------------
@app.get("/api/affinity_stats")
async def affinity_stats(request: Request):
"""
Aggregate live conversation-affinity pins, one entry per pinned conversation.
Each entry exposes only the endpoint, model, and remaining TTL in seconds
no fingerprints or content. When conversation_affinity is disabled the
`entries` list is always empty.
"""
if not config.conversation_affinity:
return {"enabled": False, "ttl": config.conversation_affinity_ttl, "entries": []}
now = time.monotonic()
entries: list[dict] = []
llama_eps = set(config.llama_server_endpoints)
async with _affinity_lock:
for fp, (ep, mdl, expires_at) in list(_affinity_map.items()):
remaining = expires_at - now
if remaining <= 0:
_affinity_map.pop(fp, None)
continue
# Mirror the normalisation used by /api/ps_details so the dashboard
# can join affinity entries to PS rows by (endpoint, model).
display_model = _normalize_llama_model_name(mdl) if ep in llama_eps else mdl
entries.append({
"endpoint": ep,
"model": display_model,
"remaining": round(remaining, 2),
})
return {
"enabled": True,
"ttl": config.conversation_affinity_ttl,
"entries": entries,
}
# -------------------------------------------------------------
# 19. Proxy usage route for monitoring
# -------------------------------------------------------------
@ -3228,7 +3359,8 @@ async def openai_chat_completions_proxy(request: Request):
return StreamingResponse(_serve_cached_ochat_json(), media_type="application/json")
# 2. Endpoint logic
endpoint, tracking_model = await choose_endpoint(model)
_affinity_key = _conversation_fingerprint(model, messages, None)
endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key)
oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
# 3. Helpers and API call — done in handler scope so try/except works reliably
async def _normalize_images_in_messages(msgs: list) -> list:
@ -3538,7 +3670,8 @@ async def openai_completions_proxy(request: Request):
return StreamingResponse(_serve_cached_ocompl_json(), media_type="application/json")
# 2. Endpoint logic
endpoint, tracking_model = await choose_endpoint(model)
_affinity_key = _conversation_fingerprint(model, None, prompt)
endpoint, tracking_model = await choose_endpoint(model, affinity_key=_affinity_key)
oclient = _make_openai_client(endpoint, default_headers=default_headers, api_key=config.api_keys.get(endpoint, "no-key"))
# 3. Async generator that streams completions data and decrements the counter
@ -4028,6 +4161,16 @@ async def startup_event() -> None:
@app.on_event("shutdown")
async def shutdown_event() -> None:
await close_all_sse_queues()
# Stop background tasks first so they stop touching the DB before we close it.
for t in (token_worker_task, flush_task):
if t is not None:
t.cancel()
try:
await t
except (asyncio.CancelledError, Exception):
pass
await flush_remaining_buffers()
await app_state["session"].close()
@ -4047,7 +4190,11 @@ async def shutdown_event() -> None:
except Exception as e:
print(f"[shutdown] Error closing httpx client {ep}: {e}")
if token_worker_task is not None:
token_worker_task.cancel()
if flush_task is not None:
flush_task.cancel()
# Close the aiosqlite connection last — its worker thread is non-daemon
# and would otherwise keep the interpreter alive after lifespan completes.
if db is not None:
try:
await db.close()
print("[shutdown] Closed token DB connection.")
except Exception as e:
print(f"[shutdown] Error closing DB: {e}")

View file

@ -121,6 +121,45 @@
.ps-subrow + .ps-subrow {
margin-top: 2px;
}
#ps-table .affinity-col,
#ps-table .affinity-cell {
display: none;
}
#ps-table.affinity-on .affinity-col,
#ps-table.affinity-on .affinity-cell {
display: table-cell;
width: 90px;
text-align: center;
padding-left: 6px;
padding-right: 6px;
}
#ps-table.affinity-on .affinity-dots {
max-width: 78px;
}
.affinity-dots {
display: inline-flex;
flex-wrap: wrap;
gap: 3px;
align-items: center;
line-height: 1;
}
.affinity-dot {
width: 8px;
height: 8px;
border-radius: 50%;
background: #2e7d32;
display: inline-block;
transition: opacity 1s linear;
}
.affinity-overflow {
font-size: 10px;
color: #555;
margin-left: 2px;
}
.affinity-empty {
color: #bbb;
font-size: 11px;
}
#ps-table {
width: max-content;
min-width: 100%;
@ -131,13 +170,13 @@
max-width: 300px;
white-space: nowrap;
}
/* Optimize narrow columns */
#ps-table th:nth-child(3),
#ps-table td:nth-child(3),
/* Optimize narrow columns (Params / Quant / Ctx) */
#ps-table th:nth-child(4),
#ps-table td:nth-child(4),
#ps-table th:nth-child(5),
#ps-table td:nth-child(5) {
#ps-table td:nth-child(5),
#ps-table th:nth-child(6),
#ps-table td:nth-child(6) {
width: 80px;
text-align: center;
}
@ -395,6 +434,7 @@
<tr>
<th class="model-col">Model</th>
<th>Endpoint</th>
<th class="affinity-col" title="Live conversation-affinity pins (KV-cache warm). One dot per pinned conversation; opacity fades toward TTL expiry.">Affinity</th>
<th>Params</th>
<th>Quant</th>
<th>Ctx</th>
@ -406,7 +446,7 @@
</thead>
<tbody id="ps-body">
<tr>
<td colspan="6" class="loading">Loading…</td>
<td colspan="10" class="loading">Loading…</td>
</tr>
</tbody>
</table>
@ -932,6 +972,14 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
return items.map((item) => `<div class="ps-subrow">${item || ""}</div>`).join("");
};
const escapeAttr = (s) => String(s).replace(/&/g, "&amp;").replace(/"/g, "&quot;").replace(/</g, "&lt;").replace(/>/g, "&gt;");
const renderAffinitySlots = (endpoints, modelName) => {
if (!endpoints.length) return "";
return endpoints
.map((ep) => `<div class="ps-subrow"><span class="affinity-dots" data-endpoint="${escapeAttr(ep)}" data-model="${escapeAttr(modelName)}"></span></div>`)
.join("");
};
body.innerHTML = Array.from(grouped.entries())
.map(([modelName, modelInstances]) => {
const existingRow = psRows.get(modelName);
@ -955,6 +1003,7 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
return `<tr data-model="${modelName}" data-endpoints="${endpointsData}">
<td class="model"><span style="color:${getColor(modelName)}">${modelName}</span> <a href="#" class="stats-link" data-model="${modelName}">stats</a></td>
<td>${renderInstanceList(endpoints)}</td>
<td class="affinity-cell">${renderAffinitySlots(endpoints, modelName)}</td>
<td>${params}</td>
<td>${quant}</td>
<td>${ctx}</td>
@ -972,11 +1021,83 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
const model = row.dataset.model;
if (model) psRows.set(model, row);
});
renderAffinityDots();
} catch (e) {
console.error(e);
}
}
/* ---------- Conversation-affinity dots ---------- */
const AFFINITY_MAX_DOTS = 12;
let affinityIndex = new Map(); // `${endpoint}|${model}` -> array of {expiresAt}
let affinityTtl = 300;
let affinityEnabled = false;
async function loadAffinity() {
try {
const data = await fetchJSON("/api/affinity_stats");
affinityEnabled = !!data.enabled;
affinityTtl = Number(data.ttl) || 300;
const now = Date.now() / 1000;
const idx = new Map();
for (const e of data.entries || []) {
const key = `${e.endpoint}|${e.model}`;
if (!idx.has(key)) idx.set(key, []);
idx.get(key).push({ expiresAt: now + Number(e.remaining) });
}
affinityIndex = idx;
applyAffinityColumnVisibility();
renderAffinityDots();
} catch (err) {
// Endpoint may 404 on older deployments — silently degrade.
affinityEnabled = false;
affinityIndex = new Map();
applyAffinityColumnVisibility();
renderAffinityDots();
}
}
function applyAffinityColumnVisibility() {
const table = document.getElementById("ps-table");
if (!table) return;
table.classList.toggle("affinity-on", affinityEnabled);
}
function renderAffinityDots() {
const spans = document.querySelectorAll(".affinity-dots");
if (!spans.length) return;
const now = Date.now() / 1000;
spans.forEach((span) => {
const ep = span.dataset.endpoint;
const mdl = span.dataset.model;
const key = `${ep}|${mdl}`;
const pins = (affinityIndex.get(key) || []).filter((p) => p.expiresAt > now);
if (pins.length !== (affinityIndex.get(key) || []).length) {
if (pins.length) affinityIndex.set(key, pins);
else affinityIndex.delete(key);
}
if (!pins.length) {
span.innerHTML = affinityEnabled
? `<span class="affinity-empty"></span>`
: "";
return;
}
// Sort freshest first so visible dots are the most "recent".
pins.sort((a, b) => b.expiresAt - a.expiresAt);
const visible = pins.slice(0, AFFINITY_MAX_DOTS);
const overflow = pins.length - visible.length;
const dotsHtml = visible
.map((p) => {
const remaining = Math.max(0, p.expiresAt - now);
const opacity = Math.max(0.15, Math.min(1, remaining / affinityTtl));
const secs = Math.round(remaining);
return `<span class="affinity-dot" style="opacity:${opacity.toFixed(2)}" title="pin expires in ${secs}s"></span>`;
})
.join("");
span.innerHTML = dotsHtml + (overflow > 0 ? `<span class="affinity-overflow">+${overflow}</span>` : "");
});
}
/* ---------- Usage Chart (stackedpercentage) ---------- */
function getColor(seed) {
const h = Math.abs(hashString(seed) % 360);
@ -1173,10 +1294,13 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
loadEndpoints();
loadTags();
loadPS();
loadAffinity();
loadUsage();
initHeaderChart();
setInterval(tickTpsChart, 1000);
setInterval(loadPS, 60_000);
setInterval(loadAffinity, 15_000);
setInterval(renderAffinityDots, 2_000);
setInterval(loadEndpoints, 300_000);
/* show logic */