From 45d1d442eeebc8848bbd18d624ab96fa67f49589 Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Thu, 20 Nov 2025 15:37:04 +0100 Subject: [PATCH] sqlite: adding connection pooling and WAL --- .gitignore | 2 +- db.py | 220 ++++++++++++++++++++++++---------------------- static/index.html | 3 +- 3 files changed, 116 insertions(+), 109 deletions(-) diff --git a/.gitignore b/.gitignore index 74eef7d..100cc12 100644 --- a/.gitignore +++ b/.gitignore @@ -64,4 +64,4 @@ cython_debug/ config.yaml # SQLite -*.db \ No newline at end of file +*.db* \ No newline at end of file diff --git a/db.py b/db.py index 264bcfa..af0f7e0 100644 --- a/db.py +++ b/db.py @@ -15,47 +15,53 @@ class TokenDatabase: db_dir = Path(self.db_path).parent if not db_dir.exists(): db_dir.mkdir(parents=True, exist_ok=True) + + async def _get_connection(self): + """Return a connection with WAL mode enabled.""" + conn= await aiosqlite.connect(self.db_path) + await conn.execute("PRAGMA journal_mode=WAL;") + return conn async def init_db(self): """Initialize the database tables.""" - async with aiosqlite.connect(self.db_path) as db: - await db.execute(''' - CREATE TABLE IF NOT EXISTS token_counts ( - endpoint TEXT, - model TEXT, - input_tokens INTEGER DEFAULT 0, - output_tokens INTEGER DEFAULT 0, - total_tokens INTEGER DEFAULT 0, - PRIMARY KEY(endpoint, model) - ) - ''') - await db.execute(''' - CREATE TABLE IF NOT EXISTS token_time_series ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - endpoint TEXT, - model TEXT, - input_tokens INTEGER, - output_tokens INTEGER, - total_tokens INTEGER, - timestamp INTEGER, -- Unix timestamp with approximate minute/hour precision - FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model) - ) - ''') - await db.commit() + db = await self._get_connection() + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_counts ( + endpoint TEXT, + model TEXT, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + total_tokens INTEGER DEFAULT 0, + PRIMARY KEY(endpoint, model) + ) + ''') + await db.execute(''' + CREATE TABLE IF NOT EXISTS token_time_series ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + endpoint TEXT, + model TEXT, + input_tokens INTEGER, + output_tokens INTEGER, + total_tokens INTEGER, + timestamp INTEGER, -- Unix timestamp with approximate minute/hour precision + FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model) + ) + ''') + await db.commit() async def update_token_counts(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): """Update token counts for a specific endpoint and model.""" total_tokens = input_tokens + output_tokens - async with aiosqlite.connect(self.db_path) as db: - await db.execute(''' - INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(endpoint, model) DO UPDATE SET - input_tokens = input_tokens + ?, - output_tokens = output_tokens + ?, - total_tokens = total_tokens + ? - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens)) - await db.commit() + db = await self._get_connection() + await db.execute(''' + INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(endpoint, model) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens)) + await db.commit() async def add_time_series_entry(self, endpoint: str, model: str, input_tokens: int, output_tokens: int): """Add a time series entry with approximate timestamp.""" @@ -64,93 +70,93 @@ class TokenDatabase: now = datetime.now(tz=timezone.utc) timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute).timestamp()) - async with aiosqlite.connect(self.db_path) as db: - await db.execute(''' - INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) - VALUES (?, ?, ?, ?, ?, ?) - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)) - await db.commit() + db = await self._get_connection() + await db.execute(''' + INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)) + await db.commit() async def update_batched_counts(self, counts: dict): """Update multiple token counts in a single transaction.""" if not counts: return - async with aiosqlite.connect(self.db_path) as db: - for endpoint, models in counts.items(): - for model, (input_tokens, output_tokens) in models.items(): - total_tokens = input_tokens + output_tokens - await db.execute(''' - INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT(endpoint, model) DO UPDATE SET - input_tokens = input_tokens + ?, - output_tokens = output_tokens + ?, - total_tokens = total_tokens + ? - ''', (endpoint, model, input_tokens, output_tokens, total_tokens, - input_tokens, output_tokens, total_tokens)) - await db.commit() + db = await self._get_connection() + for endpoint, models in counts.items(): + for model, (input_tokens, output_tokens) in models.items(): + total_tokens = input_tokens + output_tokens + await db.execute(''' + INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(endpoint, model) DO UPDATE SET + input_tokens = input_tokens + ?, + output_tokens = output_tokens + ?, + total_tokens = total_tokens + ? + ''', (endpoint, model, input_tokens, output_tokens, total_tokens, + input_tokens, output_tokens, total_tokens)) + await db.commit() async def add_batched_time_series(self, entries: list): """Add multiple time series entries in a single transaction.""" - async with aiosqlite.connect(self.db_path) as db: - for entry in entries: - await db.execute(''' - INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) - VALUES (?, ?, ?, ?, ?, ?) - ''', (entry['endpoint'], entry['model'], entry['input_tokens'], - entry['output_tokens'], entry['total_tokens'], entry['timestamp'])) - await db.commit() + db = await self._get_connection() + for entry in entries: + await db.execute(''' + INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (entry['endpoint'], entry['model'], entry['input_tokens'], + entry['output_tokens'], entry['total_tokens'], entry['timestamp'])) + await db.commit() async def load_token_counts(self): """Load all token counts from database.""" - async with aiosqlite.connect(self.db_path) as db: - async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor: - async for row in cursor: - yield { - 'endpoint': row[0], - 'model': row[1], - 'input_tokens': row[2], - 'output_tokens': row[3], - 'total_tokens': row[4] - } + db = await self._get_connection() + async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor: + async for row in cursor: + yield { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4] + } async def get_latest_time_series(self, limit: int = 100): """Get the latest time series entries.""" - async with aiosqlite.connect(self.db_path) as db: - async with db.execute(''' - SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp - FROM token_time_series - ORDER BY timestamp DESC - LIMIT ? - ''', (limit,)) as cursor: - async for row in cursor: - yield { - 'endpoint': row[0], - 'model': row[1], - 'input_tokens': row[2], - 'output_tokens': row[3], - 'total_tokens': row[4], - 'timestamp': row[5] - } + db = await self._get_connection() + async with db.execute(''' + SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp + FROM token_time_series + ORDER BY timestamp DESC + LIMIT ? + ''', (limit,)) as cursor: + async for row in cursor: + yield { + 'endpoint': row[0], + 'model': row[1], + 'input_tokens': row[2], + 'output_tokens': row[3], + 'total_tokens': row[4], + 'timestamp': row[5] + } async def get_token_counts_for_model(self, model): """Get token counts for a specific model, aggregated across all endpoints.""" - async with aiosqlite.connect(self.db_path) as db: - async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor: - total_input = 0 - total_output = 0 - total_tokens = 0 - async for row in cursor: - total_input += row[2] - total_output += row[3] - total_tokens += row[4] - - if total_input > 0 or total_output > 0: - return { - 'endpoint': 'aggregated', - 'model': model, - 'input_tokens': total_input, - 'output_tokens': total_output, - 'total_tokens': total_tokens - } - return None + db = await self._get_connection() + async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor: + total_input = 0 + total_output = 0 + total_tokens = 0 + async for row in cursor: + total_input += row[2] + total_output += row[3] + total_tokens += row[4] + + if total_input > 0 or total_output > 0: + return { + 'endpoint': 'aggregated', + 'model': model, + 'input_tokens': total_input, + 'output_tokens': total_output, + 'total_tokens': total_tokens + } + return None diff --git a/static/index.html b/static/index.html index a93ea91..bb49d21 100644 --- a/static/index.html +++ b/static/index.html @@ -122,7 +122,8 @@ } .copy-link, .delete-link, - .show-link { + .show-link, + .stats-link { font-size: 0.9em; margin-left: 0.5em; cursor: pointer;