From 3e11e33b8a56e172bf4b779bdcb83b2c89162719 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 20:20:58 +0200 Subject: [PATCH] feat: emit historic sql pattern shard work units --- .../historic-sql/chunk-unified.test.ts | 78 ++++++++++++++++--- .../adapters/historic-sql/chunk-unified.ts | 28 ++++--- 2 files changed, 87 insertions(+), 19 deletions(-) diff --git a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts index 1b5716f4..d8c0187f 100644 --- a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts @@ -45,8 +45,20 @@ async function writeUnifiedStagedDir(root: string): Promise { templates: [ { id: 'orders', - canonicalSql: 'select * from public.orders', - tablesTouched: ['public.orders'], + canonicalSql: 'select * from public.orders join public.customers on true', + tablesTouched: ['public.orders', 'public.customers'], + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + dialect: 'postgres', + }, + ], + }); + await writeJson(root, 'patterns-input/part-0001.json', { + templates: [ + { + id: 'orders', + canonicalSql: 'select * from public.orders join public.customers on true', + tablesTouched: ['public.orders', 'public.customers'], executionsBucket: '10-100', distinctUsersBucket: '2-5', dialect: 'postgres', @@ -71,11 +83,11 @@ describe('chunkHistoricSqlUnifiedStagedDir', () => { notes: expect.stringContaining('historic_sql_table_digest'), }), expect.objectContaining({ - unitKey: 'historic-sql-patterns', - displayLabel: 'Historic SQL cross-table patterns', - rawFiles: ['patterns-input.json'], + unitKey: 'historic-sql-patterns-part-0001', + displayLabel: 'Historic SQL cross-table patterns: part-0001', + rawFiles: ['patterns-input/part-0001.json'], dependencyPaths: ['manifest.json'], - notes: expect.stringContaining('historic_sql_patterns'), + notes: expect.stringContaining('patterns-input/part-0001.json'), }), ]); expect(result.workUnits[0]?.notes).toContain('emit_historic_sql_evidence'); @@ -92,7 +104,7 @@ describe('chunkHistoricSqlUnifiedStagedDir', () => { added: [], modified: ['tables/public.orders.json'], deleted: [], - unchanged: ['manifest.json', 'patterns-input.json'], + unchanged: ['manifest.json', 'patterns-input.json', 'patterns-input/part-0001.json'], }), ).resolves.toMatchObject({ workUnits: [expect.objectContaining({ unitKey: 'historic-sql-table-public-orders' })], @@ -101,12 +113,23 @@ describe('chunkHistoricSqlUnifiedStagedDir', () => { await expect( chunkHistoricSqlUnifiedStagedDir(stagedDir, { added: [], - modified: ['patterns-input.json'], + modified: ['patterns-input/part-0001.json'], deleted: [], - unchanged: ['manifest.json', 'tables/public.orders.json'], + unchanged: ['manifest.json', 'patterns-input.json', 'tables/public.orders.json'], }), ).resolves.toMatchObject({ - workUnits: [expect.objectContaining({ unitKey: 'historic-sql-patterns' })], + workUnits: [expect.objectContaining({ unitKey: 'historic-sql-patterns-part-0001' })], + }); + + await expect( + chunkHistoricSqlUnifiedStagedDir(stagedDir, { + added: [], + modified: ['patterns-input.json'], + deleted: [], + unchanged: ['manifest.json', 'patterns-input/part-0001.json', 'tables/public.orders.json'], + }), + ).resolves.toMatchObject({ + workUnits: [], }); }); @@ -118,7 +141,42 @@ describe('chunkHistoricSqlUnifiedStagedDir', () => { expect(scope.isPathInScope('manifest.json')).toBe(true); expect(scope.isPathInScope('patterns-input.json')).toBe(true); + expect(scope.isPathInScope('patterns-input/part-0001.json')).toBe(true); + expect(scope.isPathInScope('patterns-input/part-1.json')).toBe(false); expect(scope.isPathInScope('tables/public.orders.json')).toBe(true); expect(scope.isPathInScope('templates/old/page.md')).toBe(false); }); + + it('emits one patterns WorkUnit per changed shard', async () => { + const stagedDir = await tempDir(); + await writeUnifiedStagedDir(stagedDir); + await writeJson(stagedDir, 'patterns-input/part-0002.json', { + templates: [ + { + id: 'line-items', + canonicalSql: 'select * from public.orders join public.line_items on true', + tablesTouched: ['public.orders', 'public.line_items'], + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + dialect: 'postgres', + }, + ], + }); + + const result = await chunkHistoricSqlUnifiedStagedDir(stagedDir, { + added: ['patterns-input/part-0002.json'], + modified: ['patterns-input/part-0001.json'], + deleted: [], + unchanged: ['manifest.json', 'patterns-input.json', 'tables/public.orders.json'], + }); + + expect(result.workUnits.map((unit) => unit.unitKey)).toEqual([ + 'historic-sql-patterns-part-0001', + 'historic-sql-patterns-part-0002', + ]); + expect(result.workUnits.map((unit) => unit.rawFiles)).toEqual([ + ['patterns-input/part-0001.json'], + ['patterns-input/part-0002.json'], + ]); + }); }); diff --git a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts index 5ed17c52..4e6dfeda 100644 --- a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts +++ b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts @@ -2,6 +2,7 @@ import { createHash } from 'node:crypto'; import { readFile, readdir } from 'node:fs/promises'; import { join, relative } from 'node:path'; import type { ChunkResult, DiffSet, ScopeDescriptor, WorkUnit } from '../../types.js'; +import { isHistoricSqlPatternInputShardPath } from './pattern-inputs.js'; import { stagedManifestSchema, stagedPatternsInputSchema, stagedTableInputSchema } from './types.js'; async function walk(root: string): Promise { @@ -46,20 +47,26 @@ export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSe }); } - if (files.includes('patterns-input.json') && touchedPath('patterns-input.json', touched)) { - stagedPatternsInputSchema.parse(await readJson(stagedDir, 'patterns-input.json')); + for (const path of files.filter(isHistoricSqlPatternInputShardPath)) { + if (!touchedPath(path, touched)) { + continue; + } + stagedPatternsInputSchema.parse(await readJson(stagedDir, path)); + const shardLabel = path.replace(/^patterns-input\//, '').replace(/\.json$/, ''); workUnits.push({ - unitKey: 'historic-sql-patterns', - displayLabel: 'Historic SQL cross-table patterns', - rawFiles: ['patterns-input.json'], + unitKey: `historic-sql-patterns-${safeUnitKey(shardLabel)}`, + displayLabel: `Historic SQL cross-table patterns: ${shardLabel}`, + rawFiles: [path], dependencyPaths: ['manifest.json'], - peerFileIndex: files.filter((file) => file !== 'patterns-input.json' && file !== 'manifest.json').sort(), + peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(), notes: - 'Use historic_sql_patterns. Read patterns-input.json and emit pattern objects with emit_historic_sql_evidence. Do not call wiki_write or sl_write_source.', + `Use historic_sql_patterns. Read ${path} and emit pattern objects with emit_historic_sql_evidence using rawPath "${path}". Do not call wiki_write or sl_write_source.`, }); } - const deleted = diffSet?.deleted.filter((path) => path === 'patterns-input.json' || /^tables\/.+\.json$/.test(path)).sort(); + const deleted = diffSet?.deleted + .filter((path) => isHistoricSqlPatternInputShardPath(path) || /^tables\/.+\.json$/.test(path)) + .sort(); return { workUnits, eviction: deleted && deleted.length > 0 ? { deletedRawPaths: deleted } : undefined, @@ -84,6 +91,9 @@ export async function describeHistoricSqlUnifiedScope(stagedDir: string): Promis return { fingerprint, isPathInScope: (rawPath) => - rawPath === 'manifest.json' || rawPath === 'patterns-input.json' || /^tables\/.+\.json$/.test(rawPath), + rawPath === 'manifest.json' || + rawPath === 'patterns-input.json' || + isHistoricSqlPatternInputShardPath(rawPath) || + /^tables\/.+\.json$/.test(rawPath), }; }