From baf5d98318a0918a777c0ebd94a50644ad7b4eb2 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Tue, 18 Nov 2025 11:16:21 +0100 Subject: [PATCH 01/12] adding token timeseries counting in db for future data viz --- .gitignore | 5 +- Dockerfile | 6 +++ db.py | 134 ++++++++++++++++++++++++++++++++++++++++++++++ entrypoint.sh | 7 +++ requirements.txt | 1 + router.py | 87 +++++++++++++++++++++++++++--- static/index.html | 6 ++- 7 files changed, 236 insertions(+), 10 deletions(-) create mode 100644 db.py diff --git a/.gitignore b/.gitignore index 702c855..74eef7d 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,7 @@ cython_debug/ *.sqlite3 # Config -config.yaml \ No newline at end of file +config.yaml + +# SQLite +*.db \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index e456af3..073496d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,11 +3,17 @@ FROM python:3.13-slim ENV PYTHONUNBUFFERED=1 \ PYTHONDONTWRITEBYTECODE=1 +# Install SQLite +RUN apt-get update && apt-get install -y sqlite3 + WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir --upgrade pip \ && pip install --no-cache-dir -r requirements.txt +# Create database directory and set permissions +RUN mkdir -p /app/data && chown -R www-data:www-data /app/data + COPY . . RUN chmod +x /app/entrypoint.sh diff --git a/db.py b/db.py new file mode 100644 index 0000000..0816c17 --- /dev/null +++ b/db.py @@ -0,0 +1,134 @@ +import aiosqlite +import os +import asyncio +from pathlib import Path +from datetime import datetime, timezone +from collections import defaultdict + +class TokenDatabase: + def __init__(self, db_path: str = "token_counts.db"): + self.db_path = db_path + self._ensure_db_directory() + + def _ensure_db_directory(self): + """Ensure the directory for the database exists.""" + db_dir = Path(self.db_path).parent + if not db_dir.exists(): + db_dir.mkdir(parents=True, exist_ok=True) + + async def init_db(self): + """Initialize the database tables.""" + async with aiosqlite.connect(self.db_path) as db: + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_counts ( + endpoint TEXT, + model TEXT, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + PRIMARY KEY(endpoint, model) + ) + ''') + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_time_series ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + endpoint TEXT, + model TEXT, + input_tokens INTEGER, + output_tokens INTEGER, + total_tokens INTEGER, + timestamp INTEGER, -- Unix timestamp with approximate minute/hour precision + FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model) + ) + ''') + await db.commit() + + async def update_token_counts(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): + """Update token counts for a specific endpoint and model.""" + total_tokens = input_tokens + output_tokens + async with aiosqlite.connect(self.db_path) as db: + await db.execute(''' + INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(endpoint, model) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens)) + await db.commit() + + async def add_time_series_entry(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): + """Add a time series entry with approximate timestamp.""" + total_tokens = input_tokens + output_tokens + # Use current minute/hour as approximate timestamp + now = datetime.now(tz=timezone.utc) + timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute).timestamp()) + + async with aiosqlite.connect(self.db_path) as db: + await db.execute(''' + INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)) + await db.commit() + + async def update_batched_counts(self, counts: dict): + """Update multiple token counts in a single transaction.""" + if not counts: + return + async with aiosqlite.connect(self.db_path) as db: + for endpoint, models in counts.items(): + for model, (input_tokens, output_tokens) in models.items(): + total_tokens = input_tokens + output_tokens + await db.execute(''' + INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(endpoint, model) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, + input_tokens, output_tokens, total_tokens)) + await db.commit() + + async def add_batched_time_series(self, entries: list): + """Add multiple time series entries in a single transaction.""" + async with aiosqlite.connect(self.db_path) as db: + for entry in entries: + await db.execute(''' + INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (entry['endpoint'], entry['model'], entry['input_tokens'], + entry['output_tokens'], entry['total_tokens'], entry['timestamp'])) + await db.commit() + + async def load_token_counts(self): + """Load all token counts from database.""" + async with aiosqlite.connect(self.db_path) as db: + async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor: + async for row in cursor: + yield { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4] + } + + async def get_latest_time_series(self, limit: int = 100): + """Get the latest time series entries.""" + async with aiosqlite.connect(self.db_path) as db: + async with db.execute(''' + SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp + FROM token_time_series + ORDER BY timestamp DESC + LIMIT ? + ''', (limit,)) as cursor: + async for row in cursor: + yield { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4], + 'timestamp': row[5] + } diff --git a/entrypoint.sh b/entrypoint.sh index cee2f17..6682851 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,6 +1,13 @@ #!/usr/bin/env sh set -e +# Create database directory if it doesn't exist +mkdir -p /app/data +chown -R www-data:www-data /app/data + +# Set database path environment variable +export NOMYO_ROUTER_DB_PATH="/app/data/token_counts.db" + CONFIG_PATH_ARG="" SHOW_HELP=0 diff --git a/requirements.txt b/requirements.txt index e296839..a0c7ab6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,3 +36,4 @@ typing-inspection==0.4.1 typing_extensions==4.14.1 uvicorn==0.38.0 yarl==1.20.1 +aiosqlite diff --git a/router.py b/router.py index dff8aa9..1b0af1c 100644 --- a/router.py +++ b/router.py @@ -6,7 +6,8 @@ version: 0.4 license: AGPL """ # ------------------------------------------------------------- -import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, datetime, random, base64, io +import orjson, time, asyncio, yaml, ollama, openai, os, re, aiohttp, ssl, random, base64, io +from datetime import datetime, timezone from pathlib import Path from typing import Dict, Set, List, Optional from urllib.parse import urlparse @@ -45,6 +46,18 @@ app_state = { "connector": None, } token_worker_task: asyncio.Task | None = None +flush_task: asyncio.Task | None = None + +# ------------------------------------------------------------------ +# Token Count Buffer (for write-behind pattern) +# ------------------------------------------------------------------ +# Structure: {endpoint: {model: (input_tokens, output_tokens)}} +token_buffer: dict[str, dict[str, tuple[int, int]]] = defaultdict(lambda: defaultdict(tuple)) +# Time series buffer with timestamp +time_series_buffer: list[dict[str, int | str]] = [] + +# Configuration for periodic flushing +FLUSH_INTERVAL = 10 # seconds # ------------------------------------------------------------- # 1. Configuration loader @@ -61,6 +74,9 @@ class Config(BaseSettings): api_keys: Dict[str, str] = Field(default_factory=dict) + # Database configuration + db_path: str = Field(default=os.getenv("NOMYO_ROUTER_DB_PATH", "token_counts.db")) + class Config: # Load from `config.yaml` first, then from env variables env_prefix = "NOMYO_ROUTER_" @@ -101,6 +117,8 @@ def _config_path_from_env() -> Path: return Path(candidate).expanduser() return Path("config.yaml") +from db import TokenDatabase + # Create the global config object – it will be overwritten on startup config = Config.from_yaml(_config_path_from_env()) @@ -130,6 +148,9 @@ token_usage_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict( usage_lock = asyncio.Lock() # protects access to usage_counts token_usage_lock = asyncio.Lock() +# Database instance +db: "TokenDatabase" = None + # ------------------------------------------------------------- # 4. Helperfunctions # ------------------------------------------------------------- @@ -181,7 +202,7 @@ def _format_connection_issue(url: str, error: Exception) -> str: ) return f"Error while contacting {url}: {error}" - + def is_ext_openai_endpoint(endpoint: str) -> bool: if "/v1" not in endpoint: return False @@ -199,9 +220,43 @@ def is_ext_openai_endpoint(endpoint: str) -> bool: async def token_worker() -> None: while True: endpoint, model, prompt, comp = await token_queue.get() + # Accumulate counts in memory buffer + token_buffer[endpoint][model] = ( + token_buffer[endpoint].get(model, (0, 0))[0] + prompt, + token_buffer[endpoint].get(model, (0, 0))[1] + comp + ) + + # Add to time series buffer with timestamp + now = datetime.now(tz=timezone.utc) + timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute).timestamp()) + time_series_buffer.append({ + 'endpoint': endpoint, + 'model': model, + 'input_tokens': prompt, + 'output_tokens': comp, + 'total_tokens': prompt + comp, + 'timestamp': timestamp + }) + + # Update in-memory counts for immediate reporting async with token_usage_lock: token_usage_counts[endpoint][model] += (prompt + comp) - await publish_snapshot() + await publish_snapshot() + +async def flush_buffer() -> None: + """Periodically flush accumulated token counts to the database.""" + while True: + await asyncio.sleep(FLUSH_INTERVAL) + + # Flush token counts + if token_buffer: + await db.update_batched_counts(token_buffer) + token_buffer.clear() + + # Flush time series entries + if time_series_buffer: + await db.add_batched_time_series(time_series_buffer) + time_series_buffer.clear() class fetch: async def available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: @@ -366,7 +421,7 @@ async def decrement_usage(endpoint: str, model: str) -> None: def iso8601_ns(): ns = time.time_ns() sec, ns_rem = divmod(ns, 1_000_000_000) - dt = datetime.datetime.fromtimestamp(sec, tz=datetime.timezone.utc) + dt = datetime.fromtimestamp(sec, tz=timezone.utc) return ( f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}T" f"{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}." @@ -628,7 +683,6 @@ async def choose_endpoint(model: str) -> str: f"None of the configured endpoints ({', '.join(config.endpoints)}) " f"advertise the model '{model}'." ) - # 3️⃣ Among the candidates, find those that have the model *loaded* # (concurrently, but only for the filtered list) load_tasks = [fetch.loaded_models(ep) for ep in candidate_endpoints] @@ -1772,7 +1826,7 @@ async def usage_stream(request: Request): # ------------------------------------------------------------- @app.on_event("startup") async def startup_event() -> None: - global config + global config, db # Load YAML config (or use defaults if not present) config_path = _config_path_from_env() config = Config.from_yaml(config_path) @@ -1787,7 +1841,21 @@ async def startup_event() -> None: f"No configuration file found at {config_path}. " "Falling back to default settings." ) - + + # Initialize database + db = TokenDatabase(config.db_path) + await db.init_db() + + # Load existing token counts from database + async for count_entry in db.load_token_counts(): + endpoint = count_entry['endpoint'] + model = count_entry['model'] + input_tokens = count_entry['input_tokens'] + output_tokens = count_entry['output_tokens'] + total_tokens = count_entry['total_tokens'] + + token_usage_counts[endpoint][model] = total_tokens + ssl_context = ssl.create_default_context() connector = aiohttp.TCPConnector(limit=0, limit_per_host=512, ssl=ssl_context) timeout = aiohttp.ClientTimeout(total=60, connect=15, sock_read=120, sock_connect=15) @@ -1796,10 +1864,13 @@ async def startup_event() -> None: app_state["connector"] = connector app_state["session"] = session token_worker_task = asyncio.create_task(token_worker()) + flush_task = asyncio.create_task(flush_buffer()) @app.on_event("shutdown") async def shutdown_event() -> None: await close_all_sse_queues() await app_state["session"].close() if token_worker_task is not None: - token_worker_task.cancel() \ No newline at end of file + token_worker_task.cancel() + if flush_task is not None: + flush_task.cancel() diff --git a/static/index.html b/static/index.html index 6f11ccb..b4f5466 100644 --- a/static/index.html +++ b/static/index.html @@ -442,12 +442,16 @@ const tokenValue = existingRow ? existingRow.querySelector(".token-usage")?.textContent ?? 0 : 0; + const digest = m.digest || ""; + const shortDigest = digest.length > 24 + ? `${digest.slice(0, 12)}...${digest.slice(-12)}` + : digest; return ` ${m.name} ${m.details.parameter_size} ${m.details.quantization_level} ${m.context_length} - ${m.digest} + ${shortDigest} ${tokenValue} `; }) From 541f2826e0a2ba079b0c096e2d7b2ffd1a322b6b Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Tue, 18 Nov 2025 19:02:36 +0100 Subject: [PATCH 02/12] fixing token_queue, prepping chart view --- db.py | 14 ++++++++++ router.py | 69 ++++++++++++++++++++++++++++++++++++++++++----- static/index.html | 62 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 137 insertions(+), 8 deletions(-) diff --git a/db.py b/db.py index 0816c17..9a96ca2 100644 --- a/db.py +++ b/db.py @@ -132,3 +132,17 @@ class TokenDatabase: 'total_tokens': row[4], 'timestamp': row[5] } + + async def get_token_counts_for_model(self, model): + """Get token counts for a specific model.""" + async with aiosqlite.connect(self.db_path) as db: + async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor: + async for row in cursor: + return { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4] + } + return None diff --git a/router.py b/router.py index 1b0af1c..94831f9 100644 --- a/router.py +++ b/router.py @@ -797,7 +797,8 @@ async def proxy(request: Request): chunk = rechunk.openai_completion2ollama(chunk, stream, start_ts) prompt_tok = chunk.prompt_eval_count or 0 comp_tok = chunk.eval_count or 0 - await token_queue.put((endpoint, model, prompt_tok, comp_tok)) + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, model, prompt_tok, comp_tok)) if hasattr(chunk, "model_dump_json"): json_line = chunk.model_dump_json() else: @@ -811,7 +812,8 @@ async def proxy(request: Request): response = async_gen.model_dump_json() prompt_tok = async_gen.prompt_eval_count or 0 comp_tok = async_gen.eval_count or 0 - await token_queue.put((endpoint, model, prompt_tok, comp_tok)) + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, model, prompt_tok, comp_tok)) json_line = ( response if hasattr(async_gen, "model_dump_json") @@ -913,7 +915,8 @@ async def chat_proxy(request: Request): # `chunk` can be a dict or a pydantic model – dump to JSON safely prompt_tok = chunk.prompt_eval_count or 0 comp_tok = chunk.eval_count or 0 - await token_queue.put((endpoint, model, prompt_tok, comp_tok)) + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, model, prompt_tok, comp_tok)) if hasattr(chunk, "model_dump_json"): json_line = chunk.model_dump_json() else: @@ -927,7 +930,8 @@ async def chat_proxy(request: Request): response = async_gen.model_dump_json() prompt_tok = async_gen.prompt_eval_count or 0 comp_tok = async_gen.eval_count or 0 - await token_queue.put((endpoint, model, prompt_tok, comp_tok)) + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, model, prompt_tok, comp_tok)) json_line = ( response if hasattr(async_gen, "model_dump_json") @@ -1140,7 +1144,7 @@ async def show_proxy(request: Request, model: Optional[str] = None): if not model: payload = orjson.loads(body_bytes.decode("utf-8")) model = payload.get("model") - + if not model: raise HTTPException( status_code=400, detail="Missing required field 'model'" @@ -1159,6 +1163,55 @@ async def show_proxy(request: Request, model: Optional[str] = None): # 4. Return ShowResponse return show +# ------------------------------------------------------------- +# 12. API route – Stats +# ------------------------------------------------------------- +@app.post("/api/stats") +async def stats_proxy(request: Request, model: Optional[str] = None): + """ + Return token usage statistics for a specific model. + """ + try: + body_bytes = await request.body() + + if not model: + payload = orjson.loads(body_bytes.decode("utf-8")) + model = payload.get("model") + + if not model: + raise HTTPException( + status_code=400, detail="Missing required field 'model'" + ) + except orjson.JSONDecodeError as e: + raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") from e + + # Get token counts from database + token_data = await db.get_token_counts_for_model(model) + + if not token_data: + raise HTTPException( + status_code=404, detail="No token data found for this model" + ) + + # Get time series data + time_series = [] + async for entry in db.get_latest_time_series(limit=10): + if entry['model'] == model: + time_series.append({ + 'timestamp': entry['timestamp'], + 'input_tokens': entry['input_tokens'], + 'output_tokens': entry['output_tokens'], + 'total_tokens': entry['total_tokens'] + }) + + return { + 'model': model, + 'input_tokens': token_data['input_tokens'], + 'output_tokens': token_data['output_tokens'], + 'total_tokens': token_data['total_tokens'], + 'time_series': time_series + } + # ------------------------------------------------------------- # 12. API route – Copy # ------------------------------------------------------------- @@ -1584,7 +1637,8 @@ async def openai_chat_completions_proxy(request: Request): else: prompt_tok = async_gen.usage.prompt_tokens or 0 comp_tok = async_gen.usage.completion_tokens or 0 - await token_queue.put((endpoint, model, prompt_tok, comp_tok)) + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, model, prompt_tok, comp_tok)) json_line = ( async_gen.model_dump_json() if hasattr(async_gen, "model_dump_json") @@ -1690,7 +1744,8 @@ async def openai_completions_proxy(request: Request): else: prompt_tok = async_gen.usage.prompt_tokens or 0 comp_tok = async_gen.usage.completion_tokens or 0 - await token_queue.put((endpoint, model, prompt_tok, comp_tok)) + if prompt_tok != 0 or comp_tok != 0: + await token_queue.put((endpoint, model, prompt_tok, comp_tok)) json_line = ( async_gen.model_dump_json() if hasattr(async_gen, "model_dump_json") diff --git a/static/index.html b/static/index.html index b4f5466..0ecbf7d 100644 --- a/static/index.html +++ b/static/index.html @@ -447,7 +447,7 @@ ? `${digest.slice(0, 12)}...${digest.slice(-12)}` : digest; return ` - ${m.name} + ${m.name} stats ${m.details.parameter_size} ${m.details.quantization_level} ${m.context_length} @@ -636,6 +636,56 @@ modal.style.display = "none"; } }); + + /* stats logic */ + document.body.addEventListener("click", async (e) => { + if (!e.target.matches(".stats-link")) return; + e.preventDefault(); + const model = e.target.dataset.model; + try { + const resp = await fetch( + `/api/stats?model=${encodeURIComponent(model)}`, + { method: "POST" }, + ); + if (!resp.ok) + throw new Error(`Status ${resp.status}`); + const data = await resp.json(); + const content = document.getElementById("stats-content"); + content.innerHTML = ` +

Token Usage

+

Input tokens: ${data.input_tokens}

+

Output tokens: ${data.output_tokens}

+

Total tokens: ${data.total_tokens}

+

Usage Over Time

+
+ ${data.time_series.length > 0 ? + data.time_series.map(ts => ` +
+ ${new Date(ts.timestamp * 1000).toLocaleString()} +

Input: ${ts.input_tokens}, Output: ${ts.output_tokens}, Total: ${ts.total_tokens}

+
+ `).join('') : + '

No time series data available

' + } +
+ `; + document.getElementById("stats-modal").style.display = "flex"; + } catch (err) { + console.error(err); + alert(`Could not load model stats: ${err.message}`); + } + }); + + /* stats modal close */ + const statsModal = document.getElementById("stats-modal"); + statsModal.addEventListener("click", (e) => { + if ( + e.target === statsModal || + e.target.matches(".close-btn") + ) { + statsModal.style.display = "none"; + } + }); }); @@ -646,5 +696,15 @@

             
         
+
+        
     
 

From 79a7ca972b6c180ea6de487b5708dde093206907 Mon Sep 17 00:00:00 2001
From: alpha-nerd-nomyo 
Date: Wed, 19 Nov 2025 17:05:25 +0100
Subject: [PATCH 03/12] initial chart view

---
 entrypoint.sh     |   0
 router.py         |   5 +-
 static/index.html | 236 ++++++++++++++++++++++++++++++++++++++++------
 3 files changed, 208 insertions(+), 33 deletions(-)
 mode change 100644 => 100755 entrypoint.sh

diff --git a/entrypoint.sh b/entrypoint.sh
old mode 100644
new mode 100755
diff --git a/router.py b/router.py
index 94831f9..9a1f0c8 100644
--- a/router.py
+++ b/router.py
@@ -1193,9 +1193,10 @@ async def stats_proxy(request: Request, model: Optional[str] = None):
             status_code=404, detail="No token data found for this model"
         )
 
-    # Get time series data
+    # Get time series data for the last 30 days (43200 minutes = 30 days)
+    # Assuming entries are grouped by minute, 30 days = 43200 entries max
     time_series = []
-    async for entry in db.get_latest_time_series(limit=10):
+    async for entry in db.get_latest_time_series(limit=50000):
         if entry['model'] == model:
             time_series.append({
                 'timestamp': entry['timestamp'],
diff --git a/static/index.html b/static/index.html
index 0ecbf7d..42a7cd7 100644
--- a/static/index.html
+++ b/static/index.html
@@ -3,6 +3,7 @@
     
         
         NOMYO Router Dashboard
+        
         
     
     
@@ -267,7 +289,7 @@
                             Quant
                             Ctx
                             Digest
-                            Token 
+                            Token
                         
                     
                     
@@ -300,7 +322,97 @@
         
 
         
 

From 3f77a8ec621857c6c6b8a1c28afd016e9a5c0b42 Mon Sep 17 00:00:00 2001
From: alpha-nerd-nomyo 
Date: Wed, 19 Nov 2025 17:28:31 +0100
Subject: [PATCH 04/12] chart enhancements

---
 router.py         |   8 +++-
 static/index.html | 104 +++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 95 insertions(+), 17 deletions(-)

diff --git a/router.py b/router.py
index 9a1f0c8..8c3be12 100644
--- a/router.py
+++ b/router.py
@@ -1196,21 +1196,27 @@ async def stats_proxy(request: Request, model: Optional[str] = None):
     # Get time series data for the last 30 days (43200 minutes = 30 days)
     # Assuming entries are grouped by minute, 30 days = 43200 entries max
     time_series = []
+    endpoint_totals = defaultdict(int)  # Track tokens per endpoint
+    
     async for entry in db.get_latest_time_series(limit=50000):
         if entry['model'] == model:
             time_series.append({
+                'endpoint': entry['endpoint'],
                 'timestamp': entry['timestamp'],
                 'input_tokens': entry['input_tokens'],
                 'output_tokens': entry['output_tokens'],
                 'total_tokens': entry['total_tokens']
             })
+            # Accumulate total tokens per endpoint
+            endpoint_totals[entry['endpoint']] += entry['total_tokens']
 
     return {
         'model': model,
         'input_tokens': token_data['input_tokens'],
         'output_tokens': token_data['output_tokens'],
         'total_tokens': token_data['total_tokens'],
-        'time_series': time_series
+        'time_series': time_series,
+        'endpoint_distribution': dict(endpoint_totals)
     }
 
 # -------------------------------------------------------------
diff --git a/static/index.html b/static/index.html
index 42a7cd7..17e2ce0 100644
--- a/static/index.html
+++ b/static/index.html
@@ -232,6 +232,12 @@
                 height: 300px;
                 margin-top: 1rem;
             }
+            .pie-chart-container {
+                position: relative;
+                height: 250px;
+                margin-top: 1rem;
+                max-width: 400px;
+            }
         
     
     
@@ -376,23 +382,32 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
         return;
     }
 
-    // Filter data based on selected timeframe
-    const now = new Date();
-    const cutoffTime = now.getTime() - (minutes * 60 * 1000);
+    // Filter data based on selected timeframe (use UTC for consistency)
+    const now = Date.now();
+    const cutoffTime = now - (minutes * 60 * 1000);
 
     // Group data by hour for better visualization
     const groupedData = {};
     timeSeriesData.forEach(item => {
-        const timestamp = item.timestamp * 1000;
-        if (timestamp >= cutoffTime) {
-            const date = new Date(timestamp);
-            // Group by hour (local time)
-            const hourStr = date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' });
+        // Database stores UTC timestamps, multiply by 1000 to get milliseconds
+        const timestampMs = item.timestamp * 1000;
+        
+        if (timestampMs >= cutoffTime) {
+            // Convert UTC timestamp to local time for display
+            const date = new Date(timestampMs);
+            // Group by hour and minute in local time
+            const hourStr = date.toLocaleString([], { 
+                month: 'short', 
+                day: 'numeric', 
+                hour: '2-digit', 
+                minute: '2-digit' 
+            });
 
             if (!groupedData[hourStr]) {
                 groupedData[hourStr] = {
                     input: 0,
-                    output: 0
+                    output: 0,
+                    timestamp: timestampMs  // Keep for sorting
                 };
             }
             groupedData[hourStr].input += item.input_tokens || 0;
@@ -400,10 +415,11 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
         }
     });
 
-    // Convert to arrays for chart
-    const labels = Object.keys(groupedData).sort();
-    const inputData = labels.map(hour => groupedData[hour].input);
-    const outputData = labels.map(hour => groupedData[hour].output);
+    // Convert to arrays for chart, sorted by timestamp
+    const sortedEntries = Object.entries(groupedData).sort((a, b) => a[1].timestamp - b[1].timestamp);
+    const labels = sortedEntries.map(([label]) => label);
+    const inputData = sortedEntries.map(([, data]) => data.input);
+    const outputData = sortedEntries.map(([, data]) => data.output);
 
     console.log('Chart updated with', labels.length, 'data points');
 
@@ -770,6 +786,10 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) {
                             

Input tokens: ${data.input_tokens}

Output tokens: ${data.output_tokens}

Total tokens: ${data.total_tokens}

+

Endpoint Distribution

+
+ +

Usage Over Time

@@ -783,8 +803,8 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { `; document.getElementById("stats-modal").style.display = "flex"; - // Initialise the chart (ensures fresh canvas and chart instance) - initStatsChart(data.time_series); + // Initialise the charts (time-series + pie chart) + initStatsChart(data.time_series, data.endpoint_distribution); } catch (err) { console.error(err); alert(`Could not load model stats: ${err.message}`); @@ -792,8 +812,9 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { }); /* ---------- Helper to initialise or refresh the stats chart ---------- */ -function initStatsChart(timeSeriesData) { +function initStatsChart(timeSeriesData, endpointDistribution) { console.log('initStatsChart called with payload:', timeSeriesData); + console.log('Endpoint distribution:', endpointDistribution); // Destroy any existing chart instance if (statsChart) { statsChart.destroy(); @@ -856,6 +877,57 @@ function initStatsChart(timeSeriesData) { renderTimeSeriesChart(rawTimeSeries, statsChart, minutes); }); }); + + // Create endpoint distribution pie chart + if (endpointDistribution && Object.keys(endpointDistribution).length > 0) { + const pieCanvas = document.getElementById('endpoint-pie-chart'); + const pieCtx = pieCanvas.getContext('2d'); + + const endpoints = Object.keys(endpointDistribution); + const tokenCounts = Object.values(endpointDistribution); + const colors = endpoints.map(ep => getColor(ep)); + + new Chart(pieCtx, { + type: 'pie', + data: { + labels: endpoints, + datasets: [{ + data: tokenCounts, + backgroundColor: colors, + borderWidth: 1, + borderColor: '#fff' + }] + }, + options: { + responsive: true, + maintainAspectRatio: false, + plugins: { + legend: { + position: 'right', + labels: { + boxWidth: 12, + font: { size: 11 } + } + }, + title: { + display: true, + text: 'Total Tokens per Endpoint' + }, + tooltip: { + callbacks: { + label: function(context) { + const label = context.label || ''; + const value = context.parsed || 0; + const total = context.dataset.data.reduce((a, b) => a + b, 0); + const percentage = ((value / total) * 100).toFixed(1); + return `${label}: ${value.toLocaleString()} tokens (${percentage}%)`; + } + } + } + } + } + }); + } } /* stats modal close */ // The close handler is already attached during initial page load. From e0c6861f2fa8e00f340f88bf963f3811f4267fb7 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 20 Nov 2025 09:22:45 +0100 Subject: [PATCH 05/12] aggregating token_counts for stats over all endpoints and adjusting the color mapping --- db.py | 20 +++++++++++----- static/index.html | 61 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 56 insertions(+), 25 deletions(-) diff --git a/db.py b/db.py index 9a96ca2..264bcfa 100644 --- a/db.py +++ b/db.py @@ -134,15 +134,23 @@ class TokenDatabase: } async def get_token_counts_for_model(self, model): - """Get token counts for a specific model.""" + """Get token counts for a specific model, aggregated across all endpoints.""" async with aiosqlite.connect(self.db_path) as db: async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor: + total_input = 0 + total_output = 0 + total_tokens = 0 async for row in cursor: + total_input += row[2] + total_output += row[3] + total_tokens += row[4] + + if total_input > 0 or total_output > 0: return { - 'endpoint': row[0], - 'model': row[1], - 'input_tokens': row[2], - 'output_tokens': row[3], - 'total_tokens': row[4] + 'endpoint': 'aggregated', + 'model': model, + 'input_tokens': total_input, + 'output_tokens': total_output, + 'total_tokens': total_tokens } return None diff --git a/static/index.html b/static/index.html index 17e2ce0..50747c9 100644 --- a/static/index.html +++ b/static/index.html @@ -238,6 +238,23 @@ margin-top: 1rem; max-width: 400px; } + /* ---------- Stats Modal Layout ---------- */ + .stats-content-wrapper { + display: flex; + flex-direction: row; + gap: 20px; + } + .main-stats-content { + flex: 1; + } + .endpoint-distribution-container { + flex: 0 0 auto; + width: 400px; + position: relative; + } + .endpoint-distribution-container h3 { + margin-top: 0; + } @@ -602,9 +619,9 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { return `hsl(${h}, 80%, 30%)`; } function hashString(str) { - let hash = 0; + let hash = 42; for (let i = 0; i < str.length; i++) { - hash = (hash << 5) - hash + str.charCodeAt(i); + hash = ((hash << 5) + hash) + str.charCodeAt(i); hash |= 0; } return Math.abs(hash); @@ -782,23 +799,29 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { const data = await resp.json(); const content = document.getElementById("stats-content"); content.innerHTML = ` -

Token Usage

-

Input tokens: ${data.input_tokens}

-

Output tokens: ${data.output_tokens}

-

Total tokens: ${data.total_tokens}

-

Endpoint Distribution

-
- -
-

Usage Over Time

-
- - - - -
-
- +
+
+

Token Usage

+

Input tokens: ${data.input_tokens}

+

Output tokens: ${data.output_tokens}

+

Total tokens: ${data.total_tokens}

+

Usage Over Time

+
+ + + + +
+
+ +
+
+
+

Endpoint Distribution

+
+ +
+
`; document.getElementById("stats-modal").style.display = "flex"; From 0d187e91b9e9fe7325973a89a2e691c7c672773d Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 20 Nov 2025 09:53:28 +0100 Subject: [PATCH 06/12] fixing chart timescales --- static/index.html | 111 ++++++++++++++++++++++------------------------ 1 file changed, 53 insertions(+), 58 deletions(-) diff --git a/static/index.html b/static/index.html index 50747c9..061dbb0 100644 --- a/static/index.html +++ b/static/index.html @@ -229,7 +229,7 @@ } .chart-container { position: relative; - height: 300px; + height: 600px; margin-top: 1rem; } .pie-chart-container { @@ -387,64 +387,59 @@ document.addEventListener('DOMContentLoaded', () => { /* ---------- Global renderTimeSeriesChart ---------- */ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { - console.log('renderTimeSeriesChart called with minutes:', minutes, 'data length:', timeSeriesData?.length); - - // Safety check - if (!timeSeriesData || !Array.isArray(timeSeriesData)) { - console.warn('No valid time series data provided'); - chart.data.labels = []; - chart.data.datasets[0].data = []; - chart.data.datasets[1].data = []; - chart.update(); - return; - } - - // Filter data based on selected timeframe (use UTC for consistency) - const now = Date.now(); - const cutoffTime = now - (minutes * 60 * 1000); - - // Group data by hour for better visualization - const groupedData = {}; - timeSeriesData.forEach(item => { - // Database stores UTC timestamps, multiply by 1000 to get milliseconds - const timestampMs = item.timestamp * 1000; - - if (timestampMs >= cutoffTime) { - // Convert UTC timestamp to local time for display - const date = new Date(timestampMs); - // Group by hour and minute in local time - const hourStr = date.toLocaleString([], { - month: 'short', - day: 'numeric', - hour: '2-digit', - minute: '2-digit' - }); - - if (!groupedData[hourStr]) { - groupedData[hourStr] = { - input: 0, - output: 0, - timestamp: timestampMs // Keep for sorting - }; - } - groupedData[hourStr].input += item.input_tokens || 0; - groupedData[hourStr].output += item.output_tokens || 0; - } - }); - - // Convert to arrays for chart, sorted by timestamp - const sortedEntries = Object.entries(groupedData).sort((a, b) => a[1].timestamp - b[1].timestamp); - const labels = sortedEntries.map(([label]) => label); - const inputData = sortedEntries.map(([, data]) => data.input); - const outputData = sortedEntries.map(([, data]) => data.output); - - console.log('Chart updated with', labels.length, 'data points'); - - // Update chart data - chart.data.labels = labels; - chart.data.datasets[0].data = inputData; - chart.data.datasets[1].data = outputData; + // Guard clause + if (!Array.isArray(timeSeriesData) || !timeSeriesData.length) { + chart.data.labels = []; + chart.data.datasets[0].data = []; + chart.data.datasets[1].data = []; chart.update(); + return; + } + + /* ── 1️⃣ Cut‑off & bucket interval ──────────────────────────────── */ + const nowMs = Date.now(); // UTC millis + const cutoffMs = nowMs - minutes * 60 * 1000; // UTC window start + const intervalMs = 60 * 60 * 1000; // 1 h buckets + + /* ── 2️⃣ Build ordered bucket slots (UTC) ───────────────────────────── */ + const slots = []; + for (let ts = cutoffMs; ts <= nowMs; ts += intervalMs) { + slots.push(ts); + } + + /* ── 3️⃣ Aggregate raw rows into those slots ───────────────────────────── */ + const bucketMap = {}; // epoch ms → {input, output} + timeSeriesData.forEach(row => { + // If your DB already stores ms, drop the * 1000 + const tsMs = row.timestamp * 1000; // <-- keep *1000 **only** if the DB stores seconds + if (tsMs < cutoffMs || tsMs > nowMs) return; + + const slot = Math.floor((tsMs - cutoffMs) / intervalMs) * intervalMs + cutoffMs; + if (!bucketMap[slot]) bucketMap[slot] = { input: 0, output: 0 }; + bucketMap[slot].input += row.input_tokens || 0; + bucketMap[slot].output += row.output_tokens || 0; + }); + + /* ── 4️⃣ Build labels & data arrays (UTC labels) ──────────────────────── */ + const labels = slots.map(ts => { + const d = new Date(ts); // UTC millisecond timestamp + return d.toLocaleString(undefined, { // <-- force UTC for display + month: 'short', + day: 'numeric', + hour: '2-digit', + minute: '2-digit', + timeZoneName: 'short' + }).replace(/UTC$/, 'UTC'); // keep the “UTC” suffix if you like + }); + + const inputData = slots.map(ts => (bucketMap[ts]?.input ?? 0)); + const outputData = slots.map(ts => (bucketMap[ts]?.output ?? 0)); + + /* ── 5️⃣ Push into the Chart.js instance ─────────────────────────────── */ + chart.data.labels = labels; + chart.data.datasets[0].data = inputData; + chart.data.datasets[1].data = outputData; + chart.update(); } /* ---------- Utility ---------- */ async function fetchJSON(url) { From aa23a4dd817fa1d641cc22f6d2d16ca7d2974df0 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 20 Nov 2025 12:53:18 +0100 Subject: [PATCH 07/12] fixing timezone issues --- router.py | 4 +- static/index.html | 114 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 87 insertions(+), 31 deletions(-) diff --git a/router.py b/router.py index 8c3be12..fbc5b6e 100644 --- a/router.py +++ b/router.py @@ -226,9 +226,9 @@ async def token_worker() -> None: token_buffer[endpoint].get(model, (0, 0))[1] + comp ) - # Add to time series buffer with timestamp + # Add to time series buffer with timestamp (UTC) now = datetime.now(tz=timezone.utc) - timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute).timestamp()) + timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute, tzinfo=timezone.utc).timestamp()) time_series_buffer.append({ 'endpoint': endpoint, 'model': model, diff --git a/static/index.html b/static/index.html index 061dbb0..a93ea91 100644 --- a/static/index.html +++ b/static/index.html @@ -396,46 +396,102 @@ function renderTimeSeriesChart(timeSeriesData, chart, minutes) { return; } - /* ── 1️⃣ Cut‑off & bucket interval ──────────────────────────────── */ - const nowMs = Date.now(); // UTC millis - const cutoffMs = nowMs - minutes * 60 * 1000; // UTC window start - const intervalMs = 60 * 60 * 1000; // 1 h buckets - - /* ── 2️⃣ Build ordered bucket slots (UTC) ───────────────────────────── */ - const slots = []; - for (let ts = cutoffMs; ts <= nowMs; ts += intervalMs) { - slots.push(ts); + /* ── 1️⃣ Determine bucket interval based on timeframe ──────────────────── */ + let intervalMs; + let timeFormat; + + if (minutes <= 60) { + // 1 hour: 5-minute buckets + intervalMs = 5 * 60 * 1000; + timeFormat = { hour: '2-digit', minute: '2-digit' }; + } else if (minutes <= 1440) { + // 1 day: 1-hour buckets + intervalMs = 60 * 60 * 1000; + timeFormat = { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }; + } else if (minutes <= 10080) { + // 7 days: 6-hour buckets + intervalMs = 6 * 60 * 60 * 1000; + timeFormat = { month: 'short', day: 'numeric', hour: '2-digit' }; + } else { + // 30 days: 1-day buckets + intervalMs = 24 * 60 * 60 * 1000; + timeFormat = { month: 'short', day: 'numeric' }; } - /* ── 3️⃣ Aggregate raw rows into those slots ───────────────────────────── */ - const bucketMap = {}; // epoch ms → {input, output} - timeSeriesData.forEach(row => { - // If your DB already stores ms, drop the * 1000 - const tsMs = row.timestamp * 1000; // <-- keep *1000 **only** if the DB stores seconds - if (tsMs < cutoffMs || tsMs > nowMs) return; + /* ── 2️⃣ Get current time in local timezone ──────────────────────────── */ + const now = new Date(); + const nowMs = now.getTime(); + const cutoffMs = nowMs - minutes * 60 * 1000; - const slot = Math.floor((tsMs - cutoffMs) / intervalMs) * intervalMs + cutoffMs; - if (!bucketMap[slot]) bucketMap[slot] = { input: 0, output: 0 }; - bucketMap[slot].input += row.input_tokens || 0; - bucketMap[slot].output += row.output_tokens || 0; + /* ── 3️⃣ Build ordered bucket slots aligned to local time boundaries ───── */ + const slots = []; + + // Round cutoff down to nearest bucket interval in local time + const cutoffDate = new Date(cutoffMs); + let startDate = new Date(cutoffDate); + + if (minutes <= 60) { + // Align to 5-minute boundaries + startDate.setMinutes(Math.floor(startDate.getMinutes() / 5) * 5, 0, 0); + } else if (minutes <= 1440) { + // Align to hour boundaries + startDate.setMinutes(0, 0, 0); + } else if (minutes <= 10080) { + // Align to 6-hour boundaries (00:00, 06:00, 12:00, 18:00) + startDate.setHours(Math.floor(startDate.getHours() / 6) * 6, 0, 0, 0); + } else { + // Align to day boundaries + startDate.setHours(0, 0, 0, 0); + } + + let slotTime = startDate.getTime(); + while (slotTime <= nowMs) { + slots.push(slotTime); + slotTime += intervalMs; + } + + /* ── 4️⃣ Aggregate raw rows into local time buckets ───────────────────── */ + const bucketMap = {}; + + timeSeriesData.forEach(row => { + // Database stores UTC timestamps in seconds, convert to local time milliseconds + const utcTimestampMs = row.timestamp * 1000; + + // Check if within our time window + if (utcTimestampMs < cutoffMs || utcTimestampMs > nowMs) return; + + // Find which bucket this timestamp belongs to + let closestSlot = null; + let minDiff = Infinity; + + for (const slot of slots) { + const diff = Math.abs(utcTimestampMs - slot); + if (diff < minDiff && diff < intervalMs) { + minDiff = diff; + closestSlot = slot; + } + } + + if (closestSlot !== null) { + if (!bucketMap[closestSlot]) bucketMap[closestSlot] = { input: 0, output: 0 }; + bucketMap[closestSlot].input += row.input_tokens || 0; + bucketMap[closestSlot].output += row.output_tokens || 0; + } }); - /* ── 4️⃣ Build labels & data arrays (UTC labels) ──────────────────────── */ - const labels = slots.map(ts => { - const d = new Date(ts); // UTC millisecond timestamp - return d.toLocaleString(undefined, { // <-- force UTC for display - month: 'short', - day: 'numeric', - hour: '2-digit', - minute: '2-digit', + /* ── 5️⃣ Build labels in local timezone ───────────────────────────────── */ + const labels = slots.map(ts => { + const d = new Date(ts); + return d.toLocaleString(undefined, { + ...timeFormat, timeZoneName: 'short' - }).replace(/UTC$/, 'UTC'); // keep the “UTC” suffix if you like + }); }); const inputData = slots.map(ts => (bucketMap[ts]?.input ?? 0)); const outputData = slots.map(ts => (bucketMap[ts]?.output ?? 0)); - /* ── 5️⃣ Push into the Chart.js instance ─────────────────────────────── */ + /* ── 6️⃣ Push into the Chart.js instance ─────────────────────────────── */ chart.data.labels = labels; chart.data.datasets[0].data = inputData; chart.data.datasets[1].data = outputData; From 45d1d442eeebc8848bbd18d624ab96fa67f49589 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 20 Nov 2025 15:37:04 +0100 Subject: [PATCH 08/12] sqlite: adding connection pooling and WAL --- .gitignore | 2 +- db.py | 220 ++++++++++++++++++++++++---------------------- static/index.html | 3 +- 3 files changed, 116 insertions(+), 109 deletions(-) diff --git a/.gitignore b/.gitignore index 74eef7d..100cc12 100644 --- a/.gitignore +++ b/.gitignore @@ -64,4 +64,4 @@ cython_debug/ config.yaml # SQLite -*.db \ No newline at end of file +*.db* \ No newline at end of file diff --git a/db.py b/db.py index 264bcfa..af0f7e0 100644 --- a/db.py +++ b/db.py @@ -15,47 +15,53 @@ class TokenDatabase: db_dir = Path(self.db_path).parent if not db_dir.exists(): db_dir.mkdir(parents=True, exist_ok=True) + + async def _get_connection(self): + """Return a connection with WAL mode enabled.""" + conn= await aiosqlite.connect(self.db_path) + await conn.execute("PRAGMA journal_mode=WAL;") + return conn async def init_db(self): """Initialize the database tables.""" - async with aiosqlite.connect(self.db_path) as db: - await db.execute(''' - CREATE TABLE IF NOT EXISTS token_counts ( - endpoint TEXT, - model TEXT, - input_tokens INTEGER DEFAULT 0, - output_tokens INTEGER DEFAULT 0, - total_tokens INTEGER DEFAULT 0, - PRIMARY KEY(endpoint, model) - ) - ''') - await db.execute(''' - CREATE TABLE IF NOT EXISTS token_time_series ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - endpoint TEXT, - model TEXT, - input_tokens INTEGER, - output_tokens INTEGER, - total_tokens INTEGER, - timestamp INTEGER, -- Unix timestamp with approximate minute/hour precision - FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model) - ) - ''') - await db.commit() + db = await self._get_connection() + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_counts ( + endpoint TEXT, + model TEXT, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + PRIMARY KEY(endpoint, model) + ) + ''') + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_time_series ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + endpoint TEXT, + model TEXT, + input_tokens INTEGER, + output_tokens INTEGER, + total_tokens INTEGER, + timestamp INTEGER, -- Unix timestamp with approximate minute/hour precision + FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model) + ) + ''') + await db.commit() async def update_token_counts(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): """Update token counts for a specific endpoint and model.""" total_tokens = input_tokens + output_tokens - async with aiosqlite.connect(self.db_path) as db: - await db.execute(''' - INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(endpoint, model) DO UPDATE SET - input_tokens = input_tokens + ?, - output_tokens = output_tokens + ?, - total_tokens = total_tokens + ? - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens)) - await db.commit() + db = await self._get_connection() + await db.execute(''' + INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(endpoint, model) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens)) + await db.commit() async def add_time_series_entry(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): """Add a time series entry with approximate timestamp.""" @@ -64,93 +70,93 @@ class TokenDatabase: now = datetime.now(tz=timezone.utc) timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute).timestamp()) - async with aiosqlite.connect(self.db_path) as db: - await db.execute(''' - INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) - VALUES (?, ?, ?, ?, ?, ?) - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)) - await db.commit() + db = await self._get_connection() + await db.execute(''' + INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)) + await db.commit() async def update_batched_counts(self, counts: dict): """Update multiple token counts in a single transaction.""" if not counts: return - async with aiosqlite.connect(self.db_path) as db: - for endpoint, models in counts.items(): - for model, (input_tokens, output_tokens) in models.items(): - total_tokens = input_tokens + output_tokens - await db.execute(''' - INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(endpoint, model) DO UPDATE SET - input_tokens = input_tokens + ?, - output_tokens = output_tokens + ?, - total_tokens = total_tokens + ? - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, - input_tokens, output_tokens, total_tokens)) - await db.commit() + db = await self._get_connection() + for endpoint, models in counts.items(): + for model, (input_tokens, output_tokens) in models.items(): + total_tokens = input_tokens + output_tokens + await db.execute(''' + INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(endpoint, model) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, + input_tokens, output_tokens, total_tokens)) + await db.commit() async def add_batched_time_series(self, entries: list): """Add multiple time series entries in a single transaction.""" - async with aiosqlite.connect(self.db_path) as db: - for entry in entries: - await db.execute(''' - INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) - VALUES (?, ?, ?, ?, ?, ?) - ''', (entry['endpoint'], entry['model'], entry['input_tokens'], - entry['output_tokens'], entry['total_tokens'], entry['timestamp'])) - await db.commit() + db = await self._get_connection() + for entry in entries: + await db.execute(''' + INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (entry['endpoint'], entry['model'], entry['input_tokens'], + entry['output_tokens'], entry['total_tokens'], entry['timestamp'])) + await db.commit() async def load_token_counts(self): """Load all token counts from database.""" - async with aiosqlite.connect(self.db_path) as db: - async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor: - async for row in cursor: - yield { - 'endpoint': row[0], - 'model': row[1], - 'input_tokens': row[2], - 'output_tokens': row[3], - 'total_tokens': row[4] - } + db = await self._get_connection() + async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor: + async for row in cursor: + yield { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4] + } async def get_latest_time_series(self, limit: int = 100): """Get the latest time series entries.""" - async with aiosqlite.connect(self.db_path) as db: - async with db.execute(''' - SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp - FROM token_time_series - ORDER BY timestamp DESC - LIMIT ? - ''', (limit,)) as cursor: - async for row in cursor: - yield { - 'endpoint': row[0], - 'model': row[1], - 'input_tokens': row[2], - 'output_tokens': row[3], - 'total_tokens': row[4], - 'timestamp': row[5] - } + db = await self._get_connection() + async with db.execute(''' + SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp + FROM token_time_series + ORDER BY timestamp DESC + LIMIT ? + ''', (limit,)) as cursor: + async for row in cursor: + yield { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4], + 'timestamp': row[5] + } async def get_token_counts_for_model(self, model): """Get token counts for a specific model, aggregated across all endpoints.""" - async with aiosqlite.connect(self.db_path) as db: - async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor: - total_input = 0 - total_output = 0 - total_tokens = 0 - async for row in cursor: - total_input += row[2] - total_output += row[3] - total_tokens += row[4] - - if total_input > 0 or total_output > 0: - return { - 'endpoint': 'aggregated', - 'model': model, - 'input_tokens': total_input, - 'output_tokens': total_output, - 'total_tokens': total_tokens - } - return None + db = await self._get_connection() + async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor: + total_input = 0 + total_output = 0 + total_tokens = 0 + async for row in cursor: + total_input += row[2] + total_output += row[3] + total_tokens += row[4] + + if total_input > 0 or total_output > 0: + return { + 'endpoint': 'aggregated', + 'model': model, + 'input_tokens': total_input, + 'output_tokens': total_output, + 'total_tokens': total_tokens + } + return None diff --git a/static/index.html b/static/index.html index a93ea91..bb49d21 100644 --- a/static/index.html +++ b/static/index.html @@ -122,7 +122,8 @@ } .copy-link, .delete-link, - .show-link { + .show-link, + .stats-link { font-size: 0.9em; margin-left: 0.5em; cursor: pointer; From 7b50a5a2994b02306a10d4497cb1fffe0e8c4e60 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Fri, 21 Nov 2025 09:56:42 +0100 Subject: [PATCH 09/12] adding usage metrics to /v1 endpoints if stream == True --- db.py | 4 +--- router.py | 31 +++++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/db.py b/db.py index af0f7e0..21b0b0a 100644 --- a/db.py +++ b/db.py @@ -1,6 +1,4 @@ -import aiosqlite -import os -import asyncio +import aiosqlite, os, asyncio from pathlib import Path from datetime import datetime, timezone from collections import defaultdict diff --git a/router.py b/router.py index fbc5b6e..46b6e7e 100644 --- a/router.py +++ b/router.py @@ -1596,7 +1596,7 @@ async def openai_chat_completions_proxy(request: Request): optional_params = { "tools": tools, "response_format": response_format, - "stream_options": stream_options, + "stream_options": stream_options or {"include_usage": True }, "max_completion_tokens": max_completion_tokens, "max_tokens": max_tokens, "temperature": temperature, @@ -1638,8 +1638,17 @@ async def openai_chat_completions_proxy(request: Request): if hasattr(chunk, "model_dump_json") else orjson.dumps(chunk) ) - if chunk.choices[0].delta.content is not None: - yield f"data: {data}\n\n".encode("utf-8") + if chunk.choices: + if chunk.choices[0].delta.content is not None: + yield f"data: {data}\n\n".encode("utf-8") + if chunk.usage is not None: + prompt_tok = chunk.usage.prompt_tokens or 0 + comp_tok = chunk.usage.completion_tokens or 0 + if prompt_tok != 0 or comp_tok != 0: + if not is_ext_openai_endpoint(endpoint): + if not ":" in model: + model = model+":latest" + await token_queue.put((endpoint, model, prompt_tok, comp_tok)) yield b"data: [DONE]\n\n" else: prompt_tok = async_gen.usage.prompt_tokens or 0 @@ -1706,7 +1715,7 @@ async def openai_completions_proxy(request: Request): "seed": seed, "stop": stop, "stream": stream, - "stream_options": stream_options, + "stream_options": stream_options or {"include_usage": True }, "temperature": temperature, "top_p": top_p, "max_tokens": max_tokens, @@ -1734,7 +1743,7 @@ async def openai_completions_proxy(request: Request): oclient = openai.AsyncOpenAI(base_url=base_url, default_headers=default_headers, api_key=config.api_keys[endpoint]) # 3. Async generator that streams completions data and decrements the counter - async def stream_ocompletions_response(): + async def stream_ocompletions_response(model=model): try: # The chat method returns a generator of dicts (or GenerateResponse) async_gen = await oclient.completions.create(**params) @@ -1745,7 +1754,17 @@ async def openai_completions_proxy(request: Request): if hasattr(chunk, "model_dump_json") else orjson.dumps(chunk) ) - yield f"data: {data}\n\n".encode("utf-8") + if chunk.choices: + if chunk.choices[0].finish_reason == None: + yield f"data: {data}\n\n".encode("utf-8") + if chunk.usage is not None: + prompt_tok = chunk.usage.prompt_tokens or 0 + comp_tok = chunk.usage.completion_tokens or 0 + if prompt_tok != 0 or comp_tok != 0: + if not is_ext_openai_endpoint(endpoint): + if not ":" in model: + model = model+":latest" + await token_queue.put((endpoint, model, prompt_tok, comp_tok)) # Final DONE event yield b"data: [DONE]\n\n" else: From 1c3f9a9dc4b945bfed818e6b179daf57938d490d Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Mon, 24 Nov 2025 09:33:54 +0100 Subject: [PATCH 10/12] fix model naming to allow correct decrement usage counter in /v1 endpoints --- router.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/router.py b/router.py index 46b6e7e..cc332f5 100644 --- a/router.py +++ b/router.py @@ -1647,8 +1647,8 @@ async def openai_chat_completions_proxy(request: Request): if prompt_tok != 0 or comp_tok != 0: if not is_ext_openai_endpoint(endpoint): if not ":" in model: - model = model+":latest" - await token_queue.put((endpoint, model, prompt_tok, comp_tok)) + local_model = model+":latest" + await token_queue.put((endpoint, local_model, prompt_tok, comp_tok)) yield b"data: [DONE]\n\n" else: prompt_tok = async_gen.usage.prompt_tokens or 0 @@ -1763,8 +1763,8 @@ async def openai_completions_proxy(request: Request): if prompt_tok != 0 or comp_tok != 0: if not is_ext_openai_endpoint(endpoint): if not ":" in model: - model = model+":latest" - await token_queue.put((endpoint, model, prompt_tok, comp_tok)) + local_model = model+":latest" + await token_queue.put((endpoint, local_model, prompt_tok, comp_tok)) # Final DONE event yield b"data: [DONE]\n\n" else: From 0ffb3211548cf74ca65c9572533e6b1667597329 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Fri, 28 Nov 2025 14:59:29 +0100 Subject: [PATCH 11/12] fixing total stats model, button, labels and code clean up --- router.py | 17 ++++- static/index.html | 189 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 160 insertions(+), 46 deletions(-) diff --git a/router.py b/router.py index cc332f5..c28f750 100644 --- a/router.py +++ b/router.py @@ -2,7 +2,7 @@ title: NOMYO Router - an Ollama Proxy with Endpoint:Model aware routing author: alpha-nerd-nomyo author_url: https://github.com/nomyo-ai -version: 0.4 +version: 0.5 license: AGPL """ # ------------------------------------------------------------- @@ -1164,6 +1164,21 @@ async def show_proxy(request: Request, model: Optional[str] = None): return show # ------------------------------------------------------------- +@app.get("/api/token_counts") +async def token_counts_proxy(): + breakdown = [] + total = 0 + async for entry in db.load_token_counts(): + total += entry['total_tokens'] + breakdown.append({ + "endpoint": entry["endpoint"], + "model": entry["model"], + "input_tokens": entry["input_tokens"], + "output_tokens": entry["output_tokens"], + "total_tokens": entry["total_tokens"], + }) + return {"total_tokens": total, "breakdown": breakdown} + # 12. API route – Stats # ------------------------------------------------------------- @app.post("/api/stats") diff --git a/static/index.html b/static/index.html index bb49d21..a962157 100644 --- a/static/index.html +++ b/static/index.html @@ -256,13 +256,21 @@ .endpoint-distribution-container h3 { margin-top: 0; } + .header-row { + display: flex; + align-items: center; /* vertically center the button with the headline */ + gap: 1rem; + } -

Router Dashboard

+
+

Router Dashboard

+ +
- + From 59a8ef3abbb0c4286f531c07ee6bf9732d6561f6 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Tue, 2 Dec 2025 12:18:23 +0100 Subject: [PATCH 12/12] refactor: use a persistent WAL-enabled connection with async locks - Introduce a lazily initialized, shared aiosqlite connection stored in self._db and two asyncio locks (_db_lock, _operation_lock) for safe concurrent access - Ensure the database directory exists before connecting and enable WAL journaling and foreign keys on first connect - Add close method to gracefully close the persistent connection - Guard initialization and write operations with _operation_lock to ensure single-threaded schema setup - Switch to ON CONFLICT UPSERT for token_counts updates and initialize token_time_series table - Add typing for _db (Optional[aiosqlite.Connection]) and adjust imports accordingly addition: Frontend button with total stats aggregation task and feedback span element to keep user informed and a small database footprint --- db.py | 321 ++++++++++++++++++++++++++++++---------------- router.py | 46 ++++++- static/index.html | 21 ++- 3 files changed, 278 insertions(+), 110 deletions(-) diff --git a/db.py b/db.py index 21b0b0a..aaea508 100644 --- a/db.py +++ b/db.py @@ -1,4 +1,6 @@ -import aiosqlite, os, asyncio +import aiosqlite +import asyncio +from typing import Optional from pathlib import Path from datetime import datetime, timezone from collections import defaultdict @@ -6,155 +8,260 @@ from collections import defaultdict class TokenDatabase: def __init__(self, db_path: str = "token_counts.db"): self.db_path = db_path - self._ensure_db_directory() + self._db: Optional[aiosqlite.Connection] = None + self._db_lock = asyncio.Lock() + self._operation_lock = asyncio.Lock() def _ensure_db_directory(self): """Ensure the directory for the database exists.""" db_dir = Path(self.db_path).parent if not db_dir.exists(): db_dir.mkdir(parents=True, exist_ok=True) - - async def _get_connection(self): - """Return a connection with WAL mode enabled.""" - conn= await aiosqlite.connect(self.db_path) - await conn.execute("PRAGMA journal_mode=WAL;") - return conn + + async def _get_connection(self) -> aiosqlite.Connection: + """Return a persistent connection with WAL mode and FK enforcement enabled.""" + if self._db is None: + async with self._db_lock: + if self._db is None: + self._ensure_db_directory() + self._db = await aiosqlite.connect(self.db_path) + # Enable WAL and foreign keys for reliability and integrity + await self._db.execute("PRAGMA journal_mode=WAL;") + await self._db.execute("PRAGMA foreign_keys = ON;") + await self._db.commit() + return self._db + + async def close(self): + """Close the persistent database connection, if open.""" + if self._db is not None: + await self._db.close() + self._db = None async def init_db(self): """Initialize the database tables.""" db = await self._get_connection() - await db.execute(''' - CREATE TABLE IF NOT EXISTS token_counts ( - endpoint TEXT, - model TEXT, - input_tokens INTEGER DEFAULT 0, - output_tokens INTEGER DEFAULT 0, - total_tokens INTEGER DEFAULT 0, - PRIMARY KEY(endpoint, model) - ) - ''') - await db.execute(''' - CREATE TABLE IF NOT EXISTS token_time_series ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - endpoint TEXT, - model TEXT, - input_tokens INTEGER, - output_tokens INTEGER, - total_tokens INTEGER, - timestamp INTEGER, -- Unix timestamp with approximate minute/hour precision - FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model) - ) - ''') - await db.commit() + async with self._operation_lock: + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_counts ( + endpoint TEXT, + model TEXT, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + PRIMARY KEY(endpoint, model) + ) + ''') + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_time_series ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + endpoint TEXT, + model TEXT, + input_tokens INTEGER, + output_tokens INTEGER, + total_tokens INTEGER, + timestamp INTEGER, + FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model) + ) + ''') + await db.commit() async def update_token_counts(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): """Update token counts for a specific endpoint and model.""" total_tokens = input_tokens + output_tokens db = await self._get_connection() - await db.execute(''' - INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(endpoint, model) DO UPDATE SET - input_tokens = input_tokens + ?, - output_tokens = output_tokens + ?, - total_tokens = total_tokens + ? - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens)) - await db.commit() + async with self._operation_lock: + await db.execute(''' + INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (endpoint, model) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens)) + await db.commit() async def add_time_series_entry(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): """Add a time series entry with approximate timestamp.""" total_tokens = input_tokens + output_tokens - # Use current minute/hour as approximate timestamp + # Use current minute/hour as approximate timestamp in UTC now = datetime.now(tz=timezone.utc) timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute).timestamp()) db = await self._get_connection() - await db.execute(''' - INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) - VALUES (?, ?, ?, ?, ?, ?) - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)) - await db.commit() + async with self._operation_lock: + await db.execute(''' + INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)) + await db.commit() async def update_batched_counts(self, counts: dict): """Update multiple token counts in a single transaction.""" if not counts: return db = await self._get_connection() - for endpoint, models in counts.items(): - for model, (input_tokens, output_tokens) in models.items(): - total_tokens = input_tokens + output_tokens - await db.execute(''' - INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(endpoint, model) DO UPDATE SET - input_tokens = input_tokens + ?, - output_tokens = output_tokens + ?, - total_tokens = total_tokens + ? - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, - input_tokens, output_tokens, total_tokens)) - await db.commit() + async with self._operation_lock: + try: + await db.execute('BEGIN') + for endpoint, models in counts.items(): + for model, (input_tokens, output_tokens) in models.items(): + total_tokens = input_tokens + output_tokens + await db.execute(''' + INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (endpoint, model) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, + input_tokens, output_tokens, total_tokens)) + await db.commit() + except Exception: + # Rollback on error to maintain consistency + try: + await db.execute('ROLLBACK') + except Exception: + pass + raise async def add_batched_time_series(self, entries: list): """Add multiple time series entries in a single transaction.""" db = await self._get_connection() - for entry in entries: - await db.execute(''' - INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) - VALUES (?, ?, ?, ?, ?, ?) - ''', (entry['endpoint'], entry['model'], entry['input_tokens'], - entry['output_tokens'], entry['total_tokens'], entry['timestamp'])) - await db.commit() + async with self._operation_lock: + try: + await db.execute('BEGIN') + for entry in entries: + await db.execute(''' + INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (entry['endpoint'], entry['model'], entry['input_tokens'], + entry['output_tokens'], entry['total_tokens'], entry['timestamp'])) + await db.commit() + except Exception: + try: + await db.execute('ROLLBACK') + except Exception: + pass + raise async def load_token_counts(self): """Load all token counts from database.""" db = await self._get_connection() - async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor: - async for row in cursor: - yield { - 'endpoint': row[0], - 'model': row[1], - 'input_tokens': row[2], - 'output_tokens': row[3], - 'total_tokens': row[4] - } + async with self._operation_lock: + async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor: + async for row in cursor: + yield { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4] + } async def get_latest_time_series(self, limit: int = 100): """Get the latest time series entries.""" db = await self._get_connection() - async with db.execute(''' - SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp - FROM token_time_series - ORDER BY timestamp DESC - LIMIT ? - ''', (limit,)) as cursor: - async for row in cursor: - yield { - 'endpoint': row[0], - 'model': row[1], - 'input_tokens': row[2], - 'output_tokens': row[3], - 'total_tokens': row[4], - 'timestamp': row[5] - } + async with self._operation_lock: + async with db.execute(''' + SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp + FROM token_time_series + ORDER BY timestamp DESC + LIMIT ? + ''', (limit,)) as cursor: + async for row in cursor: + yield { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4], + 'timestamp': row[5] + } async def get_token_counts_for_model(self, model): """Get token counts for a specific model, aggregated across all endpoints.""" db = await self._get_connection() - async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor: - total_input = 0 - total_output = 0 - total_tokens = 0 - async for row in cursor: - total_input += row[2] - total_output += row[3] - total_tokens += row[4] - - if total_input > 0 or total_output > 0: - return { - 'endpoint': 'aggregated', - 'model': model, - 'input_tokens': total_input, - 'output_tokens': total_output, - 'total_tokens': total_tokens - } + async with self._operation_lock: + async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor: + total_input = 0 + total_output = 0 + total_tokens = 0 + async for row in cursor: + total_input += row[2] + total_output += row[3] + total_tokens += row[4] + + if total_input > 0 or total_output > 0: + return { + 'endpoint': 'aggregated', + 'model': model, + 'input_tokens': total_input, + 'output_tokens': total_output, + 'total_tokens': total_tokens + } return None + + async def aggregate_time_series_older_than(self, days: int, trim_old: bool = False) -> int: + """ + Aggregate time_series entries older than 'days' days into daily aggregates by + endpoint, model and UTC date (YYYY-MM-DD). The results are stored in + token_time_series_daily with a UNIQUE constraint on (endpoint, model, date). + + Returns the number of aggregated groups (distinct (endpoint, model, date) tuples) + that were created/updated. + """ + if not isinstance(days, int) or days <= 0: + days = 30 + + cutoff_ts = int(datetime.now(tz=timezone.utc).timestamp()) - (days * 86400) + + db = await self._get_connection() + aggregated_count = 0 + + async with self._operation_lock: + # Ensure daily table exists + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_time_series_daily ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + endpoint TEXT, + model TEXT, + date TEXT, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + UNIQUE(endpoint, model, date) + ) + ''') + await db.commit() + + cursor = await db.execute(''' + SELECT endpoint, model, date(timestamp, 'unixepoch') as day, + SUM(input_tokens) as in_sum, + SUM(output_tokens) as out_sum, + SUM(total_tokens) as tot_sum + FROM token_time_series + WHERE timestamp < ? + GROUP BY endpoint, model, day + ''', (cutoff_ts,)) + rows = await cursor.fetchall() + + for row in rows: + endpoint, model, day, in_sum, out_sum, tot_sum = row + await db.execute(''' + INSERT INTO token_time_series_daily (endpoint, model, date, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (endpoint, model, date) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, day, int(in_sum or 0), int(out_sum or 0), int(tot_sum or 0), + int(in_sum or 0), int(out_sum or 0), int(tot_sum or 0))) + aggregated_count += 1 + + # Trim old entries if requested + if trim_old: + await db.execute('DELETE FROM token_time_series WHERE timestamp < ?', (cutoff_ts,)) + + await db.commit() + + return aggregated_count diff --git a/router.py b/router.py index c28f750..3b60770 100644 --- a/router.py +++ b/router.py @@ -52,7 +52,7 @@ flush_task: asyncio.Task | None = None # Token Count Buffer (for write-behind pattern) # ------------------------------------------------------------------ # Structure: {endpoint: {model: (input_tokens, output_tokens)}} -token_buffer: dict[str, dict[str, tuple[int, int]]] = defaultdict(lambda: defaultdict(tuple)) +token_buffer: dict[str, dict[str, tuple[int, int]]] = defaultdict(lambda: defaultdict(lambda: (0, 0))) # Time series buffer with timestamp time_series_buffer: list[dict[str, int | str]] = [] @@ -258,6 +258,29 @@ async def flush_buffer() -> None: await db.add_batched_time_series(time_series_buffer) time_series_buffer.clear() +async def flush_remaining_buffers() -> None: + """ + Flush any in-memory buffers to the database on shutdown. + This is designed to be safely invoked during shutdown and should not raise. + """ + try: + flushed_entries = 0 + if token_buffer: + await db.update_batched_counts(token_buffer) + flushed_entries += sum(len(v) for v in token_buffer.values()) + token_buffer.clear() + if time_series_buffer: + await db.add_batched_time_series(time_series_buffer) + flushed_entries += len(time_series_buffer) + time_series_buffer.clear() + if flushed_entries: + print(f"[shutdown] Flushed {flushed_entries} in-memory entries to DB on shutdown.") + else: + print("[shutdown] No in-memory entries to flush on shutdown.") + except Exception as e: + # Do not raise during shutdown – log and continue teardown + print(f"[shutdown] Error flushing remaining buffers: {e}") + class fetch: async def available_models(endpoint: str, api_key: Optional[str] = None) -> Set[str]: """ @@ -1179,6 +1202,26 @@ async def token_counts_proxy(): }) return {"total_tokens": total, "breakdown": breakdown} +@app.post("/api/aggregate_time_series_days") +async def aggregate_time_series_days_proxy(request: Request): + """ + Aggregate time_series entries older than days into daily aggregates by endpoint/model/date. + """ + try: + body_bytes = await request.body() + if not body_bytes: + days = 30 + trim_old = False + else: + payload = orjson.loads(body_bytes.decode("utf-8")) + days = int(payload.get("days", 30)) + trim_old = bool(payload.get("trim_old", False)) + except Exception: + days = 30 + trim_old = False + aggregated = await db.aggregate_time_series_older_than(days, trim_old=trim_old) + return {"status": "ok", "days": days, "trim_old": trim_old, "aggregated_groups": aggregated} + # 12. API route – Stats # ------------------------------------------------------------- @app.post("/api/stats") @@ -1965,6 +2008,7 @@ async def startup_event() -> None: @app.on_event("shutdown") async def shutdown_event() -> None: await close_all_sse_queues() + await flush_remaining_buffers() await app_state["session"].close() if token_worker_task is not None: token_worker_task.cancel() diff --git a/static/index.html b/static/index.html index a962157..0d22721 100644 --- a/static/index.html +++ b/static/index.html @@ -269,7 +269,7 @@ />

Router Dashboard

- +