Add files via upload
dashboard polling removed in favour for SSE pub:sub
This commit is contained in:
parent
ef936bb2a0
commit
a23ccafc5a
3 changed files with 115 additions and 15 deletions
72
router.py
72
router.py
|
|
@ -11,6 +11,7 @@ from httpx_aiohttp import AiohttpTransport
|
|||
from pathlib import Path
|
||||
from typing import Dict, Set, List, Optional
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
from fastapi_sse import sse_handler
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from starlette.responses import StreamingResponse, JSONResponse, Response, HTMLResponse, RedirectResponse
|
||||
from pydantic import Field
|
||||
|
|
@ -26,6 +27,12 @@ _models_cache: dict[str, tuple[Set[str], float]] = {}
|
|||
# timeout expires, after which the endpoint will be queried again.
|
||||
_error_cache: dict[str, float] = {}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# SSE Queues
|
||||
# ------------------------------------------------------------------
|
||||
_subscribers: Set[asyncio.Queue] = set()
|
||||
_subscribers_lock = asyncio.Lock()
|
||||
|
||||
# -------------------------------------------------------------
|
||||
# 1. Configuration loader
|
||||
# -------------------------------------------------------------
|
||||
|
|
@ -77,6 +84,7 @@ config = Config()
|
|||
# 2. FastAPI application
|
||||
# -------------------------------------------------------------
|
||||
app = FastAPI()
|
||||
sse_handler.app = app
|
||||
|
||||
# -------------------------------------------------------------
|
||||
# 3. Global state: per‑endpoint per‑model active connection counters
|
||||
|
|
@ -234,6 +242,7 @@ def dedupe_on_keys(dicts, key_fields):
|
|||
async def increment_usage(endpoint: str, model: str) -> None:
|
||||
async with usage_lock:
|
||||
usage_counts[endpoint][model] += 1
|
||||
await publish_snapshot()
|
||||
|
||||
async def decrement_usage(endpoint: str, model: str) -> None:
|
||||
async with usage_lock:
|
||||
|
|
@ -246,6 +255,41 @@ async def decrement_usage(endpoint: str, model: str) -> None:
|
|||
usage_counts[endpoint].pop(model, None)
|
||||
#if not usage_counts[endpoint]:
|
||||
# usage_counts.pop(endpoint, None)
|
||||
await publish_snapshot()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# SSE Helpser
|
||||
# ------------------------------------------------------------------
|
||||
async def publish_snapshot():
|
||||
snapshot = json.dumps({"usage_counts": usage_counts})
|
||||
async with _subscribers_lock:
|
||||
for q in _subscribers:
|
||||
# If the queue is full, drop the message to avoid back‑pressure.
|
||||
if q.full():
|
||||
continue
|
||||
await q.put(snapshot)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Subscriber helpers
|
||||
# ------------------------------------------------------------------
|
||||
async def subscribe() -> asyncio.Queue:
|
||||
"""
|
||||
Returns a new Queue that will receive every snapshot.
|
||||
"""
|
||||
q: asyncio.Queue = asyncio.Queue(maxsize=10)
|
||||
async with _subscribers_lock:
|
||||
_subscribers.add(q)
|
||||
return q
|
||||
|
||||
async def unsubscribe(q: asyncio.Queue):
|
||||
async with _subscribers_lock:
|
||||
_subscribers.discard(q)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Convenience wrapper – returns the current snapshot (for the proxy)
|
||||
# ------------------------------------------------------------------
|
||||
async def get_usage_counts() -> Dict:
|
||||
return dict(usage_counts) # shallow copy
|
||||
|
||||
# -------------------------------------------------------------
|
||||
# 5. Endpoint selection logic (respecting the configurable limit)
|
||||
|
|
@ -1272,7 +1316,33 @@ async def health_proxy(request: Request):
|
|||
return JSONResponse(content=response_payload, status_code=http_status)
|
||||
|
||||
# -------------------------------------------------------------
|
||||
# 27. FastAPI startup event – load configuration
|
||||
# 27. SSE route for usage broadcasts
|
||||
# -------------------------------------------------------------
|
||||
@app.get("/api/usage-stream")
|
||||
async def usage_stream(request: Request):
|
||||
"""
|
||||
Server‑Sent‑Events that emits a JSON payload every time the
|
||||
global `usage_counts` dictionary changes.
|
||||
"""
|
||||
async def event_generator():
|
||||
# The queue that receives *every* new snapshot
|
||||
queue = await subscribe()
|
||||
try:
|
||||
while True:
|
||||
# If the client disconnects, cancel the loop
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
data = await queue.get()
|
||||
# Send the data as a single SSE message
|
||||
yield f"data: {data}\n\n"
|
||||
finally:
|
||||
# Clean‑up: unsubscribe from the broadcast channel
|
||||
await unsubscribe(queue)
|
||||
|
||||
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
||||
|
||||
# -------------------------------------------------------------
|
||||
# 28. FastAPI startup event – load configuration
|
||||
# -------------------------------------------------------------
|
||||
@app.on_event("startup")
|
||||
async def startup_event() -> None:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue