feat: chunk historic sql unified staging

This commit is contained in:
Andrey Avtomonov 2026-05-11 18:15:08 +02:00
parent 25952e44bc
commit 7549872d0f
3 changed files with 212 additions and 0 deletions

View file

@ -0,0 +1,122 @@
import { mkdir, mkdtemp, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it } from 'vitest';
import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js';
async function tempDir(): Promise<string> {
return mkdtemp(join(tmpdir(), 'historic-sql-unified-chunk-'));
}
async function writeJson(root: string, relPath: string, value: unknown): Promise<void> {
const target = join(root, relPath);
await mkdir(join(target, '..'), { recursive: true });
await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8');
}
async function writeUnifiedStagedDir(root: string): Promise<void> {
await writeJson(root, 'manifest.json', {
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 1,
touchedTableCount: 1,
parseFailures: 0,
warnings: [],
probeWarnings: [],
});
await writeJson(root, 'tables/public.orders.json', {
table: 'public.orders',
stats: {
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
errorRateBucket: 'none',
p95RuntimeBucket: '<100ms',
recencyBucket: 'current',
},
columnsByClause: { select: [['status', 'high']] },
observedJoins: [],
topTemplates: [{ id: 'orders', canonicalSql: 'select * from public.orders', topUsers: [{ user: 'analyst' }] }],
});
await writeJson(root, 'patterns-input.json', {
templates: [
{
id: 'orders',
canonicalSql: 'select * from public.orders',
tablesTouched: ['public.orders'],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
},
],
});
}
describe('chunkHistoricSqlUnifiedStagedDir', () => {
it('emits one table WorkUnit plus one patterns WorkUnit', async () => {
const stagedDir = await tempDir();
await writeUnifiedStagedDir(stagedDir);
const result = await chunkHistoricSqlUnifiedStagedDir(stagedDir);
expect(result.workUnits).toEqual([
expect.objectContaining({
unitKey: 'historic-sql-table-public-orders',
displayLabel: 'Historic SQL usage: public.orders',
rawFiles: ['tables/public.orders.json'],
dependencyPaths: ['manifest.json'],
notes: expect.stringContaining('historic_sql_table_digest'),
}),
expect.objectContaining({
unitKey: 'historic-sql-patterns',
displayLabel: 'Historic SQL cross-table patterns',
rawFiles: ['patterns-input.json'],
dependencyPaths: ['manifest.json'],
notes: expect.stringContaining('historic_sql_patterns'),
}),
]);
expect(result.reconcileNotes).toEqual(['Historic-SQL touched tables=1 parseFailures=0']);
});
it('respects diff sets for unchanged table and patterns files', async () => {
const stagedDir = await tempDir();
await writeUnifiedStagedDir(stagedDir);
await expect(
chunkHistoricSqlUnifiedStagedDir(stagedDir, {
added: [],
modified: ['tables/public.orders.json'],
deleted: [],
unchanged: ['manifest.json', 'patterns-input.json'],
}),
).resolves.toMatchObject({
workUnits: [expect.objectContaining({ unitKey: 'historic-sql-table-public-orders' })],
});
await expect(
chunkHistoricSqlUnifiedStagedDir(stagedDir, {
added: [],
modified: ['patterns-input.json'],
deleted: [],
unchanged: ['manifest.json', 'tables/public.orders.json'],
}),
).resolves.toMatchObject({
workUnits: [expect.objectContaining({ unitKey: 'historic-sql-patterns' })],
});
});
it('describes unified staged scope', async () => {
const stagedDir = await tempDir();
await writeUnifiedStagedDir(stagedDir);
const scope = await describeHistoricSqlUnifiedScope(stagedDir);
expect(scope.isPathInScope('manifest.json')).toBe(true);
expect(scope.isPathInScope('patterns-input.json')).toBe(true);
expect(scope.isPathInScope('tables/public.orders.json')).toBe(true);
expect(scope.isPathInScope('templates/old/page.md')).toBe(false);
});
});

View file

