From 2a91ea521fde5a966dee6df0471116da39140611 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 20:19:47 +0200 Subject: [PATCH] feat: shard historic sql pattern inputs --- .../historic-sql/pattern-inputs.test.ts | 89 +++++++++++++++++ .../adapters/historic-sql/pattern-inputs.ts | 99 +++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 packages/context/src/ingest/adapters/historic-sql/pattern-inputs.test.ts create mode 100644 packages/context/src/ingest/adapters/historic-sql/pattern-inputs.ts diff --git a/packages/context/src/ingest/adapters/historic-sql/pattern-inputs.test.ts b/packages/context/src/ingest/adapters/historic-sql/pattern-inputs.test.ts new file mode 100644 index 00000000..d37ed193 --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/pattern-inputs.test.ts @@ -0,0 +1,89 @@ +import { describe, expect, it } from 'vitest'; +import { + HISTORIC_SQL_PATTERN_WORKUNIT_MAX_BYTES, + isHistoricSqlPatternInputShardPath, + serializedStagedPatternsInputByteLength, + splitHistoricSqlPatternInputs, +} from './pattern-inputs.js'; +import type { StagedPatternsInput } from './types.js'; + +type PatternTemplate = StagedPatternsInput['templates'][number]; + +function template(id: string, tablesTouched: string[], canonicalSql = 'select 1'): PatternTemplate { + return { + id, + canonicalSql, + tablesTouched, + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + dialect: 'postgres', + }; +} + +describe('historic-SQL pattern input sharding', () => { + it('keeps the audit input complete while sharding only cross-table pattern candidates', () => { + const largeSql = `select * from public.orders join public.customers on true where marker = '${'x'.repeat(260)}'`; + const input: StagedPatternsInput = { + templates: [ + template('single-table-orders', ['public.orders']), + template('orders-customers-2', ['public.orders', 'public.customers'], largeSql), + template('orders-customers-1', ['public.customers', 'public.orders'], largeSql), + template('orders-customers-payments', ['public.orders', 'public.customers', 'public.payments'], largeSql), + ], + }; + + const result = splitHistoricSqlPatternInputs(input, { maxBytes: 760 }); + + expect(result.auditInput.templates.map((entry) => entry.id)).toEqual([ + 'orders-customers-1', + 'orders-customers-2', + 'orders-customers-payments', + 'single-table-orders', + ]); + expect(result.shards.length).toBeGreaterThan(1); + expect(result.shards.map((shard) => shard.path)).toEqual([ + 'patterns-input/part-0001.json', + 'patterns-input/part-0002.json', + 'patterns-input/part-0003.json', + ]); + expect(result.shards.flatMap((shard) => shard.input.templates.map((entry) => entry.id))).toEqual([ + 'orders-customers-payments', + 'orders-customers-1', + 'orders-customers-2', + ]); + expect(result.shards.every((shard) => shard.byteLength <= 760)).toBe(true); + expect(result.shards.flatMap((shard) => shard.input.templates).some((entry) => entry.id === 'single-table-orders')).toBe(false); + expect(result.warnings).toEqual([]); + }); + + it('omits a single oversized template from shards and reports a manifest warning', () => { + const input: StagedPatternsInput = { + templates: [ + template( + 'oversized-cross-table', + ['public.orders', 'public.customers'], + `select * from public.orders join public.customers on true where payload = '${'x'.repeat(500)}'`, + ), + ], + }; + + const result = splitHistoricSqlPatternInputs(input, { maxBytes: 240 }); + + expect(result.auditInput.templates.map((entry) => entry.id)).toEqual(['oversized-cross-table']); + expect(result.shards).toEqual([]); + expect(result.warnings).toEqual(['patterns_input_template_too_large:oversized-cross-table']); + }); + + it('recognizes only generated pattern shard paths', () => { + expect(isHistoricSqlPatternInputShardPath('patterns-input/part-0001.json')).toBe(true); + expect(isHistoricSqlPatternInputShardPath('patterns-input/part-0012.json')).toBe(true); + expect(isHistoricSqlPatternInputShardPath('patterns-input.json')).toBe(false); + expect(isHistoricSqlPatternInputShardPath('patterns-input/part-1.json')).toBe(false); + expect(isHistoricSqlPatternInputShardPath('patterns-input/readme.md')).toBe(false); + }); + + it('uses a production byte budget below read_raw_file maximum size', () => { + expect(HISTORIC_SQL_PATTERN_WORKUNIT_MAX_BYTES).toBeLessThan(120_000); + expect(serializedStagedPatternsInputByteLength({ templates: [] })).toBeGreaterThan(0); + }); +}); diff --git a/packages/context/src/ingest/adapters/historic-sql/pattern-inputs.ts b/packages/context/src/ingest/adapters/historic-sql/pattern-inputs.ts new file mode 100644 index 00000000..c9380239 --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/pattern-inputs.ts @@ -0,0 +1,99 @@ +import { Buffer } from 'node:buffer'; +import type { StagedPatternsInput } from './types.js'; + +export const HISTORIC_SQL_PATTERN_WORKUNIT_DIR = 'patterns-input'; +export const HISTORIC_SQL_PATTERN_WORKUNIT_MAX_BYTES = 110_000; +export const HISTORIC_SQL_PATTERN_WORKUNIT_PATH_RE = /^patterns-input\/part-\d{4}\.json$/; + +type PatternTemplate = StagedPatternsInput['templates'][number]; + +export interface HistoricSqlPatternInputShard { + path: string; + input: StagedPatternsInput; + byteLength: number; +} + +export interface HistoricSqlPatternInputSplitResult { + auditInput: StagedPatternsInput; + shards: HistoricSqlPatternInputShard[]; + warnings: string[]; +} + +export interface HistoricSqlPatternInputSplitOptions { + maxBytes?: number; +} + +export function isHistoricSqlPatternInputShardPath(path: string): boolean { + return HISTORIC_SQL_PATTERN_WORKUNIT_PATH_RE.test(path); +} + +export function serializeStagedPatternsInput(input: StagedPatternsInput): string { + return `${JSON.stringify(input, null, 2)}\n`; +} + +export function serializedStagedPatternsInputByteLength(input: StagedPatternsInput): number { + return Buffer.byteLength(serializeStagedPatternsInput(input), 'utf-8'); +} + +function sortedAuditTemplates(templates: readonly PatternTemplate[]): PatternTemplate[] { + return [...templates].sort((left, right) => left.id.localeCompare(right.id)); +} + +function sortedPatternCandidates(templates: readonly PatternTemplate[]): PatternTemplate[] { + return [...templates] + .filter((template) => template.tablesTouched.length >= 2) + .map((template) => ({ ...template, tablesTouched: [...template.tablesTouched].sort() })) + .sort((left, right) => { + const cardinality = right.tablesTouched.length - left.tablesTouched.length; + if (cardinality !== 0) return cardinality; + const tableSignature = left.tablesTouched.join('\0').localeCompare(right.tablesTouched.join('\0')); + if (tableSignature !== 0) return tableSignature; + return left.id.localeCompare(right.id); + }); +} + +function shardPath(index: number): string { + return `${HISTORIC_SQL_PATTERN_WORKUNIT_DIR}/part-${String(index).padStart(4, '0')}.json`; +} + +export function splitHistoricSqlPatternInputs( + input: StagedPatternsInput, + options: HistoricSqlPatternInputSplitOptions = {}, +): HistoricSqlPatternInputSplitResult { + const maxBytes = options.maxBytes ?? HISTORIC_SQL_PATTERN_WORKUNIT_MAX_BYTES; + const auditInput: StagedPatternsInput = { templates: sortedAuditTemplates(input.templates) }; + const warnings: string[] = []; + const shards: HistoricSqlPatternInputShard[] = []; + let current: PatternTemplate[] = []; + + const flush = () => { + if (current.length === 0) { + return; + } + const shardInput: StagedPatternsInput = { templates: current }; + shards.push({ + path: shardPath(shards.length + 1), + input: shardInput, + byteLength: serializedStagedPatternsInputByteLength(shardInput), + }); + current = []; + }; + + for (const template of sortedPatternCandidates(input.templates)) { + const singleInput: StagedPatternsInput = { templates: [template] }; + if (serializedStagedPatternsInputByteLength(singleInput) > maxBytes) { + warnings.push(`patterns_input_template_too_large:${template.id}`); + continue; + } + + const nextInput: StagedPatternsInput = { templates: [...current, template] }; + if (current.length > 0 && serializedStagedPatternsInputByteLength(nextInput) > maxBytes) { + flush(); + } + current.push(template); + } + + flush(); + + return { auditInput, shards, warnings }; +}