feat: shard historic sql pattern inputs

This commit is contained in:
Andrey Avtomonov 2026-05-11 20:19:47 +02:00
parent 1ba7be8f91
commit 2a91ea521f
2 changed files with 188 additions and 0 deletions

View file

@ -0,0 +1,89 @@
import { describe, expect, it } from 'vitest';
import {
HISTORIC_SQL_PATTERN_WORKUNIT_MAX_BYTES,
isHistoricSqlPatternInputShardPath,
serializedStagedPatternsInputByteLength,
splitHistoricSqlPatternInputs,
} from './pattern-inputs.js';
import type { StagedPatternsInput } from './types.js';
type PatternTemplate = StagedPatternsInput['templates'][number];
function template(id: string, tablesTouched: string[], canonicalSql = 'select 1'): PatternTemplate {
return {
id,
canonicalSql,
tablesTouched,
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
};
}
describe('historic-SQL pattern input sharding', () => {
it('keeps the audit input complete while sharding only cross-table pattern candidates', () => {
const largeSql = `select * from public.orders join public.customers on true where marker = '${'x'.repeat(260)}'`;
const input: StagedPatternsInput = {
templates: [
template('single-table-orders', ['public.orders']),
template('orders-customers-2', ['public.orders', 'public.customers'], largeSql),
template('orders-customers-1', ['public.customers', 'public.orders'], largeSql),
template('orders-customers-payments', ['public.orders', 'public.customers', 'public.payments'], largeSql),
],
};
const result = splitHistoricSqlPatternInputs(input, { maxBytes: 760 });
expect(result.auditInput.templates.map((entry) => entry.id)).toEqual([
'orders-customers-1',
'orders-customers-2',
'orders-customers-payments',
'single-table-orders',
]);
expect(result.shards.length).toBeGreaterThan(1);
expect(result.shards.map((shard) => shard.path)).toEqual([
'patterns-input/part-0001.json',
'patterns-input/part-0002.json',
'patterns-input/part-0003.json',
]);
expect(result.shards.flatMap((shard) => shard.input.templates.map((entry) => entry.id))).toEqual([
'orders-customers-payments',
'orders-customers-1',
'orders-customers-2',
]);
expect(result.shards.every((shard) => shard.byteLength <= 760)).toBe(true);
expect(result.shards.flatMap((shard) => shard.input.templates).some((entry) => entry.id === 'single-table-orders')).toBe(false);
expect(result.warnings).toEqual([]);
});
it('omits a single oversized template from shards and reports a manifest warning', () => {
const input: StagedPatternsInput = {
templates: [
template(
'oversized-cross-table',
['public.orders', 'public.customers'],
`select * from public.orders join public.customers on true where payload = '${'x'.repeat(500)}'`,
),
],
};
const result = splitHistoricSqlPatternInputs(input, { maxBytes: 240 });
expect(result.auditInput.templates.map((entry) => entry.id)).toEqual(['oversized-cross-table']);
expect(result.shards).toEqual([]);
expect(result.warnings).toEqual(['patterns_input_template_too_large:oversized-cross-table']);
});
it('recognizes only generated pattern shard paths', () => {
expect(isHistoricSqlPatternInputShardPath('patterns-input/part-0001.json')).toBe(true);
expect(isHistoricSqlPatternInputShardPath('patterns-input/part-0012.json')).toBe(true);
expect(isHistoricSqlPatternInputShardPath('patterns-input.json')).toBe(false);
expect(isHistoricSqlPatternInputShardPath('patterns-input/part-1.json')).toBe(false);
expect(isHistoricSqlPatternInputShardPath('patterns-input/readme.md')).toBe(false);
});
it('uses a production byte budget below read_raw_file maximum size', () => {
expect(HISTORIC_SQL_PATTERN_WORKUNIT_MAX_BYTES).toBeLessThan(120_000);
expect(serializedStagedPatternsInputByteLength({ templates: [] })).toBeGreaterThan(0);
});
});

