sqlite: adding connection pooling and WAL
This commit is contained in:
parent
aa23a4dd81
commit
45d1d442ee
3 changed files with 116 additions and 109 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -64,4 +64,4 @@ cython_debug/
|
|||
config.yaml
|
||||
|
||||
# SQLite
|
||||
*.db
|
||||
*.db*
|
||||
220
db.py
220
db.py
|
|
@ -15,47 +15,53 @@ class TokenDatabase:
|
|||
db_dir = Path(self.db_path).parent
|
||||
if not db_dir.exists():
|
||||
db_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
async def _get_connection(self):
|
||||
"""Return a connection with WAL mode enabled."""
|
||||
conn= await aiosqlite.connect(self.db_path)
|
||||
await conn.execute("PRAGMA journal_mode=WAL;")
|
||||
return conn
|
||||
|
||||
async def init_db(self):
|
||||
"""Initialize the database tables."""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute('''
|
||||
CREATE TABLE IF NOT EXISTS token_counts (
|
||||
endpoint TEXT,
|
||||
model TEXT,
|
||||
input_tokens INTEGER DEFAULT 0,
|
||||
output_tokens INTEGER DEFAULT 0,
|
||||
total_tokens INTEGER DEFAULT 0,
|
||||
PRIMARY KEY(endpoint, model)
|
||||
)
|
||||
''')
|
||||
await db.execute('''
|
||||
CREATE TABLE IF NOT EXISTS token_time_series (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
endpoint TEXT,
|
||||
model TEXT,
|
||||
input_tokens INTEGER,
|
||||
output_tokens INTEGER,
|
||||
total_tokens INTEGER,
|
||||
timestamp INTEGER, -- Unix timestamp with approximate minute/hour precision
|
||||
FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model)
|
||||
)
|
||||
''')
|
||||
await db.commit()
|
||||
db = await self._get_connection()
|
||||
await db.execute('''
|
||||
CREATE TABLE IF NOT EXISTS token_counts (
|
||||
endpoint TEXT,
|
||||
model TEXT,
|
||||
input_tokens INTEGER DEFAULT 0,
|
||||
output_tokens INTEGER DEFAULT 0,
|
||||
total_tokens INTEGER DEFAULT 0,
|
||||
PRIMARY KEY(endpoint, model)
|
||||
)
|
||||
''')
|
||||
await db.execute('''
|
||||
CREATE TABLE IF NOT EXISTS token_time_series (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
endpoint TEXT,
|
||||
model TEXT,
|
||||
input_tokens INTEGER,
|
||||
output_tokens INTEGER,
|
||||
total_tokens INTEGER,
|
||||
timestamp INTEGER, -- Unix timestamp with approximate minute/hour precision
|
||||
FOREIGN KEY(endpoint, model) REFERENCES token_counts(endpoint, model)
|
||||
)
|
||||
''')
|
||||
await db.commit()
|
||||
|
||||
async def update_token_counts(self, endpoint: str, model: str, input_tokens: int, output_tokens: int):
|
||||
"""Update token counts for a specific endpoint and model."""
|
||||
total_tokens = input_tokens + output_tokens
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute('''
|
||||
INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(endpoint, model) DO UPDATE SET
|
||||
input_tokens = input_tokens + ?,
|
||||
output_tokens = output_tokens + ?,
|
||||
total_tokens = total_tokens + ?
|
||||
''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens))
|
||||
await db.commit()
|
||||
db = await self._get_connection()
|
||||
await db.execute('''
|
||||
INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(endpoint, model) DO UPDATE SET
|
||||
input_tokens = input_tokens + ?,
|
||||
output_tokens = output_tokens + ?,
|
||||
total_tokens = total_tokens + ?
|
||||
''', (endpoint, model, input_tokens, output_tokens, total_tokens, input_tokens, output_tokens, total_tokens))
|
||||
await db.commit()
|
||||
|
||||
async def add_time_series_entry(self, endpoint: str, model: str, input_tokens: int, output_tokens: int):
|
||||
"""Add a time series entry with approximate timestamp."""
|
||||
|
|
@ -64,93 +70,93 @@ class TokenDatabase:
|
|||
now = datetime.now(tz=timezone.utc)
|
||||
timestamp = int(datetime(now.year, now.month, now.day, now.hour, now.minute).timestamp())
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute('''
|
||||
INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp))
|
||||
await db.commit()
|
||||
db = await self._get_connection()
|
||||
await db.execute('''
|
||||
INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
''', (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp))
|
||||
await db.commit()
|
||||
|
||||
async def update_batched_counts(self, counts: dict):
|
||||
"""Update multiple token counts in a single transaction."""
|
||||
if not counts:
|
||||
return
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
for endpoint, models in counts.items():
|
||||
for model, (input_tokens, output_tokens) in models.items():
|
||||
total_tokens = input_tokens + output_tokens
|
||||
await db.execute('''
|
||||
INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(endpoint, model) DO UPDATE SET
|
||||
input_tokens = input_tokens + ?,
|
||||
output_tokens = output_tokens + ?,
|
||||
total_tokens = total_tokens + ?
|
||||
''', (endpoint, model, input_tokens, output_tokens, total_tokens,
|
||||
input_tokens, output_tokens, total_tokens))
|
||||
await db.commit()
|
||||
db = await self._get_connection()
|
||||
for endpoint, models in counts.items():
|
||||
for model, (input_tokens, output_tokens) in models.items():
|
||||
total_tokens = input_tokens + output_tokens
|
||||
await db.execute('''
|
||||
INSERT INTO token_counts (endpoint, model, input_tokens, output_tokens, total_tokens)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(endpoint, model) DO UPDATE SET
|
||||
input_tokens = input_tokens + ?,
|
||||
output_tokens = output_tokens + ?,
|
||||
total_tokens = total_tokens + ?
|
||||
''', (endpoint, model, input_tokens, output_tokens, total_tokens,
|
||||
input_tokens, output_tokens, total_tokens))
|
||||
await db.commit()
|
||||
|
||||
async def add_batched_time_series(self, entries: list):
|
||||
"""Add multiple time series entries in a single transaction."""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
for entry in entries:
|
||||
await db.execute('''
|
||||
INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
''', (entry['endpoint'], entry['model'], entry['input_tokens'],
|
||||
entry['output_tokens'], entry['total_tokens'], entry['timestamp']))
|
||||
await db.commit()
|
||||
db = await self._get_connection()
|
||||
for entry in entries:
|
||||
await db.execute('''
|
||||
INSERT INTO token_time_series (endpoint, model, input_tokens, output_tokens, total_tokens, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
''', (entry['endpoint'], entry['model'], entry['input_tokens'],
|
||||
entry['output_tokens'], entry['total_tokens'], entry['timestamp']))
|
||||
await db.commit()
|
||||
|
||||
async def load_token_counts(self):
|
||||
"""Load all token counts from database."""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor:
|
||||
async for row in cursor:
|
||||
yield {
|
||||
'endpoint': row[0],
|
||||
'model': row[1],
|
||||
'input_tokens': row[2],
|
||||
'output_tokens': row[3],
|
||||
'total_tokens': row[4]
|
||||
}
|
||||
db = await self._get_connection()
|
||||
async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts') as cursor:
|
||||
async for row in cursor:
|
||||
yield {
|
||||
'endpoint': row[0],
|
||||
'model': row[1],
|
||||
'input_tokens': row[2],
|
||||
'output_tokens': row[3],
|
||||
'total_tokens': row[4]
|
||||
}
|
||||
|
||||
async def get_latest_time_series(self, limit: int = 100):
|
||||
"""Get the latest time series entries."""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
async with db.execute('''
|
||||
SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp
|
||||
FROM token_time_series
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ?
|
||||
''', (limit,)) as cursor:
|
||||
async for row in cursor:
|
||||
yield {
|
||||
'endpoint': row[0],
|
||||
'model': row[1],
|
||||
'input_tokens': row[2],
|
||||
'output_tokens': row[3],
|
||||
'total_tokens': row[4],
|
||||
'timestamp': row[5]
|
||||
}
|
||||
db = await self._get_connection()
|
||||
async with db.execute('''
|
||||
SELECT endpoint, model, input_tokens, output_tokens, total_tokens, timestamp
|
||||
FROM token_time_series
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ?
|
||||
''', (limit,)) as cursor:
|
||||
async for row in cursor:
|
||||
yield {
|
||||
'endpoint': row[0],
|
||||
'model': row[1],
|
||||
'input_tokens': row[2],
|
||||
'output_tokens': row[3],
|
||||
'total_tokens': row[4],
|
||||
'timestamp': row[5]
|
||||
}
|
||||
|
||||
async def get_token_counts_for_model(self, model):
|
||||
"""Get token counts for a specific model, aggregated across all endpoints."""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor:
|
||||
total_input = 0
|
||||
total_output = 0
|
||||
total_tokens = 0
|
||||
async for row in cursor:
|
||||
total_input += row[2]
|
||||
total_output += row[3]
|
||||
total_tokens += row[4]
|
||||
|
||||
if total_input > 0 or total_output > 0:
|
||||
return {
|
||||
'endpoint': 'aggregated',
|
||||
'model': model,
|
||||
'input_tokens': total_input,
|
||||
'output_tokens': total_output,
|
||||
'total_tokens': total_tokens
|
||||
}
|
||||
return None
|
||||
db = await self._get_connection()
|
||||
async with db.execute('SELECT endpoint, model, input_tokens, output_tokens, total_tokens FROM token_counts WHERE model = ?', (model,)) as cursor:
|
||||
total_input = 0
|
||||
total_output = 0
|
||||
total_tokens = 0
|
||||
async for row in cursor:
|
||||
total_input += row[2]
|
||||
total_output += row[3]
|
||||
total_tokens += row[4]
|
||||
|
||||
if total_input > 0 or total_output > 0:
|
||||
return {
|
||||
'endpoint': 'aggregated',
|
||||
'model': model,
|
||||
'input_tokens': total_input,
|
||||
'output_tokens': total_output,
|
||||
'total_tokens': total_tokens
|
||||
}
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -122,7 +122,8 @@
|
|||
}
|
||||
.copy-link,
|
||||
.delete-link,
|
||||
.show-link {
|
||||
.show-link,
|
||||
.stats-link {
|
||||
font-size: 0.9em;
|
||||
margin-left: 0.5em;
|
||||
cursor: pointer;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue