From 3ada4601d1bb3911744c5fdf220d9d392f902087 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 17:55:42 +0200 Subject: [PATCH] docs(plans): add historic-sql unified hot-path plan Co-Authored-By: Claude Opus 4.7 (1M context) --- ...026-05-11-historic-sql-unified-hot-path.md | 1698 +++++++++++++++++ 1 file changed, 1698 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-11-historic-sql-unified-hot-path.md diff --git a/docs/superpowers/plans/2026-05-11-historic-sql-unified-hot-path.md b/docs/superpowers/plans/2026-05-11-historic-sql-unified-hot-path.md new file mode 100644 index 00000000..c3228e8a --- /dev/null +++ b/docs/superpowers/plans/2026-05-11-historic-sql-unified-hot-path.md @@ -0,0 +1,1698 @@ +# Historic SQL Unified Hot Path Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build the deterministic historic-SQL hot path that reads warehouse-aggregated query templates, batch-parses them once, and writes stable table-bucket and pattern-input staged artifacts. + +**Architecture:** This slice adds the unified reader/stager contracts from the historic-SQL redesign without doing the LLM cold path or projection work. Dialect-specific SQL lives in reader classes; shared TypeScript code filters, batch-parses, bucketizes, and writes `manifest.json`, `tables/*.json`, and `patterns-input.json`. The existing production adapter remains on the legacy path until the follow-up skills/projection cutover can switch it without loading missing skills. + +**Tech Stack:** TypeScript ESM/NodeNext, zod 4, Vitest, `SqlAnalysisPort.analyzeBatch()`, warehouse query clients. + +--- + +## Starting Point + +Spec: `docs/superpowers/specs/2026-05-11-historic-sql-redesign-design.md` + +Plans found that are based on this spec: + +- `docs/superpowers/plans/2026-05-11-historic-sql-foundations.md` +- `docs/superpowers/plans/2026-05-11-historic-sql-search-enrichment.md` + +Implemented status from this worktree: + +- `2026-05-11-historic-sql-foundations.md` is implemented. Evidence: `packages/context/src/ingest/adapters/historic-sql/skill-schemas.ts`, `SemanticLayerSource.usage` in `packages/context/src/sl/types.ts`, `mergeUsagePreservingExternal()` in `packages/context/src/ingest/adapters/live-database/manifest.ts`, `SqlAnalysisPort.analyzeBatch()` in `packages/context/src/sql-analysis/ports.ts`, and `/sql/analyze-batch` in `python/ktx-daemon/src/ktx_daemon/app.py`. +- `2026-05-11-historic-sql-search-enrichment.md` is implemented. Evidence: `buildSemanticLayerSourceSearchText()` indexes `source.usage` in `packages/context/src/sl/sl-search.service.ts`, SQLite FTS returns `snippet()` in `packages/context/src/sl/sqlite-sl-sources-index.ts`, and local/MCP list results expose `frequencyTier` and `snippet` in `packages/context/src/sl/local-sl.ts` and `packages/context/src/mcp/local-project-ports.ts`. + +Still not implemented: + +- `packages/context/src/ingest/adapters/historic-sql/stage.ts` still calls `SqlAnalysisPort.analyzeForFingerprint()` per raw query and emits `templates/*/{metadata.json,page.md,usage.json}`. +- `packages/context/src/ingest/adapters/historic-sql/stage-pgss.ts` still owns Postgres baseline-diff state and writes `.ktx/cache/historic-sql/*/pgss-baseline.json`. +- `packages/context/src/ingest/adapters/historic-sql/chunk.ts` still emits one WorkUnit per template page for `historic_sql_ingest`. +- `packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts` still advertises `historic_sql_ingest` and `historic_sql_curator`. +- Old code strings still exist: `stagePgStatStatementsTemplates`, `expandCategoricalTemplates`, `classifySlot`, and `pgss-baseline`. + +This plan covers the deterministic hot path from the spec: unified aggregate contracts, aggregate readers, batch parsing, table bucketing, pattern input staging, and a new chunker for the new staged shape. It does not switch `HistoricSqlSourceAdapter` to the new WorkUnits; the cutover plan must create `historic_sql_table_digest`, `historic_sql_patterns`, and projection before changing production `skillNames`. + +## File Structure + +Create: + +- `packages/context/src/ingest/adapters/historic-sql/types.test.ts` + Locks the new public zod contracts and the one-release `minCalls` to `minExecutions` config alias. +- `packages/context/src/ingest/adapters/historic-sql/buckets.ts` + Owns deterministic bucket labels and frequency-tier helpers used by staging. +- `packages/context/src/ingest/adapters/historic-sql/buckets.test.ts` + Locks stable bucket boundaries so small numeric drift does not churn staged files. +- `packages/context/src/ingest/adapters/historic-sql/stage-unified.ts` + Implements the new deterministic stager behind `stageHistoricSqlAggregatedSnapshot()`. +- `packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts` + Tests batch parsing, parse failures, service-account filtering, per-table bucketing, and `patterns-input.json`. +- `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts` + Implements the new Postgres aggregate reader over `pg_stat_statements`. +- `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts` + Tests the aggregate PGSS query shape, probe warnings, and row mapping. +- `packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts` + Implements the new chunker for `tables/*.json` plus `patterns-input.json`. +- `packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts` + Tests table WorkUnits, the patterns WorkUnit, diff filtering, eviction, and scope detection. + +Modify: + +- `packages/context/src/ingest/adapters/historic-sql/types.ts` + Adds aggregate input, staged artifact, reader, and manifest schemas. Keeps legacy exported types until adapter cutover, but marks the new contracts as the target API for the next slice. +- `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts` + Adds `fetchAggregated()` while retaining the existing `fetch()` until the adapter cutover. +- `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts` + Adds aggregate-query tests. +- `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts` + Adds `fetchAggregated()` while retaining the existing `fetch()` until the adapter cutover. +- `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts` + Adds aggregate-query tests. +- `packages/context/src/ingest/index.ts` + Exports the new hot-path contracts and helpers. +- `packages/context/src/package-exports.test.ts` + Asserts the new exports exist without removing old exports in this slice. + +Do not modify in this plan: + +- `packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.ts` +- `packages/context/skills/historic_sql_ingest/SKILL.md` +- `packages/context/skills/historic_sql_curator/SKILL.md` +- `packages/context/src/ingest/ingest-runtime-assets.test.ts` + +Those files change in the cutover/projection plan after the replacement skills exist. + +## Task 1: Add Unified Contracts + +**Files:** +- Create: `packages/context/src/ingest/adapters/historic-sql/types.test.ts` +- Modify: `packages/context/src/ingest/adapters/historic-sql/types.ts` +- Modify: `packages/context/src/ingest/index.ts` +- Modify: `packages/context/src/package-exports.test.ts` + +- [ ] **Step 1: Write failing contract tests** + +Create `packages/context/src/ingest/adapters/historic-sql/types.test.ts`: + +```typescript +import { describe, expect, it } from 'vitest'; +import { + aggregatedTemplateSchema, + historicSqlUnifiedPullConfigSchema, + stagedManifestSchema, + stagedPatternsInputSchema, + stagedTableInputSchema, +} from './types.js'; + +describe('historic-sql unified contracts', () => { + it('parses minExecutions and accepts minCalls as a one-release alias', () => { + expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).toMatchObject({ + dialect: 'postgres', + minExecutions: 9, + windowDays: 90, + concurrency: 12, + redactionPatterns: [], + staleArchiveAfterDays: 90, + }); + + expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minCalls: 7 }).minExecutions).toBe(7); + }); + + it('validates aggregate templates from warehouse readers', () => { + const parsed = aggregatedTemplateSchema.parse({ + templateId: 'pg:123', + canonicalSql: 'select status, count(*) from public.orders group by status', + dialect: 'postgres', + stats: { + executions: 42, + distinctUsers: 3, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 12.5, + p95RuntimeMs: 40, + errorRate: 0, + rowsProduced: 100, + }, + topUsers: [{ user: 'analyst', executions: 40 }], + }); + + expect(parsed.templateId).toBe('pg:123'); + expect(parsed.topUsers).toEqual([{ user: 'analyst', executions: 40 }]); + }); + + it('validates staged table, patterns, and manifest artifacts', () => { + expect( + stagedTableInputSchema.parse({ + table: 'public.orders', + stats: { + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + errorRateBucket: 'none', + p95RuntimeBucket: '<100ms', + recencyBucket: 'current', + }, + columnsByClause: { + select: [['status', 'high']], + where: [['created_at', 'mid']], + }, + observedJoins: [{ withTable: 'public.customers', on: ['customer_id'], freq: 'high' }], + topTemplates: [{ id: 'pg:123', canonicalSql: 'select * from public.orders', topUsers: [{ user: 'analyst' }] }], + }).table, + ).toBe('public.orders'); + + expect( + stagedPatternsInputSchema.parse({ + templates: [ + { + id: 'pg:123', + canonicalSql: 'select * from public.orders', + tablesTouched: ['public.orders'], + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + dialect: 'postgres', + }, + ], + }).templates, + ).toHaveLength(1); + + expect( + stagedManifestSchema.parse({ + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + fetchedAt: '2026-05-11T00:00:00.000Z', + windowStart: '2026-02-10T00:00:00.000Z', + windowEnd: '2026-05-11T00:00:00.000Z', + snapshotRowCount: 2, + touchedTableCount: 1, + parseFailures: 1, + warnings: ['parse_failed:bad'], + probeWarnings: [], + }).parseFailures, + ).toBe(1); + }); +}); +``` + +Add these assertions near the historic-SQL export assertions in `packages/context/src/package-exports.test.ts`: + +```typescript + expect(ingest.historicSqlUnifiedPullConfigSchema).toBeDefined(); + expect(ingest.aggregatedTemplateSchema).toBeDefined(); + expect(ingest.stagedTableInputSchema).toBeDefined(); +``` + +- [ ] **Step 2: Run the contract tests to verify they fail** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/types.test.ts src/package-exports.test.ts +``` + +Expected: FAIL with missing exports for `historicSqlUnifiedPullConfigSchema`, `aggregatedTemplateSchema`, and `stagedTableInputSchema`. + +- [ ] **Step 3: Add the new schemas and reader contracts** + +Insert this block immediately after the existing `historicSqlPullConfigSchema` definition in `packages/context/src/ingest/adapters/historic-sql/types.ts`. Keep `historicSqlPullConfigSchema` and `HistoricSqlPullConfig` unchanged in this plan because the current production adapter still reads `lastSuccessfulCursor`, `maxTemplatesPerRun`, and `minCalls`. + +```typescript +const filterModeSchema = z.enum(['exclude', 'include', 'mark-only']); + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} + +export const historicSqlUnifiedPullConfigSchema = z.preprocess((value) => { + if (!isRecord(value)) { + return value; + } + if (value.minExecutions === undefined && typeof value.minCalls === 'number') { + return { ...value, minExecutions: value.minCalls }; + } + return value; +}, z.object({ + dialect: historicSqlDialectSchema, + windowDays: z.number().int().positive().default(90), + minExecutions: z.number().int().nonnegative().default(5), + concurrency: z.number().int().positive().default(12), + filters: z.object({ + serviceAccounts: z.object({ + patterns: z.array(z.string()).default([]), + mode: filterModeSchema.default('exclude'), + }).optional(), + orchestrators: z.object({ + mode: filterModeSchema.default('mark-only'), + }).optional(), + dropTrivialProbes: z.boolean().default(true), + dropFailedBelow: z.object({ + errorRate: z.number().min(0).max(1), + executions: z.number().int().nonnegative(), + }).optional(), + }).default({}), + redactionPatterns: z.array(z.string()).default([]), + staleArchiveAfterDays: z.number().int().positive().default(90), +})); + +export type HistoricSqlUnifiedPullConfig = z.infer; + +export const aggregatedTemplateSchema = z.object({ + templateId: z.string().min(1), + canonicalSql: z.string().min(1), + dialect: historicSqlDialectSchema, + stats: z.object({ + executions: z.number().int().nonnegative(), + distinctUsers: z.number().int().nonnegative(), + firstSeen: z.iso.datetime(), + lastSeen: z.iso.datetime(), + p50RuntimeMs: z.number().nonnegative().nullable(), + p95RuntimeMs: z.number().nonnegative().nullable(), + errorRate: z.number().min(0).max(1), + rowsProduced: z.number().int().nonnegative().nullable(), + }), + topUsers: z.array(z.object({ + user: z.string().nullable(), + executions: z.number().int().nonnegative(), + })).default([]), +}); +export type AggregatedTemplate = z.infer; + +export const stagedTableInputSchema = z.object({ + table: z.string().min(1), + stats: z.object({ + executionsBucket: z.string(), + distinctUsersBucket: z.string(), + errorRateBucket: z.string(), + p95RuntimeBucket: z.string(), + recencyBucket: z.string(), + }), + columnsByClause: z.record(z.string(), z.array(z.tuple([z.string(), z.string()]))), + observedJoins: z.array(z.object({ + withTable: z.string(), + on: z.array(z.string()), + freq: z.string(), + })), + topTemplates: z.array(z.object({ + id: z.string(), + canonicalSql: z.string(), + topUsers: z.array(z.object({ user: z.string().nullable() })), + })), +}); +export type StagedTableInput = z.infer; + +export const stagedPatternsInputSchema = z.object({ + templates: z.array(z.object({ + id: z.string(), + canonicalSql: z.string(), + tablesTouched: z.array(z.string()), + executionsBucket: z.string(), + distinctUsersBucket: z.string(), + dialect: historicSqlDialectSchema, + })), +}); +export type StagedPatternsInput = z.infer; + +export const stagedManifestSchema = z.object({ + source: z.literal(HISTORIC_SQL_SOURCE_KEY), + connectionId: z.string().min(1), + dialect: historicSqlDialectSchema, + fetchedAt: z.iso.datetime(), + windowStart: z.iso.datetime(), + windowEnd: z.iso.datetime(), + snapshotRowCount: z.number().int().nonnegative(), + touchedTableCount: z.number().int().nonnegative(), + parseFailures: z.number().int().nonnegative(), + warnings: z.array(z.string()), + probeWarnings: z.array(z.string()), +}); +export type StagedManifest = z.infer; + +export interface HistoricSqlProbeResult { + warnings: string[]; +} + +export interface HistoricSqlReader { + probe(client: unknown): Promise; + fetchAggregated( + client: unknown, + window: HistoricSqlTimeWindow, + config: HistoricSqlUnifiedPullConfig, + ): AsyncIterable; +} +``` + +- [ ] **Step 4: Export the new contracts** + +In `packages/context/src/ingest/index.ts`, add exports for the new types and schemas: + +```typescript +export type { + AggregatedTemplate, + HistoricSqlProbeResult, + HistoricSqlReader, + HistoricSqlUnifiedPullConfig, + StagedManifest, + StagedPatternsInput, + StagedTableInput, +} from './adapters/historic-sql/types.js'; +export { + aggregatedTemplateSchema, + historicSqlUnifiedPullConfigSchema, + stagedManifestSchema, + stagedPatternsInputSchema, + stagedTableInputSchema, +} from './adapters/historic-sql/types.js'; +``` + +- [ ] **Step 5: Run the contract tests** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/types.test.ts src/package-exports.test.ts +``` + +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add packages/context/src/ingest/adapters/historic-sql/types.ts packages/context/src/ingest/adapters/historic-sql/types.test.ts packages/context/src/ingest/index.ts packages/context/src/package-exports.test.ts +git commit -m "feat: add historic sql unified contracts" +``` + +## Task 2: Add Stable Bucket Helpers + +**Files:** +- Create: `packages/context/src/ingest/adapters/historic-sql/buckets.ts` +- Create: `packages/context/src/ingest/adapters/historic-sql/buckets.test.ts` +- Modify: `packages/context/src/ingest/index.ts` + +- [ ] **Step 1: Write failing bucket tests** + +Create `packages/context/src/ingest/adapters/historic-sql/buckets.test.ts`: + +```typescript +import { describe, expect, it } from 'vitest'; +import { + bucketDistinctUsers, + bucketErrorRate, + bucketExecutions, + bucketFrequency, + bucketP95Runtime, + bucketRecency, +} from './buckets.js'; + +describe('historic-sql bucket helpers', () => { + it('uses stable execution buckets', () => { + expect([0, 9, 10, 99, 100, 999, 1000, 4999, 5000, 49999, 50000].map(bucketExecutions)).toEqual([ + '<10', + '<10', + '10-100', + '10-100', + '100-1k', + '100-1k', + '1k-5k', + '1k-5k', + '5k-50k', + '5k-50k', + '>50k', + ]); + }); + + it('uses stable distinct-user, error-rate, runtime, and recency buckets', () => { + expect([0, 1, 2, 5, 6, 10, 11].map(bucketDistinctUsers)).toEqual([ + '0', + '1', + '2-5', + '2-5', + '5-10', + '5-10', + '>10', + ]); + expect([0, 0.01, 0.05, 0.2].map(bucketErrorRate)).toEqual(['none', 'low', 'low', 'high']); + expect([null, 99, 100, 999, 1000, 9999, 10000].map(bucketP95Runtime)).toEqual([ + 'unknown', + '<100ms', + '100ms-1s', + '100ms-1s', + '1s-10s', + '1s-10s', + '>10s', + ]); + expect(bucketRecency('2026-05-11T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('current'); + expect(bucketRecency('2026-04-20T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('recent'); + expect(bucketRecency('2026-01-01T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('stale'); + }); + + it('maps frequency counts to high, mid, and low labels', () => { + expect(bucketFrequency(80, 100)).toBe('high'); + expect(bucketFrequency(20, 100)).toBe('mid'); + expect(bucketFrequency(1, 100)).toBe('low'); + expect(bucketFrequency(0, 0)).toBe('low'); + }); +}); +``` + +- [ ] **Step 2: Run the bucket test to verify it fails** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/buckets.test.ts +``` + +Expected: FAIL because `buckets.js` does not exist. + +- [ ] **Step 3: Add the bucket helper implementation** + +Create `packages/context/src/ingest/adapters/historic-sql/buckets.ts`: + +```typescript +export function bucketExecutions(value: number): string { + if (value < 10) return '<10'; + if (value < 100) return '10-100'; + if (value < 1000) return '100-1k'; + if (value < 5000) return '1k-5k'; + if (value < 50000) return '5k-50k'; + return '>50k'; +} + +export function bucketDistinctUsers(value: number): string { + if (value <= 0) return '0'; + if (value === 1) return '1'; + if (value <= 5) return '2-5'; + if (value <= 10) return '5-10'; + return '>10'; +} + +export function bucketErrorRate(value: number): string { + if (value <= 0) return 'none'; + if (value < 0.1) return 'low'; + return 'high'; +} + +export function bucketP95Runtime(value: number | null): string { + if (value === null) return 'unknown'; + if (value < 100) return '<100ms'; + if (value < 1000) return '100ms-1s'; + if (value < 10000) return '1s-10s'; + return '>10s'; +} + +export function bucketRecency(lastSeen: string, now: Date): string { + const parsed = new Date(lastSeen); + if (Number.isNaN(parsed.getTime())) { + return 'unknown'; + } + const ageDays = (now.getTime() - parsed.getTime()) / (24 * 60 * 60 * 1000); + if (ageDays <= 7) return 'current'; + if (ageDays <= 45) return 'recent'; + return 'stale'; +} + +export function bucketFrequency(count: number, total: number): 'high' | 'mid' | 'low' { + if (total <= 0 || count <= 0) return 'low'; + const ratio = count / total; + if (ratio >= 0.5) return 'high'; + if (ratio >= 0.1) return 'mid'; + return 'low'; +} +``` + +- [ ] **Step 4: Run the bucket test** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/buckets.test.ts +``` + +Expected: PASS. + +- [ ] **Step 5: Export bucket helpers** + +In `packages/context/src/ingest/index.ts`, add: + +```typescript +export { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketP95Runtime, bucketRecency } from './adapters/historic-sql/buckets.js'; +``` + +- [ ] **Step 6: Commit** + +```bash +git add packages/context/src/ingest/adapters/historic-sql/buckets.ts packages/context/src/ingest/adapters/historic-sql/buckets.test.ts packages/context/src/ingest/index.ts +git commit -m "feat: add historic sql bucket helpers" +``` + +## Task 3: Stage Aggregated Snapshots + +**Files:** +- Create: `packages/context/src/ingest/adapters/historic-sql/stage-unified.ts` +- Create: `packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts` +- Modify: `packages/context/src/ingest/index.ts` + +- [ ] **Step 1: Write failing staged-artifact tests** + +Create `packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts`: + +```typescript +import { mkdtemp, readFile, readdir } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { describe, expect, it, vi } from 'vitest'; +import type { SqlAnalysisPort } from '../../../sql-analysis/index.js'; +import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js'; +import type { AggregatedTemplate, HistoricSqlReader } from './types.js'; + +async function tempDir(): Promise { + return mkdtemp(join(tmpdir(), 'historic-sql-unified-stage-')); +} + +async function readJson(root: string, relPath: string): Promise { + return JSON.parse(await readFile(join(root, relPath), 'utf-8')) as T; +} + +function aggregate(overrides: Partial & { templateId: string; canonicalSql: string }): AggregatedTemplate { + return { + templateId: overrides.templateId, + canonicalSql: overrides.canonicalSql, + dialect: overrides.dialect ?? 'postgres', + stats: overrides.stats ?? { + executions: 42, + distinctUsers: 3, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 20, + p95RuntimeMs: 80, + errorRate: 0, + rowsProduced: 100, + }, + topUsers: overrides.topUsers ?? [{ user: 'analyst', executions: 40 }], + }; +} + +describe('stageHistoricSqlAggregatedSnapshot', () => { + it('batch parses templates and writes stable table and patterns artifacts', async () => { + const stagedDir = await tempDir(); + const reader: HistoricSqlReader = { + async probe() { + return { warnings: ['pg_stat_statements.max is low; aggregation still proceeds'] }; + }, + async *fetchAggregated() { + yield aggregate({ + templateId: 'orders-by-status', + canonicalSql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status', + }); + yield aggregate({ + templateId: 'service-account-only', + canonicalSql: 'select * from public.orders where id = $1', + stats: { + executions: 20, + distinctUsers: 1, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 5, + p95RuntimeMs: 10, + errorRate: 0, + rowsProduced: 1, + }, + topUsers: [{ user: 'svc_loader', executions: 20 }], + }); + yield aggregate({ + templateId: 'bad-parse', + canonicalSql: 'select broken from', + }); + }, + }; + const sqlAnalysis: SqlAnalysisPort = { + analyzeForFingerprint: vi.fn(), + analyzeBatch: vi.fn(async () => new Map([ + [ + 'orders-by-status', + { + tablesTouched: ['public.orders', 'public.customers'], + columnsByClause: { + select: ['status'], + where: ['created_at'], + join: ['customer_id'], + groupBy: ['status'], + }, + }, + ], + ['bad-parse', { tablesTouched: [], columnsByClause: {}, error: 'parse failed' }], + ])), + }; + + await stageHistoricSqlAggregatedSnapshot({ + stagedDir, + connectionId: 'warehouse', + queryClient: {}, + reader, + sqlAnalysis, + pullConfig: { + dialect: 'postgres', + filters: { + serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' }, + }, + }, + now: new Date('2026-05-11T12:00:00.000Z'), + }); + + expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledTimes(1); + expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith( + [ + { + id: 'orders-by-status', + sql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status', + }, + { id: 'bad-parse', sql: 'select broken from' }, + ], + 'postgres', + ); + + expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.customers.json', 'public.orders.json']); + + const manifest = await readJson>(stagedDir, 'manifest.json'); + expect(manifest).toMatchObject({ + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + snapshotRowCount: 3, + touchedTableCount: 2, + parseFailures: 1, + warnings: ['parse_failed:bad-parse'], + probeWarnings: ['pg_stat_statements.max is low; aggregation still proceeds'], + }); + + const orders = await readJson>(stagedDir, 'tables/public.orders.json'); + expect(orders).toMatchObject({ + table: 'public.orders', + stats: { + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + errorRateBucket: 'none', + p95RuntimeBucket: '<100ms', + recencyBucket: 'current', + }, + columnsByClause: { + select: [['status', 'high']], + where: [['created_at', 'high']], + join: [['customer_id', 'high']], + groupBy: [['status', 'high']], + }, + observedJoins: [{ withTable: 'public.customers', on: ['customer_id'], freq: 'high' }], + topTemplates: [ + { + id: 'orders-by-status', + topUsers: [{ user: 'analyst' }], + }, + ], + }); + expect(orders.topTemplates[0].canonicalSql).toContain('group by o.status'); + + const patterns = await readJson>(stagedDir, 'patterns-input.json'); + expect(patterns.templates).toEqual([ + { + id: 'orders-by-status', + canonicalSql: expect.stringContaining('public.orders'), + tablesTouched: ['public.customers', 'public.orders'], + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + dialect: 'postgres', + }, + ]); + }); +}); +``` + +- [ ] **Step 2: Run the stage test to verify it fails** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/stage-unified.test.ts +``` + +Expected: FAIL because `stage-unified.js` does not exist. + +- [ ] **Step 3: Add the unified stager** + +Create `packages/context/src/ingest/adapters/historic-sql/stage-unified.ts` with these exported shapes and helpers: + +```typescript +import { mkdir, writeFile } from 'node:fs/promises'; +import { dirname, join } from 'node:path'; +import type { SqlAnalysisPort } from '../../../sql-analysis/index.js'; +import { + bucketDistinctUsers, + bucketErrorRate, + bucketExecutions, + bucketFrequency, + bucketP95Runtime, + bucketRecency, +} from './buckets.js'; +import { + HISTORIC_SQL_SOURCE_KEY, + aggregatedTemplateSchema, + historicSqlUnifiedPullConfigSchema, + type AggregatedTemplate, + type HistoricSqlReader, + type HistoricSqlUnifiedPullConfig, + type StagedPatternsInput, + type StagedTableInput, +} from './types.js'; + +interface StageHistoricSqlAggregatedSnapshotInput { + stagedDir: string; + connectionId: string; + queryClient: unknown; + reader: HistoricSqlReader; + sqlAnalysis: SqlAnalysisPort; + pullConfig: unknown; + now?: Date; +} + +interface ParsedTemplate { + template: AggregatedTemplate; + tablesTouched: string[]; + columnsByClause: Record; +} + +interface TableAccumulator { + table: string; + executions: number; + distinctUsers: number; + errorRateNumerator: number; + p95RuntimeMs: number | null; + lastSeen: string; + columnsByClause: Map>; + observedJoins: Map>; + topTemplates: AggregatedTemplate[]; +} + +const TRIVIAL_SQL_RE = /^\s*SELECT\s+(1|NOW\(\)|CURRENT_TIMESTAMP|VERSION\(\))\s*;?\s*$/i; +const NOISE_PREFIX_RE = /^\s*(SHOW|DESCRIBE|DESC|EXPLAIN|USE|SET)\b/i; +const SYSTEM_TABLE_RE = /\b(INFORMATION_SCHEMA|SNOWFLAKE\.ACCOUNT_USAGE|pg_|system\.)/i; +const ORCHESTRATOR_RE = /\b(dbt|looker|metabase)\b/i; + +function writeJson(root: string, relPath: string, value: unknown): Promise { + const target = join(root, relPath); + return mkdir(dirname(target), { recursive: true }).then(() => + writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'), + ); +} + +function compilePatterns(patterns: string[]): RegExp[] { + return patterns.map((pattern) => new RegExp(pattern)); +} + +function matchesAny(value: string | null, patterns: RegExp[]): boolean { + return !!value && patterns.some((pattern) => pattern.test(value)); +} + +function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean { + if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true; + if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true; + return false; +} + +function shouldDropByUsers(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { + const service = config.filters.serviceAccounts; + if (!service || service.mode === 'mark-only' || service.patterns.length === 0) return false; + const patterns = compilePatterns(service.patterns); + const matchingExecutions = template.topUsers + .filter((entry) => matchesAny(entry.user, patterns)) + .reduce((sum, entry) => sum + entry.executions, 0); + const allExecutions = template.topUsers.reduce((sum, entry) => sum + entry.executions, 0); + const serviceOnly = allExecutions > 0 && matchingExecutions >= allExecutions; + return service.mode === 'exclude' ? serviceOnly : !serviceOnly; +} + +function shouldDropByFailure(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { + const failed = config.filters.dropFailedBelow; + return !!failed && template.stats.errorRate > failed.errorRate && template.stats.executions < failed.executions; +} + +function shouldDropTemplate(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean { + if (shouldDropBySql(template.canonicalSql, config)) return true; + if (shouldDropByUsers(template, config)) return true; + if (shouldDropByFailure(template, config)) return true; + return false; +} + +function recordColumn(acc: TableAccumulator, clause: string, column: string, executions: number): void { + const byColumn = acc.columnsByClause.get(clause) ?? new Map(); + byColumn.set(column, (byColumn.get(column) ?? 0) + executions); + acc.columnsByClause.set(clause, byColumn); +} + +function recordJoin(acc: TableAccumulator, otherTable: string, columns: string[], executions: number): void { + const byColumns = acc.observedJoins.get(otherTable) ?? new Map(); + const key = [...new Set(columns)].sort().join(','); + if (key.length > 0) { + byColumns.set(key, (byColumns.get(key) ?? 0) + executions); + acc.observedJoins.set(otherTable, byColumns); + } +} + +function accumulatorFor(table: string): TableAccumulator { + return { + table, + executions: 0, + distinctUsers: 0, + errorRateNumerator: 0, + p95RuntimeMs: null, + lastSeen: '1970-01-01T00:00:00.000Z', + columnsByClause: new Map(), + observedJoins: new Map(), + topTemplates: [], + }; +} + +function addTemplate(acc: TableAccumulator, parsed: ParsedTemplate): void { + const executions = parsed.template.stats.executions; + acc.executions += executions; + acc.distinctUsers = Math.max(acc.distinctUsers, parsed.template.stats.distinctUsers); + acc.errorRateNumerator += parsed.template.stats.errorRate * executions; + acc.p95RuntimeMs = + acc.p95RuntimeMs === null + ? parsed.template.stats.p95RuntimeMs + : parsed.template.stats.p95RuntimeMs === null + ? acc.p95RuntimeMs + : Math.max(acc.p95RuntimeMs, parsed.template.stats.p95RuntimeMs); + acc.lastSeen = parsed.template.stats.lastSeen > acc.lastSeen ? parsed.template.stats.lastSeen : acc.lastSeen; + for (const [clause, columns] of Object.entries(parsed.columnsByClause)) { + for (const column of columns) { + recordColumn(acc, clause, column, executions); + } + } + const joinColumns = parsed.columnsByClause.join ?? []; + for (const otherTable of parsed.tablesTouched.filter((table) => table !== acc.table)) { + recordJoin(acc, otherTable, joinColumns, executions); + } + acc.topTemplates.push(parsed.template); +} +``` + +In the same file, add the staging function: + +```typescript +function toStagedTable(acc: TableAccumulator, now: Date): StagedTableInput { + const errorRate = acc.executions > 0 ? acc.errorRateNumerator / acc.executions : 0; + const columnsByClause = Object.fromEntries( + [...acc.columnsByClause.entries()] + .sort(([left], [right]) => left.localeCompare(right)) + .map(([clause, counts]) => [ + clause, + [...counts.entries()] + .sort((left, right) => right[1] - left[1] || left[0].localeCompare(right[0])) + .map(([column, count]) => [column, bucketFrequency(count, acc.executions)]), + ]), + ); + const observedJoins = [...acc.observedJoins.entries()] + .flatMap(([withTable, byColumns]) => + [...byColumns.entries()].map(([columns, count]) => ({ + withTable, + on: columns.split(',').filter(Boolean), + freq: bucketFrequency(count, acc.executions), + })), + ) + .sort((left, right) => left.withTable.localeCompare(right.withTable) || left.on.join(',').localeCompare(right.on.join(','))); + const topTemplates = [...acc.topTemplates] + .sort((left, right) => right.stats.executions - left.stats.executions || left.templateId.localeCompare(right.templateId)) + .slice(0, 5) + .map((template) => ({ + id: template.templateId, + canonicalSql: template.canonicalSql, + topUsers: template.topUsers.slice(0, 5).map((entry) => ({ user: entry.user })), + })); + + return { + table: acc.table, + stats: { + executionsBucket: bucketExecutions(acc.executions), + distinctUsersBucket: bucketDistinctUsers(acc.distinctUsers), + errorRateBucket: bucketErrorRate(errorRate), + p95RuntimeBucket: bucketP95Runtime(acc.p95RuntimeMs), + recencyBucket: bucketRecency(acc.lastSeen, now), + }, + columnsByClause, + observedJoins, + topTemplates, + }; +} + +function toPatternsInput(parsedTemplates: ParsedTemplate[]): StagedPatternsInput { + return { + templates: parsedTemplates + .map(({ template, tablesTouched }) => ({ + id: template.templateId, + canonicalSql: template.canonicalSql, + tablesTouched: [...tablesTouched].sort(), + executionsBucket: bucketExecutions(template.stats.executions), + distinctUsersBucket: bucketDistinctUsers(template.stats.distinctUsers), + dialect: template.dialect, + })) + .sort((left, right) => left.id.localeCompare(right.id)), + }; +} + +export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSqlAggregatedSnapshotInput): Promise { + const config = historicSqlUnifiedPullConfigSchema.parse(input.pullConfig); + const now = input.now ?? new Date(); + const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000); + const probe = await input.reader.probe(input.queryClient); + const snapshot: AggregatedTemplate[] = []; + + for await (const row of input.reader.fetchAggregated(input.queryClient, { start: windowStart, end: now }, config)) { + const parsed = aggregatedTemplateSchema.parse(row); + if (!shouldDropTemplate(parsed, config)) { + snapshot.push(parsed); + } + } + + const analysis = await input.sqlAnalysis.analyzeBatch( + snapshot.map((template) => ({ id: template.templateId, sql: template.canonicalSql })), + config.dialect, + ); + const warnings: string[] = []; + const parsedTemplates: ParsedTemplate[] = []; + for (const template of snapshot) { + const parsed = analysis.get(template.templateId); + if (!parsed || parsed.error) { + warnings.push(`parse_failed:${template.templateId}`); + continue; + } + const tablesTouched = [...new Set(parsed.tablesTouched)].filter((table) => table.length > 0).sort(); + if (tablesTouched.length === 0) { + continue; + } + parsedTemplates.push({ + template, + tablesTouched, + columnsByClause: Object.fromEntries( + Object.entries(parsed.columnsByClause).map(([clause, columns]) => [clause, [...new Set(columns)].sort()]), + ), + }); + } + + const byTable = new Map(); + for (const parsed of parsedTemplates) { + for (const table of parsed.tablesTouched) { + const acc = byTable.get(table) ?? accumulatorFor(table); + addTemplate(acc, parsed); + byTable.set(table, acc); + } + } + + await mkdir(input.stagedDir, { recursive: true }); + for (const [table, acc] of [...byTable.entries()].sort(([left], [right]) => left.localeCompare(right))) { + await writeJson(input.stagedDir, `tables/${table}.json`, toStagedTable(acc, now)); + } + await writeJson(input.stagedDir, 'patterns-input.json', toPatternsInput(parsedTemplates)); + await writeJson(input.stagedDir, 'manifest.json', { + source: HISTORIC_SQL_SOURCE_KEY, + connectionId: input.connectionId, + dialect: config.dialect, + fetchedAt: now.toISOString(), + windowStart: windowStart.toISOString(), + windowEnd: now.toISOString(), + snapshotRowCount: snapshot.length, + touchedTableCount: byTable.size, + parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length, + warnings, + probeWarnings: probe.warnings, + }); +} +``` + +- [ ] **Step 4: Run the staged-artifact test** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/stage-unified.test.ts +``` + +Expected: PASS. + +- [ ] **Step 5: Export the unified stager** + +In `packages/context/src/ingest/index.ts`, add: + +```typescript +export { stageHistoricSqlAggregatedSnapshot } from './adapters/historic-sql/stage-unified.js'; +``` + +- [ ] **Step 6: Commit** + +```bash +git add packages/context/src/ingest/adapters/historic-sql/stage-unified.ts packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts packages/context/src/ingest/index.ts +git commit -m "feat: stage historic sql aggregate snapshots" +``` + +## Task 4: Add Aggregate Readers + +**Files:** +- Create: `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts` +- Create: `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts` +- Modify: `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts` +- Modify: `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts` +- Modify: `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts` +- Modify: `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts` +- Modify: `packages/context/src/ingest/index.ts` + +- [ ] **Step 1: Write failing Postgres aggregate reader tests** + +Create `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts`: + +```typescript +import { describe, expect, it, vi } from 'vitest'; +import { PostgresPgssReader } from './postgres-pgss-reader.js'; + +describe('PostgresPgssReader aggregate path', () => { + it('aggregates pg_stat_statements rows by queryid and query', async () => { + const executeQuery = vi.fn(async (sql: string, params?: unknown[]) => { + if (sql.includes('pg_stat_statements_info')) { + return { headers: ['stats_reset', 'dealloc'], rows: [['2026-05-01T00:00:00.000Z', 1]] }; + } + expect(sql).toContain('GROUP BY queryid, query'); + expect(sql).toContain('HAVING SUM(calls) >= $1'); + expect(params).toEqual([5]); + return { + headers: ['template_id', 'canonical_sql', 'executions', 'distinct_users', 'mean_ms', 'rows_produced', 'top_users'], + rows: [ + [ + '123', + 'select status from public.orders', + '42', + '3', + '11.5', + '100', + JSON.stringify([{ user: 'analyst', executions: 40 }]), + ], + ], + }; + }); + + const reader = new PostgresPgssReader(); + const rows = []; + for await (const row of reader.fetchAggregated( + { executeQuery }, + { start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') }, + { dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 }, + )) { + rows.push(row); + } + + expect(rows).toEqual([ + { + templateId: '123', + canonicalSql: 'select status from public.orders', + dialect: 'postgres', + stats: { + executions: 42, + distinctUsers: 3, + firstSeen: '2026-05-01T00:00:00.000Z', + lastSeen: '2026-05-11T00:00:00.000Z', + p50RuntimeMs: 11.5, + p95RuntimeMs: 11.5, + errorRate: 0, + rowsProduced: 100, + }, + topUsers: [{ user: 'analyst', executions: 40 }], + }, + ]); + }); +}); +``` + +- [ ] **Step 2: Add failing BigQuery and Snowflake aggregate assertions** + +In `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts`, add a test that constructs `new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'demo', region: 'us' })`, calls `fetchAggregated()`, and asserts the SQL contains: + +```typescript +expect(sql).toContain('COUNT(*) AS executions'); +expect(sql).toContain('COUNT(DISTINCT user_email) AS distinct_users'); +expect(sql).toContain('GROUP BY query_hash'); +expect(sql).toContain('HAVING COUNT(*) >= 5'); +``` + +Map one returned row with headers: + +```typescript +[ + 'template_id', + 'canonical_sql', + 'executions', + 'distinct_users', + 'first_seen', + 'last_seen', + 'p50_ms', + 'p95_ms', + 'error_rate', + 'rows_produced', + 'top_users', +] +``` + +and assert `templateId`, `stats.executions`, `stats.errorRate`, and `topUsers` match the row. + +In `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts`, add the same shape but assert the SQL contains: + +```typescript +expect(sql).toContain('SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'); +expect(sql).toContain('COUNT(*) AS executions'); +expect(sql).toContain('GROUP BY query_hash'); +expect(sql).toContain('HAVING COUNT(*) >= 5'); +``` + +- [ ] **Step 3: Run aggregate reader tests to verify they fail** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts +``` + +Expected: FAIL because `fetchAggregated()` and `postgres-pgss-reader.js` do not exist. + +- [ ] **Step 4: Implement the aggregate reader methods** + +Create `packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts` with the same probe behavior currently implemented in `postgres-pgss-query-history-reader.ts`: `queryClient`, `execute`, `indexByHeader`, `value`, `nullableString`, `requiredString`, `requiredFiniteNumber`, `nullableInteger`, `nullableIsoTimestamp`, `firstRow`, `extensionMissingError`, and `grantsMissingError` keep their current behavior. Add this aggregate query and row mapper: + +```typescript +const AGGREGATE_SQL = ` +SELECT queryid::text AS template_id, + query AS canonical_sql, + SUM(calls)::bigint AS executions, + COUNT(DISTINCT userid) AS distinct_users, + SUM(total_exec_time) / NULLIF(SUM(calls), 0) AS mean_ms, + SUM(rows)::bigint AS rows_produced, + COALESCE( + json_agg(json_build_object('user', rolname, 'executions', calls) ORDER BY calls DESC) + FILTER (WHERE userid IS NOT NULL), + '[]'::json + )::text AS top_users +FROM pg_stat_statements +LEFT JOIN pg_roles ON pg_roles.oid = pg_stat_statements.userid +WHERE toplevel = true +GROUP BY queryid, query +HAVING SUM(calls) >= $1 +ORDER BY SUM(total_exec_time) DESC +`.trim(); +``` + +The `fetchAggregated()` method must: + +```typescript + async *fetchAggregated( + client: unknown, + window: HistoricSqlTimeWindow, + config: HistoricSqlUnifiedPullConfig, + ): AsyncIterable { + const pgClient = queryClient(client); + const statsResult = await execute(pgClient, STATS_INFO_SQL); + const { row: statsRow, headers: statsHeaders } = firstRow(statsResult, 'stats-info'); + const firstSeen = nullableIsoTimestamp(value(statsRow, statsHeaders, 'stats_reset')) ?? window.start.toISOString(); + const result = await execute(pgClient, AGGREGATE_SQL, [config.minExecutions]); + const indexes = indexByHeader(result.headers); + for (const row of result.rows) { + yield aggregatedTemplateSchema.parse({ + templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'), + canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'), + dialect: 'postgres', + stats: { + executions: requiredInteger(value(row, indexes, 'executions'), 'executions'), + distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'), + firstSeen, + lastSeen: window.end.toISOString(), + p50RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')), + p95RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')), + errorRate: 0, + rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')), + }, + topUsers: parseTopUsers(value(row, indexes, 'top_users')), + }); + } + } +``` + +In `packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts`, add this aggregate query inside `fetchAggregated()`: + +```typescript +const sql = ` +SELECT + query_hash AS template_id, + MIN(query) AS canonical_sql, + COUNT(*) AS executions, + COUNT(DISTINCT user_email) AS distinct_users, + MIN(creation_time) AS first_seen, + MAX(creation_time) AS last_seen, + APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(50)] AS p50_ms, + APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(95)] AS p95_ms, + SAFE_DIVIDE(COUNTIF(error_result IS NOT NULL), COUNT(*)) AS error_rate, + CAST(NULL AS INT64) AS rows_produced, + TO_JSON_STRING(ARRAY_AGG(STRUCT(user_email AS user, 1 AS executions) ORDER BY creation_time DESC LIMIT 5)) AS top_users +FROM ${this.viewPath} +WHERE job_type = 'QUERY' + AND statement_type IN ('SELECT', 'MERGE') + AND creation_time >= ${timestampExpression(window.start)} + AND creation_time < ${timestampExpression(window.end)} + AND query IS NOT NULL +GROUP BY query_hash +HAVING COUNT(*) >= ${config.minExecutions} +ORDER BY executions DESC`.trim(); +``` + +Map each result row into `aggregatedTemplateSchema.parse({ templateId, canonicalSql, dialect: 'bigquery', stats: { executions, distinctUsers, firstSeen, lastSeen, p50RuntimeMs, p95RuntimeMs, errorRate, rowsProduced }, topUsers })`, where `topUsers` is parsed from the `top_users` JSON string and invalid JSON becomes `[]`. + +In `packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts`, add this aggregate query inside `fetchAggregated()`: + +```typescript +const sql = ` +SELECT + query_hash AS template_id, + MIN(query_text) AS canonical_sql, + COUNT(*) AS executions, + COUNT(DISTINCT user_name) AS distinct_users, + MIN(start_time) AS first_seen, + MAX(start_time) AS last_seen, + APPROX_PERCENTILE(total_elapsed_time, 0.50) AS p50_ms, + APPROX_PERCENTILE(total_elapsed_time, 0.95) AS p95_ms, + DIV0(COUNT_IF(execution_status != 'SUCCESS'), COUNT(*)) AS error_rate, + SUM(rows_produced) AS rows_produced, + ARRAY_AGG(OBJECT_CONSTRUCT('user', user_name, 'executions', 1)) WITHIN GROUP (ORDER BY start_time DESC)::string AS top_users +FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY +WHERE query_text IS NOT NULL + AND query_type IN ('SELECT', 'MERGE') + AND start_time >= ${timestampLiteral(window.start)} + AND start_time < ${timestampLiteral(window.end)} +GROUP BY query_hash +HAVING COUNT(*) >= ${config.minExecutions} +ORDER BY executions DESC`.trim(); +``` + +Map each result row into `aggregatedTemplateSchema.parse({ templateId, canonicalSql, dialect: 'snowflake', stats: { executions, distinctUsers, firstSeen, lastSeen, p50RuntimeMs, p95RuntimeMs, errorRate, rowsProduced }, topUsers })`, where `topUsers` is parsed from the `top_users` JSON string and invalid JSON becomes `[]`. Keep the existing `fetch()` methods unchanged in this plan so current adapter behavior does not move before the skill/projection cutover. + +- [ ] **Step 5: Export the new Postgres reader** + +In `packages/context/src/ingest/index.ts`, add: + +```typescript +export { PostgresPgssReader } from './adapters/historic-sql/postgres-pgss-reader.js'; +``` + +- [ ] **Step 6: Run aggregate reader tests** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts +``` + +Expected: PASS. + +- [ ] **Step 7: Commit** + +```bash +git add packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts packages/context/src/ingest/index.ts +git commit -m "feat: add historic sql aggregate readers" +``` + +## Task 5: Add Unified Chunking + +**Files:** +- Create: `packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts` +- Create: `packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts` +- Modify: `packages/context/src/ingest/index.ts` + +- [ ] **Step 1: Write failing unified chunk tests** + +Create `packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts`: + +```typescript +import { mkdir, mkdtemp, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { describe, expect, it } from 'vitest'; +import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js'; + +async function tempDir(): Promise { + return mkdtemp(join(tmpdir(), 'historic-sql-unified-chunk-')); +} + +async function writeJson(root: string, relPath: string, value: unknown): Promise { + const target = join(root, relPath); + await mkdir(join(target, '..'), { recursive: true }); + await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'); +} + +async function writeUnifiedStagedDir(root: string): Promise { + await writeJson(root, 'manifest.json', { + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + fetchedAt: '2026-05-11T00:00:00.000Z', + windowStart: '2026-02-10T00:00:00.000Z', + windowEnd: '2026-05-11T00:00:00.000Z', + snapshotRowCount: 1, + touchedTableCount: 1, + parseFailures: 0, + warnings: [], + probeWarnings: [], + }); + await writeJson(root, 'tables/public.orders.json', { + table: 'public.orders', + stats: { + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + errorRateBucket: 'none', + p95RuntimeBucket: '<100ms', + recencyBucket: 'current', + }, + columnsByClause: { select: [['status', 'high']] }, + observedJoins: [], + topTemplates: [{ id: 'orders', canonicalSql: 'select * from public.orders', topUsers: [{ user: 'analyst' }] }], + }); + await writeJson(root, 'patterns-input.json', { + templates: [ + { + id: 'orders', + canonicalSql: 'select * from public.orders', + tablesTouched: ['public.orders'], + executionsBucket: '10-100', + distinctUsersBucket: '2-5', + dialect: 'postgres', + }, + ], + }); +} + +describe('chunkHistoricSqlUnifiedStagedDir', () => { + it('emits one table WorkUnit plus one patterns WorkUnit', async () => { + const stagedDir = await tempDir(); + await writeUnifiedStagedDir(stagedDir); + + const result = await chunkHistoricSqlUnifiedStagedDir(stagedDir); + + expect(result.workUnits).toEqual([ + expect.objectContaining({ + unitKey: 'historic-sql-table-public-orders', + displayLabel: 'Historic SQL usage: public.orders', + rawFiles: ['tables/public.orders.json'], + dependencyPaths: ['manifest.json'], + notes: expect.stringContaining('historic_sql_table_digest'), + }), + expect.objectContaining({ + unitKey: 'historic-sql-patterns', + displayLabel: 'Historic SQL cross-table patterns', + rawFiles: ['patterns-input.json'], + dependencyPaths: ['manifest.json'], + notes: expect.stringContaining('historic_sql_patterns'), + }), + ]); + expect(result.reconcileNotes).toEqual(['Historic-SQL touched tables=1 parseFailures=0']); + }); + + it('respects diff sets for unchanged table and patterns files', async () => { + const stagedDir = await tempDir(); + await writeUnifiedStagedDir(stagedDir); + + await expect( + chunkHistoricSqlUnifiedStagedDir(stagedDir, { + added: [], + modified: ['tables/public.orders.json'], + deleted: [], + unchanged: ['manifest.json', 'patterns-input.json'], + }), + ).resolves.toMatchObject({ + workUnits: [expect.objectContaining({ unitKey: 'historic-sql-table-public-orders' })], + }); + + await expect( + chunkHistoricSqlUnifiedStagedDir(stagedDir, { + added: [], + modified: ['patterns-input.json'], + deleted: [], + unchanged: ['manifest.json', 'tables/public.orders.json'], + }), + ).resolves.toMatchObject({ + workUnits: [expect.objectContaining({ unitKey: 'historic-sql-patterns' })], + }); + }); + + it('describes unified staged scope', async () => { + const stagedDir = await tempDir(); + await writeUnifiedStagedDir(stagedDir); + + const scope = await describeHistoricSqlUnifiedScope(stagedDir); + + expect(scope.isPathInScope('manifest.json')).toBe(true); + expect(scope.isPathInScope('patterns-input.json')).toBe(true); + expect(scope.isPathInScope('tables/public.orders.json')).toBe(true); + expect(scope.isPathInScope('templates/old/page.md')).toBe(false); + }); +}); +``` + +- [ ] **Step 2: Run the unified chunk tests to verify they fail** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/chunk-unified.test.ts +``` + +Expected: FAIL because `chunk-unified.js` does not exist. + +- [ ] **Step 3: Add the unified chunker** + +Create `packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts`: + +```typescript +import { createHash } from 'node:crypto'; +import { readFile, readdir } from 'node:fs/promises'; +import { join, relative } from 'node:path'; +import type { ChunkResult, DiffSet, ScopeDescriptor, WorkUnit } from '../../types.js'; +import { stagedManifestSchema, stagedPatternsInputSchema, stagedTableInputSchema } from './types.js'; + +async function walk(root: string): Promise { + const entries = await readdir(root, { withFileTypes: true, recursive: true }); + return entries + .filter((entry) => entry.isFile()) + .map((entry) => relative(root, join(entry.parentPath, entry.name)).replace(/\\/g, '/')) + .sort(); +} + +async function readJson(stagedDir: string, relPath: string): Promise { + return JSON.parse(await readFile(join(stagedDir, relPath), 'utf-8')) as T; +} + +function safeUnitKey(value: string): string { + return value.replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, ''); +} + +function touchedPath(path: string, touched: Set | null): boolean { + return !touched || touched.has(path); +} + +export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSet?: DiffSet): Promise { + const files = await walk(stagedDir); + const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json')); + const touched = diffSet ? new Set([...diffSet.added, ...diffSet.modified]) : null; + const workUnits: WorkUnit[] = []; + + for (const path of files.filter((file) => /^tables\/.+\.json$/.test(file))) { + if (!touchedPath(path, touched)) { + continue; + } + const table = stagedTableInputSchema.parse(await readJson(stagedDir, path)); + workUnits.push({ + unitKey: `historic-sql-table-${safeUnitKey(table.table)}`, + displayLabel: `Historic SQL usage: ${table.table}`, + rawFiles: [path], + dependencyPaths: ['manifest.json'], + peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(), + notes: + 'Use historic_sql_table_digest. Read this table usage JSON and the existing semantic-layer source for the table; output only table usage evidence shaped like tableUsageOutputSchema.', + }); + } + + if (files.includes('patterns-input.json') && touchedPath('patterns-input.json', touched)) { + stagedPatternsInputSchema.parse(await readJson(stagedDir, 'patterns-input.json')); + workUnits.push({ + unitKey: 'historic-sql-patterns', + displayLabel: 'Historic SQL cross-table patterns', + rawFiles: ['patterns-input.json'], + dependencyPaths: ['manifest.json'], + peerFileIndex: files.filter((file) => file !== 'patterns-input.json' && file !== 'manifest.json').sort(), + notes: + 'Use historic_sql_patterns. Read patterns-input.json and produce cross-table pattern evidence shaped like patternsArraySchema.', + }); + } + + const deleted = diffSet?.deleted.filter((path) => path === 'patterns-input.json' || /^tables\/.+\.json$/.test(path)).sort(); + return { + workUnits, + eviction: deleted && deleted.length > 0 ? { deletedRawPaths: deleted } : undefined, + reconcileNotes: [`Historic-SQL touched tables=${manifest.touchedTableCount} parseFailures=${manifest.parseFailures}`], + contextReport: { + capped: false, + warnings: [...manifest.probeWarnings, ...manifest.warnings], + }, + }; +} + +export async function describeHistoricSqlUnifiedScope(stagedDir: string): Promise { + const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json')); + const fingerprint = createHash('sha256') + .update(JSON.stringify({ + connectionId: manifest.connectionId, + dialect: manifest.dialect, + windowStart: manifest.windowStart, + windowEnd: manifest.windowEnd, + })) + .digest('hex'); + return { + fingerprint, + isPathInScope: (rawPath) => + rawPath === 'manifest.json' || rawPath === 'patterns-input.json' || /^tables\/.+\.json$/.test(rawPath), + }; +} +``` + +- [ ] **Step 4: Run the unified chunk tests** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/chunk-unified.test.ts +``` + +Expected: PASS. + +- [ ] **Step 5: Export the unified chunker** + +In `packages/context/src/ingest/index.ts`, add: + +```typescript +export { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './adapters/historic-sql/chunk-unified.js'; +``` + +- [ ] **Step 6: Commit** + +```bash +git add packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts packages/context/src/ingest/index.ts +git commit -m "feat: chunk historic sql unified staging" +``` + +## Task 6: Verify the Hot Path Slice + +**Files:** +- Modify: files changed in Tasks 1-5 + +- [ ] **Step 1: Run focused historic-SQL tests** + +Run: + +```bash +pnpm --filter @ktx/context exec vitest run \ + src/ingest/adapters/historic-sql/types.test.ts \ + src/ingest/adapters/historic-sql/buckets.test.ts \ + src/ingest/adapters/historic-sql/stage-unified.test.ts \ + src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts \ + src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts \ + src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts \ + src/ingest/adapters/historic-sql/chunk-unified.test.ts \ + src/package-exports.test.ts +``` + +Expected: PASS. + +- [ ] **Step 2: Run type-check for the context package** + +Run: + +```bash +pnpm --filter @ktx/context run type-check +``` + +Expected: PASS. + +- [ ] **Step 3: Confirm legacy production adapter was not switched** + +Run: + +```bash +rg -n "historic_sql_ingest|historic_sql_curator|stagePgStatStatementsTemplates" packages/context/src/ingest/adapters/historic-sql packages/context/skills packages/context/src/ingest/ingest-runtime-assets.test.ts +``` + +Expected: Results still include `historic-sql.adapter.ts`, the old skill files, and runtime-asset tests. This is correct for this plan because the replacement skills and projection are not present yet. + +- [ ] **Step 4: Confirm new hot-path exports exist** + +Run: + +```bash +rg -n "stageHistoricSqlAggregatedSnapshot|chunkHistoricSqlUnifiedStagedDir|PostgresPgssReader|aggregatedTemplateSchema" packages/context/src/ingest/index.ts packages/context/src/ingest/adapters/historic-sql +``` + +Expected: Results include the new stager, chunker, reader, and schemas. + +- [ ] **Step 5: Commit verification fixes only when verification changed files** + +```bash +git status --short +``` + +Expected: no output. If verification forced a fix, run: + +```bash +git add packages/context/src/ingest/adapters/historic-sql packages/context/src/ingest/index.ts packages/context/src/package-exports.test.ts +git commit -m "test: verify historic sql unified hot path" +``` + +## Follow-Up Plan Boundary + +The next plan after this one should switch the production adapter only after it also creates the cold-path pieces: + +- `packages/context/skills/historic_sql_table_digest/SKILL.md` +- `packages/context/skills/historic_sql_patterns/SKILL.md` +- adapter `skillNames` change from `historic_sql_ingest` to the two new skills +- `onPullSucceeded()` projection of table usage into `_schema/{shard}.yaml` +- pattern wiki page projection and slug stability +- one-time cleanup of legacy template wiki pages and PGSS baselines +- deletion of `stage-pgss.ts`, old template staging exports, and old historic-SQL skill assets + +## Self-Review + +Spec coverage: + +- Unified aggregate reader contracts: Task 1 and Task 4. +- Trailing-window aggregate fetch shape: Task 4. +- Batch SQL parse through `SqlAnalysisPort.analyzeBatch()`: Task 3. +- Service-account, trivial query, failed-template, parse-failure, and zero-table filtering: Task 3. +- Bucketed `tables/*.json`, `patterns-input.json`, and `manifest.json`: Task 2 and Task 3. +- WorkUnits for one table file plus patterns input: Task 5. +- Hard production cutover, LLM skills, projection, wiki pages, stale handling, and legacy deletion: explicitly excluded from this plan and listed as the next plan boundary. + +Placeholder scan: + +- No unresolved placeholders are left in task steps. +- Every code-changing task includes concrete test code, implementation code, commands, and expected results. + +Type consistency: + +- `HistoricSqlUnifiedPullConfig`, `AggregatedTemplate`, `StagedTableInput`, `StagedPatternsInput`, and `StagedManifest` are defined in Task 1 and reused consistently by Tasks 3-5. +- `PostgresPgssReader`, `fetchAggregated()`, `stageHistoricSqlAggregatedSnapshot()`, and `chunkHistoricSqlUnifiedStagedDir()` names match exports and test imports. + +Plan complete and saved to `docs/superpowers/plans/2026-05-11-historic-sql-unified-hot-path.md`. Two execution options: + +**1. Subagent-Driven (recommended)** - I dispatch a fresh subagent per task, review between tasks, fast iteration + +**2. Inline Execution** - Execute tasks in this session using executing-plans, batch execution with checkpoints + +Which approach?