@ -0,0 +1,89 @@
import { createHash } from 'node:crypto';
import { readFile, readdir } from 'node:fs/promises';
import { join, relative } from 'node:path';
import type { ChunkResult, DiffSet, ScopeDescriptor, WorkUnit } from '../../types.js';
import { stagedManifestSchema, stagedPatternsInputSchema, stagedTableInputSchema } from './types.js';
async function walk(root: string): Promise<string[]> {
const entries = await readdir(root, { withFileTypes: true, recursive: true });
return entries
.filter((entry) => entry.isFile())
.map((entry) => relative(root, join(entry.parentPath, entry.name)).replace(/\\/g, '/'))
.sort();
}
async function readJson<T>(stagedDir: string, relPath: string): Promise<T> {
return JSON.parse(await readFile(join(stagedDir, relPath), 'utf-8')) as T;
}
function safeUnitKey(value: string): string {
return value.replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '');
}
function touchedPath(path: string, touched: Set<string> | null): boolean {
return !touched || touched.has(path);
}
export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSet?: DiffSet): Promise<ChunkResult> {
const files = await walk(stagedDir);
const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json'));
const touched = diffSet ? new Set([...diffSet.added, ...diffSet.modified]) : null;
const workUnits: WorkUnit[] = [];
for (const path of files.filter((file) => /^tables\/.+\.json$/.test(file))) {
if (!touchedPath(path, touched)) {
continue;
}
const table = stagedTableInputSchema.parse(await readJson(stagedDir, path));
workUnits.push({
unitKey: `historic-sql-table-${safeUnitKey(table.table)}`,
displayLabel: `Historic SQL usage: ${table.table}`,
rawFiles: [path],
dependencyPaths: ['manifest.json'],
peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(),
notes:
'Use historic_sql_table_digest. Read this table usage JSON and the existing semantic-layer source for the table; output only table usage evidence shaped like tableUsageOutputSchema.',
});
}
if (files.includes('patterns-input.json') && touchedPath('patterns-input.json', touched)) {
stagedPatternsInputSchema.parse(await readJson(stagedDir, 'patterns-input.json'));
workUnits.push({
unitKey: 'historic-sql-patterns',
displayLabel: 'Historic SQL cross-table patterns',
rawFiles: ['patterns-input.json'],
dependencyPaths: ['manifest.json'],
peerFileIndex: files.filter((file) => file !== 'patterns-input.json' && file !== 'manifest.json').sort(),
notes:
'Use historic_sql_patterns. Read patterns-input.json and produce cross-table pattern evidence shaped like patternsArraySchema.',
});
}
const deleted = diffSet?.deleted.filter((path) => path === 'patterns-input.json' || /^tables\/.+\.json$/.test(path)).sort();
return {
workUnits,
eviction: deleted && deleted.length > 0 ? { deletedRawPaths: deleted } : undefined,
reconcileNotes: [`Historic-SQL touched tables=${manifest.touchedTableCount} parseFailures=${manifest.parseFailures}`],
contextReport: {
capped: false,
warnings: [...manifest.probeWarnings, ...manifest.warnings],
},
};
}
export async function describeHistoricSqlUnifiedScope(stagedDir: string): Promise<ScopeDescriptor> {
const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json'));
const fingerprint = createHash('sha256')
.update(JSON.stringify({
connectionId: manifest.connectionId,
dialect: manifest.dialect,
windowStart: manifest.windowStart,
windowEnd: manifest.windowEnd,
}))
.digest('hex');
return {
fingerprint,
isPathInScope: (rawPath) =>
rawPath === 'manifest.json' || rawPath === 'patterns-input.json' || /^tables\/.+\.json$/.test(rawPath),
};
}

View file

@ -319,6 +319,7 @@ export { NotionSourceAdapter, type NotionSourceAdapterDeps } from './adapters/no
export { NotionClient, type NotionApi, type NotionBotInfo } from './adapters/notion/notion-client.js';
export { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketP95Runtime, bucketRecency } from './adapters/historic-sql/buckets.js';
export { chunkHistoricSqlStagedDir, describeHistoricSqlScope } from './adapters/historic-sql/chunk.js';
export { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './adapters/historic-sql/chunk-unified.js';
export { detectHistoricSqlStagedDir } from './adapters/historic-sql/detect.js';
export {
HistoricSqlExtensionMissingError,