63 KiB
Historic SQL Unified Hot Path Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Build the deterministic historic-SQL hot path that reads warehouse-aggregated query templates, batch-parses them once, and writes stable table-bucket and pattern-input staged artifacts.
Architecture: This slice adds the unified reader/stager contracts from the historic-SQL redesign without doing the LLM cold path or projection work. Dialect-specific SQL lives in reader classes; shared TypeScript code filters, batch-parses, bucketizes, and writes manifest.json, tables/*.json, and patterns-input.json. The existing production adapter remains on the legacy path until the follow-up skills/projection cutover can switch it without loading missing skills.
Tech Stack: TypeScript ESM/NodeNext, zod 4, Vitest, SqlAnalysisPort.analyzeBatch(), warehouse query clients.
Starting Point
Spec: docs/superpowers/specs/2026-05-11-historic-sql-redesign-design.md
Plans found that are based on this spec:
docs/superpowers/plans/2026-05-11-historic-sql-foundations.mddocs/superpowers/plans/2026-05-11-historic-sql-search-enrichment.md
Implemented status from this worktree:
2026-05-11-historic-sql-foundations.mdis implemented. Evidence:packages/context/src/ingest/adapters/historic-sql/skill-schemas.ts,SemanticLayerSource.usageinpackages/context/src/sl/types.ts,mergeUsagePreservingExternal()inpackages/context/src/ingest/adapters/live-database/manifest.ts,SqlAnalysisPort.analyzeBatch()inpackages/context/src/sql-analysis/ports.ts, and/sql/analyze-batchinpython/ktx-daemon/src/ktx_daemon/app.py.2026-05-11-historic-sql-search-enrichment.mdis implemented. Evidence:buildSemanticLayerSourceSearchText()indexessource.usageinpackages/context/src/sl/sl-search.service.ts, SQLite FTS returnssnippet()inpackages/context/src/sl/sqlite-sl-sources-index.ts, and local/MCP list results exposefrequencyTierandsnippetinpackages/context/src/sl/local-sl.tsandpackages/context/src/mcp/local-project-ports.ts.
Still not implemented:
packages/context/src/ingest/adapters/historic-sql/stage.tsstill callsSqlAnalysisPort.analyzeForFingerprint()per raw query and emitstemplates/*/{metadata.json,page.md,usage.json}.packages/context/src/ingest/adapters/historic-sql/stage-pgss.tsstill owns Postgres baseline-diff state and writes.ktx/cache/historic-sql/*/pgss-baseline.json.packages/context/src/ingest/adapters/historic-sql/chunk.tsstill emits one WorkUnit per template page forhistoric_sql_ingest.packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.tsstill advertiseshistoric_sql_ingestandhistoric_sql_curator.- Old code strings still exist:
stagePgStatStatementsTemplates,expandCategoricalTemplates,classifySlot, andpgss-baseline.
This plan covers the deterministic hot path from the spec: unified aggregate contracts, aggregate readers, batch parsing, table bucketing, pattern input staging, and a new chunker for the new staged shape. It does not switch HistoricSqlSourceAdapter to the new WorkUnits; the cutover plan must create historic_sql_table_digest, historic_sql_patterns, and projection before changing production skillNames.
File Structure
Create:
packages/context/src/ingest/adapters/historic-sql/types.test.tsLocks the new public zod contracts and the one-releaseminCallstominExecutionsconfig alias.packages/context/src/ingest/adapters/historic-sql/buckets.tsOwns deterministic bucket labels and frequency-tier helpers used by staging.packages/context/src/ingest/adapters/historic-sql/buckets.test.tsLocks stable bucket boundaries so small numeric drift does not churn staged files.packages/context/src/ingest/adapters/historic-sql/stage-unified.tsImplements the new deterministic stager behindstageHistoricSqlAggregatedSnapshot().packages/context/src/ingest/adapters/historic-sql/stage-unified.test.tsTests batch parsing, parse failures, service-account filtering, per-table bucketing, andpatterns-input.json.packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.tsImplements the new Postgres aggregate reader overpg_stat_statements.packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.tsTests the aggregate PGSS query shape, probe warnings, and row mapping.packages/context/src/ingest/adapters/historic-sql/chunk-unified.tsImplements the new chunker fortables/*.jsonpluspatterns-input.json.packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.tsTests table WorkUnits, the patterns WorkUnit, diff filtering, eviction, and scope detection.
Modify:
packages/context/src/ingest/adapters/historic-sql/types.tsAdds aggregate input, staged artifact, reader, and manifest schemas. Keeps legacy exported types until adapter cutover, but marks the new contracts as the target API for the next slice.packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.tsAddsfetchAggregated()while retaining the existingfetch()until the adapter cutover.packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.tsAdds aggregate-query tests.packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.tsAddsfetchAggregated()while retaining the existingfetch()until the adapter cutover.packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.tsAdds aggregate-query tests.packages/context/src/ingest/index.tsExports the new hot-path contracts and helpers.packages/context/src/package-exports.test.tsAsserts the new exports exist without removing old exports in this slice.
Do not modify in this plan:
packages/context/src/ingest/adapters/historic-sql/historic-sql.adapter.tspackages/context/skills/historic_sql_ingest/SKILL.mdpackages/context/skills/historic_sql_curator/SKILL.mdpackages/context/src/ingest/ingest-runtime-assets.test.ts
Those files change in the cutover/projection plan after the replacement skills exist.
Task 1: Add Unified Contracts
Files:
-
Create:
packages/context/src/ingest/adapters/historic-sql/types.test.ts -
Modify:
packages/context/src/ingest/adapters/historic-sql/types.ts -
Modify:
packages/context/src/ingest/index.ts -
Modify:
packages/context/src/package-exports.test.ts -
Step 1: Write failing contract tests
Create packages/context/src/ingest/adapters/historic-sql/types.test.ts:
import { describe, expect, it } from 'vitest';
import {
aggregatedTemplateSchema,
historicSqlUnifiedPullConfigSchema,
stagedManifestSchema,
stagedPatternsInputSchema,
stagedTableInputSchema,
} from './types.js';
describe('historic-sql unified contracts', () => {
it('parses minExecutions and accepts minCalls as a one-release alias', () => {
expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minExecutions: 9 })).toMatchObject({
dialect: 'postgres',
minExecutions: 9,
windowDays: 90,
concurrency: 12,
redactionPatterns: [],
staleArchiveAfterDays: 90,
});
expect(historicSqlUnifiedPullConfigSchema.parse({ dialect: 'postgres', minCalls: 7 }).minExecutions).toBe(7);
});
it('validates aggregate templates from warehouse readers', () => {
const parsed = aggregatedTemplateSchema.parse({
templateId: 'pg:123',
canonicalSql: 'select status, count(*) from public.orders group by status',
dialect: 'postgres',
stats: {
executions: 42,
distinctUsers: 3,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 12.5,
p95RuntimeMs: 40,
errorRate: 0,
rowsProduced: 100,
},
topUsers: [{ user: 'analyst', executions: 40 }],
});
expect(parsed.templateId).toBe('pg:123');
expect(parsed.topUsers).toEqual([{ user: 'analyst', executions: 40 }]);
});
it('validates staged table, patterns, and manifest artifacts', () => {
expect(
stagedTableInputSchema.parse({
table: 'public.orders',
stats: {
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
errorRateBucket: 'none',
p95RuntimeBucket: '<100ms',
recencyBucket: 'current',
},
columnsByClause: {
select: [['status', 'high']],
where: [['created_at', 'mid']],
},
observedJoins: [{ withTable: 'public.customers', on: ['customer_id'], freq: 'high' }],
topTemplates: [{ id: 'pg:123', canonicalSql: 'select * from public.orders', topUsers: [{ user: 'analyst' }] }],
}).table,
).toBe('public.orders');
expect(
stagedPatternsInputSchema.parse({
templates: [
{
id: 'pg:123',
canonicalSql: 'select * from public.orders',
tablesTouched: ['public.orders'],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
},
],
}).templates,
).toHaveLength(1);
expect(
stagedManifestSchema.parse({
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 2,
touchedTableCount: 1,
parseFailures: 1,
warnings: ['parse_failed:bad'],
probeWarnings: [],
}).parseFailures,
).toBe(1);
});
});
Add these assertions near the historic-SQL export assertions in packages/context/src/package-exports.test.ts:
expect(ingest.historicSqlUnifiedPullConfigSchema).toBeDefined();
expect(ingest.aggregatedTemplateSchema).toBeDefined();
expect(ingest.stagedTableInputSchema).toBeDefined();
- Step 2: Run the contract tests to verify they fail
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/types.test.ts src/package-exports.test.ts
Expected: FAIL with missing exports for historicSqlUnifiedPullConfigSchema, aggregatedTemplateSchema, and stagedTableInputSchema.
- Step 3: Add the new schemas and reader contracts
Insert this block immediately after the existing historicSqlPullConfigSchema definition in packages/context/src/ingest/adapters/historic-sql/types.ts. Keep historicSqlPullConfigSchema and HistoricSqlPullConfig unchanged in this plan because the current production adapter still reads lastSuccessfulCursor, maxTemplatesPerRun, and minCalls.
const filterModeSchema = z.enum(['exclude', 'include', 'mark-only']);
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
export const historicSqlUnifiedPullConfigSchema = z.preprocess((value) => {
if (!isRecord(value)) {
return value;
}
if (value.minExecutions === undefined && typeof value.minCalls === 'number') {
return { ...value, minExecutions: value.minCalls };
}
return value;
}, z.object({
dialect: historicSqlDialectSchema,
windowDays: z.number().int().positive().default(90),
minExecutions: z.number().int().nonnegative().default(5),
concurrency: z.number().int().positive().default(12),
filters: z.object({
serviceAccounts: z.object({
patterns: z.array(z.string()).default([]),
mode: filterModeSchema.default('exclude'),
}).optional(),
orchestrators: z.object({
mode: filterModeSchema.default('mark-only'),
}).optional(),
dropTrivialProbes: z.boolean().default(true),
dropFailedBelow: z.object({
errorRate: z.number().min(0).max(1),
executions: z.number().int().nonnegative(),
}).optional(),
}).default({}),
redactionPatterns: z.array(z.string()).default([]),
staleArchiveAfterDays: z.number().int().positive().default(90),
}));
export type HistoricSqlUnifiedPullConfig = z.infer<typeof historicSqlUnifiedPullConfigSchema>;
export const aggregatedTemplateSchema = z.object({
templateId: z.string().min(1),
canonicalSql: z.string().min(1),
dialect: historicSqlDialectSchema,
stats: z.object({
executions: z.number().int().nonnegative(),
distinctUsers: z.number().int().nonnegative(),
firstSeen: z.iso.datetime(),
lastSeen: z.iso.datetime(),
p50RuntimeMs: z.number().nonnegative().nullable(),
p95RuntimeMs: z.number().nonnegative().nullable(),
errorRate: z.number().min(0).max(1),
rowsProduced: z.number().int().nonnegative().nullable(),
}),
topUsers: z.array(z.object({
user: z.string().nullable(),
executions: z.number().int().nonnegative(),
})).default([]),
});
export type AggregatedTemplate = z.infer<typeof aggregatedTemplateSchema>;
export const stagedTableInputSchema = z.object({
table: z.string().min(1),
stats: z.object({
executionsBucket: z.string(),
distinctUsersBucket: z.string(),
errorRateBucket: z.string(),
p95RuntimeBucket: z.string(),
recencyBucket: z.string(),
}),
columnsByClause: z.record(z.string(), z.array(z.tuple([z.string(), z.string()]))),
observedJoins: z.array(z.object({
withTable: z.string(),
on: z.array(z.string()),
freq: z.string(),
})),
topTemplates: z.array(z.object({
id: z.string(),
canonicalSql: z.string(),
topUsers: z.array(z.object({ user: z.string().nullable() })),
})),
});
export type StagedTableInput = z.infer<typeof stagedTableInputSchema>;
export const stagedPatternsInputSchema = z.object({
templates: z.array(z.object({
id: z.string(),
canonicalSql: z.string(),
tablesTouched: z.array(z.string()),
executionsBucket: z.string(),
distinctUsersBucket: z.string(),
dialect: historicSqlDialectSchema,
})),
});
export type StagedPatternsInput = z.infer<typeof stagedPatternsInputSchema>;
export const stagedManifestSchema = z.object({
source: z.literal(HISTORIC_SQL_SOURCE_KEY),
connectionId: z.string().min(1),
dialect: historicSqlDialectSchema,
fetchedAt: z.iso.datetime(),
windowStart: z.iso.datetime(),
windowEnd: z.iso.datetime(),
snapshotRowCount: z.number().int().nonnegative(),
touchedTableCount: z.number().int().nonnegative(),
parseFailures: z.number().int().nonnegative(),
warnings: z.array(z.string()),
probeWarnings: z.array(z.string()),
});
export type StagedManifest = z.infer<typeof stagedManifestSchema>;
export interface HistoricSqlProbeResult {
warnings: string[];
}
export interface HistoricSqlReader {
probe(client: unknown): Promise<HistoricSqlProbeResult>;
fetchAggregated(
client: unknown,
window: HistoricSqlTimeWindow,
config: HistoricSqlUnifiedPullConfig,
): AsyncIterable<AggregatedTemplate>;
}
- Step 4: Export the new contracts
In packages/context/src/ingest/index.ts, add exports for the new types and schemas:
export type {
AggregatedTemplate,
HistoricSqlProbeResult,
HistoricSqlReader,
HistoricSqlUnifiedPullConfig,
StagedManifest,
StagedPatternsInput,
StagedTableInput,
} from './adapters/historic-sql/types.js';
export {
aggregatedTemplateSchema,
historicSqlUnifiedPullConfigSchema,
stagedManifestSchema,
stagedPatternsInputSchema,
stagedTableInputSchema,
} from './adapters/historic-sql/types.js';
- Step 5: Run the contract tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/types.test.ts src/package-exports.test.ts
Expected: PASS.
- Step 6: Commit
git add packages/context/src/ingest/adapters/historic-sql/types.ts packages/context/src/ingest/adapters/historic-sql/types.test.ts packages/context/src/ingest/index.ts packages/context/src/package-exports.test.ts
git commit -m "feat: add historic sql unified contracts"
Task 2: Add Stable Bucket Helpers
Files:
-
Create:
packages/context/src/ingest/adapters/historic-sql/buckets.ts -
Create:
packages/context/src/ingest/adapters/historic-sql/buckets.test.ts -
Modify:
packages/context/src/ingest/index.ts -
Step 1: Write failing bucket tests
Create packages/context/src/ingest/adapters/historic-sql/buckets.test.ts:
import { describe, expect, it } from 'vitest';
import {
bucketDistinctUsers,
bucketErrorRate,
bucketExecutions,
bucketFrequency,
bucketP95Runtime,
bucketRecency,
} from './buckets.js';
describe('historic-sql bucket helpers', () => {
it('uses stable execution buckets', () => {
expect([0, 9, 10, 99, 100, 999, 1000, 4999, 5000, 49999, 50000].map(bucketExecutions)).toEqual([
'<10',
'<10',
'10-100',
'10-100',
'100-1k',
'100-1k',
'1k-5k',
'1k-5k',
'5k-50k',
'5k-50k',
'>50k',
]);
});
it('uses stable distinct-user, error-rate, runtime, and recency buckets', () => {
expect([0, 1, 2, 5, 6, 10, 11].map(bucketDistinctUsers)).toEqual([
'0',
'1',
'2-5',
'2-5',
'5-10',
'5-10',
'>10',
]);
expect([0, 0.01, 0.05, 0.2].map(bucketErrorRate)).toEqual(['none', 'low', 'low', 'high']);
expect([null, 99, 100, 999, 1000, 9999, 10000].map(bucketP95Runtime)).toEqual([
'unknown',
'<100ms',
'100ms-1s',
'100ms-1s',
'1s-10s',
'1s-10s',
'>10s',
]);
expect(bucketRecency('2026-05-11T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('current');
expect(bucketRecency('2026-04-20T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('recent');
expect(bucketRecency('2026-01-01T00:00:00.000Z', new Date('2026-05-11T12:00:00.000Z'))).toBe('stale');
});
it('maps frequency counts to high, mid, and low labels', () => {
expect(bucketFrequency(80, 100)).toBe('high');
expect(bucketFrequency(20, 100)).toBe('mid');
expect(bucketFrequency(1, 100)).toBe('low');
expect(bucketFrequency(0, 0)).toBe('low');
});
});
- Step 2: Run the bucket test to verify it fails
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/buckets.test.ts
Expected: FAIL because buckets.js does not exist.
- Step 3: Add the bucket helper implementation
Create packages/context/src/ingest/adapters/historic-sql/buckets.ts:
export function bucketExecutions(value: number): string {
if (value < 10) return '<10';
if (value < 100) return '10-100';
if (value < 1000) return '100-1k';
if (value < 5000) return '1k-5k';
if (value < 50000) return '5k-50k';
return '>50k';
}
export function bucketDistinctUsers(value: number): string {
if (value <= 0) return '0';
if (value === 1) return '1';
if (value <= 5) return '2-5';
if (value <= 10) return '5-10';
return '>10';
}
export function bucketErrorRate(value: number): string {
if (value <= 0) return 'none';
if (value < 0.1) return 'low';
return 'high';
}
export function bucketP95Runtime(value: number | null): string {
if (value === null) return 'unknown';
if (value < 100) return '<100ms';
if (value < 1000) return '100ms-1s';
if (value < 10000) return '1s-10s';
return '>10s';
}
export function bucketRecency(lastSeen: string, now: Date): string {
const parsed = new Date(lastSeen);
if (Number.isNaN(parsed.getTime())) {
return 'unknown';
}
const ageDays = (now.getTime() - parsed.getTime()) / (24 * 60 * 60 * 1000);
if (ageDays <= 7) return 'current';
if (ageDays <= 45) return 'recent';
return 'stale';
}
export function bucketFrequency(count: number, total: number): 'high' | 'mid' | 'low' {
if (total <= 0 || count <= 0) return 'low';
const ratio = count / total;
if (ratio >= 0.5) return 'high';
if (ratio >= 0.1) return 'mid';
return 'low';
}
- Step 4: Run the bucket test
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/buckets.test.ts
Expected: PASS.
- Step 5: Export bucket helpers
In packages/context/src/ingest/index.ts, add:
export { bucketDistinctUsers, bucketErrorRate, bucketExecutions, bucketP95Runtime, bucketRecency } from './adapters/historic-sql/buckets.js';
- Step 6: Commit
git add packages/context/src/ingest/adapters/historic-sql/buckets.ts packages/context/src/ingest/adapters/historic-sql/buckets.test.ts packages/context/src/ingest/index.ts
git commit -m "feat: add historic sql bucket helpers"
Task 3: Stage Aggregated Snapshots
Files:
-
Create:
packages/context/src/ingest/adapters/historic-sql/stage-unified.ts -
Create:
packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts -
Modify:
packages/context/src/ingest/index.ts -
Step 1: Write failing staged-artifact tests
Create packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts:
import { mkdtemp, readFile, readdir } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import type { SqlAnalysisPort } from '../../../sql-analysis/index.js';
import { stageHistoricSqlAggregatedSnapshot } from './stage-unified.js';
import type { AggregatedTemplate, HistoricSqlReader } from './types.js';
async function tempDir(): Promise<string> {
return mkdtemp(join(tmpdir(), 'historic-sql-unified-stage-'));
}
async function readJson<T>(root: string, relPath: string): Promise<T> {
return JSON.parse(await readFile(join(root, relPath), 'utf-8')) as T;
}
function aggregate(overrides: Partial<AggregatedTemplate> & { templateId: string; canonicalSql: string }): AggregatedTemplate {
return {
templateId: overrides.templateId,
canonicalSql: overrides.canonicalSql,
dialect: overrides.dialect ?? 'postgres',
stats: overrides.stats ?? {
executions: 42,
distinctUsers: 3,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 20,
p95RuntimeMs: 80,
errorRate: 0,
rowsProduced: 100,
},
topUsers: overrides.topUsers ?? [{ user: 'analyst', executions: 40 }],
};
}
describe('stageHistoricSqlAggregatedSnapshot', () => {
it('batch parses templates and writes stable table and patterns artifacts', async () => {
const stagedDir = await tempDir();
const reader: HistoricSqlReader = {
async probe() {
return { warnings: ['pg_stat_statements.max is low; aggregation still proceeds'] };
},
async *fetchAggregated() {
yield aggregate({
templateId: 'orders-by-status',
canonicalSql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status',
});
yield aggregate({
templateId: 'service-account-only',
canonicalSql: 'select * from public.orders where id = $1',
stats: {
executions: 20,
distinctUsers: 1,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 5,
p95RuntimeMs: 10,
errorRate: 0,
rowsProduced: 1,
},
topUsers: [{ user: 'svc_loader', executions: 20 }],
});
yield aggregate({
templateId: 'bad-parse',
canonicalSql: 'select broken from',
});
},
};
const sqlAnalysis: SqlAnalysisPort = {
analyzeForFingerprint: vi.fn(),
analyzeBatch: vi.fn(async () => new Map([
[
'orders-by-status',
{
tablesTouched: ['public.orders', 'public.customers'],
columnsByClause: {
select: ['status'],
where: ['created_at'],
join: ['customer_id'],
groupBy: ['status'],
},
},
],
['bad-parse', { tablesTouched: [], columnsByClause: {}, error: 'parse failed' }],
])),
};
await stageHistoricSqlAggregatedSnapshot({
stagedDir,
connectionId: 'warehouse',
queryClient: {},
reader,
sqlAnalysis,
pullConfig: {
dialect: 'postgres',
filters: {
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
},
},
now: new Date('2026-05-11T12:00:00.000Z'),
});
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledTimes(1);
expect(sqlAnalysis.analyzeBatch).toHaveBeenCalledWith(
[
{
id: 'orders-by-status',
sql: 'select o.status, count(*) from public.orders o join public.customers c on c.id = o.customer_id where o.created_at >= $1 group by o.status',
},
{ id: 'bad-parse', sql: 'select broken from' },
],
'postgres',
);
expect(await readdir(join(stagedDir, 'tables'))).toEqual(['public.customers.json', 'public.orders.json']);
const manifest = await readJson<Record<string, unknown>>(stagedDir, 'manifest.json');
expect(manifest).toMatchObject({
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
snapshotRowCount: 3,
touchedTableCount: 2,
parseFailures: 1,
warnings: ['parse_failed:bad-parse'],
probeWarnings: ['pg_stat_statements.max is low; aggregation still proceeds'],
});
const orders = await readJson<Record<string, any>>(stagedDir, 'tables/public.orders.json');
expect(orders).toMatchObject({
table: 'public.orders',
stats: {
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
errorRateBucket: 'none',
p95RuntimeBucket: '<100ms',
recencyBucket: 'current',
},
columnsByClause: {
select: [['status', 'high']],
where: [['created_at', 'high']],
join: [['customer_id', 'high']],
groupBy: [['status', 'high']],
},
observedJoins: [{ withTable: 'public.customers', on: ['customer_id'], freq: 'high' }],
topTemplates: [
{
id: 'orders-by-status',
topUsers: [{ user: 'analyst' }],
},
],
});
expect(orders.topTemplates[0].canonicalSql).toContain('group by o.status');
const patterns = await readJson<Record<string, any>>(stagedDir, 'patterns-input.json');
expect(patterns.templates).toEqual([
{
id: 'orders-by-status',
canonicalSql: expect.stringContaining('public.orders'),
tablesTouched: ['public.customers', 'public.orders'],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
},
]);
});
});
- Step 2: Run the stage test to verify it fails
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/stage-unified.test.ts
Expected: FAIL because stage-unified.js does not exist.
- Step 3: Add the unified stager
Create packages/context/src/ingest/adapters/historic-sql/stage-unified.ts with these exported shapes and helpers:
import { mkdir, writeFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import type { SqlAnalysisPort } from '../../../sql-analysis/index.js';
import {
bucketDistinctUsers,
bucketErrorRate,
bucketExecutions,
bucketFrequency,
bucketP95Runtime,
bucketRecency,
} from './buckets.js';
import {
HISTORIC_SQL_SOURCE_KEY,
aggregatedTemplateSchema,
historicSqlUnifiedPullConfigSchema,
type AggregatedTemplate,
type HistoricSqlReader,
type HistoricSqlUnifiedPullConfig,
type StagedPatternsInput,
type StagedTableInput,
} from './types.js';
interface StageHistoricSqlAggregatedSnapshotInput {
stagedDir: string;
connectionId: string;
queryClient: unknown;
reader: HistoricSqlReader;
sqlAnalysis: SqlAnalysisPort;
pullConfig: unknown;
now?: Date;
}
interface ParsedTemplate {
template: AggregatedTemplate;
tablesTouched: string[];
columnsByClause: Record<string, string[]>;
}
interface TableAccumulator {
table: string;
executions: number;
distinctUsers: number;
errorRateNumerator: number;
p95RuntimeMs: number | null;
lastSeen: string;
columnsByClause: Map<string, Map<string, number>>;
observedJoins: Map<string, Map<string, number>>;
topTemplates: AggregatedTemplate[];
}
const TRIVIAL_SQL_RE = /^\s*SELECT\s+(1|NOW\(\)|CURRENT_TIMESTAMP|VERSION\(\))\s*;?\s*$/i;
const NOISE_PREFIX_RE = /^\s*(SHOW|DESCRIBE|DESC|EXPLAIN|USE|SET)\b/i;
const SYSTEM_TABLE_RE = /\b(INFORMATION_SCHEMA|SNOWFLAKE\.ACCOUNT_USAGE|pg_|system\.)/i;
const ORCHESTRATOR_RE = /\b(dbt|looker|metabase)\b/i;
function writeJson(root: string, relPath: string, value: unknown): Promise<void> {
const target = join(root, relPath);
return mkdir(dirname(target), { recursive: true }).then(() =>
writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'),
);
}
function compilePatterns(patterns: string[]): RegExp[] {
return patterns.map((pattern) => new RegExp(pattern));
}
function matchesAny(value: string | null, patterns: RegExp[]): boolean {
return !!value && patterns.some((pattern) => pattern.test(value));
}
function shouldDropBySql(sql: string, config: HistoricSqlUnifiedPullConfig): boolean {
if (NOISE_PREFIX_RE.test(sql) || SYSTEM_TABLE_RE.test(sql)) return true;
if (config.filters.dropTrivialProbes !== false && TRIVIAL_SQL_RE.test(sql)) return true;
return false;
}
function shouldDropByUsers(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean {
const service = config.filters.serviceAccounts;
if (!service || service.mode === 'mark-only' || service.patterns.length === 0) return false;
const patterns = compilePatterns(service.patterns);
const matchingExecutions = template.topUsers
.filter((entry) => matchesAny(entry.user, patterns))
.reduce((sum, entry) => sum + entry.executions, 0);
const allExecutions = template.topUsers.reduce((sum, entry) => sum + entry.executions, 0);
const serviceOnly = allExecutions > 0 && matchingExecutions >= allExecutions;
return service.mode === 'exclude' ? serviceOnly : !serviceOnly;
}
function shouldDropByFailure(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean {
const failed = config.filters.dropFailedBelow;
return !!failed && template.stats.errorRate > failed.errorRate && template.stats.executions < failed.executions;
}
function shouldDropTemplate(template: AggregatedTemplate, config: HistoricSqlUnifiedPullConfig): boolean {
if (shouldDropBySql(template.canonicalSql, config)) return true;
if (shouldDropByUsers(template, config)) return true;
if (shouldDropByFailure(template, config)) return true;
return false;
}
function recordColumn(acc: TableAccumulator, clause: string, column: string, executions: number): void {
const byColumn = acc.columnsByClause.get(clause) ?? new Map<string, number>();
byColumn.set(column, (byColumn.get(column) ?? 0) + executions);
acc.columnsByClause.set(clause, byColumn);
}
function recordJoin(acc: TableAccumulator, otherTable: string, columns: string[], executions: number): void {
const byColumns = acc.observedJoins.get(otherTable) ?? new Map<string, number>();
const key = [...new Set(columns)].sort().join(',');
if (key.length > 0) {
byColumns.set(key, (byColumns.get(key) ?? 0) + executions);
acc.observedJoins.set(otherTable, byColumns);
}
}
function accumulatorFor(table: string): TableAccumulator {
return {
table,
executions: 0,
distinctUsers: 0,
errorRateNumerator: 0,
p95RuntimeMs: null,
lastSeen: '1970-01-01T00:00:00.000Z',
columnsByClause: new Map(),
observedJoins: new Map(),
topTemplates: [],
};
}
function addTemplate(acc: TableAccumulator, parsed: ParsedTemplate): void {
const executions = parsed.template.stats.executions;
acc.executions += executions;
acc.distinctUsers = Math.max(acc.distinctUsers, parsed.template.stats.distinctUsers);
acc.errorRateNumerator += parsed.template.stats.errorRate * executions;
acc.p95RuntimeMs =
acc.p95RuntimeMs === null
? parsed.template.stats.p95RuntimeMs
: parsed.template.stats.p95RuntimeMs === null
? acc.p95RuntimeMs
: Math.max(acc.p95RuntimeMs, parsed.template.stats.p95RuntimeMs);
acc.lastSeen = parsed.template.stats.lastSeen > acc.lastSeen ? parsed.template.stats.lastSeen : acc.lastSeen;
for (const [clause, columns] of Object.entries(parsed.columnsByClause)) {
for (const column of columns) {
recordColumn(acc, clause, column, executions);
}
}
const joinColumns = parsed.columnsByClause.join ?? [];
for (const otherTable of parsed.tablesTouched.filter((table) => table !== acc.table)) {
recordJoin(acc, otherTable, joinColumns, executions);
}
acc.topTemplates.push(parsed.template);
}
In the same file, add the staging function:
function toStagedTable(acc: TableAccumulator, now: Date): StagedTableInput {
const errorRate = acc.executions > 0 ? acc.errorRateNumerator / acc.executions : 0;
const columnsByClause = Object.fromEntries(
[...acc.columnsByClause.entries()]
.sort(([left], [right]) => left.localeCompare(right))
.map(([clause, counts]) => [
clause,
[...counts.entries()]
.sort((left, right) => right[1] - left[1] || left[0].localeCompare(right[0]))
.map(([column, count]) => [column, bucketFrequency(count, acc.executions)]),
]),
);
const observedJoins = [...acc.observedJoins.entries()]
.flatMap(([withTable, byColumns]) =>
[...byColumns.entries()].map(([columns, count]) => ({
withTable,
on: columns.split(',').filter(Boolean),
freq: bucketFrequency(count, acc.executions),
})),
)
.sort((left, right) => left.withTable.localeCompare(right.withTable) || left.on.join(',').localeCompare(right.on.join(',')));
const topTemplates = [...acc.topTemplates]
.sort((left, right) => right.stats.executions - left.stats.executions || left.templateId.localeCompare(right.templateId))
.slice(0, 5)
.map((template) => ({
id: template.templateId,
canonicalSql: template.canonicalSql,
topUsers: template.topUsers.slice(0, 5).map((entry) => ({ user: entry.user })),
}));
return {
table: acc.table,
stats: {
executionsBucket: bucketExecutions(acc.executions),
distinctUsersBucket: bucketDistinctUsers(acc.distinctUsers),
errorRateBucket: bucketErrorRate(errorRate),
p95RuntimeBucket: bucketP95Runtime(acc.p95RuntimeMs),
recencyBucket: bucketRecency(acc.lastSeen, now),
},
columnsByClause,
observedJoins,
topTemplates,
};
}
function toPatternsInput(parsedTemplates: ParsedTemplate[]): StagedPatternsInput {
return {
templates: parsedTemplates
.map(({ template, tablesTouched }) => ({
id: template.templateId,
canonicalSql: template.canonicalSql,
tablesTouched: [...tablesTouched].sort(),
executionsBucket: bucketExecutions(template.stats.executions),
distinctUsersBucket: bucketDistinctUsers(template.stats.distinctUsers),
dialect: template.dialect,
}))
.sort((left, right) => left.id.localeCompare(right.id)),
};
}
export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSqlAggregatedSnapshotInput): Promise<void> {
const config = historicSqlUnifiedPullConfigSchema.parse(input.pullConfig);
const now = input.now ?? new Date();
const windowStart = new Date(now.getTime() - config.windowDays * 24 * 60 * 60 * 1000);
const probe = await input.reader.probe(input.queryClient);
const snapshot: AggregatedTemplate[] = [];
for await (const row of input.reader.fetchAggregated(input.queryClient, { start: windowStart, end: now }, config)) {
const parsed = aggregatedTemplateSchema.parse(row);
if (!shouldDropTemplate(parsed, config)) {
snapshot.push(parsed);
}
}
const analysis = await input.sqlAnalysis.analyzeBatch(
snapshot.map((template) => ({ id: template.templateId, sql: template.canonicalSql })),
config.dialect,
);
const warnings: string[] = [];
const parsedTemplates: ParsedTemplate[] = [];
for (const template of snapshot) {
const parsed = analysis.get(template.templateId);
if (!parsed || parsed.error) {
warnings.push(`parse_failed:${template.templateId}`);
continue;
}
const tablesTouched = [...new Set(parsed.tablesTouched)].filter((table) => table.length > 0).sort();
if (tablesTouched.length === 0) {
continue;
}
parsedTemplates.push({
template,
tablesTouched,
columnsByClause: Object.fromEntries(
Object.entries(parsed.columnsByClause).map(([clause, columns]) => [clause, [...new Set(columns)].sort()]),
),
});
}
const byTable = new Map<string, TableAccumulator>();
for (const parsed of parsedTemplates) {
for (const table of parsed.tablesTouched) {
const acc = byTable.get(table) ?? accumulatorFor(table);
addTemplate(acc, parsed);
byTable.set(table, acc);
}
}
await mkdir(input.stagedDir, { recursive: true });
for (const [table, acc] of [...byTable.entries()].sort(([left], [right]) => left.localeCompare(right))) {
await writeJson(input.stagedDir, `tables/${table}.json`, toStagedTable(acc, now));
}
await writeJson(input.stagedDir, 'patterns-input.json', toPatternsInput(parsedTemplates));
await writeJson(input.stagedDir, 'manifest.json', {
source: HISTORIC_SQL_SOURCE_KEY,
connectionId: input.connectionId,
dialect: config.dialect,
fetchedAt: now.toISOString(),
windowStart: windowStart.toISOString(),
windowEnd: now.toISOString(),
snapshotRowCount: snapshot.length,
touchedTableCount: byTable.size,
parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length,
warnings,
probeWarnings: probe.warnings,
});
}
- Step 4: Run the staged-artifact test
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/stage-unified.test.ts
Expected: PASS.
- Step 5: Export the unified stager
In packages/context/src/ingest/index.ts, add:
export { stageHistoricSqlAggregatedSnapshot } from './adapters/historic-sql/stage-unified.js';
- Step 6: Commit
git add packages/context/src/ingest/adapters/historic-sql/stage-unified.ts packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts packages/context/src/ingest/index.ts
git commit -m "feat: stage historic sql aggregate snapshots"
Task 4: Add Aggregate Readers
Files:
-
Create:
packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts -
Create:
packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts -
Modify:
packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts -
Modify:
packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts -
Modify:
packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts -
Modify:
packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts -
Modify:
packages/context/src/ingest/index.ts -
Step 1: Write failing Postgres aggregate reader tests
Create packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts:
import { describe, expect, it, vi } from 'vitest';
import { PostgresPgssReader } from './postgres-pgss-reader.js';
describe('PostgresPgssReader aggregate path', () => {
it('aggregates pg_stat_statements rows by queryid and query', async () => {
const executeQuery = vi.fn(async (sql: string, params?: unknown[]) => {
if (sql.includes('pg_stat_statements_info')) {
return { headers: ['stats_reset', 'dealloc'], rows: [['2026-05-01T00:00:00.000Z', 1]] };
}
expect(sql).toContain('GROUP BY queryid, query');
expect(sql).toContain('HAVING SUM(calls) >= $1');
expect(params).toEqual([5]);
return {
headers: ['template_id', 'canonical_sql', 'executions', 'distinct_users', 'mean_ms', 'rows_produced', 'top_users'],
rows: [
[
'123',
'select status from public.orders',
'42',
'3',
'11.5',
'100',
JSON.stringify([{ user: 'analyst', executions: 40 }]),
],
],
};
});
const reader = new PostgresPgssReader();
const rows = [];
for await (const row of reader.fetchAggregated(
{ executeQuery },
{ start: new Date('2026-02-10T00:00:00.000Z'), end: new Date('2026-05-11T00:00:00.000Z') },
{ dialect: 'postgres', minExecutions: 5, windowDays: 90, concurrency: 12, filters: {}, redactionPatterns: [], staleArchiveAfterDays: 90 },
)) {
rows.push(row);
}
expect(rows).toEqual([
{
templateId: '123',
canonicalSql: 'select status from public.orders',
dialect: 'postgres',
stats: {
executions: 42,
distinctUsers: 3,
firstSeen: '2026-05-01T00:00:00.000Z',
lastSeen: '2026-05-11T00:00:00.000Z',
p50RuntimeMs: 11.5,
p95RuntimeMs: 11.5,
errorRate: 0,
rowsProduced: 100,
},
topUsers: [{ user: 'analyst', executions: 40 }],
},
]);
});
});
- Step 2: Add failing BigQuery and Snowflake aggregate assertions
In packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts, add a test that constructs new BigQueryHistoricSqlQueryHistoryReader({ projectId: 'demo', region: 'us' }), calls fetchAggregated(), and asserts the SQL contains:
expect(sql).toContain('COUNT(*) AS executions');
expect(sql).toContain('COUNT(DISTINCT user_email) AS distinct_users');
expect(sql).toContain('GROUP BY query_hash');
expect(sql).toContain('HAVING COUNT(*) >= 5');
Map one returned row with headers:
[
'template_id',
'canonical_sql',
'executions',
'distinct_users',
'first_seen',
'last_seen',
'p50_ms',
'p95_ms',
'error_rate',
'rows_produced',
'top_users',
]
and assert templateId, stats.executions, stats.errorRate, and topUsers match the row.
In packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts, add the same shape but assert the SQL contains:
expect(sql).toContain('SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY');
expect(sql).toContain('COUNT(*) AS executions');
expect(sql).toContain('GROUP BY query_hash');
expect(sql).toContain('HAVING COUNT(*) >= 5');
- Step 3: Run aggregate reader tests to verify they fail
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts
Expected: FAIL because fetchAggregated() and postgres-pgss-reader.js do not exist.
- Step 4: Implement the aggregate reader methods
Create packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts with the same probe behavior currently implemented in postgres-pgss-query-history-reader.ts: queryClient, execute, indexByHeader, value, nullableString, requiredString, requiredFiniteNumber, nullableInteger, nullableIsoTimestamp, firstRow, extensionMissingError, and grantsMissingError keep their current behavior. Add this aggregate query and row mapper:
const AGGREGATE_SQL = `
SELECT queryid::text AS template_id,
query AS canonical_sql,
SUM(calls)::bigint AS executions,
COUNT(DISTINCT userid) AS distinct_users,
SUM(total_exec_time) / NULLIF(SUM(calls), 0) AS mean_ms,
SUM(rows)::bigint AS rows_produced,
COALESCE(
json_agg(json_build_object('user', rolname, 'executions', calls) ORDER BY calls DESC)
FILTER (WHERE userid IS NOT NULL),
'[]'::json
)::text AS top_users
FROM pg_stat_statements
LEFT JOIN pg_roles ON pg_roles.oid = pg_stat_statements.userid
WHERE toplevel = true
GROUP BY queryid, query
HAVING SUM(calls) >= $1
ORDER BY SUM(total_exec_time) DESC
`.trim();
The fetchAggregated() method must:
async *fetchAggregated(
client: unknown,
window: HistoricSqlTimeWindow,
config: HistoricSqlUnifiedPullConfig,
): AsyncIterable<AggregatedTemplate> {
const pgClient = queryClient(client);
const statsResult = await execute(pgClient, STATS_INFO_SQL);
const { row: statsRow, headers: statsHeaders } = firstRow(statsResult, 'stats-info');
const firstSeen = nullableIsoTimestamp(value(statsRow, statsHeaders, 'stats_reset')) ?? window.start.toISOString();
const result = await execute(pgClient, AGGREGATE_SQL, [config.minExecutions]);
const indexes = indexByHeader(result.headers);
for (const row of result.rows) {
yield aggregatedTemplateSchema.parse({
templateId: requiredString(value(row, indexes, 'template_id'), 'template_id'),
canonicalSql: requiredString(value(row, indexes, 'canonical_sql'), 'canonical_sql'),
dialect: 'postgres',
stats: {
executions: requiredInteger(value(row, indexes, 'executions'), 'executions'),
distinctUsers: requiredInteger(value(row, indexes, 'distinct_users'), 'distinct_users'),
firstSeen,
lastSeen: window.end.toISOString(),
p50RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')),
p95RuntimeMs: nullableNumber(value(row, indexes, 'mean_ms')),
errorRate: 0,
rowsProduced: nullableInteger(value(row, indexes, 'rows_produced')),
},
topUsers: parseTopUsers(value(row, indexes, 'top_users')),
});
}
}
In packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts, add this aggregate query inside fetchAggregated():
const sql = `
SELECT
query_hash AS template_id,
MIN(query) AS canonical_sql,
COUNT(*) AS executions,
COUNT(DISTINCT user_email) AS distinct_users,
MIN(creation_time) AS first_seen,
MAX(creation_time) AS last_seen,
APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(50)] AS p50_ms,
APPROX_QUANTILES(TIMESTAMP_DIFF(end_time, creation_time, MILLISECOND), 100)[OFFSET(95)] AS p95_ms,
SAFE_DIVIDE(COUNTIF(error_result IS NOT NULL), COUNT(*)) AS error_rate,
CAST(NULL AS INT64) AS rows_produced,
TO_JSON_STRING(ARRAY_AGG(STRUCT(user_email AS user, 1 AS executions) ORDER BY creation_time DESC LIMIT 5)) AS top_users
FROM ${this.viewPath}
WHERE job_type = 'QUERY'
AND statement_type IN ('SELECT', 'MERGE')
AND creation_time >= ${timestampExpression(window.start)}
AND creation_time < ${timestampExpression(window.end)}
AND query IS NOT NULL
GROUP BY query_hash
HAVING COUNT(*) >= ${config.minExecutions}
ORDER BY executions DESC`.trim();
Map each result row into aggregatedTemplateSchema.parse({ templateId, canonicalSql, dialect: 'bigquery', stats: { executions, distinctUsers, firstSeen, lastSeen, p50RuntimeMs, p95RuntimeMs, errorRate, rowsProduced }, topUsers }), where topUsers is parsed from the top_users JSON string and invalid JSON becomes [].
In packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts, add this aggregate query inside fetchAggregated():
const sql = `
SELECT
query_hash AS template_id,
MIN(query_text) AS canonical_sql,
COUNT(*) AS executions,
COUNT(DISTINCT user_name) AS distinct_users,
MIN(start_time) AS first_seen,
MAX(start_time) AS last_seen,
APPROX_PERCENTILE(total_elapsed_time, 0.50) AS p50_ms,
APPROX_PERCENTILE(total_elapsed_time, 0.95) AS p95_ms,
DIV0(COUNT_IF(execution_status != 'SUCCESS'), COUNT(*)) AS error_rate,
SUM(rows_produced) AS rows_produced,
ARRAY_AGG(OBJECT_CONSTRUCT('user', user_name, 'executions', 1)) WITHIN GROUP (ORDER BY start_time DESC)::string AS top_users
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE query_text IS NOT NULL
AND query_type IN ('SELECT', 'MERGE')
AND start_time >= ${timestampLiteral(window.start)}
AND start_time < ${timestampLiteral(window.end)}
GROUP BY query_hash
HAVING COUNT(*) >= ${config.minExecutions}
ORDER BY executions DESC`.trim();
Map each result row into aggregatedTemplateSchema.parse({ templateId, canonicalSql, dialect: 'snowflake', stats: { executions, distinctUsers, firstSeen, lastSeen, p50RuntimeMs, p95RuntimeMs, errorRate, rowsProduced }, topUsers }), where topUsers is parsed from the top_users JSON string and invalid JSON becomes []. Keep the existing fetch() methods unchanged in this plan so current adapter behavior does not move before the skill/projection cutover.
- Step 5: Export the new Postgres reader
In packages/context/src/ingest/index.ts, add:
export { PostgresPgssReader } from './adapters/historic-sql/postgres-pgss-reader.js';
- Step 6: Run aggregate reader tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts
Expected: PASS.
- Step 7: Commit
git add packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.ts packages/context/src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.ts packages/context/src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.ts packages/context/src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts packages/context/src/ingest/index.ts
git commit -m "feat: add historic sql aggregate readers"
Task 5: Add Unified Chunking
Files:
-
Create:
packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts -
Create:
packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts -
Modify:
packages/context/src/ingest/index.ts -
Step 1: Write failing unified chunk tests
Create packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts:
import { mkdir, mkdtemp, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it } from 'vitest';
import { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './chunk-unified.js';
async function tempDir(): Promise<string> {
return mkdtemp(join(tmpdir(), 'historic-sql-unified-chunk-'));
}
async function writeJson(root: string, relPath: string, value: unknown): Promise<void> {
const target = join(root, relPath);
await mkdir(join(target, '..'), { recursive: true });
await writeFile(target, `${JSON.stringify(value, null, 2)}\n`, 'utf-8');
}
async function writeUnifiedStagedDir(root: string): Promise<void> {
await writeJson(root, 'manifest.json', {
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 1,
touchedTableCount: 1,
parseFailures: 0,
warnings: [],
probeWarnings: [],
});
await writeJson(root, 'tables/public.orders.json', {
table: 'public.orders',
stats: {
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
errorRateBucket: 'none',
p95RuntimeBucket: '<100ms',
recencyBucket: 'current',
},
columnsByClause: { select: [['status', 'high']] },
observedJoins: [],
topTemplates: [{ id: 'orders', canonicalSql: 'select * from public.orders', topUsers: [{ user: 'analyst' }] }],
});
await writeJson(root, 'patterns-input.json', {
templates: [
{
id: 'orders',
canonicalSql: 'select * from public.orders',
tablesTouched: ['public.orders'],
executionsBucket: '10-100',
distinctUsersBucket: '2-5',
dialect: 'postgres',
},
],
});
}
describe('chunkHistoricSqlUnifiedStagedDir', () => {
it('emits one table WorkUnit plus one patterns WorkUnit', async () => {
const stagedDir = await tempDir();
await writeUnifiedStagedDir(stagedDir);
const result = await chunkHistoricSqlUnifiedStagedDir(stagedDir);
expect(result.workUnits).toEqual([
expect.objectContaining({
unitKey: 'historic-sql-table-public-orders',
displayLabel: 'Historic SQL usage: public.orders',
rawFiles: ['tables/public.orders.json'],
dependencyPaths: ['manifest.json'],
notes: expect.stringContaining('historic_sql_table_digest'),
}),
expect.objectContaining({
unitKey: 'historic-sql-patterns',
displayLabel: 'Historic SQL cross-table patterns',
rawFiles: ['patterns-input.json'],
dependencyPaths: ['manifest.json'],
notes: expect.stringContaining('historic_sql_patterns'),
}),
]);
expect(result.reconcileNotes).toEqual(['Historic-SQL touched tables=1 parseFailures=0']);
});
it('respects diff sets for unchanged table and patterns files', async () => {
const stagedDir = await tempDir();
await writeUnifiedStagedDir(stagedDir);
await expect(
chunkHistoricSqlUnifiedStagedDir(stagedDir, {
added: [],
modified: ['tables/public.orders.json'],
deleted: [],
unchanged: ['manifest.json', 'patterns-input.json'],
}),
).resolves.toMatchObject({
workUnits: [expect.objectContaining({ unitKey: 'historic-sql-table-public-orders' })],
});
await expect(
chunkHistoricSqlUnifiedStagedDir(stagedDir, {
added: [],
modified: ['patterns-input.json'],
deleted: [],
unchanged: ['manifest.json', 'tables/public.orders.json'],
}),
).resolves.toMatchObject({
workUnits: [expect.objectContaining({ unitKey: 'historic-sql-patterns' })],
});
});
it('describes unified staged scope', async () => {
const stagedDir = await tempDir();
await writeUnifiedStagedDir(stagedDir);
const scope = await describeHistoricSqlUnifiedScope(stagedDir);
expect(scope.isPathInScope('manifest.json')).toBe(true);
expect(scope.isPathInScope('patterns-input.json')).toBe(true);
expect(scope.isPathInScope('tables/public.orders.json')).toBe(true);
expect(scope.isPathInScope('templates/old/page.md')).toBe(false);
});
});
- Step 2: Run the unified chunk tests to verify they fail
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/chunk-unified.test.ts
Expected: FAIL because chunk-unified.js does not exist.
- Step 3: Add the unified chunker
Create packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts:
import { createHash } from 'node:crypto';
import { readFile, readdir } from 'node:fs/promises';
import { join, relative } from 'node:path';
import type { ChunkResult, DiffSet, ScopeDescriptor, WorkUnit } from '../../types.js';
import { stagedManifestSchema, stagedPatternsInputSchema, stagedTableInputSchema } from './types.js';
async function walk(root: string): Promise<string[]> {
const entries = await readdir(root, { withFileTypes: true, recursive: true });
return entries
.filter((entry) => entry.isFile())
.map((entry) => relative(root, join(entry.parentPath, entry.name)).replace(/\\/g, '/'))
.sort();
}
async function readJson<T>(stagedDir: string, relPath: string): Promise<T> {
return JSON.parse(await readFile(join(stagedDir, relPath), 'utf-8')) as T;
}
function safeUnitKey(value: string): string {
return value.replace(/[^a-zA-Z0-9]+/g, '-').replace(/^-+|-+$/g, '');
}
function touchedPath(path: string, touched: Set<string> | null): boolean {
return !touched || touched.has(path);
}
export async function chunkHistoricSqlUnifiedStagedDir(stagedDir: string, diffSet?: DiffSet): Promise<ChunkResult> {
const files = await walk(stagedDir);
const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json'));
const touched = diffSet ? new Set([...diffSet.added, ...diffSet.modified]) : null;
const workUnits: WorkUnit[] = [];
for (const path of files.filter((file) => /^tables\/.+\.json$/.test(file))) {
if (!touchedPath(path, touched)) {
continue;
}
const table = stagedTableInputSchema.parse(await readJson(stagedDir, path));
workUnits.push({
unitKey: `historic-sql-table-${safeUnitKey(table.table)}`,
displayLabel: `Historic SQL usage: ${table.table}`,
rawFiles: [path],
dependencyPaths: ['manifest.json'],
peerFileIndex: files.filter((file) => file !== path && file !== 'manifest.json').sort(),
notes:
'Use historic_sql_table_digest. Read this table usage JSON and the existing semantic-layer source for the table; output only table usage evidence shaped like tableUsageOutputSchema.',
});
}
if (files.includes('patterns-input.json') && touchedPath('patterns-input.json', touched)) {
stagedPatternsInputSchema.parse(await readJson(stagedDir, 'patterns-input.json'));
workUnits.push({
unitKey: 'historic-sql-patterns',
displayLabel: 'Historic SQL cross-table patterns',
rawFiles: ['patterns-input.json'],
dependencyPaths: ['manifest.json'],
peerFileIndex: files.filter((file) => file !== 'patterns-input.json' && file !== 'manifest.json').sort(),
notes:
'Use historic_sql_patterns. Read patterns-input.json and produce cross-table pattern evidence shaped like patternsArraySchema.',
});
}
const deleted = diffSet?.deleted.filter((path) => path === 'patterns-input.json' || /^tables\/.+\.json$/.test(path)).sort();
return {
workUnits,
eviction: deleted && deleted.length > 0 ? { deletedRawPaths: deleted } : undefined,
reconcileNotes: [`Historic-SQL touched tables=${manifest.touchedTableCount} parseFailures=${manifest.parseFailures}`],
contextReport: {
capped: false,
warnings: [...manifest.probeWarnings, ...manifest.warnings],
},
};
}
export async function describeHistoricSqlUnifiedScope(stagedDir: string): Promise<ScopeDescriptor> {
const manifest = stagedManifestSchema.parse(await readJson(stagedDir, 'manifest.json'));
const fingerprint = createHash('sha256')
.update(JSON.stringify({
connectionId: manifest.connectionId,
dialect: manifest.dialect,
windowStart: manifest.windowStart,
windowEnd: manifest.windowEnd,
}))
.digest('hex');
return {
fingerprint,
isPathInScope: (rawPath) =>
rawPath === 'manifest.json' || rawPath === 'patterns-input.json' || /^tables\/.+\.json$/.test(rawPath),
};
}
- Step 4: Run the unified chunk tests
Run:
pnpm --filter @ktx/context exec vitest run src/ingest/adapters/historic-sql/chunk-unified.test.ts
Expected: PASS.
- Step 5: Export the unified chunker
In packages/context/src/ingest/index.ts, add:
export { chunkHistoricSqlUnifiedStagedDir, describeHistoricSqlUnifiedScope } from './adapters/historic-sql/chunk-unified.js';
- Step 6: Commit
git add packages/context/src/ingest/adapters/historic-sql/chunk-unified.ts packages/context/src/ingest/adapters/historic-sql/chunk-unified.test.ts packages/context/src/ingest/index.ts
git commit -m "feat: chunk historic sql unified staging"
Task 6: Verify the Hot Path Slice
Files:
-
Modify: files changed in Tasks 1-5
-
Step 1: Run focused historic-SQL tests
Run:
pnpm --filter @ktx/context exec vitest run \
src/ingest/adapters/historic-sql/types.test.ts \
src/ingest/adapters/historic-sql/buckets.test.ts \
src/ingest/adapters/historic-sql/stage-unified.test.ts \
src/ingest/adapters/historic-sql/postgres-pgss-reader.test.ts \
src/ingest/adapters/historic-sql/bigquery-query-history-reader.test.ts \
src/ingest/adapters/historic-sql/snowflake-query-history-reader.test.ts \
src/ingest/adapters/historic-sql/chunk-unified.test.ts \
src/package-exports.test.ts
Expected: PASS.
- Step 2: Run type-check for the context package
Run:
pnpm --filter @ktx/context run type-check
Expected: PASS.
- Step 3: Confirm legacy production adapter was not switched
Run:
rg -n "historic_sql_ingest|historic_sql_curator|stagePgStatStatementsTemplates" packages/context/src/ingest/adapters/historic-sql packages/context/skills packages/context/src/ingest/ingest-runtime-assets.test.ts
Expected: Results still include historic-sql.adapter.ts, the old skill files, and runtime-asset tests. This is correct for this plan because the replacement skills and projection are not present yet.
- Step 4: Confirm new hot-path exports exist
Run:
rg -n "stageHistoricSqlAggregatedSnapshot|chunkHistoricSqlUnifiedStagedDir|PostgresPgssReader|aggregatedTemplateSchema" packages/context/src/ingest/index.ts packages/context/src/ingest/adapters/historic-sql
Expected: Results include the new stager, chunker, reader, and schemas.
- Step 5: Commit verification fixes only when verification changed files
git status --short
Expected: no output. If verification forced a fix, run:
git add packages/context/src/ingest/adapters/historic-sql packages/context/src/ingest/index.ts packages/context/src/package-exports.test.ts
git commit -m "test: verify historic sql unified hot path"
Follow-Up Plan Boundary
The next plan after this one should switch the production adapter only after it also creates the cold-path pieces:
packages/context/skills/historic_sql_table_digest/SKILL.mdpackages/context/skills/historic_sql_patterns/SKILL.md- adapter
skillNameschange fromhistoric_sql_ingestto the two new skills onPullSucceeded()projection of table usage into_schema/{shard}.yaml- pattern wiki page projection and slug stability
- one-time cleanup of legacy template wiki pages and PGSS baselines
- deletion of
stage-pgss.ts, old template staging exports, and old historic-SQL skill assets
Self-Review
Spec coverage:
- Unified aggregate reader contracts: Task 1 and Task 4.
- Trailing-window aggregate fetch shape: Task 4.
- Batch SQL parse through
SqlAnalysisPort.analyzeBatch(): Task 3. - Service-account, trivial query, failed-template, parse-failure, and zero-table filtering: Task 3.
- Bucketed
tables/*.json,patterns-input.json, andmanifest.json: Task 2 and Task 3. - WorkUnits for one table file plus patterns input: Task 5.
- Hard production cutover, LLM skills, projection, wiki pages, stale handling, and legacy deletion: explicitly excluded from this plan and listed as the next plan boundary.
Placeholder scan:
- No unresolved placeholders are left in task steps.
- Every code-changing task includes concrete test code, implementation code, commands, and expected results.
Type consistency:
HistoricSqlUnifiedPullConfig,AggregatedTemplate,StagedTableInput,StagedPatternsInput, andStagedManifestare defined in Task 1 and reused consistently by Tasks 3-5.PostgresPgssReader,fetchAggregated(),stageHistoricSqlAggregatedSnapshot(), andchunkHistoricSqlUnifiedStagedDir()names match exports and test imports.
Plan complete and saved to docs/superpowers/plans/2026-05-11-historic-sql-unified-hot-path.md. Two execution options:
1. Subagent-Driven (recommended) - I dispatch a fresh subagent per task, review between tasks, fast iteration
2. Inline Execution - Execute tasks in this session using executing-plans, batch execution with checkpoints
Which approach?