mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
feat: write historic sql pattern shards
This commit is contained in:
parent
2a91ea521f
commit
02b621be72
2 changed files with 129 additions and 3 deletions
|
|
@ -236,4 +236,123 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
|
|||
expect(tableJson).toContain('[REDACTED]');
|
||||
expect(patternsJson).toContain('[REDACTED]');
|
||||
});
|
||||
|
||||
it('preserves full patterns audit input and writes bounded cross-table pattern shards', async () => {
|
||||
const stagedDir = await tempDir();
|
||||
const largeSql = `select * from public.orders o join public.customers c on c.id = o.customer_id where payload = '${'x'.repeat(8000)}'`;
|
||||
const reader: HistoricSqlReader = {
|
||||
async probe() {
|
||||
return { warnings: [], info: [] };
|
||||
},
|
||||
async *fetchAggregated() {
|
||||
yield aggregate({
|
||||
templateId: 'orders-customers-a',
|
||||
canonicalSql: largeSql,
|
||||
stats: {
|
||||
executions: 25,
|
||||
distinctUsers: 4,
|
||||
firstSeen: '2026-05-01T00:00:00.000Z',
|
||||
lastSeen: '2026-05-11T00:00:00.000Z',
|
||||
p50RuntimeMs: 15,
|
||||
p95RuntimeMs: 90,
|
||||
errorRate: 0,
|
||||
rowsProduced: 250,
|
||||
},
|
||||
});
|
||||
yield aggregate({
|
||||
templateId: 'orders-customers-b',
|
||||
canonicalSql: largeSql.replace('payload', 'payload_b'),
|
||||
stats: {
|
||||
executions: 22,
|
||||
distinctUsers: 3,
|
||||
firstSeen: '2026-05-01T00:00:00.000Z',
|
||||
lastSeen: '2026-05-11T00:00:00.000Z',
|
||||
p50RuntimeMs: 20,
|
||||
p95RuntimeMs: 95,
|
||||
errorRate: 0,
|
||||
rowsProduced: 220,
|
||||
},
|
||||
});
|
||||
yield aggregate({
|
||||
templateId: 'orders-single-table',
|
||||
canonicalSql: 'select count(*) from public.orders',
|
||||
stats: {
|
||||
executions: 30,
|
||||
distinctUsers: 2,
|
||||
firstSeen: '2026-05-01T00:00:00.000Z',
|
||||
lastSeen: '2026-05-11T00:00:00.000Z',
|
||||
p50RuntimeMs: 10,
|
||||
p95RuntimeMs: 20,
|
||||
errorRate: 0,
|
||||
rowsProduced: 30,
|
||||
},
|
||||
});
|
||||
},
|
||||
};
|
||||
const sqlAnalysis: SqlAnalysisPort = {
|
||||
analyzeForFingerprint: vi.fn(),
|
||||
analyzeBatch: vi.fn(async () => new Map([
|
||||
[
|
||||
'orders-customers-a',
|
||||
{
|
||||
tablesTouched: ['public.orders', 'public.customers'],
|
||||
columnsByClause: {
|
||||
select: [],
|
||||
where: ['payload'],
|
||||
join: ['customer_id', 'id'],
|
||||
groupBy: [],
|
||||
},
|
||||
},
|
||||
],
|
||||
[
|
||||
'orders-customers-b',
|
||||
{
|
||||
tablesTouched: ['public.orders', 'public.customers'],
|
||||
columnsByClause: {
|
||||
select: [],
|
||||
where: ['payload_b'],
|
||||
join: ['customer_id', 'id'],
|
||||
groupBy: [],
|
||||
},
|
||||
},
|
||||
],
|
||||
[
|
||||
'orders-single-table',
|
||||
{
|
||||
tablesTouched: ['public.orders'],
|
||||
columnsByClause: {
|
||||
select: [],
|
||||
where: [],
|
||||
join: [],
|
||||
groupBy: [],
|
||||
},
|
||||
},
|
||||
],
|
||||
])),
|
||||
};
|
||||
|
||||
await stageHistoricSqlAggregatedSnapshot({
|
||||
stagedDir,
|
||||
connectionId: 'warehouse',
|
||||
queryClient: {},
|
||||
reader,
|
||||
sqlAnalysis,
|
||||
pullConfig: { dialect: 'postgres' },
|
||||
now: new Date('2026-05-11T12:00:00.000Z'),
|
||||
});
|
||||
|
||||
const audit = await readJson<Record<string, any>>(stagedDir, 'patterns-input.json');
|
||||
expect(audit.templates.map((entry: any) => entry.id)).toEqual([
|
||||
'orders-customers-a',
|
||||
'orders-customers-b',
|
||||
'orders-single-table',
|
||||
]);
|
||||
|
||||
const firstShard = await readJson<Record<string, any>>(stagedDir, 'patterns-input/part-0001.json');
|
||||
expect(firstShard.templates.map((entry: any) => entry.id)).toEqual(['orders-customers-a', 'orders-customers-b']);
|
||||
expect(firstShard.templates.some((entry: any) => entry.id === 'orders-single-table')).toBe(false);
|
||||
|
||||
const manifest = await readJson<Record<string, any>>(stagedDir, 'manifest.json');
|
||||
expect(manifest.warnings).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import {
|
|||
bucketP95Runtime,
|
||||
bucketRecency,
|
||||
} from './buckets.js';
|
||||
import { splitHistoricSqlPatternInputs } from './pattern-inputs.js';
|
||||
import {
|
||||
compileHistoricSqlRedactionPatterns,
|
||||
redactHistoricSqlText,
|
||||
|
|
@ -283,7 +284,13 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
|
|||
for (const [table, acc] of [...byTable.entries()].sort(([left], [right]) => left.localeCompare(right))) {
|
||||
await writeJson(input.stagedDir, `tables/${table}.json`, toStagedTable(acc, now));
|
||||
}
|
||||
await writeJson(input.stagedDir, 'patterns-input.json', toPatternsInput(parsedTemplates));
|
||||
const patternsInput = toPatternsInput(parsedTemplates);
|
||||
const patternInputSplit = splitHistoricSqlPatternInputs(patternsInput);
|
||||
const allWarnings = [...warnings, ...patternInputSplit.warnings];
|
||||
await writeJson(input.stagedDir, 'patterns-input.json', patternInputSplit.auditInput);
|
||||
for (const shard of patternInputSplit.shards) {
|
||||
await writeJson(input.stagedDir, shard.path, shard.input);
|
||||
}
|
||||
await writeJson(input.stagedDir, 'manifest.json', {
|
||||
source: HISTORIC_SQL_SOURCE_KEY,
|
||||
connectionId: input.connectionId,
|
||||
|
|
@ -293,8 +300,8 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
|
|||
windowEnd: now.toISOString(),
|
||||
snapshotRowCount,
|
||||
touchedTableCount: byTable.size,
|
||||
parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length,
|
||||
warnings,
|
||||
parseFailures: allWarnings.filter((warning) => warning.startsWith('parse_failed:')).length,
|
||||
warnings: allWarnings,
|
||||
probeWarnings: probe.warnings,
|
||||
staleArchiveAfterDays: config.staleArchiveAfterDays,
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue