From 93752d57197b281cad5ca8f4533f9700fa8f64ee Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 18:04:13 +0200 Subject: [PATCH] feat: stage historic sql aggregate snapshots --- .../historic-sql/stage-unified.test.ts | 167 +++++++++++ .../adapters/historic-sql/stage-unified.ts | 281 ++++++++++++++++++ packages/context/src/ingest/index.ts | 1 + 3 files changed, 449 insertions(+) create mode 100644 packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts create mode 100644 packages/context/src/ingest/adapters/historic-sql/stage-unified.ts diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts b/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts new file mode 100644 index 00000000..ad11bace --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts @@ -0,0 +1,167 @@ +import { mkdtemp, readFile, readdir } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { describe, expect, it, vi } from 'vitest'; +import type { SqlAnalysisPort } from '../../../sql-analysis/index.js'; +import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js'; +import type { AggregatedTemplate, HistoricSqlReader } from './types.js'; + +async function tempDir(): Promise { + return mkdtemp(join(tmpdir(), 'historic-sql-unified-stage-')); +} + +async function readJson(root: string, relPath: string): Promise { + return JSON.parse(await readFile(join(root, relPath), 'utf-8')) as T; +} + +function aggregate(overrides: Partial & { templateId: string; canonicalSql: string }): AggregatedTemplate { + return { + templateId: overrides.templateId, + canonicalSql: overrides.canonicalSql, + dialect: overrides.dialect ?? 'postgres', + stats: overrides.stats ?? { + executions: 42, + distinctUsers: 3, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 20, + p95RuntimeMs: 80, + errorRate: 0, + rowsProduced: 100, + }, + topUsers: overrides.topUsers ?? [{ user: 'analyst', executions: 40 }], + }; +} + +describe('stageHistoricSqlAggregatedSnapshot', () => { + it('batch parses templates and writes stable table and patterns artifacts', async () => { + const stagedDir = await tempDir(); + const reader: HistoricSqlReader = { + async probe() { + return { warnings: ['pg_stat_statements.max is low; aggregation still proceeds'] }; + }, + async *fetchAggregated() { + yield aggregate({ + templateId: 'orders-by-status', + canonicalSql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status', + }); + yield aggregate({ + templateId: 'service-account-only', + canonicalSql: 'select * from public.orders where id = $1', + stats: { + executions: 20, + distinctUsers: 1, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 5, + p95RuntimeMs: 10, + errorRate: 0, + rowsProduced: 1, + }, + topUsers: [{ user: 'svc_loader', executions: 20 }], + }); + yield aggregate({ + templateId: 'bad-parse', + canonicalSql: 'select broken from', + }); + }, + }; + const sqlAnalysis: SqlAnalysisPort = { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(async () => new Map([ + [ + 'orders-by-status', + { + tablesTouched: ['public.orders', 'public.customers'], + columnsByClause: { + select: ['status'], + where: ['created_at'], + join: ['customer_id'], + groupBy: ['status'], + }, + }, + ], + ['bad-parse', { tablesTouched: [], columnsByClause: {}, error: 'parse failed' }], + ])), + }; + + await stageHistoricSqlAggregatedSnapshot({ + stagedDir, + connectionId: 'warehouse', + queryClient: {}, + reader, + sqlAnalysis, + pullConfig: { + dialect: 'postgres', + filters: { + serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' }, + }, + }, + now: new Date('2026-05-11T12:00:00.000Z'), + }); + + expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledTimes(1); + expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith( + [ + { + id: 'orders-by-status', + sql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status', + }, + { id: 'bad-parse', sql: 'select broken from' }, + ], + 'postgres', + ); + + expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.customers.json', 'public.orders.json']); + + const manifest = await readJson>(stagedDir, 'manifest.json'); + expect(manifest).toMatchObject({ + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + snapshotRowCount: 3, + touchedTableCount: 2, + parseFailures: 1, + warnings: ['parse_failed:bad-parse'], + probeWarnings: ['pg_stat_statements.max is low; aggregation still proceeds'], + }); + + const orders = await readJson>(stagedDir, 'tables/public.orders.json'); + expect(orders).toMatchObject({ + table: 'public.orders', + stats: { + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + errorRateBucket: 'none', + p95RuntimeBucket: '<100ms', + recencyBucket: 'current', + }, + columnsByClause: { + select: [['status', 'high']], + where: [['created_at', 'high']], + join: [['customer_id', 'high']], + groupBy: [['status', 'high']], + }, + observedJoins: [{ withTable: 'public.customers', on: ['customer_id'], freq: 'high' }], + topTemplates: [ + { + id: 'orders-by-status', + topUsers: [{ user: 'analyst' }], + }, + ], + }); + expect(orders.topTemplates[0].canonicalSql).toContain('group by o.status'); + + const patterns = await readJson>(stagedDir, 'patterns-input.json'); + expect(patterns.templates).toEqual([ + { + id: 'orders-by-status', + canonicalSql: expect.stringContaining('public.orders'), + tablesTouched: ['public.customers', 'public.orders'], + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + dialect: 'postgres', + }, + ]); + }); +}); diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts b/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts new file mode 100644 index 00000000..ee9f04f6 --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts @@ -0,0 +1,281 @@ +import { mkdir, writeFile } from 'node:fs/promises'; +import { dirname, join } from 'node:path'; +import type { SqlAnalysisPort } from '../../../sql-analysis/index.js'; +import { + bucketDistinctUsers, + bucketErrorRate, + bucketExecutions, + bucketFrequency, + bucketP95Runtime, + bucketRecency, +} from './buckets.js'; +import { + HISTORIC_SQL_SOURCE_KEY, + aggregatedTemplateSchema, + historicSqlUnifiedPullConfigSchema, + type AggregatedTemplate, + type HistoricSqlReader, + type HistoricSqlUnifiedPullConfig, + type StagedPatternsInput, + type StagedTableInput, +} from './types.js'; + +interface StageHistoricSqlAggregatedSnapshotInput { + stagedDir: string; + connectionId: string; + queryClient: unknown; + reader: HistoricSqlReader; + sqlAnalysis: SqlAnalysisPort; + pullConfig: unknown; + now?: Date; +} + +interface ParsedTemplate { + template: AggregatedTemplate; + tablesTouched: string[]; + columnsByClause: Record; +} + +interface TableAccumulator { + table: string; + executions: number; + distinctUsers: number; + errorRateNumerator: number; + p95RuntimeMs: number | null; + lastSeen: string; + columnsByClause: Map>; + observedJoins: Map>; + topTemplates: AggregatedTemplate[]; +} + +const TRIVIAL_SQL_RE = /^\s*SELECT\s+(1|NOW\(\)|CURRENT_TIMESTAMP|VERSION\(\))\s*;?\s*$/i; +const NOISE_PREFIX_RE = /^\s*(SHOW|DESCRIBE|DESC|EXPLAIN|USE|SET)\b/i; +const SYSTEM_TABLE_RE = /\b(INFORMATION_SCHEMA|SNOWFLAKE\.ACCOUNT_USAGE|pg_|system\.)/i; + +function writeJson(root: string, relPath: string, value: unknown): Promise { + const target = join(root, relPath); + return mkdir(dirname(target), { recursive: true }).then(() => + writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'), + ); +} + +function compilePatterns(patterns: string[]): RegExp[] { + return patterns.map((pattern) => new RegExp(pattern)); +} + +function matchesAny(value: string | null, patterns: RegExp[]): boolean { + return !!value && patterns.some((pattern) => pattern.test(value)); +} + +function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean { + if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true; + if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true; + return false; +} + +function shouldDropByUsers(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { + const service = config.filters.serviceAccounts; + if (!service || service.mode === 'mark-only' || service.patterns.length === 0) return false; + const patterns = compilePatterns(service.patterns); + const matchingExecutions = template.topUsers + .filter((entry) => matchesAny(entry.user, patterns)) + .reduce((sum, entry) => sum + entry.executions, 0); + const allExecutions = template.topUsers.reduce((sum, entry) => sum + entry.executions, 0); + const serviceOnly = allExecutions > 0 && matchingExecutions >= allExecutions; + return service.mode === 'exclude' ? serviceOnly : !serviceOnly; +} + +function shouldDropByFailure(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { + const failed = config.filters.dropFailedBelow; + return !!failed && template.stats.errorRate > failed.errorRate && template.stats.executions < failed.executions; +} + +function shouldDropTemplate(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { + if (shouldDropBySql(template.canonicalSql, config)) return true; + if (shouldDropByUsers(template, config)) return true; + if (shouldDropByFailure(template, config)) return true; + return false; +} + +function recordColumn(acc: TableAccumulator, clause: string, column: string, executions: number): void { + const byColumn = acc.columnsByClause.get(clause) ?? new Map(); + byColumn.set(column, (byColumn.get(column) ?? 0) + executions); + acc.columnsByClause.set(clause, byColumn); +} + +function recordJoin(acc: TableAccumulator, otherTable: string, columns: string[], executions: number): void { + const byColumns = acc.observedJoins.get(otherTable) ?? new Map(); + const key = [...new Set(columns)].sort().join(','); + if (key.length > 0) { + byColumns.set(key, (byColumns.get(key) ?? 0) + executions); + acc.observedJoins.set(otherTable, byColumns); + } +} + +function accumulatorFor(table: string): TableAccumulator { + return { + table, + executions: 0, + distinctUsers: 0, + errorRateNumerator: 0, + p95RuntimeMs: null, + lastSeen: '1970-01-01T00:00:00.000Z', + columnsByClause: new Map(), + observedJoins: new Map(), + topTemplates: [], + }; +} + +function addTemplate(acc: TableAccumulator, parsed: ParsedTemplate): void { + const executions = parsed.template.stats.executions; + acc.executions += executions; + acc.distinctUsers = Math.max(acc.distinctUsers, parsed.template.stats.distinctUsers); + acc.errorRateNumerator += parsed.template.stats.errorRate * executions; + acc.p95RuntimeMs = + acc.p95RuntimeMs === null + ? parsed.template.stats.p95RuntimeMs + : parsed.template.stats.p95RuntimeMs === null + ? acc.p95RuntimeMs + : Math.max(acc.p95RuntimeMs, parsed.template.stats.p95RuntimeMs); + acc.lastSeen = parsed.template.stats.lastSeen > acc.lastSeen ? parsed.template.stats.lastSeen : acc.lastSeen; + for (const [clause, columns] of Object.entries(parsed.columnsByClause)) { + for (const column of columns) { + recordColumn(acc, clause, column, executions); + } + } + const joinColumns = parsed.columnsByClause.join ?? []; + for (const otherTable of parsed.tablesTouched.filter((table) => table !== acc.table)) { + recordJoin(acc, otherTable, joinColumns, executions); + } + acc.topTemplates.push(parsed.template); +} + +function toStagedTable(acc: TableAccumulator, now: Date): StagedTableInput { + const errorRate = acc.executions > 0 ? acc.errorRateNumerator / acc.executions : 0; + const columnsByClause = Object.fromEntries( + [...acc.columnsByClause.entries()] + .sort(([left], [right]) => left.localeCompare(right)) + .map(([clause, counts]) => [ + clause, + [...counts.entries()] + .sort((left, right) => right[1] - left[1] || left[0].localeCompare(right[0])) + .map(([column, count]) => [column, bucketFrequency(count, acc.executions)]), + ]), + ); + const observedJoins = [...acc.observedJoins.entries()] + .flatMap(([withTable, byColumns]) => + [...byColumns.entries()].map(([columns, count]) => ({ + withTable, + on: columns.split(',').filter(Boolean), + freq: bucketFrequency(count, acc.executions), + })), + ) + .sort((left, right) => left.withTable.localeCompare(right.withTable) || left.on.join(',').localeCompare(right.on.join(','))); + const topTemplates = [...acc.topTemplates] + .sort((left, right) => right.stats.executions - left.stats.executions || left.templateId.localeCompare(right.templateId)) + .slice(0, 5) + .map((template) => ({ + id: template.templateId, + canonicalSql: template.canonicalSql, + topUsers: template.topUsers.slice(0, 5).map((entry) => ({ user: entry.user })), + })); + + return { + table: acc.table, + stats: { + executionsBucket: bucketExecutions(acc.executions), + distinctUsersBucket: bucketDistinctUsers(acc.distinctUsers), + errorRateBucket: bucketErrorRate(errorRate), + p95RuntimeBucket: bucketP95Runtime(acc.p95RuntimeMs), + recencyBucket: bucketRecency(acc.lastSeen, now), + }, + columnsByClause, + observedJoins, + topTemplates, + }; +} + +function toPatternsInput(parsedTemplates: ParsedTemplate[]): StagedPatternsInput { + return { + templates: parsedTemplates + .map(({ template, tablesTouched }) => ({ + id: template.templateId, + canonicalSql: template.canonicalSql, + tablesTouched: [...tablesTouched].sort(), + executionsBucket: bucketExecutions(template.stats.executions), + distinctUsersBucket: bucketDistinctUsers(template.stats.distinctUsers), + dialect: template.dialect, + })) + .sort((left, right) => left.id.localeCompare(right.id)), + }; +} + +export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSqlAggregatedSnapshotInput): Promise { + const config = historicSqlUnifiedPullConfigSchema.parse(input.pullConfig); + const now = input.now ?? new Date(); + const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000); + const probe = await input.reader.probe(input.queryClient); + const snapshot: AggregatedTemplate[] = []; + let snapshotRowCount = 0; + + for await (const row of input.reader.fetchAggregated(input.queryClient, { start: windowStart, end: now }, config)) { + snapshotRowCount += 1; + const parsed = aggregatedTemplateSchema.parse(row); + if (!shouldDropTemplate(parsed, config)) { + snapshot.push(parsed); + } + } + + const analysis = await input.sqlAnalysis.analyzeBatch( + snapshot.map((template) => ({ id: template.templateId, sql: template.canonicalSql })), + config.dialect, + ); + const warnings: string[] = []; + const parsedTemplates: ParsedTemplate[] = []; + for (const template of snapshot) { + const parsed = analysis.get(template.templateId); + if (!parsed || parsed.error) { + warnings.push(`parse_failed:${template.templateId}`); + continue; + } + const tablesTouched = [...new Set(parsed.tablesTouched)].filter((table) => table.length > 0).sort(); + if (tablesTouched.length === 0) { + continue; + } + parsedTemplates.push({ + template, + tablesTouched, + columnsByClause: Object.fromEntries( + Object.entries(parsed.columnsByClause).map(([clause, columns]) => [clause, [...new Set(columns)].sort()]), + ), + }); + } + + const byTable = new Map(); + for (const parsed of parsedTemplates) { + for (const table of parsed.tablesTouched) { + const acc = byTable.get(table) ?? accumulatorFor(table); + addTemplate(acc, parsed); + byTable.set(table, acc); + } + } + + await mkdir(input.stagedDir, { recursive: true }); + for (const [table, acc] of [...byTable.entries()].sort(([left], [right]) => left.localeCompare(right))) { + await writeJson(input.stagedDir, `tables/${table}.json`, toStagedTable(acc, now)); + } + await writeJson(input.stagedDir, 'patterns-input.json', toPatternsInput(parsedTemplates)); + await writeJson(input.stagedDir, 'manifest.json', { + source: HISTORIC_SQL_SOURCE_KEY, + connectionId: input.connectionId, + dialect: config.dialect, + fetchedAt: now.toISOString(), + windowStart: windowStart.toISOString(), + windowEnd: now.toISOString(), + snapshotRowCount, + touchedTableCount: byTable.size, + parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length, + warnings, + probeWarnings: probe.warnings, + }); +} diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index a106bdd8..3362d88e 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -330,6 +330,7 @@ export { BigQueryHistoricSqlQueryHistoryReader } from './adapters/historic-sql/b export type { BigQueryHistoricSqlQueryHistoryReaderOptions } from './adapters/historic-sql/bigquery-query-history-reader.js'; export { PostgresPgssQueryHistoryReader } from './adapters/historic-sql/postgres-pgss-query-history-reader.js'; export { SnowflakeHistoricSqlQueryHistoryReader } from './adapters/historic-sql/snowflake-query-history-reader.js'; +export { stageHistoricSqlAggregatedSnapshot } from './adapters/historic-sql/stage-unified.js'; export { stageHistoricSqlTemplates } from './adapters/historic-sql/stage.js'; export { patternOutputSchema,