From 02b621be720cd112759cf722e41c68fcc80e45ec Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 20:20:22 +0200 Subject: [PATCH] feat: write historic sql pattern shards --- .../historic-sql/stage-unified.test.ts | 119 ++++++++++++++++++ .../adapters/historic-sql/stage-unified.ts | 13 +- 2 files changed, 129 insertions(+), 3 deletions(-) diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts b/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts index 913e42f9..421970bf 100644 --- a/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts @@ -236,4 +236,123 @@ describe('stageHistoricSqlAggregatedSnapshot', () => { expect(tableJson).toContain('[REDACTED]'); expect(patternsJson).toContain('[REDACTED]'); }); + + it('preserves full patterns audit input and writes bounded cross-table pattern shards', async () => { + const stagedDir = await tempDir(); + const largeSql = `select * from public.orders o join public.customers c on c.id = o.customer_id where payload = '${'x'.repeat(8000)}'`; + const reader: HistoricSqlReader = { + async probe() { + return { warnings: [], info: [] }; + }, + async *fetchAggregated() { + yield aggregate({ + templateId: 'orders-customers-a', + canonicalSql: largeSql, + stats: { + executions: 25, + distinctUsers: 4, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 15, + p95RuntimeMs: 90, + errorRate: 0, + rowsProduced: 250, + }, + }); + yield aggregate({ + templateId: 'orders-customers-b', + canonicalSql: largeSql.replace('payload', 'payload_b'), + stats: { + executions: 22, + distinctUsers: 3, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 20, + p95RuntimeMs: 95, + errorRate: 0, + rowsProduced: 220, + }, + }); + yield aggregate({ + templateId: 'orders-single-table', + canonicalSql: 'select count(*) from public.orders', + stats: { + executions: 30, + distinctUsers: 2, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 10, + p95RuntimeMs: 20, + errorRate: 0, + rowsProduced: 30, + }, + }); + }, + }; + const sqlAnalysis: SqlAnalysisPort = { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(async () => new Map([ + [ + 'orders-customers-a', + { + tablesTouched: ['public.orders', 'public.customers'], + columnsByClause: { + select: [], + where: ['payload'], + join: ['customer_id', 'id'], + groupBy: [], + }, + }, + ], + [ + 'orders-customers-b', + { + tablesTouched: ['public.orders', 'public.customers'], + columnsByClause: { + select: [], + where: ['payload_b'], + join: ['customer_id', 'id'], + groupBy: [], + }, + }, + ], + [ + 'orders-single-table', + { + tablesTouched: ['public.orders'], + columnsByClause: { + select: [], + where: [], + join: [], + groupBy: [], + }, + }, + ], + ])), + }; + + await stageHistoricSqlAggregatedSnapshot({ + stagedDir, + connectionId: 'warehouse', + queryClient: {}, + reader, + sqlAnalysis, + pullConfig: { dialect: 'postgres' }, + now: new Date('2026-05-11T12:00:00.000Z'), + }); + + const audit = await readJson>(stagedDir, 'patterns-input.json'); + expect(audit.templates.map((entry: any) => entry.id)).toEqual([ + 'orders-customers-a', + 'orders-customers-b', + 'orders-single-table', + ]); + + const firstShard = await readJson>(stagedDir, 'patterns-input/part-0001.json'); + expect(firstShard.templates.map((entry: any) => entry.id)).toEqual(['orders-customers-a', 'orders-customers-b']); + expect(firstShard.templates.some((entry: any) => entry.id === 'orders-single-table')).toBe(false); + + const manifest = await readJson>(stagedDir, 'manifest.json'); + expect(manifest.warnings).toEqual([]); + }); }); diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts b/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts index 7ee96d27..a95052d1 100644 --- a/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts +++ b/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts @@ -9,6 +9,7 @@ import { bucketP95Runtime, bucketRecency, } from './buckets.js'; +import { splitHistoricSqlPatternInputs } from './pattern-inputs.js'; import { compileHistoricSqlRedactionPatterns, redactHistoricSqlText, @@ -283,7 +284,13 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql for (const [table, acc] of [...byTable.entries()].sort(([left], [right]) => left.localeCompare(right))) { await writeJson(input.stagedDir, `tables/${table}.json`, toStagedTable(acc, now)); } - await writeJson(input.stagedDir, 'patterns-input.json', toPatternsInput(parsedTemplates)); + const patternsInput = toPatternsInput(parsedTemplates); + const patternInputSplit = splitHistoricSqlPatternInputs(patternsInput); + const allWarnings = [...warnings, ...patternInputSplit.warnings]; + await writeJson(input.stagedDir, 'patterns-input.json', patternInputSplit.auditInput); + for (const shard of patternInputSplit.shards) { + await writeJson(input.stagedDir, shard.path, shard.input); + } await writeJson(input.stagedDir, 'manifest.json', { source: HISTORIC_SQL_SOURCE_KEY, connectionId: input.connectionId, @@ -293,8 +300,8 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql windowEnd: now.toISOString(), snapshotRowCount, touchedTableCount: byTable.size, - parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length, - warnings, + parseFailures: allWarnings.filter((warning) => warning.startsWith('parse_failed:')).length, + warnings: allWarnings, probeWarnings: probe.warnings, staleArchiveAfterDays: config.staleArchiveAfterDays, });