From 7549872d0faa5cf1df35cb2207667df484da63de Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 18:15:08 +0200 Subject: [PATCH] feat: chunk historic sql unified staging --- .../historic-sql/chunk-unified.test.ts | 122 ++++++++++++++++++ .../adapters/historic-sql/chunk-unified.ts | 89 +++++++++++++ packages/context/src/ingest/index.ts | 1 + 3 files changed, 212 insertions(+) create mode 100644 packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts create mode 100644 packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts diff --git a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts new file mode 100644 index 00000000..f20f22ab --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts @@ -0,0 +1,122 @@ +import { mkdir, mkdtemp, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { describe, expect, it } from 'vitest'; +import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js'; + +async function tempDir(): Promise { + return mkdtemp(join(tmpdir(), 'historic-sql-unified-chunk-')); +} + +async function writeJson(root: string, relPath: string, value: unknown): Promise { + const target = join(root, relPath); + await mkdir(join(target, '..'), { recursive: true }); + await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'); +} + +async function writeUnifiedStagedDir(root: string): Promise { + await writeJson(root, 'manifest.json', { + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + fetchedAt: '2026-05-11T00:00:00.000Z', + windowStart: '2026-02-10T00:00:00.000Z', + windowEnd: '2026-05-11T00:00:00.000Z', + snapshotRowCount: 1, + touchedTableCount: 1, + parseFailures: 0, + warnings: [], + probeWarnings: [], + }); + await writeJson(root, 'tables/public.orders.json', { + table: 'public.orders', + stats: { + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + errorRateBucket: 'none', + p95RuntimeBucket: '<100ms', + recencyBucket: 'current', + }, + columnsByClause: { select: [['status', 'high']] }, + observedJoins: [], + topTemplates: [{ id: 'orders', canonicalSql: 'select * from public.orders', topUsers: [{ user: 'analyst' }] }], + }); + await writeJson(root, 'patterns-input.json', { + templates: [ + { + id: 'orders', + canonicalSql: 'select * from public.orders', + tablesTouched: ['public.orders'], + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + dialect: 'postgres', + }, + ], + }); +} + +describe('chunkHistoricSqlUnifiedStagedDir', () => { + it('emits one table WorkUnit plus one patterns WorkUnit', async () => { + const stagedDir = await tempDir(); + await writeUnifiedStagedDir(stagedDir); + + const result = await chunkHistoricSqlUnifiedStagedDir(stagedDir); + + expect(result.workUnits).toEqual([ + expect.objectContaining({ + unitKey: 'historic-sql-table-public-orders', + displayLabel: 'Historic SQL usage: public.orders', + rawFiles: ['tables/public.orders.json'], + dependencyPaths: ['manifest.json'], + notes: expect.stringContaining('historic_sql_table_digest'), + }), + expect.objectContaining({ + unitKey: 'historic-sql-patterns', + displayLabel: 'Historic SQL cross-table patterns', + rawFiles: ['patterns-input.json'], + dependencyPaths: ['manifest.json'], + notes: expect.stringContaining('historic_sql_patterns'), + }), + ]); + expect(result.reconcileNotes).toEqual(['Historic-SQL touched tables=1 parseFailures=0']); + }); + + it('respects diff sets for unchanged table and patterns files', async () => { + const stagedDir = await tempDir(); + await writeUnifiedStagedDir(stagedDir); + + await expect( + chunkHistoricSqlUnifiedStagedDir(stagedDir, { + added: [], + modified: ['tables/public.orders.json'], + deleted: [], + unchanged: ['manifest.json', 'patterns-input.json'], + }), + ).resolves.toMatchObject({ + workUnits: [expect.objectContaining({ unitKey: 'historic-sql-table-public-orders' })], + }); + + await expect( + chunkHistoricSqlUnifiedStagedDir(stagedDir, { + added: [], + modified: ['patterns-input.json'], + deleted: [], + unchanged: ['manifest.json', 'tables/public.orders.json'], + }), + ).resolves.toMatchObject({ + workUnits: [expect.objectContaining({ unitKey: 'historic-sql-patterns' })], + }); + }); + + it('describes unified staged scope', async () => { + const stagedDir = await tempDir(); + await writeUnifiedStagedDir(stagedDir); + + const scope = await describeHistoricSqlUnifiedScope(stagedDir); + + expect(scope.isPathInScope('manifest.json')).toBe(true); + expect(scope.isPathInScope('patterns-input.json')).toBe(true); + expect(scope.isPathInScope('tables/public.orders.json')).toBe(true); + expect(scope.isPathInScope('templates/old/page.md')).toBe(false); + }); +}); diff --git a/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts new file mode 100644 index 00000000..ff29f3cd --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts @@ -0,0 +1,89 @@ +import { createHash } from 'node:crypto'; +import { readFile, readdir } from 'node:fs/promises'; +import { join, relative } from 'node:path'; +import type { ChunkResult, DiffSet, ScopeDescriptor, WorkUnit } from '../../types.js'; +import { stagedManifestSchema, stagedPatternsInputSchema, stagedTableInputSchema } from './types.js'; + +async function walk(root: string): Promise { + const entries = await readdir(root, { withFileTypes: true, recursive: true }); + return entries + .filter((entry) => entry.isFile()) + .map((entry) => relative(root, join(entry.parentPath, entry.name)).replace(/\\/g, '/')) + .sort(); +} + +async function readJson(stagedDir: string, relPath: string): Promise { + return JSON.parse(await readFile(join(stagedDir, relPath), 'utf-8')) as T; +} + +function safeUnitKey(value: string): string { + return value.replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, ''); +} + +function touchedPath(path: string, touched: Set | null): boolean { + return !touched || touched.has(path); +} + +export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSet?: DiffSet): Promise { + const files = await walk(stagedDir); + const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json')); + const touched = diffSet ? new Set([...diffSet.added, ...diffSet.modified]) : null; + const workUnits: WorkUnit[] = []; + + for (const path of files.filter((file) => /^tables\/.+\.json$/.test(file))) { + if (!touchedPath(path, touched)) { + continue; + } + const table = stagedTableInputSchema.parse(await readJson(stagedDir, path)); + workUnits.push({ + unitKey: `historic-sql-table-${safeUnitKey(table.table)}`, + displayLabel: `Historic SQL usage: ${table.table}`, + rawFiles: [path], + dependencyPaths: ['manifest.json'], + peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(), + notes: + 'Use historic_sql_table_digest. Read this table usage JSON and the existing semantic-layer source for the table; output only table usage evidence shaped like tableUsageOutputSchema.', + }); + } + + if (files.includes('patterns-input.json') && touchedPath('patterns-input.json', touched)) { + stagedPatternsInputSchema.parse(await readJson(stagedDir, 'patterns-input.json')); + workUnits.push({ + unitKey: 'historic-sql-patterns', + displayLabel: 'Historic SQL cross-table patterns', + rawFiles: ['patterns-input.json'], + dependencyPaths: ['manifest.json'], + peerFileIndex: files.filter((file) => file !== 'patterns-input.json' && file !== 'manifest.json').sort(), + notes: + 'Use historic_sql_patterns. Read patterns-input.json and produce cross-table pattern evidence shaped like patternsArraySchema.', + }); + } + + const deleted = diffSet?.deleted.filter((path) => path === 'patterns-input.json' || /^tables\/.+\.json$/.test(path)).sort(); + return { + workUnits, + eviction: deleted && deleted.length > 0 ? { deletedRawPaths: deleted } : undefined, + reconcileNotes: [`Historic-SQL touched tables=${manifest.touchedTableCount} parseFailures=${manifest.parseFailures}`], + contextReport: { + capped: false, + warnings: [...manifest.probeWarnings, ...manifest.warnings], + }, + }; +} + +export async function describeHistoricSqlUnifiedScope(stagedDir: string): Promise { + const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json')); + const fingerprint = createHash('sha256') + .update(JSON.stringify({ + connectionId: manifest.connectionId, + dialect: manifest.dialect, + windowStart: manifest.windowStart, + windowEnd: manifest.windowEnd, + })) + .digest('hex'); + return { + fingerprint, + isPathInScope: (rawPath) => + rawPath === 'manifest.json' || rawPath === 'patterns-input.json' || /^tables\/.+\.json$/.test(rawPath), + }; +} diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index 32beb6e8..556ba485 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -319,6 +319,7 @@ export { NotionSourceAdapter, type NotionSourceAdapterDeps } from './adapters/no export { NotionClient, type NotionApi, type NotionBotInfo } from './adapters/notion/notion-client.js'; export { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketP95Runtime, bucketRecency } from './adapters/historic-sql/buckets.js'; export { chunkHistoricSqlStagedDir, describeHistoricSqlScope } from './adapters/historic-sql/chunk.js'; +export { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './adapters/historic-sql/chunk-unified.js'; export { detectHistoricSqlStagedDir } from './adapters/historic-sql/detect.js'; export { HistoricSqlExtensionMissingError,