View file

@ -0,0 +1,99 @@
import { Buffer } from 'node:buffer';
import type { StagedPatternsInput } from './types.js';
export const HISTORIC_SQL_PATTERN_WORKUNIT_DIR = 'patterns-input';
export const HISTORIC_SQL_PATTERN_WORKUNIT_MAX_BYTES = 110_000;
export const HISTORIC_SQL_PATTERN_WORKUNIT_PATH_RE = /^patterns-input\/part-\d{4}\.json$/;
type PatternTemplate = StagedPatternsInput['templates'][number];
export interface HistoricSqlPatternInputShard {
path: string;
input: StagedPatternsInput;
byteLength: number;
}
export interface HistoricSqlPatternInputSplitResult {
auditInput: StagedPatternsInput;
shards: HistoricSqlPatternInputShard[];
warnings: string[];
}
export interface HistoricSqlPatternInputSplitOptions {
maxBytes?: number;
}
export function isHistoricSqlPatternInputShardPath(path: string): boolean {
return HISTORIC_SQL_PATTERN_WORKUNIT_PATH_RE.test(path);
}
export function serializeStagedPatternsInput(input: StagedPatternsInput): string {
return `${JSON.stringify(input, null, 2)}\n`;
}
export function serializedStagedPatternsInputByteLength(input: StagedPatternsInput): number {
return Buffer.byteLength(serializeStagedPatternsInput(input), 'utf-8');
}
function sortedAuditTemplates(templates: readonly PatternTemplate[]): PatternTemplate[] {
return [...templates].sort((left, right) => left.id.localeCompare(right.id));
}
function sortedPatternCandidates(templates: readonly PatternTemplate[]): PatternTemplate[] {
return [...templates]
.filter((template) => template.tablesTouched.length >= 2)
.map((template) => ({ ...template, tablesTouched: [...template.tablesTouched].sort() }))
.sort((left, right) => {
const cardinality = right.tablesTouched.length - left.tablesTouched.length;
if (cardinality !== 0) return cardinality;
const tableSignature = left.tablesTouched.join('\0').localeCompare(right.tablesTouched.join('\0'));
if (tableSignature !== 0) return tableSignature;
return left.id.localeCompare(right.id);
});
}
function shardPath(index: number): string {
return `${HISTORIC_SQL_PATTERN_WORKUNIT_DIR}/part-${String(index).padStart(4, '0')}.json`;
}
export function splitHistoricSqlPatternInputs(
input: StagedPatternsInput,
options: HistoricSqlPatternInputSplitOptions = {},
): HistoricSqlPatternInputSplitResult {
const maxBytes = options.maxBytes ?? HISTORIC_SQL_PATTERN_WORKUNIT_MAX_BYTES;
const auditInput: StagedPatternsInput = { templates: sortedAuditTemplates(input.templates) };
const warnings: string[] = [];
const shards: HistoricSqlPatternInputShard[] = [];
let current: PatternTemplate[] = [];
const flush = () => {
if (current.length === 0) {
return;
}
const shardInput: StagedPatternsInput = { templates: current };
shards.push({
path: shardPath(shards.length + 1),
input: shardInput,
byteLength: serializedStagedPatternsInputByteLength(shardInput),
});
current = [];
};
for (const template of sortedPatternCandidates(input.templates)) {
const singleInput: StagedPatternsInput = { templates: [template] };
if (serializedStagedPatternsInputByteLength(singleInput) > maxBytes) {
warnings.push(`patterns_input_template_too_large:${template.id}`);
continue;
}
const nextInput: StagedPatternsInput = { templates: [...current, template] };
if (current.length > 0 && serializedStagedPatternsInputByteLength(nextInput) > maxBytes) {
flush();
}
current.push(template);
}
flush();
return { auditInput, shards, warnings };
}