Compare commits
5 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a3928c9c33 | |||
| 1a2781ac23 | |||
| a3e7e8a007 | |||
| 5ac412eb5c | |||
| 537b757c4a |
4 changed files with 53 additions and 9 deletions
|
|
@ -26,8 +26,8 @@ RUN pip install --root-user-action=ignore --no-cache-dir --upgrade pip \
|
||||||
# CPU-only torch must be installed before sentence-transformers to avoid
|
# CPU-only torch must be installed before sentence-transformers to avoid
|
||||||
# pulling the full CUDA-enabled build (~2.5 GB).
|
# pulling the full CUDA-enabled build (~2.5 GB).
|
||||||
RUN if [ "$SEMANTIC_CACHE" = "true" ]; then \
|
RUN if [ "$SEMANTIC_CACHE" = "true" ]; then \
|
||||||
pip install --no-cache-dir torch --index-url https://download.pytorch.org/whl/cpu && \
|
pip install --root-user-action=ignore --no-cache-dir torch --index-url https://download.pytorch.org/whl/cpu && \
|
||||||
pip install --no-cache-dir sentence-transformers && \
|
pip install --root-user-action=ignore --no-cache-dir sentence-transformers && \
|
||||||
python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('all-MiniLM-L6-v2')"; \
|
python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('all-MiniLM-L6-v2')"; \
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -127,6 +127,34 @@ The router can proxy requests to OpenAI-compatible endpoints alongside Ollama en
|
||||||
- Handles authentication with API keys
|
- Handles authentication with API keys
|
||||||
- Maintains consistent behavior across endpoint types
|
- Maintains consistent behavior across endpoint types
|
||||||
|
|
||||||
|
### Reactive Context-Shift
|
||||||
|
|
||||||
|
When a backend returns a `exceed_context_size_error` (context window exceeded), the router automatically trims the conversation history and retries rather than surfacing the error to the client.
|
||||||
|
|
||||||
|
**How it works:**
|
||||||
|
|
||||||
|
1. The error body contains `n_ctx` (the model's context limit) and `n_prompt_tokens` (the actual token count as measured by the backend).
|
||||||
|
2. `_calibrated_trim_target()` computes a tiktoken-scale trim target using the *delta* between actual tokens and the context limit, correcting for the fact that tiktoken counts fewer tokens than the backend tokeniser does.
|
||||||
|
3. `_trim_messages_for_context()` implements a sliding-window drop: system messages are always preserved; the oldest non-system messages are evicted first (FIFO) until the estimated token count fits the target. The most recent message is never dropped. After trimming, leading assistant/tool messages are removed to satisfy chat-template requirements (first non-system message must be a user message).
|
||||||
|
4. Two retry attempts are made:
|
||||||
|
- **Retry 1** — trimmed messages, original tool definitions.
|
||||||
|
- **Retry 2** — trimmed messages with tool definitions also stripped (handles cases where tool schemas alone consume too many tokens).
|
||||||
|
|
||||||
|
**Proactive pre-trimming:**
|
||||||
|
|
||||||
|
Once a context overflow has been observed for an endpoint/model pair whose `n_ctx` ≤ 32 768, the router records that limit in `_endpoint_nctx`. Subsequent requests to the same pair are pre-trimmed before being sent, avoiding the round-trip to the backend entirely for small-context models.
|
||||||
|
|
||||||
|
### Reactive SSE Push
|
||||||
|
|
||||||
|
The `/api/usage-stream` endpoint delivers real-time usage updates using a pub/sub push model rather than client polling.
|
||||||
|
|
||||||
|
**Mechanism:**
|
||||||
|
|
||||||
|
- `subscribe()` creates a bounded `asyncio.Queue` (capacity 10) and registers it in `_subscribers`.
|
||||||
|
- Whenever `usage_counts` or `token_usage_counts` change — on every `increment_usage`, `decrement_usage`, or token-worker flush — `_capture_snapshot()` serialises the current state to JSON while the caller still holds the relevant lock, then `_distribute_snapshot()` pushes the snapshot to every registered queue outside the lock.
|
||||||
|
- If a subscriber's queue is full (slow client), the oldest undelivered snapshot is evicted before the new one is enqueued, so fast producers never block on slow consumers.
|
||||||
|
- `unsubscribe()` removes the queue when the SSE connection closes; `close_all_sse_queues()` sends a `None` sentinel to all subscribers during router shutdown.
|
||||||
|
|
||||||
## Performance Considerations
|
## Performance Considerations
|
||||||
|
|
||||||
### Concurrency Model
|
### Concurrency Model
|
||||||
|
|
@ -145,7 +173,7 @@ The router can proxy requests to OpenAI-compatible endpoints alongside Ollama en
|
||||||
### Memory Management
|
### Memory Management
|
||||||
|
|
||||||
- **Write-behind pattern**: Token counts buffered in memory, flushed periodically
|
- **Write-behind pattern**: Token counts buffered in memory, flushed periodically
|
||||||
- **Queue-based SSE**: Server-Sent Events use bounded queues to prevent memory bloat
|
- **Queue-based SSE**: Bounded per-subscriber queues (capacity 10) with oldest-eviction — see [Reactive SSE Push](#reactive-sse-push)
|
||||||
- **Automatic cleanup**: Zero connection counts are removed from tracking
|
- **Automatic cleanup**: Zero connection counts are removed from tracking
|
||||||
|
|
||||||
## Error Handling
|
## Error Handling
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ ollama==0.6.1
|
||||||
openai==1.102.0
|
openai==1.102.0
|
||||||
orjson>=3.11.5
|
orjson>=3.11.5
|
||||||
numpy>=1.26
|
numpy>=1.26
|
||||||
pillow==12.1.1
|
pillow==12.2.0
|
||||||
propcache==0.3.2
|
propcache==0.3.2
|
||||||
pydantic==2.11.7
|
pydantic==2.11.7
|
||||||
pydantic-settings==2.10.1
|
pydantic-settings==2.10.1
|
||||||
|
|
|
||||||
26
router.py
26
router.py
|
|
@ -3754,22 +3754,38 @@ async def health_proxy(request: Request):
|
||||||
- `endpoints`: a mapping of endpoint URL → `{status, version|detail}`.
|
- `endpoints`: a mapping of endpoint URL → `{status, version|detail}`.
|
||||||
* The HTTP status code is 200 when everything is healthy, 503 otherwise.
|
* The HTTP status code is 200 when everything is healthy, 503 otherwise.
|
||||||
"""
|
"""
|
||||||
# Run all health checks in parallel
|
# Run all health checks in parallel.
|
||||||
tasks = [fetch.endpoint_details(ep, "/api/version", "version", skip_error_cache=True) for ep in config.endpoints] # if not is_ext_openai_endpoint(ep)]
|
# Ollama endpoints expose /api/version; OpenAI-compatible endpoints (vLLM,
|
||||||
|
# llama-server, external) expose /models. Using /api/version against an
|
||||||
|
# OpenAI-compatible endpoint yields a 404 and noisy log output.
|
||||||
|
all_endpoints = list(config.endpoints)
|
||||||
|
llama_eps_extra = [ep for ep in config.llama_server_endpoints if ep not in config.endpoints]
|
||||||
|
all_endpoints += llama_eps_extra
|
||||||
|
|
||||||
|
tasks = []
|
||||||
|
for ep in all_endpoints:
|
||||||
|
if is_openai_compatible(ep):
|
||||||
|
tasks.append(fetch.endpoint_details(ep, "/models", "data", config.api_keys.get(ep), skip_error_cache=True))
|
||||||
|
else:
|
||||||
|
tasks.append(fetch.endpoint_details(ep, "/api/version", "version", skip_error_cache=True))
|
||||||
|
|
||||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
health_summary = {}
|
health_summary = {}
|
||||||
overall_ok = True
|
overall_ok = True
|
||||||
|
|
||||||
for ep, result in zip(config.endpoints, results):
|
for ep, result in zip(all_endpoints, results):
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
# Endpoint did not respond / returned an error
|
# Endpoint did not respond / returned an error
|
||||||
health_summary[ep] = {"status": "error", "detail": str(result)}
|
health_summary[ep] = {"status": "error", "detail": str(result)}
|
||||||
overall_ok = False
|
overall_ok = False
|
||||||
else:
|
else:
|
||||||
# Successful response – report the reported version
|
# Successful response – report the reported version (Ollama) or
|
||||||
health_summary[ep] = {"status": "ok", "version": result}
|
# indicate the endpoint is reachable (OpenAI-compatible).
|
||||||
|
if is_openai_compatible(ep):
|
||||||
|
health_summary[ep] = {"status": "ok"}
|
||||||
|
else:
|
||||||
|
health_summary[ep] = {"status": "ok", "version": result}
|
||||||
|
|
||||||
response_payload = {
|
response_payload = {
|
||||||
"status": "ok" if overall_ok else "error",
|
"status": "ok" if overall_ok else "error",
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue