feat: stage historic sql aggregate snapshots

This commit is contained in:
Andrey Avtomonov 2026-05-11 18:04:13 +02:00
parent 6a291a1875
commit 93752d5719
3 changed files with 449 additions and 0 deletions

View file

@ -0,0 +1,167 @@
import { mkdtemp, readFile, readdir } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import type { SqlAnalysisPort } from '../../../sql-analysis/index.js';
import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js';
import type { AggregatedTemplate, HistoricSqlReader } from './types.js';
async function tempDir(): Promise<string> {
return mkdtemp(join(tmpdir(), 'historic-sql-unified-stage-'));
}
async function readJson<T>(root: string, relPath: string): Promise<T> {
return JSON.parse(await readFile(join(root, relPath), 'utf-8')) as T;
}
function aggregate(overrides: Partial<AggregatedTemplate> & { templateId: string; canonicalSql: string }): AggregatedTemplate {
return {
templateId: overrides.templateId,
canonicalSql: overrides.canonicalSql,
dialect: overrides.dialect ?? 'postgres',
stats: overrides.stats ?? {
executions: 42,
distinctUsers: 3,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 20,
p95RuntimeMs: 80,
errorRate: 0,
rowsProduced: 100,
},
topUsers: overrides.topUsers ?? [{ user: 'analyst', executions: 40 }],
};
}
describe('stageHistoricSqlAggregatedSnapshot', () => {
it('batch parses templates and writes stable table and patterns artifacts', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: ['pg_stat_statements.max is low; aggregation still proceeds'] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'orders-by-status',
canonicalSql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status',
});
yield aggregate({
templateId: 'service-account-only',
canonicalSql: 'select * from public.orders where id = $1',
stats: {
executions: 20,
distinctUsers: 1,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 5,
p95RuntimeMs: 10,
errorRate: 0,
rowsProduced: 1,
},
topUsers: [{ user: 'svc_loader', executions: 20 }],
});
yield aggregate({
templateId: 'bad-parse',
canonicalSql: 'select broken from',
});
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
[
'orders-by-status',
{
tablesTouched: ['public.orders', 'public.customers'],
columnsByClause: {
select: ['status'],
where: ['created_at'],
join: ['customer_id'],
groupBy: ['status'],
},
},
],
['bad-parse', { tablesTouched: [], columnsByClause: {}, error: 'parse failed' }],
])),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
filters: {
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
},
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledTimes(1);
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith(
[
{
id: 'orders-by-status',
sql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status',
},
{ id: 'bad-parse', sql: 'select broken from' },
],
'postgres',
);
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.customers.json', 'public.orders.json']);
const manifest = await readJson<Record<string, unknown>>(stagedDir, 'manifest.json');
expect(manifest).toMatchObject({
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
snapshotRowCount: 3,
touchedTableCount: 2,
parseFailures: 1,
warnings: ['parse_failed:bad-parse'],
probeWarnings: ['pg_stat_statements.max is low; aggregation still proceeds'],
});
const orders = await readJson<Record<string, any>>(stagedDir, 'tables/public.orders.json');
expect(orders).toMatchObject({
table: 'public.orders',
stats: {
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
errorRateBucket: 'none',
p95RuntimeBucket: '<100ms',
recencyBucket: 'current',
},
columnsByClause: {
select: [['status', 'high']],
where: [['created_at', 'high']],
join: [['customer_id', 'high']],
groupBy: [['status', 'high']],
},
observedJoins: [{ withTable: 'public.customers', on: ['customer_id'], freq: 'high' }],
topTemplates: [
{
id: 'orders-by-status',
topUsers: [{ user: 'analyst' }],
},
],
});
expect(orders.topTemplates[0].canonicalSql).toContain('group by o.status');
const patterns = await readJson<Record<string, any>>(stagedDir, 'patterns-input.json');
expect(patterns.templates).toEqual([
{
id: 'orders-by-status',
canonicalSql: expect.stringContaining('public.orders'),
tablesTouched: ['public.customers', 'public.orders'],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
},
]);
});
});

View file

@ -0,0 +1,281 @@
import { mkdir, writeFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import type { SqlAnalysisPort } from '../../../sql-analysis/index.js';
import {
bucketDistinctUsers,
bucketErrorRate,
bucketExecutions,
bucketFrequency,
bucketP95Runtime,
bucketRecency,
} from './buckets.js';
import {
HISTORIC_SQL_SOURCE_KEY,
aggregatedTemplateSchema,
historicSqlUnifiedPullConfigSchema,
type AggregatedTemplate,
type HistoricSqlReader,
type HistoricSqlUnifiedPullConfig,
type StagedPatternsInput,
type StagedTableInput,
} from './types.js';
interface StageHistoricSqlAggregatedSnapshotInput {
stagedDir: string;
connectionId: string;
queryClient: unknown;
reader: HistoricSqlReader;
sqlAnalysis: SqlAnalysisPort;
pullConfig: unknown;
now?: Date;
}
interface ParsedTemplate {
template: AggregatedTemplate;
tablesTouched: string[];
columnsByClause: Record<string, string[]>;
}
interface TableAccumulator {
table: string;
executions: number;
distinctUsers: number;
errorRateNumerator: number;
p95RuntimeMs: number | null;
lastSeen: string;
columnsByClause: Map<string, Map<string, number>>;
observedJoins: Map<string, Map<string, number>>;
topTemplates: AggregatedTemplate[];
}
const TRIVIAL_SQL_RE = /^\s*SELECT\s+(1|NOW\(\)|CURRENT_TIMESTAMP|VERSION\(\))\s*;?\s*$/i;
const NOISE_PREFIX_RE = /^\s*(SHOW|DESCRIBE|DESC|EXPLAIN|USE|SET)\b/i;
const SYSTEM_TABLE_RE = /\b(INFORMATION_SCHEMA|SNOWFLAKE\.ACCOUNT_USAGE|pg_|system\.)/i;
function writeJson(root: string, relPath: string, value: unknown): Promise<void> {
const target = join(root, relPath);
return mkdir(dirname(target), { recursive: true }).then(() =>
writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'),
);
}
function compilePatterns(patterns: string[]): RegExp[] {
return patterns.map((pattern) => new RegExp(pattern));
}
function matchesAny(value: string | null, patterns: RegExp[]): boolean {
return !!value && patterns.some((pattern) => pattern.test(value));
}
function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean {
if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true;
if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true;
return false;
}
function shouldDropByUsers(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean {
const service = config.filters.serviceAccounts;
if (!service || service.mode === 'mark-only' || service.patterns.length === 0) return false;
const patterns = compilePatterns(service.patterns);
const matchingExecutions = template.topUsers
.filter((entry) => matchesAny(entry.user, patterns))
.reduce((sum, entry) => sum + entry.executions, 0);
const allExecutions = template.topUsers.reduce((sum, entry) => sum + entry.executions, 0);
const serviceOnly = allExecutions > 0 && matchingExecutions >= allExecutions;
return service.mode === 'exclude' ? serviceOnly : !serviceOnly;
}
function shouldDropByFailure(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean {
const failed = config.filters.dropFailedBelow;
return !!failed && template.stats.errorRate > failed.errorRate && template.stats.executions < failed.executions;
}
function shouldDropTemplate(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean {
if (shouldDropBySql(template.canonicalSql, config)) return true;
if (shouldDropByUsers(template, config)) return true;
if (shouldDropByFailure(template, config)) return true;
return false;
}
function recordColumn(acc: TableAccumulator, clause: string, column: string, executions: number): void {
const byColumn = acc.columnsByClause.get(clause) ?? new Map<string, number>();
byColumn.set(column, (byColumn.get(column) ?? 0) + executions);
acc.columnsByClause.set(clause, byColumn);
}
function recordJoin(acc: TableAccumulator, otherTable: string, columns: string[], executions: number): void {
const byColumns = acc.observedJoins.get(otherTable) ?? new Map<string, number>();
const key = [...new Set(columns)].sort().join(',');
if (key.length > 0) {
byColumns.set(key, (byColumns.get(key) ?? 0) + executions);
acc.observedJoins.set(otherTable, byColumns);
}
}
function accumulatorFor(table: string): TableAccumulator {
return {
table,
executions: 0,
distinctUsers: 0,
errorRateNumerator: 0,
p95RuntimeMs: null,
lastSeen: '1970-01-01T00:00:00.000Z',
columnsByClause: new Map(),
observedJoins: new Map(),
topTemplates: [],
};
}
function addTemplate(acc: TableAccumulator, parsed: ParsedTemplate): void {
const executions = parsed.template.stats.executions;
acc.executions += executions;
acc.distinctUsers = Math.max(acc.distinctUsers, parsed.template.stats.distinctUsers);
acc.errorRateNumerator += parsed.template.stats.errorRate * executions;
acc.p95RuntimeMs =
acc.p95RuntimeMs === null
? parsed.template.stats.p95RuntimeMs
: parsed.template.stats.p95RuntimeMs === null
? acc.p95RuntimeMs
: Math.max(acc.p95RuntimeMs, parsed.template.stats.p95RuntimeMs);
acc.lastSeen = parsed.template.stats.lastSeen > acc.lastSeen ? parsed.template.stats.lastSeen : acc.lastSeen;
for (const [clause, columns] of Object.entries(parsed.columnsByClause)) {
for (const column of columns) {
recordColumn(acc, clause, column, executions);
}
}
const joinColumns = parsed.columnsByClause.join ?? [];
for (const otherTable of parsed.tablesTouched.filter((table) => table !== acc.table)) {
recordJoin(acc, otherTable, joinColumns, executions);
}
acc.topTemplates.push(parsed.template);
}
function toStagedTable(acc: TableAccumulator, now: Date): StagedTableInput {
const errorRate = acc.executions > 0 ? acc.errorRateNumerator / acc.executions : 0;
const columnsByClause = Object.fromEntries(
[...acc.columnsByClause.entries()]
.sort(([left], [right]) => left.localeCompare(right))
.map(([clause, counts]) => [
clause,
[...counts.entries()]
.sort((left, right) => right[1] - left[1] || left[0].localeCompare(right[0]))
.map(([column, count]) => [column, bucketFrequency(count, acc.executions)]),
]),
);
const observedJoins = [...acc.observedJoins.entries()]
.flatMap(([withTable, byColumns]) =>
[...byColumns.entries()].map(([columns, count]) => ({
withTable,
on: columns.split(',').filter(Boolean),
freq: bucketFrequency(count, acc.executions),
})),
)
.sort((left, right) => left.withTable.localeCompare(right.withTable) || left.on.join(',').localeCompare(right.on.join(',')));
const topTemplates = [...acc.topTemplates]
.sort((left, right) => right.stats.executions - left.stats.executions || left.templateId.localeCompare(right.templateId))
.slice(0, 5)
.map((template) => ({
id: template.templateId,
canonicalSql: template.canonicalSql,
topUsers: template.topUsers.slice(0, 5).map((entry) => ({ user: entry.user })),
}));
return {
table: acc.table,
stats: {
executionsBucket: bucketExecutions(acc.executions),
distinctUsersBucket: bucketDistinctUsers(acc.distinctUsers),
errorRateBucket: bucketErrorRate(errorRate),
p95RuntimeBucket: bucketP95Runtime(acc.p95RuntimeMs),
recencyBucket: bucketRecency(acc.lastSeen, now),
},
columnsByClause,
observedJoins,
topTemplates,
};
}
function toPatternsInput(parsedTemplates: ParsedTemplate[]): StagedPatternsInput {
return {
templates: parsedTemplates
.map(({ template, tablesTouched }) => ({
id: template.templateId,
canonicalSql: template.canonicalSql,
tablesTouched: [...tablesTouched].sort(),
executionsBucket: bucketExecutions(template.stats.executions),
distinctUsersBucket: bucketDistinctUsers(template.stats.distinctUsers),
dialect: template.dialect,
}))
.sort((left, right) => left.id.localeCompare(right.id)),
};
}
export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSqlAggregatedSnapshotInput): Promise<void> {
const config = historicSqlUnifiedPullConfigSchema.parse(input.pullConfig);
const now = input.now ?? new Date();
const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000);
const probe = await input.reader.probe(input.queryClient);
const snapshot: AggregatedTemplate[] = [];
let snapshotRowCount = 0;
for await (const row of input.reader.fetchAggregated(input.queryClient, { start: windowStart, end: now }, config)) {
snapshotRowCount += 1;
const parsed = aggregatedTemplateSchema.parse(row);
if (!shouldDropTemplate(parsed, config)) {
snapshot.push(parsed);
}
}
const analysis = await input.sqlAnalysis.analyzeBatch(
snapshot.map((template) => ({ id: template.templateId, sql: template.canonicalSql })),
config.dialect,
);
const warnings: string[] = [];
const parsedTemplates: ParsedTemplate[] = [];
for (const template of snapshot) {
const parsed = analysis.get(template.templateId);
if (!parsed || parsed.error) {
warnings.push(`parse_failed:${template.templateId}`);
continue;
}
const tablesTouched = [...new Set(parsed.tablesTouched)].filter((table) => table.length > 0).sort();
if (tablesTouched.length === 0) {
continue;
}
parsedTemplates.push({
template,
tablesTouched,
columnsByClause: Object.fromEntries(
Object.entries(parsed.columnsByClause).map(([clause, columns]) => [clause, [...new Set(columns)].sort()]),
),
});
}
const byTable = new Map<string, TableAccumulator>();
for (const parsed of parsedTemplates) {
for (const table of parsed.tablesTouched) {
const acc = byTable.get(table) ?? accumulatorFor(table);
addTemplate(acc, parsed);
byTable.set(table, acc);
}
}
await mkdir(input.stagedDir, { recursive: true });
for (const [table, acc] of [...byTable.entries()].sort(([left], [right]) => left.localeCompare(right))) {
await writeJson(input.stagedDir, `tables/${table}.json`, toStagedTable(acc, now));
}
await writeJson(input.stagedDir, 'patterns-input.json', toPatternsInput(parsedTemplates));
await writeJson(input.stagedDir, 'manifest.json', {
source: HISTORIC_SQL_SOURCE_KEY,
connectionId: input.connectionId,
dialect: config.dialect,
fetchedAt: now.toISOString(),
windowStart: windowStart.toISOString(),
windowEnd: now.toISOString(),
snapshotRowCount,
touchedTableCount: byTable.size,
parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length,
warnings,
probeWarnings: probe.warnings,
});
}

View file

@ -330,6 +330,7 @@ export { BigQueryHistoricSqlQueryHistoryReader } from './adapters/historic-sql/b
export type { BigQueryHistoricSqlQueryHistoryReaderOptions } from './adapters/historic-sql/bigquery-query-history-reader.js';
export { PostgresPgssQueryHistoryReader } from './adapters/historic-sql/postgres-pgss-query-history-reader.js';
export { SnowflakeHistoricSqlQueryHistoryReader } from './adapters/historic-sql/snowflake-query-history-reader.js';
export { stageHistoricSqlAggregatedSnapshot } from './adapters/historic-sql/stage-unified.js';
export { stageHistoricSqlTemplates } from './adapters/historic-sql/stage.js';
export {
patternOutputSchema,