# Historic SQL Unified Hot Path Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Build the deterministic historic-SQL hot path that reads warehouse-aggregated query templates, batch-parses them once, and writes stable table-bucket and pattern-input staged artifacts. **Architecture:** This slice adds the unified reader/stager contracts from the historic-SQL redesign without doing the LLM cold path or projection work. Dialect-specific SQL lives in reader classes; shared TypeScript code filters, batch-parses, bucketizes, and writes `manifest.json`, `tables/*.json`, and `patterns-input.json`. The existing production adapter remains on the legacy path until the follow-up skills/projection cutover can switch it without loading missing skills. **Tech Stack:** TypeScript ESM/NodeNext, zod 4, Vitest, `SqlAnalysisPort.analyzeBatch()`, warehouse query clients. --- ## Starting Point Spec: `docs/superpowers/specs/2026-05-11-historic-sql-redesign-design.md` Plans found that are based on this spec: - `docs/superpowers/plans/2026-05-11-historic-sql-foundations.md` - `docs/superpowers/plans/2026-05-11-historic-sql-search-enrichment.md` Implemented status from this worktree: - `2026-05-11-historic-sql-foundations.md` is implemented. Evidence: `packages/context/src/ingest/adapters/historic-sql/skill-schemas.ts`, `SemanticLayerSource.usage` in `packages/context/src/sl/types.ts`, `mergeUsagePreservingExternal()` in `packages/context/src/ingest/adapters/live-database/manifest.ts`, `SqlAnalysisPort.analyzeBatch()` in `packages/context/src/sql-analysis/ports.ts`, and `/sql/analyze-batch` in `python/ktx-daemon/src/ktx_daemon/app.py`. - `2026-05-11-historic-sql-search-enrichment.md` is implemented. Evidence: `buildSemanticLayerSourceSearchText()` indexes `source.usage` in `packages/context/src/sl/sl-search.service.ts`, SQLite FTS returns `snippet()` in `packages/context/src/sl/sqlite-sl-sources-index.ts`, and local/MCP list results expose `frequencyTier` and `snippet` in `packages/context/src/sl/local-sl.ts` and `packages/context/src/mcp/local-project-ports.ts`. Still not implemented: - `packages/context/src/ingest/adapters/historic-sql/stage.ts` still calls `SqlAnalysisPort.analyzeForFingerprint()` per raw query and emits `templates/*/{metadata.json,page.md,usage.json}`. - `packages/context/src/ingest/adapters/historic-sql/stage-pgss.ts` still owns Postgres baseline-diff state and writes `.ktx/cache/historic-sql/*/pgss-baseline.json`. - `packages/context/src/ingest/adapters/historic-sql/chunk.ts` still emits one WorkUnit per template page for `historic_sql_ingest`. - `packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts` still advertises `historic_sql_ingest` and `historic_sql_curator`. - Old code strings still exist: `stagePgStatStatementsTemplates`, `expandCategoricalTemplates`, `classifySlot`, and `pgss-baseline`. This plan covers the deterministic hot path from the spec: unified aggregate contracts, aggregate readers, batch parsing, table bucketing, pattern input staging, and a new chunker for the new staged shape. It does not switch `HistoricSqlSourceAdapter` to the new WorkUnits; the cutover plan must create `historic_sql_table_digest`, `historic_sql_patterns`, and projection before changing production `skillNames`. ## File Structure Create: - `packages/context/src/ingest/adapters/historic-sql/types.test.ts` Locks the new public zod contracts and the one-release `minCalls` to `minExecutions` config alias. - `packages/context/src/ingest/adapters/historic-sql/buckets.ts` Owns deterministic bucket labels and frequency-tier helpers used by staging. - `packages/context/src/ingest/adapters/historic-sql/buckets.test.ts` Locks stable bucket boundaries so small numeric drift does not churn staged files. - `packages/context/src/ingest/adapters/historic-sql/stage-unified.ts` Implements the new deterministic stager behind `stageHistoricSqlAggregatedSnapshot()`. - `packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts` Tests batch parsing, parse failures, service-account filtering, per-table bucketing, and `patterns-input.json`. - `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts` Implements the new Postgres aggregate reader over `pg_stat_statements`. - `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts` Tests the aggregate PGSS query shape, probe warnings, and row mapping. - `packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts` Implements the new chunker for `tables/*.json` plus `patterns-input.json`. - `packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts` Tests table WorkUnits, the patterns WorkUnit, diff filtering, eviction, and scope detection. Modify: - `packages/context/src/ingest/adapters/historic-sql/types.ts` Adds aggregate input, staged artifact, reader, and manifest schemas. Keeps legacy exported types until adapter cutover, but marks the new contracts as the target API for the next slice. - `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts` Adds `fetchAggregated()` while retaining the existing `fetch()` until the adapter cutover. - `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts` Adds aggregate-query tests. - `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts` Adds `fetchAggregated()` while retaining the existing `fetch()` until the adapter cutover. - `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts` Adds aggregate-query tests. - `packages/context/src/ingest/index.ts` Exports the new hot-path contracts and helpers. - `packages/context/src/package-exports.test.ts` Asserts the new exports exist without removing old exports in this slice. Do not modify in this plan: - `packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts` - `packages/context/skills/historic_sql_ingest/SKILL.md` - `packages/context/skills/historic_sql_curator/SKILL.md` - `packages/context/src/ingest/ingest-runtime-assets.test.ts` Those files change in the cutover/projection plan after the replacement skills exist. ## Task 1: Add Unified Contracts **Files:** - Create: `packages/context/src/ingest/adapters/historic-sql/types.test.ts` - Modify: `packages/context/src/ingest/adapters/historic-sql/types.ts` - Modify: `packages/context/src/ingest/index.ts` - Modify: `packages/context/src/package-exports.test.ts` - [ ] **Step 1: Write failing contract tests** Create `packages/context/src/ingest/adapters/historic-sql/types.test.ts`: ```typescript import { describe, expect, it } from 'vitest'; import { aggregatedTemplateSchema, historicSqlUnifiedPullConfigSchema, stagedManifestSchema, stagedPatternsInputSchema, stagedTableInputSchema, } from './types.js'; describe('historic-sql unified contracts', () => { it('parses minExecutions and accepts minCalls as a one-release alias', () => { expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).toMatchObject({ dialect: 'postgres', minExecutions: 9, windowDays: 90, concurrency: 12, redactionPatterns: [], staleArchiveAfterDays: 90, }); expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minCalls: 7 }).minExecutions).toBe(7); }); it('validates aggregate templates from warehouse readers', () => { const parsed = aggregatedTemplateSchema.parse({ templateId: 'pg:123', canonicalSql: 'select status, count(*) from public.orders group by status', dialect: 'postgres', stats: { executions: 42, distinctUsers: 3, firstSeen: '2026-05-01T00:00:00.000Z', lastSeen: '2026-05-11T00:00:00.000Z', p50RuntimeMs: 12.5, p95RuntimeMs: 40, errorRate: 0, rowsProduced: 100, }, topUsers: [{ user: 'analyst', executions: 40 }], }); expect(parsed.templateId).toBe('pg:123'); expect(parsed.topUsers).toEqual([{ user: 'analyst', executions: 40 }]); }); it('validates staged table, patterns, and manifest artifacts', () => { expect( stagedTableInputSchema.parse({ table: 'public.orders', stats: { executionsBucket: '10-100', distinctUsersBucket: '2-5', errorRateBucket: 'none', p95RuntimeBucket: '<100ms', recencyBucket: 'current', }, columnsByClause: { select: [['status', 'high']], where: [['created_at', 'mid']], }, observedJoins: [{ withTable: 'public.customers', on: ['customer_id'], freq: 'high' }], topTemplates: [{ id: 'pg:123', canonicalSql: 'select * from public.orders', topUsers: [{ user: 'analyst' }] }], }).table, ).toBe('public.orders'); expect( stagedPatternsInputSchema.parse({ templates: [ { id: 'pg:123', canonicalSql: 'select * from public.orders', tablesTouched: ['public.orders'], executionsBucket: '10-100', distinctUsersBucket: '2-5', dialect: 'postgres', }, ], }).templates, ).toHaveLength(1); expect( stagedManifestSchema.parse({ source: 'historic-sql', connectionId: 'warehouse', dialect: 'postgres', fetchedAt: '2026-05-11T00:00:00.000Z', windowStart: '2026-02-10T00:00:00.000Z', windowEnd: '2026-05-11T00:00:00.000Z', snapshotRowCount: 2, touchedTableCount: 1, parseFailures: 1, warnings: ['parse_failed:bad'], probeWarnings: [], }).parseFailures, ).toBe(1); }); }); ``` Add these assertions near the historic-SQL export assertions in `packages/context/src/package-exports.test.ts`: ```typescript expect(ingest.historicSqlUnifiedPullConfigSchema).toBeDefined(); expect(ingest.aggregatedTemplateSchema).toBeDefined(); expect(ingest.stagedTableInputSchema).toBeDefined(); ``` - [ ] **Step 2: Run the contract tests to verify they fail** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/types.test.ts src/package-exports.test.ts ``` Expected: FAIL with missing exports for `historicSqlUnifiedPullConfigSchema`, `aggregatedTemplateSchema`, and `stagedTableInputSchema`. - [ ] **Step 3: Add the new schemas and reader contracts** Insert this block immediately after the existing `historicSqlPullConfigSchema` definition in `packages/context/src/ingest/adapters/historic-sql/types.ts`. Keep `historicSqlPullConfigSchema` and `HistoricSqlPullConfig` unchanged in this plan because the current production adapter still reads `lastSuccessfulCursor`, `maxTemplatesPerRun`, and `minCalls`. ```typescript const filterModeSchema = z.enum(['exclude', 'include', 'mark-only']); function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null && !Array.isArray(value); } export const historicSqlUnifiedPullConfigSchema = z.preprocess((value) => { if (!isRecord(value)) { return value; } if (value.minExecutions === undefined && typeof value.minCalls === 'number') { return { ...value, minExecutions: value.minCalls }; } return value; }, z.object({ dialect: historicSqlDialectSchema, windowDays: z.number().int().positive().default(90), minExecutions: z.number().int().nonnegative().default(5), concurrency: z.number().int().positive().default(12), filters: z.object({ serviceAccounts: z.object({ patterns: z.array(z.string()).default([]), mode: filterModeSchema.default('exclude'), }).optional(), orchestrators: z.object({ mode: filterModeSchema.default('mark-only'), }).optional(), dropTrivialProbes: z.boolean().default(true), dropFailedBelow: z.object({ errorRate: z.number().min(0).max(1), executions: z.number().int().nonnegative(), }).optional(), }).default({}), redactionPatterns: z.array(z.string()).default([]), staleArchiveAfterDays: z.number().int().positive().default(90), })); export type HistoricSqlUnifiedPullConfig = z.infer; export const aggregatedTemplateSchema = z.object({ templateId: z.string().min(1), canonicalSql: z.string().min(1), dialect: historicSqlDialectSchema, stats: z.object({ executions: z.number().int().nonnegative(), distinctUsers: z.number().int().nonnegative(), firstSeen: z.iso.datetime(), lastSeen: z.iso.datetime(), p50RuntimeMs: z.number().nonnegative().nullable(), p95RuntimeMs: z.number().nonnegative().nullable(), errorRate: z.number().min(0).max(1), rowsProduced: z.number().int().nonnegative().nullable(), }), topUsers: z.array(z.object({ user: z.string().nullable(), executions: z.number().int().nonnegative(), })).default([]), }); export type AggregatedTemplate = z.infer; export const stagedTableInputSchema = z.object({ table: z.string().min(1), stats: z.object({ executionsBucket: z.string(), distinctUsersBucket: z.string(), errorRateBucket: z.string(), p95RuntimeBucket: z.string(), recencyBucket: z.string(), }), columnsByClause: z.record(z.string(), z.array(z.tuple([z.string(), z.string()]))), observedJoins: z.array(z.object({ withTable: z.string(), on: z.array(z.string()), freq: z.string(), })), topTemplates: z.array(z.object({ id: z.string(), canonicalSql: z.string(), topUsers: z.array(z.object({ user: z.string().nullable() })), })), }); export type StagedTableInput = z.infer; export const stagedPatternsInputSchema = z.object({ templates: z.array(z.object({ id: z.string(), canonicalSql: z.string(), tablesTouched: z.array(z.string()), executionsBucket: z.string(), distinctUsersBucket: z.string(), dialect: historicSqlDialectSchema, })), }); export type StagedPatternsInput = z.infer; export const stagedManifestSchema = z.object({ source: z.literal(HISTORIC_SQL_SOURCE_KEY), connectionId: z.string().min(1), dialect: historicSqlDialectSchema, fetchedAt: z.iso.datetime(), windowStart: z.iso.datetime(), windowEnd: z.iso.datetime(), snapshotRowCount: z.number().int().nonnegative(), touchedTableCount: z.number().int().nonnegative(), parseFailures: z.number().int().nonnegative(), warnings: z.array(z.string()), probeWarnings: z.array(z.string()), }); export type StagedManifest = z.infer; export interface HistoricSqlProbeResult { warnings: string[]; } export interface HistoricSqlReader { probe(client: unknown): Promise; fetchAggregated( client: unknown, window: HistoricSqlTimeWindow, config: HistoricSqlUnifiedPullConfig, ): AsyncIterable; } ``` - [ ] **Step 4: Export the new contracts** In `packages/context/src/ingest/index.ts`, add exports for the new types and schemas: ```typescript export type { AggregatedTemplate, HistoricSqlProbeResult, HistoricSqlReader, HistoricSqlUnifiedPullConfig, StagedManifest, StagedPatternsInput, StagedTableInput, } from './adapters/historic-sql/types.js'; export { aggregatedTemplateSchema, historicSqlUnifiedPullConfigSchema, stagedManifestSchema, stagedPatternsInputSchema, stagedTableInputSchema, } from './adapters/historic-sql/types.js'; ``` - [ ] **Step 5: Run the contract tests** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/types.test.ts src/package-exports.test.ts ``` Expected: PASS. - [ ] **Step 6: Commit** ```bash git add packages/context/src/ingest/adapters/historic-sql/types.ts packages/context/src/ingest/adapters/historic-sql/types.test.ts packages/context/src/ingest/index.ts packages/context/src/package-exports.test.ts git commit -m "feat: add historic sql unified contracts" ``` ## Task 2: Add Stable Bucket Helpers **Files:** - Create: `packages/context/src/ingest/adapters/historic-sql/buckets.ts` - Create: `packages/context/src/ingest/adapters/historic-sql/buckets.test.ts` - Modify: `packages/context/src/ingest/index.ts` - [ ] **Step 1: Write failing bucket tests** Create `packages/context/src/ingest/adapters/historic-sql/buckets.test.ts`: ```typescript import { describe, expect, it } from 'vitest'; import { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketFrequency, bucketP95Runtime, bucketRecency, } from './buckets.js'; describe('historic-sql bucket helpers', () => { it('uses stable execution buckets', () => { expect([0, 9, 10, 99, 100, 999, 1000, 4999, 5000, 49999, 50000].map(bucketExecutions)).toEqual([ '<10', '<10', '10-100', '10-100', '100-1k', '100-1k', '1k-5k', '1k-5k', '5k-50k', '5k-50k', '>50k', ]); }); it('uses stable distinct-user, error-rate, runtime, and recency buckets', () => { expect([0, 1, 2, 5, 6, 10, 11].map(bucketDistinctUsers)).toEqual([ '0', '1', '2-5', '2-5', '5-10', '5-10', '>10', ]); expect([0, 0.01, 0.05, 0.2].map(bucketErrorRate)).toEqual(['none', 'low', 'low', 'high']); expect([null, 99, 100, 999, 1000, 9999, 10000].map(bucketP95Runtime)).toEqual([ 'unknown', '<100ms', '100ms-1s', '100ms-1s', '1s-10s', '1s-10s', '>10s', ]); expect(bucketRecency('2026-05-11T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('current'); expect(bucketRecency('2026-04-20T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('recent'); expect(bucketRecency('2026-01-01T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('stale'); }); it('maps frequency counts to high, mid, and low labels', () => { expect(bucketFrequency(80, 100)).toBe('high'); expect(bucketFrequency(20, 100)).toBe('mid'); expect(bucketFrequency(1, 100)).toBe('low'); expect(bucketFrequency(0, 0)).toBe('low'); }); }); ``` - [ ] **Step 2: Run the bucket test to verify it fails** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/buckets.test.ts ``` Expected: FAIL because `buckets.js` does not exist. - [ ] **Step 3: Add the bucket helper implementation** Create `packages/context/src/ingest/adapters/historic-sql/buckets.ts`: ```typescript export function bucketExecutions(value: number): string { if (value < 10) return '<10'; if (value < 100) return '10-100'; if (value < 1000) return '100-1k'; if (value < 5000) return '1k-5k'; if (value < 50000) return '5k-50k'; return '>50k'; } export function bucketDistinctUsers(value: number): string { if (value <= 0) return '0'; if (value === 1) return '1'; if (value <= 5) return '2-5'; if (value <= 10) return '5-10'; return '>10'; } export function bucketErrorRate(value: number): string { if (value <= 0) return 'none'; if (value < 0.1) return 'low'; return 'high'; } export function bucketP95Runtime(value: number | null): string { if (value === null) return 'unknown'; if (value < 100) return '<100ms'; if (value < 1000) return '100ms-1s'; if (value < 10000) return '1s-10s'; return '>10s'; } export function bucketRecency(lastSeen: string, now: Date): string { const parsed = new Date(lastSeen); if (Number.isNaN(parsed.getTime())) { return 'unknown'; } const ageDays = (now.getTime() - parsed.getTime()) / (24 * 60 * 60 * 1000); if (ageDays <= 7) return 'current'; if (ageDays <= 45) return 'recent'; return 'stale'; } export function bucketFrequency(count: number, total: number): 'high' | 'mid' | 'low' { if (total <= 0 || count <= 0) return 'low'; const ratio = count / total; if (ratio >= 0.5) return 'high'; if (ratio >= 0.1) return 'mid'; return 'low'; } ``` - [ ] **Step 4: Run the bucket test** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/buckets.test.ts ``` Expected: PASS. - [ ] **Step 5: Export bucket helpers** In `packages/context/src/ingest/index.ts`, add: ```typescript export { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketP95Runtime, bucketRecency } from './adapters/historic-sql/buckets.js'; ``` - [ ] **Step 6: Commit** ```bash git add packages/context/src/ingest/adapters/historic-sql/buckets.ts packages/context/src/ingest/adapters/historic-sql/buckets.test.ts packages/context/src/ingest/index.ts git commit -m "feat: add historic sql bucket helpers" ``` ## Task 3: Stage Aggregated Snapshots **Files:** - Create: `packages/context/src/ingest/adapters/historic-sql/stage-unified.ts` - Create: `packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts` - Modify: `packages/context/src/ingest/index.ts` - [ ] **Step 1: Write failing staged-artifact tests** Create `packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts`: ```typescript import { mkdtemp, readFile, readdir } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { describe, expect, it, vi } from 'vitest'; import type { SqlAnalysisPort } from '../../../sql-analysis/index.js'; import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js'; import type { AggregatedTemplate, HistoricSqlReader } from './types.js'; async function tempDir(): Promise { return mkdtemp(join(tmpdir(), 'historic-sql-unified-stage-')); } async function readJson(root: string, relPath: string): Promise { return JSON.parse(await readFile(join(root, relPath), 'utf-8')) as T; } function aggregate(overrides: Partial & { templateId: string; canonicalSql: string }): AggregatedTemplate { return { templateId: overrides.templateId, canonicalSql: overrides.canonicalSql, dialect: overrides.dialect ?? 'postgres', stats: overrides.stats ?? { executions: 42, distinctUsers: 3, firstSeen: '2026-05-01T00:00:00.000Z', lastSeen: '2026-05-11T00:00:00.000Z', p50RuntimeMs: 20, p95RuntimeMs: 80, errorRate: 0, rowsProduced: 100, }, topUsers: overrides.topUsers ?? [{ user: 'analyst', executions: 40 }], }; } describe('stageHistoricSqlAggregatedSnapshot', () => { it('batch parses templates and writes stable table and patterns artifacts', async () => { const stagedDir = await tempDir(); const reader: HistoricSqlReader = { async probe() { return { warnings: ['pg_stat_statements.max is low; aggregation still proceeds'] }; }, async *fetchAggregated() { yield aggregate({ templateId: 'orders-by-status', canonicalSql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status', }); yield aggregate({ templateId: 'service-account-only', canonicalSql: 'select * from public.orders where id = $1', stats: { executions: 20, distinctUsers: 1, firstSeen: '2026-05-01T00:00:00.000Z', lastSeen: '2026-05-11T00:00:00.000Z', p50RuntimeMs: 5, p95RuntimeMs: 10, errorRate: 0, rowsProduced: 1, }, topUsers: [{ user: 'svc_loader', executions: 20 }], }); yield aggregate({ templateId: 'bad-parse', canonicalSql: 'select broken from', }); }, }; const sqlAnalysis: SqlAnalysisPort = { analyzeForFingerprint: vi.fn(), analyzeBatch: vi.fn(async () => new Map([ [ 'orders-by-status', { tablesTouched: ['public.orders', 'public.customers'], columnsByClause: { select: ['status'], where: ['created_at'], join: ['customer_id'], groupBy: ['status'], }, }, ], ['bad-parse', { tablesTouched: [], columnsByClause: {}, error: 'parse failed' }], ])), }; await stageHistoricSqlAggregatedSnapshot({ stagedDir, connectionId: 'warehouse', queryClient: {}, reader, sqlAnalysis, pullConfig: { dialect: 'postgres', filters: { serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' }, }, }, now: new Date('2026-05-11T12:00:00.000Z'), }); expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledTimes(1); expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith( [ { id: 'orders-by-status', sql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status', }, { id: 'bad-parse', sql: 'select broken from' }, ], 'postgres', ); expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.customers.json', 'public.orders.json']); const manifest = await readJson>(stagedDir, 'manifest.json'); expect(manifest).toMatchObject({ source: 'historic-sql', connectionId: 'warehouse', dialect: 'postgres', snapshotRowCount: 3, touchedTableCount: 2, parseFailures: 1, warnings: ['parse_failed:bad-parse'], probeWarnings: ['pg_stat_statements.max is low; aggregation still proceeds'], }); const orders = await readJson>(stagedDir, 'tables/public.orders.json'); expect(orders).toMatchObject({ table: 'public.orders', stats: { executionsBucket: '10-100', distinctUsersBucket: '2-5', errorRateBucket: 'none', p95RuntimeBucket: '<100ms', recencyBucket: 'current', }, columnsByClause: { select: [['status', 'high']], where: [['created_at', 'high']], join: [['customer_id', 'high']], groupBy: [['status', 'high']], }, observedJoins: [{ withTable: 'public.customers', on: ['customer_id'], freq: 'high' }], topTemplates: [ { id: 'orders-by-status', topUsers: [{ user: 'analyst' }], }, ], }); expect(orders.topTemplates[0].canonicalSql).toContain('group by o.status'); const patterns = await readJson>(stagedDir, 'patterns-input.json'); expect(patterns.templates).toEqual([ { id: 'orders-by-status', canonicalSql: expect.stringContaining('public.orders'), tablesTouched: ['public.customers', 'public.orders'], executionsBucket: '10-100', distinctUsersBucket: '2-5', dialect: 'postgres', }, ]); }); }); ``` - [ ] **Step 2: Run the stage test to verify it fails** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/stage-unified.test.ts ``` Expected: FAIL because `stage-unified.js` does not exist. - [ ] **Step 3: Add the unified stager** Create `packages/context/src/ingest/adapters/historic-sql/stage-unified.ts` with these exported shapes and helpers: ```typescript import { mkdir, writeFile } from 'node:fs/promises'; import { dirname, join } from 'node:path'; import type { SqlAnalysisPort } from '../../../sql-analysis/index.js'; import { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketFrequency, bucketP95Runtime, bucketRecency, } from './buckets.js'; import { HISTORIC_SQL_SOURCE_KEY, aggregatedTemplateSchema, historicSqlUnifiedPullConfigSchema, type AggregatedTemplate, type HistoricSqlReader, type HistoricSqlUnifiedPullConfig, type StagedPatternsInput, type StagedTableInput, } from './types.js'; interface StageHistoricSqlAggregatedSnapshotInput { stagedDir: string; connectionId: string; queryClient: unknown; reader: HistoricSqlReader; sqlAnalysis: SqlAnalysisPort; pullConfig: unknown; now?: Date; } interface ParsedTemplate { template: AggregatedTemplate; tablesTouched: string[]; columnsByClause: Record; } interface TableAccumulator { table: string; executions: number; distinctUsers: number; errorRateNumerator: number; p95RuntimeMs: number | null; lastSeen: string; columnsByClause: Map>; observedJoins: Map>; topTemplates: AggregatedTemplate[]; } const TRIVIAL_SQL_RE = /^\s*SELECT\s+(1|NOW\(\)|CURRENT_TIMESTAMP|VERSION\(\))\s*;?\s*$/i; const NOISE_PREFIX_RE = /^\s*(SHOW|DESCRIBE|DESC|EXPLAIN|USE|SET)\b/i; const SYSTEM_TABLE_RE = /\b(INFORMATION_SCHEMA|SNOWFLAKE\.ACCOUNT_USAGE|pg_|system\.)/i; const ORCHESTRATOR_RE = /\b(dbt|looker|metabase)\b/i; function writeJson(root: string, relPath: string, value: unknown): Promise { const target = join(root, relPath); return mkdir(dirname(target), { recursive: true }).then(() => writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'), ); } function compilePatterns(patterns: string[]): RegExp[] { return patterns.map((pattern) => new RegExp(pattern)); } function matchesAny(value: string | null, patterns: RegExp[]): boolean { return !!value && patterns.some((pattern) => pattern.test(value)); } function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean { if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true; if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true; return false; } function shouldDropByUsers(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { const service = config.filters.serviceAccounts; if (!service || service.mode === 'mark-only' || service.patterns.length === 0) return false; const patterns = compilePatterns(service.patterns); const matchingExecutions = template.topUsers .filter((entry) => matchesAny(entry.user, patterns)) .reduce((sum, entry) => sum + entry.executions, 0); const allExecutions = template.topUsers.reduce((sum, entry) => sum + entry.executions, 0); const serviceOnly = allExecutions > 0 && matchingExecutions >= allExecutions; return service.mode === 'exclude' ? serviceOnly : !serviceOnly; } function shouldDropByFailure(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { const failed = config.filters.dropFailedBelow; return !!failed && template.stats.errorRate > failed.errorRate && template.stats.executions < failed.executions; } function shouldDropTemplate(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { if (shouldDropBySql(template.canonicalSql, config)) return true; if (shouldDropByUsers(template, config)) return true; if (shouldDropByFailure(template, config)) return true; return false; } function recordColumn(acc: TableAccumulator, clause: string, column: string, executions: number): void { const byColumn = acc.columnsByClause.get(clause) ?? new Map(); byColumn.set(column, (byColumn.get(column) ?? 0) + executions); acc.columnsByClause.set(clause, byColumn); } function recordJoin(acc: TableAccumulator, otherTable: string, columns: string[], executions: number): void { const byColumns = acc.observedJoins.get(otherTable) ?? new Map(); const key = [...new Set(columns)].sort().join(','); if (key.length > 0) { byColumns.set(key, (byColumns.get(key) ?? 0) + executions); acc.observedJoins.set(otherTable, byColumns); } } function accumulatorFor(table: string): TableAccumulator { return { table, executions: 0, distinctUsers: 0, errorRateNumerator: 0, p95RuntimeMs: null, lastSeen: '1970-01-01T00:00:00.000Z', columnsByClause: new Map(), observedJoins: new Map(), topTemplates: [], }; } function addTemplate(acc: TableAccumulator, parsed: ParsedTemplate): void { const executions = parsed.template.stats.executions; acc.executions += executions; acc.distinctUsers = Math.max(acc.distinctUsers, parsed.template.stats.distinctUsers); acc.errorRateNumerator += parsed.template.stats.errorRate * executions; acc.p95RuntimeMs = acc.p95RuntimeMs === null ? parsed.template.stats.p95RuntimeMs : parsed.template.stats.p95RuntimeMs === null ? acc.p95RuntimeMs : Math.max(acc.p95RuntimeMs, parsed.template.stats.p95RuntimeMs); acc.lastSeen = parsed.template.stats.lastSeen > acc.lastSeen ? parsed.template.stats.lastSeen : acc.lastSeen; for (const [clause, columns] of Object.entries(parsed.columnsByClause)) { for (const column of columns) { recordColumn(acc, clause, column, executions); } } const joinColumns = parsed.columnsByClause.join ?? []; for (const otherTable of parsed.tablesTouched.filter((table) => table !== acc.table)) { recordJoin(acc, otherTable, joinColumns, executions); } acc.topTemplates.push(parsed.template); } ``` In the same file, add the staging function: ```typescript function toStagedTable(acc: TableAccumulator, now: Date): StagedTableInput { const errorRate = acc.executions > 0 ? acc.errorRateNumerator / acc.executions : 0; const columnsByClause = Object.fromEntries( [...acc.columnsByClause.entries()] .sort(([left], [right]) => left.localeCompare(right)) .map(([clause, counts]) => [ clause, [...counts.entries()] .sort((left, right) => right[1] - left[1] || left[0].localeCompare(right[0])) .map(([column, count]) => [column, bucketFrequency(count, acc.executions)]), ]), ); const observedJoins = [...acc.observedJoins.entries()] .flatMap(([withTable, byColumns]) => [...byColumns.entries()].map(([columns, count]) => ({ withTable, on: columns.split(',').filter(Boolean), freq: bucketFrequency(count, acc.executions), })), ) .sort((left, right) => left.withTable.localeCompare(right.withTable) || left.on.join(',').localeCompare(right.on.join(','))); const topTemplates = [...acc.topTemplates] .sort((left, right) => right.stats.executions - left.stats.executions || left.templateId.localeCompare(right.templateId)) .slice(0, 5) .map((template) => ({ id: template.templateId, canonicalSql: template.canonicalSql, topUsers: template.topUsers.slice(0, 5).map((entry) => ({ user: entry.user })), })); return { table: acc.table, stats: { executionsBucket: bucketExecutions(acc.executions), distinctUsersBucket: bucketDistinctUsers(acc.distinctUsers), errorRateBucket: bucketErrorRate(errorRate), p95RuntimeBucket: bucketP95Runtime(acc.p95RuntimeMs), recencyBucket: bucketRecency(acc.lastSeen, now), }, columnsByClause, observedJoins, topTemplates, }; } function toPatternsInput(parsedTemplates: ParsedTemplate[]): StagedPatternsInput { return { templates: parsedTemplates .map(({ template, tablesTouched }) => ({ id: template.templateId, canonicalSql: template.canonicalSql, tablesTouched: [...tablesTouched].sort(), executionsBucket: bucketExecutions(template.stats.executions), distinctUsersBucket: bucketDistinctUsers(template.stats.distinctUsers), dialect: template.dialect, })) .sort((left, right) => left.id.localeCompare(right.id)), }; } export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSqlAggregatedSnapshotInput): Promise { const config = historicSqlUnifiedPullConfigSchema.parse(input.pullConfig); const now = input.now ?? new Date(); const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000); const probe = await input.reader.probe(input.queryClient); const snapshot: AggregatedTemplate[] = []; for await (const row of input.reader.fetchAggregated(input.queryClient, { start: windowStart, end: now }, config)) { const parsed = aggregatedTemplateSchema.parse(row); if (!shouldDropTemplate(parsed, config)) { snapshot.push(parsed); } } const analysis = await input.sqlAnalysis.analyzeBatch( snapshot.map((template) => ({ id: template.templateId, sql: template.canonicalSql })), config.dialect, ); const warnings: string[] = []; const parsedTemplates: ParsedTemplate[] = []; for (const template of snapshot) { const parsed = analysis.get(template.templateId); if (!parsed || parsed.error) { warnings.push(`parse_failed:${template.templateId}`); continue; } const tablesTouched = [...new Set(parsed.tablesTouched)].filter((table) => table.length > 0).sort(); if (tablesTouched.length === 0) { continue; } parsedTemplates.push({ template, tablesTouched, columnsByClause: Object.fromEntries( Object.entries(parsed.columnsByClause).map(([clause, columns]) => [clause, [...new Set(columns)].sort()]), ), }); } const byTable = new Map(); for (const parsed of parsedTemplates) { for (const table of parsed.tablesTouched) { const acc = byTable.get(table) ?? accumulatorFor(table); addTemplate(acc, parsed); byTable.set(table, acc); } } await mkdir(input.stagedDir, { recursive: true }); for (const [table, acc] of [...byTable.entries()].sort(([left], [right]) => left.localeCompare(right))) { await writeJson(input.stagedDir, `tables/${table}.json`, toStagedTable(acc, now)); } await writeJson(input.stagedDir, 'patterns-input.json', toPatternsInput(parsedTemplates)); await writeJson(input.stagedDir, 'manifest.json', { source: HISTORIC_SQL_SOURCE_KEY, connectionId: input.connectionId, dialect: config.dialect, fetchedAt: now.toISOString(), windowStart: windowStart.toISOString(), windowEnd: now.toISOString(), snapshotRowCount: snapshot.length, touchedTableCount: byTable.size, parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length, warnings, probeWarnings: probe.warnings, }); } ``` - [ ] **Step 4: Run the staged-artifact test** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/stage-unified.test.ts ``` Expected: PASS. - [ ] **Step 5: Export the unified stager** In `packages/context/src/ingest/index.ts`, add: ```typescript export { stageHistoricSqlAggregatedSnapshot } from './adapters/historic-sql/stage-unified.js'; ``` - [ ] **Step 6: Commit** ```bash git add packages/context/src/ingest/adapters/historic-sql/stage-unified.ts packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts packages/context/src/ingest/index.ts git commit -m "feat: stage historic sql aggregate snapshots" ``` ## Task 4: Add Aggregate Readers **Files:** - Create: `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts` - Create: `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts` - Modify: `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts` - Modify: `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts` - Modify: `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts` - Modify: `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts` - Modify: `packages/context/src/ingest/index.ts` - [ ] **Step 1: Write failing Postgres aggregate reader tests** Create `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts`: ```typescript import { describe, expect, it, vi } from 'vitest'; import { PostgresPgssReader } from './postgres-pgss-reader.js'; describe('PostgresPgssReader aggregate path', () => { it('aggregates pg_stat_statements rows by queryid and query', async () => { const executeQuery = vi.fn(async (sql: string, params?: unknown[]) => { if (sql.includes('pg_stat_statements_info')) { return { headers: ['stats_reset', 'dealloc'], rows: [['2026-05-01T00:00:00.000Z', 1]] }; } expect(sql).toContain('GROUP BY queryid, query'); expect(sql).toContain('HAVING SUM(calls) >= $1'); expect(params).toEqual([5]); return { headers: ['template_id', 'canonical_sql', 'executions', 'distinct_users', 'mean_ms', 'rows_produced', 'top_users'], rows: [ [ '123', 'select status from public.orders', '42', '3', '11.5', '100', JSON.stringify([{ user: 'analyst', executions: 40 }]), ], ], }; }); const reader = new PostgresPgssReader(); const rows = []; for await (const row of reader.fetchAggregated( { executeQuery }, { start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') }, { dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 }, )) { rows.push(row); } expect(rows).toEqual([ { templateId: '123', canonicalSql: 'select status from public.orders', dialect: 'postgres', stats: { executions: 42, distinctUsers: 3, firstSeen: '2026-05-01T00:00:00.000Z', lastSeen: '2026-05-11T00:00:00.000Z', p50RuntimeMs: 11.5, p95RuntimeMs: 11.5, errorRate: 0, rowsProduced: 100, }, topUsers: [{ user: 'analyst', executions: 40 }], }, ]); }); }); ``` - [ ] **Step 2: Add failing BigQuery and Snowflake aggregate assertions** In `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts`, add a test that constructs `new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'demo', region: 'us' })`, calls `fetchAggregated()`, and asserts the SQL contains: ```typescript expect(sql).toContain('COUNT(*) AS executions'); expect(sql).toContain('COUNT(DISTINCT user_email) AS distinct_users'); expect(sql).toContain('GROUP BY query_hash'); expect(sql).toContain('HAVING COUNT(*) >= 5'); ``` Map one returned row with headers: ```typescript [ 'template_id', 'canonical_sql', 'executions', 'distinct_users', 'first_seen', 'last_seen', 'p50_ms', 'p95_ms', 'error_rate', 'rows_produced', 'top_users', ] ``` and assert `templateId`, `stats.executions`, `stats.errorRate`, and `topUsers` match the row. In `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts`, add the same shape but assert the SQL contains: ```typescript expect(sql).toContain('SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'); expect(sql).toContain('COUNT(*) AS executions'); expect(sql).toContain('GROUP BY query_hash'); expect(sql).toContain('HAVING COUNT(*) >= 5'); ``` - [ ] **Step 3: Run aggregate reader tests to verify they fail** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts ``` Expected: FAIL because `fetchAggregated()` and `postgres-pgss-reader.js` do not exist. - [ ] **Step 4: Implement the aggregate reader methods** Create `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts` with the same probe behavior currently implemented in `postgres-pgss-query-history-reader.ts`: `queryClient`, `execute`, `indexByHeader`, `value`, `nullableString`, `requiredString`, `requiredFiniteNumber`, `nullableInteger`, `nullableIsoTimestamp`, `firstRow`, `extensionMissingError`, and `grantsMissingError` keep their current behavior. Add this aggregate query and row mapper: ```typescript const AGGREGATE_SQL = ` SELECT queryid::text AS template_id, query AS canonical_sql, SUM(calls)::bigint AS executions, COUNT(DISTINCT userid) AS distinct_users, SUM(total_exec_time) / NULLIF(SUM(calls), 0) AS mean_ms, SUM(rows)::bigint AS rows_produced, COALESCE( json_agg(json_build_object('user', rolname, 'executions', calls) ORDER BY calls DESC) FILTER (WHERE userid IS NOT NULL), '[]'::json )::text AS top_users FROM pg_stat_statements LEFT JOIN pg_roles ON pg_roles.oid = pg_stat_statements.userid WHERE toplevel = true GROUP BY queryid, query HAVING SUM(calls) >= $1 ORDER BY SUM(total_exec_time) DESC `.trim(); ``` The `fetchAggregated()` method must: ```typescript async *fetchAggregated( client: unknown, window: HistoricSqlTimeWindow, config: HistoricSqlUnifiedPullConfig, ): AsyncIterable { const pgClient = queryClient(client); const statsResult = await execute(pgClient, STATS_INFO_SQL); const { row: statsRow, headers: statsHeaders } = firstRow(statsResult, 'stats-info'); const firstSeen = nullableIsoTimestamp(value(statsRow, statsHeaders, 'stats_reset')) ?? window.start.toISOString(); const result = await execute(pgClient, AGGREGATE_SQL, [config.minExecutions]); const indexes = indexByHeader(result.headers); for (const row of result.rows) { yield aggregatedTemplateSchema.parse({ templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'), canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'), dialect: 'postgres', stats: { executions: requiredInteger(value(row, indexes, 'executions'), 'executions'), distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'), firstSeen, lastSeen: window.end.toISOString(), p50RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')), p95RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')), errorRate: 0, rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')), }, topUsers: parseTopUsers(value(row, indexes, 'top_users')), }); } } ``` In `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts`, add this aggregate query inside `fetchAggregated()`: ```typescript const sql = ` SELECT query_hash AS template_id, MIN(query) AS canonical_sql, COUNT(*) AS executions, COUNT(DISTINCT user_email) AS distinct_users, MIN(creation_time) AS first_seen, MAX(creation_time) AS last_seen, APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(50)] AS p50_ms, APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(95)] AS p95_ms, SAFE_DIVIDE(COUNTIF(error_result IS NOT NULL), COUNT(*)) AS error_rate, CAST(NULL AS INT64) AS rows_produced, TO_JSON_STRING(ARRAY_AGG(STRUCT(user_email AS user, 1 AS executions) ORDER BY creation_time DESC LIMIT 5)) AS top_users FROM ${this.viewPath} WHERE job_type = 'QUERY' AND statement_type IN ('SELECT', 'MERGE') AND creation_time >= ${timestampExpression(window.start)} AND creation_time < ${timestampExpression(window.end)} AND query IS NOT NULL GROUP BY query_hash HAVING COUNT(*) >= ${config.minExecutions} ORDER BY executions DESC`.trim(); ``` Map each result row into `aggregatedTemplateSchema.parse({ templateId, canonicalSql, dialect: 'bigquery', stats: { executions, distinctUsers, firstSeen, lastSeen, p50RuntimeMs, p95RuntimeMs, errorRate, rowsProduced }, topUsers })`, where `topUsers` is parsed from the `top_users` JSON string and invalid JSON becomes `[]`. In `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts`, add this aggregate query inside `fetchAggregated()`: ```typescript const sql = ` SELECT query_hash AS template_id, MIN(query_text) AS canonical_sql, COUNT(*) AS executions, COUNT(DISTINCT user_name) AS distinct_users, MIN(start_time) AS first_seen, MAX(start_time) AS last_seen, APPROX_PERCENTILE(total_elapsed_time, 0.50) AS p50_ms, APPROX_PERCENTILE(total_elapsed_time, 0.95) AS p95_ms, DIV0(COUNT_IF(execution_status != 'SUCCESS'), COUNT(*)) AS error_rate, SUM(rows_produced) AS rows_produced, ARRAY_AGG(OBJECT_CONSTRUCT('user', user_name, 'executions', 1)) WITHIN GROUP (ORDER BY start_time DESC)::string AS top_users FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY WHERE query_text IS NOT NULL AND query_type IN ('SELECT', 'MERGE') AND start_time >= ${timestampLiteral(window.start)} AND start_time < ${timestampLiteral(window.end)} GROUP BY query_hash HAVING COUNT(*) >= ${config.minExecutions} ORDER BY executions DESC`.trim(); ``` Map each result row into `aggregatedTemplateSchema.parse({ templateId, canonicalSql, dialect: 'snowflake', stats: { executions, distinctUsers, firstSeen, lastSeen, p50RuntimeMs, p95RuntimeMs, errorRate, rowsProduced }, topUsers })`, where `topUsers` is parsed from the `top_users` JSON string and invalid JSON becomes `[]`. Keep the existing `fetch()` methods unchanged in this plan so current adapter behavior does not move before the skill/projection cutover. - [ ] **Step 5: Export the new Postgres reader** In `packages/context/src/ingest/index.ts`, add: ```typescript export { PostgresPgssReader } from './adapters/historic-sql/postgres-pgss-reader.js'; ``` - [ ] **Step 6: Run aggregate reader tests** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts ``` Expected: PASS. - [ ] **Step 7: Commit** ```bash git add packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts packages/context/src/ingest/index.ts git commit -m "feat: add historic sql aggregate readers" ``` ## Task 5: Add Unified Chunking **Files:** - Create: `packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts` - Create: `packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts` - Modify: `packages/context/src/ingest/index.ts` - [ ] **Step 1: Write failing unified chunk tests** Create `packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts`: ```typescript import { mkdir, mkdtemp, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { describe, expect, it } from 'vitest'; import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js'; async function tempDir(): Promise { return mkdtemp(join(tmpdir(), 'historic-sql-unified-chunk-')); } async function writeJson(root: string, relPath: string, value: unknown): Promise { const target = join(root, relPath); await mkdir(join(target, '..'), { recursive: true }); await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'); } async function writeUnifiedStagedDir(root: string): Promise { await writeJson(root, 'manifest.json', { source: 'historic-sql', connectionId: 'warehouse', dialect: 'postgres', fetchedAt: '2026-05-11T00:00:00.000Z', windowStart: '2026-02-10T00:00:00.000Z', windowEnd: '2026-05-11T00:00:00.000Z', snapshotRowCount: 1, touchedTableCount: 1, parseFailures: 0, warnings: [], probeWarnings: [], }); await writeJson(root, 'tables/public.orders.json', { table: 'public.orders', stats: { executionsBucket: '10-100', distinctUsersBucket: '2-5', errorRateBucket: 'none', p95RuntimeBucket: '<100ms', recencyBucket: 'current', }, columnsByClause: { select: [['status', 'high']] }, observedJoins: [], topTemplates: [{ id: 'orders', canonicalSql: 'select * from public.orders', topUsers: [{ user: 'analyst' }] }], }); await writeJson(root, 'patterns-input.json', { templates: [ { id: 'orders', canonicalSql: 'select * from public.orders', tablesTouched: ['public.orders'], executionsBucket: '10-100', distinctUsersBucket: '2-5', dialect: 'postgres', }, ], }); } describe('chunkHistoricSqlUnifiedStagedDir', () => { it('emits one table WorkUnit plus one patterns WorkUnit', async () => { const stagedDir = await tempDir(); await writeUnifiedStagedDir(stagedDir); const result = await chunkHistoricSqlUnifiedStagedDir(stagedDir); expect(result.workUnits).toEqual([ expect.objectContaining({ unitKey: 'historic-sql-table-public-orders', displayLabel: 'Historic SQL usage: public.orders', rawFiles: ['tables/public.orders.json'], dependencyPaths: ['manifest.json'], notes: expect.stringContaining('historic_sql_table_digest'), }), expect.objectContaining({ unitKey: 'historic-sql-patterns', displayLabel: 'Historic SQL cross-table patterns', rawFiles: ['patterns-input.json'], dependencyPaths: ['manifest.json'], notes: expect.stringContaining('historic_sql_patterns'), }), ]); expect(result.reconcileNotes).toEqual(['Historic-SQL touched tables=1 parseFailures=0']); }); it('respects diff sets for unchanged table and patterns files', async () => { const stagedDir = await tempDir(); await writeUnifiedStagedDir(stagedDir); await expect( chunkHistoricSqlUnifiedStagedDir(stagedDir, { added: [], modified: ['tables/public.orders.json'], deleted: [], unchanged: ['manifest.json', 'patterns-input.json'], }), ).resolves.toMatchObject({ workUnits: [expect.objectContaining({ unitKey: 'historic-sql-table-public-orders' })], }); await expect( chunkHistoricSqlUnifiedStagedDir(stagedDir, { added: [], modified: ['patterns-input.json'], deleted: [], unchanged: ['manifest.json', 'tables/public.orders.json'], }), ).resolves.toMatchObject({ workUnits: [expect.objectContaining({ unitKey: 'historic-sql-patterns' })], }); }); it('describes unified staged scope', async () => { const stagedDir = await tempDir(); await writeUnifiedStagedDir(stagedDir); const scope = await describeHistoricSqlUnifiedScope(stagedDir); expect(scope.isPathInScope('manifest.json')).toBe(true); expect(scope.isPathInScope('patterns-input.json')).toBe(true); expect(scope.isPathInScope('tables/public.orders.json')).toBe(true); expect(scope.isPathInScope('templates/old/page.md')).toBe(false); }); }); ``` - [ ] **Step 2: Run the unified chunk tests to verify they fail** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/chunk-unified.test.ts ``` Expected: FAIL because `chunk-unified.js` does not exist. - [ ] **Step 3: Add the unified chunker** Create `packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts`: ```typescript import { createHash } from 'node:crypto'; import { readFile, readdir } from 'node:fs/promises'; import { join, relative } from 'node:path'; import type { ChunkResult, DiffSet, ScopeDescriptor, WorkUnit } from '../../types.js'; import { stagedManifestSchema, stagedPatternsInputSchema, stagedTableInputSchema } from './types.js'; async function walk(root: string): Promise { const entries = await readdir(root, { withFileTypes: true, recursive: true }); return entries .filter((entry) => entry.isFile()) .map((entry) => relative(root, join(entry.parentPath, entry.name)).replace(/\\/g, '/')) .sort(); } async function readJson(stagedDir: string, relPath: string): Promise { return JSON.parse(await readFile(join(stagedDir, relPath), 'utf-8')) as T; } function safeUnitKey(value: string): string { return value.replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, ''); } function touchedPath(path: string, touched: Set | null): boolean { return !touched || touched.has(path); } export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSet?: DiffSet): Promise { const files = await walk(stagedDir); const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json')); const touched = diffSet ? new Set([...diffSet.added, ...diffSet.modified]) : null; const workUnits: WorkUnit[] = []; for (const path of files.filter((file) => /^tables\/.+\.json$/.test(file))) { if (!touchedPath(path, touched)) { continue; } const table = stagedTableInputSchema.parse(await readJson(stagedDir, path)); workUnits.push({ unitKey: `historic-sql-table-${safeUnitKey(table.table)}`, displayLabel: `Historic SQL usage: ${table.table}`, rawFiles: [path], dependencyPaths: ['manifest.json'], peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(), notes: 'Use historic_sql_table_digest. Read this table usage JSON and the existing semantic-layer source for the table; output only table usage evidence shaped like tableUsageOutputSchema.', }); } if (files.includes('patterns-input.json') && touchedPath('patterns-input.json', touched)) { stagedPatternsInputSchema.parse(await readJson(stagedDir, 'patterns-input.json')); workUnits.push({ unitKey: 'historic-sql-patterns', displayLabel: 'Historic SQL cross-table patterns', rawFiles: ['patterns-input.json'], dependencyPaths: ['manifest.json'], peerFileIndex: files.filter((file) => file !== 'patterns-input.json' && file !== 'manifest.json').sort(), notes: 'Use historic_sql_patterns. Read patterns-input.json and produce cross-table pattern evidence shaped like patternsArraySchema.', }); } const deleted = diffSet?.deleted.filter((path) => path === 'patterns-input.json' || /^tables\/.+\.json$/.test(path)).sort(); return { workUnits, eviction: deleted && deleted.length > 0 ? { deletedRawPaths: deleted } : undefined, reconcileNotes: [`Historic-SQL touched tables=${manifest.touchedTableCount} parseFailures=${manifest.parseFailures}`], contextReport: { capped: false, warnings: [...manifest.probeWarnings, ...manifest.warnings], }, }; } export async function describeHistoricSqlUnifiedScope(stagedDir: string): Promise { const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json')); const fingerprint = createHash('sha256') .update(JSON.stringify({ connectionId: manifest.connectionId, dialect: manifest.dialect, windowStart: manifest.windowStart, windowEnd: manifest.windowEnd, })) .digest('hex'); return { fingerprint, isPathInScope: (rawPath) => rawPath === 'manifest.json' || rawPath === 'patterns-input.json' || /^tables\/.+\.json$/.test(rawPath), }; } ``` - [ ] **Step 4: Run the unified chunk tests** Run: ```bash pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/chunk-unified.test.ts ``` Expected: PASS. - [ ] **Step 5: Export the unified chunker** In `packages/context/src/ingest/index.ts`, add: ```typescript export { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './adapters/historic-sql/chunk-unified.js'; ``` - [ ] **Step 6: Commit** ```bash git add packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts packages/context/src/ingest/index.ts git commit -m "feat: chunk historic sql unified staging" ``` ## Task 6: Verify the Hot Path Slice **Files:** - Modify: files changed in Tasks 1-5 - [ ] **Step 1: Run focused historic-SQL tests** Run: ```bash pnpm --filter @ktx/context exec vitest run \ src/ingest/adapters/historic-sql/types.test.ts \ src/ingest/adapters/historic-sql/buckets.test.ts \ src/ingest/adapters/historic-sql/stage-unified.test.ts \ src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts \ src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts \ src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts \ src/ingest/adapters/historic-sql/chunk-unified.test.ts \ src/package-exports.test.ts ``` Expected: PASS. - [ ] **Step 2: Run type-check for the context package** Run: ```bash pnpm --filter @ktx/context run type-check ``` Expected: PASS. - [ ] **Step 3: Confirm legacy production adapter was not switched** Run: ```bash rg -n "historic_sql_ingest|historic_sql_curator|stagePgStatStatementsTemplates" packages/context/src/ingest/adapters/historic-sql packages/context/skills packages/context/src/ingest/ingest-runtime-assets.test.ts ``` Expected: Results still include `historic-sql.adapter.ts`, the old skill files, and runtime-asset tests. This is correct for this plan because the replacement skills and projection are not present yet. - [ ] **Step 4: Confirm new hot-path exports exist** Run: ```bash rg -n "stageHistoricSqlAggregatedSnapshot|chunkHistoricSqlUnifiedStagedDir|PostgresPgssReader|aggregatedTemplateSchema" packages/context/src/ingest/index.ts packages/context/src/ingest/adapters/historic-sql ``` Expected: Results include the new stager, chunker, reader, and schemas. - [ ] **Step 5: Commit verification fixes only when verification changed files** ```bash git status --short ``` Expected: no output. If verification forced a fix, run: ```bash git add packages/context/src/ingest/adapters/historic-sql packages/context/src/ingest/index.ts packages/context/src/package-exports.test.ts git commit -m "test: verify historic sql unified hot path" ``` ## Follow-Up Plan Boundary The next plan after this one should switch the production adapter only after it also creates the cold-path pieces: - `packages/context/skills/historic_sql_table_digest/SKILL.md` - `packages/context/skills/historic_sql_patterns/SKILL.md` - adapter `skillNames` change from `historic_sql_ingest` to the two new skills - `onPullSucceeded()` projection of table usage into `_schema/{shard}.yaml` - pattern wiki page projection and slug stability - one-time cleanup of legacy template wiki pages and PGSS baselines - deletion of `stage-pgss.ts`, old template staging exports, and old historic-SQL skill assets ## Self-Review Spec coverage: - Unified aggregate reader contracts: Task 1 and Task 4. - Trailing-window aggregate fetch shape: Task 4. - Batch SQL parse through `SqlAnalysisPort.analyzeBatch()`: Task 3. - Service-account, trivial query, failed-template, parse-failure, and zero-table filtering: Task 3. - Bucketed `tables/*.json`, `patterns-input.json`, and `manifest.json`: Task 2 and Task 3. - WorkUnits for one table file plus patterns input: Task 5. - Hard production cutover, LLM skills, projection, wiki pages, stale handling, and legacy deletion: explicitly excluded from this plan and listed as the next plan boundary. Placeholder scan: - No unresolved placeholders are left in task steps. - Every code-changing task includes concrete test code, implementation code, commands, and expected results. Type consistency: - `HistoricSqlUnifiedPullConfig`, `AggregatedTemplate`, `StagedTableInput`, `StagedPatternsInput`, and `StagedManifest` are defined in Task 1 and reused consistently by Tasks 3-5. - `PostgresPgssReader`, `fetchAggregated()`, `stageHistoricSqlAggregatedSnapshot()`, and `chunkHistoricSqlUnifiedStagedDir()` names match exports and test imports. Plan complete and saved to `docs/superpowers/plans/2026-05-11-historic-sql-unified-hot-path.md`. Two execution options: **1. Subagent-Driven (recommended)** - I dispatch a fresh subagent per task, review between tasks, fast iteration **2. Inline Execution** - Execute tasks in this session using executing-plans, batch execution with checkpoints Which approach?