feat: project historic sql evidence

This commit is contained in:
Andrey Avtomonov 2026-05-11 18:53:45 +02:00
parent 954426289d
commit fb541cb9e5
8 changed files with 511 additions and 2 deletions

View file

@ -0,0 +1,174 @@
import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import YAML from 'yaml';
import { describe, expect, it } from 'vitest';
import { projectHistoricSqlEvidence } from './projection.js';
async function tempWorkdir(): Promise<string> {
return mkdtemp(join(tmpdir(), 'historic-sql-projection-'));
}
async function writeText(root: string, relPath: string, content: string): Promise<void> {
const target = join(root, relPath);
await mkdir(join(target, '..'), { recursive: true });
await writeFile(target, content, 'utf-8');
}
async function writeJson(root: string, relPath: string, value: unknown): Promise<void> {
await writeText(root, relPath, `${JSON.stringify(value, null, 2)}\n`);
}
describe('projectHistoricSqlEvidence', () => {
it('merges table usage into matching _schema shards and preserves external usage keys', async () => {
const workdir = await tempWorkdir();
await writeText(
workdir,
'semantic-layer/warehouse/_schema/public.yaml',
YAML.stringify({
tables: {
orders: {
table: 'public.orders',
usage: {
narrative: 'Old generated usage.',
frequencyTier: 'low',
commonFilters: ['old_status'],
commonJoins: [],
ownerNote: 'keep me',
},
columns: [{ name: 'id', type: 'string' }],
},
},
}),
);
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/manifest.json', {
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 1,
touchedTableCount: 1,
parseFailures: 0,
warnings: [],
probeWarnings: [],
staleArchiveAfterDays: 90,
});
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.orders.json', { table: 'public.orders' });
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/orders.json', {
kind: 'table_usage',
connectionId: 'warehouse',
table: 'public.orders',
rawPath: 'tables/public.orders.json',
usage: {
narrative: 'Orders are repeatedly queried for lifecycle analysis.',
frequencyTier: 'high',
commonFilters: ['status', 'created_at'],
commonGroupBys: ['status'],
commonJoins: [{ table: 'public.customers', on: ['customer_id'] }],
staleSince: null,
},
});
const result = await projectHistoricSqlEvidence({ workdir, connectionId: 'warehouse', syncId: 'sync-1', runId: 'run-1' });
expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]);
const shard = YAML.parse(await readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8'));
expect(shard.tables.orders.usage).toEqual({
ownerNote: 'keep me',
narrative: 'Orders are repeatedly queried for lifecycle analysis.',
frequencyTier: 'high',
commonFilters: ['status', 'created_at'],
commonGroupBys: ['status'],
commonJoins: [{ table: 'public.customers', on: ['customer_id'] }],
staleSince: null,
});
});
it('writes pattern pages, reuses similar slugs, and marks missing old pattern pages stale', async () => {
const workdir = await tempWorkdir();
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/manifest.json', {
source: 'historic-sql',
connectionId: 'warehouse',
dialect: 'postgres',
fetchedAt: '2026-05-11T00:00:00.000Z',
windowStart: '2026-02-10T00:00:00.000Z',
windowEnd: '2026-05-11T00:00:00.000Z',
snapshotRowCount: 2,
touchedTableCount: 2,
parseFailures: 0,
warnings: [],
probeWarnings: [],
staleArchiveAfterDays: 90,
});
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.orders.json', { table: 'public.orders' });
await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.customers.json', { table: 'public.customers' });
await writeText(
workdir,
'knowledge/global/historic-sql/old-order-lifecycle.md',
[
'---',
YAML.stringify({
summary: 'Old order lifecycle page',
tags: ['historic-sql', 'pattern'],
refs: [],
sl_refs: ['orders'],
usage_mode: 'auto',
source: 'historic-sql',
tables: ['public.orders', 'public.customers'],
fingerprints: ['pg:1'],
}).trimEnd(),
'---',
'',
'Old body',
'',
].join('\n'),
);
await writeText(
workdir,
'knowledge/global/historic-sql/retired-pattern.md',
[
'---',
YAML.stringify({
summary: 'Retired pattern',
tags: ['historic-sql', 'pattern'],
refs: [],
sl_refs: [],
usage_mode: 'auto',
source: 'historic-sql',
tables: ['public.tickets'],
fingerprints: ['pg:9'],
}).trimEnd(),
'---',
'',
'Retired body',
'',
].join('\n'),
);
await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/pattern.json', {
kind: 'pattern',
connectionId: 'warehouse',
rawPath: 'patterns-input.json',
pattern: {
slug: 'order-lifecycle-analysis',
title: 'Order Lifecycle Analysis',
narrative: 'Analysts compare order status with customer segment.',
definitionSql: 'select * from public.orders join public.customers on customers.id = orders.customer_id',
tablesInvolved: ['public.orders', 'public.customers'],
slRefs: ['orders', 'customers'],
constituentTemplateIds: ['pg:1', 'pg:2'],
},
});
const result = await projectHistoricSqlEvidence({ workdir, connectionId: 'warehouse', syncId: 'sync-1', runId: 'run-1' });
expect(result.patternPagesWritten).toBe(1);
await expect(readFile(join(workdir, 'knowledge/global/historic-sql/old-order-lifecycle.md'), 'utf-8')).resolves.toContain(
'Order Lifecycle Analysis',
);
await expect(readFile(join(workdir, 'knowledge/global/historic-sql/retired-pattern.md'), 'utf-8')).resolves.toContain(
'stale_since: "2026-05-11T00:00:00.000Z"',
);
});
});

View file

@ -0,0 +1,328 @@
import { access, mkdir, readdir, readFile, rename, rm, writeFile } from 'node:fs/promises';
import { dirname, join, relative } from 'node:path';
import YAML from 'yaml';
import { rawSourcesDirForSync } from '../../raw-sources-paths.js';
import { mergeUsagePreservingExternal } from '../live-database/manifest.js';
import { historicSqlEvidenceEnvelopeSchema, type HistoricSqlEvidenceEnvelope } from './evidence.js';
import type { TableUsageOutput } from './skill-schemas.js';
import { stagedManifestSchema } from './types.js';
export interface HistoricSqlProjectionInput {
workdir: string;
connectionId: string;
syncId: string;
runId: string;
}
export interface HistoricSqlProjectionResult {
tableUsageMerged: number;
staleTablesMarked: number;
patternPagesWritten: number;
stalePatternPagesMarked: number;
archivedPatternPages: number;
legacyPagesDeleted: number;
touchedSources: Array<{ connectionId: string; sourceName: string }>;
warnings: string[];
}
interface ManifestShard {
tables?: Record<string, { table?: string; usage?: Record<string, unknown>; columns?: unknown[]; [key: string]: unknown }>;
}
interface HistoricSqlPatternPage {
key: string;
path: string;
frontmatter: Record<string, unknown>;
content: string;
}
function safeKnowledgeSlug(value: string): string {
return value.toLowerCase().replace(/[^a-z0-9/-]+/g, '-').replace(/^-+|-+$/g, '');
}
async function pathExists(path: string): Promise<boolean> {
try {
await access(path);
return true;
} catch {
return false;
}
}
async function walkFiles(root: string): Promise<string[]> {
if (!(await pathExists(root))) return [];
const result: string[] = [];
async function visit(dir: string): Promise<void> {
const entries = await readdir(dir, { withFileTypes: true });
for (const entry of entries) {
const absolute = join(dir, entry.name);
if (entry.isDirectory()) {
await visit(absolute);
} else if (entry.isFile()) {
result.push(relative(root, absolute).replace(/\\/g, '/'));
}
}
}
await visit(root);
return result.sort();
}
async function readJson(path: string): Promise<unknown> {
return JSON.parse(await readFile(path, 'utf-8')) as unknown;
}
async function writeYamlAtomic(path: string, value: unknown): Promise<void> {
await mkdir(dirname(path), { recursive: true });
const tmp = `${path}.tmp`;
await writeFile(tmp, YAML.stringify(value, { indent: 2, lineWidth: 0 }), 'utf-8');
await rename(tmp, path);
}
function tableSourceName(tableRef: string): string {
return tableRef.split('.').filter(Boolean).at(-1) ?? tableRef;
}
function staleUsage(fetchedAt: string) {
return {
narrative: 'No recent historic SQL usage was observed in the latest snapshot.',
frequencyTier: 'unused' as const,
commonFilters: [],
commonGroupBys: [],
commonJoins: [],
staleSince: fetchedAt,
};
}
async function loadEvidence(workdir: string, runId: string): Promise<HistoricSqlEvidenceEnvelope[]> {
const root = join(workdir, '.ktx/ingest-evidence/historic-sql', runId);
const files = await walkFiles(root);
const evidence: HistoricSqlEvidenceEnvelope[] = [];
for (const file of files.filter((candidate) => candidate.endsWith('.json'))) {
evidence.push(historicSqlEvidenceEnvelopeSchema.parse(await readJson(join(root, file))));
}
return evidence;
}
function renderPatternMarkdown(pattern: HistoricSqlEvidenceEnvelope & { kind: 'pattern' }): string {
return [
`# ${pattern.pattern.title}`,
'',
pattern.pattern.narrative,
'',
'## Representative SQL',
'',
'```sql',
pattern.pattern.definitionSql,
'```',
'',
'## Tables',
'',
...pattern.pattern.tablesInvolved.map((table) => `- ${table}`),
'',
'## Constituent Templates',
'',
...pattern.pattern.constituentTemplateIds.map((id) => `- ${id}`),
'',
].join('\n');
}
function overlapRatio(left: string[], right: string[]): number {
const rightSet = new Set(right);
const intersection = left.filter((value) => rightSet.has(value)).length;
return left.length === 0 ? 0 : intersection / left.length;
}
function parseMarkdownPage(key: string, path: string, raw: string): HistoricSqlPatternPage | null {
const match = raw.match(/^---\n([\s\S]*?)\n---\n?([\s\S]*)$/);
if (!match) return null;
return {
key,
path,
frontmatter: (YAML.parse(match[1] ?? '') ?? {}) as Record<string, unknown>,
content: match[2] ?? '',
};
}
function isHistoricPatternPage(page: HistoricSqlPatternPage): boolean {
const tags = Array.isArray(page.frontmatter.tags) ? page.frontmatter.tags : [];
return (
page.frontmatter.source === 'historic-sql' &&
tags.includes('historic-sql') &&
tags.includes('pattern')
);
}
function isLegacyQueryPage(page: HistoricSqlPatternPage): boolean {
const tags = Array.isArray(page.frontmatter.tags) ? page.frontmatter.tags : [];
return page.frontmatter.source === 'historic-sql' && tags.includes('query-pattern') && !tags.includes('pattern');
}
function stringArray(value: unknown): string[] {
return Array.isArray(value) ? value.filter((entry): entry is string => typeof entry === 'string') : [];
}
function renderMarkdownPage(frontmatter: Record<string, unknown>, content: string): string {
let yaml = YAML.stringify(frontmatter, { indent: 2, lineWidth: 0 }).trimEnd();
const staleSince = frontmatter.stale_since;
if (typeof staleSince === 'string') {
yaml = yaml.replace(`stale_since: ${staleSince}`, `stale_since: "${staleSince}"`);
}
return `---\n${yaml}\n---\n\n${content.trim()}\n`;
}
function existingPageSignals(page: HistoricSqlPatternPage): string[] {
return [...stringArray(page.frontmatter.tables), ...stringArray(page.frontmatter.fingerprints)];
}
function shouldArchive(staleSince: unknown, fetchedAt: string, days: number): boolean {
if (typeof staleSince !== 'string') return false;
const staleTime = Date.parse(staleSince);
const fetchedTime = Date.parse(fetchedAt);
if (!Number.isFinite(staleTime) || !Number.isFinite(fetchedTime)) return false;
return fetchedTime - staleTime > days * 24 * 60 * 60 * 1000;
}
async function loadPatternPages(root: string): Promise<HistoricSqlPatternPage[]> {
const files = await walkFiles(root);
const pages: HistoricSqlPatternPage[] = [];
for (const file of files.filter((candidate) => candidate.endsWith('.md'))) {
const key = file.replace(/\.md$/, '');
const path = join(root, file);
const page = parseMarkdownPage(key, path, await readFile(path, 'utf-8'));
if (page) {
pages.push(page);
}
}
return pages;
}
async function currentStagedTables(rawDir: string): Promise<Set<string>> {
const tablesRoot = join(rawDir, 'tables');
const files = await walkFiles(tablesRoot);
const tables = new Set<string>();
for (const file of files.filter((candidate) => candidate.endsWith('.json'))) {
const value = await readJson(join(tablesRoot, file));
if (typeof value === 'object' && value !== null && 'table' in value && typeof value.table === 'string') {
tables.add(value.table);
}
}
return tables;
}
export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInput): Promise<HistoricSqlProjectionResult> {
const result: HistoricSqlProjectionResult = {
tableUsageMerged: 0,
staleTablesMarked: 0,
patternPagesWritten: 0,
stalePatternPagesMarked: 0,
archivedPatternPages: 0,
legacyPagesDeleted: 0,
touchedSources: [],
warnings: [],
};
const touchedKeys = new Set<string>();
const rawDir = join(input.workdir, rawSourcesDirForSync(input.connectionId, 'historic-sql', input.syncId));
const manifest = stagedManifestSchema.parse(await readJson(join(rawDir, 'manifest.json')));
const currentTables = await currentStagedTables(rawDir);
const evidence = await loadEvidence(input.workdir, input.runId);
const tableEvidence = evidence.filter((entry): entry is HistoricSqlEvidenceEnvelope & { kind: 'table_usage' } => entry.kind === 'table_usage');
const patternEvidence = evidence.filter((entry): entry is HistoricSqlEvidenceEnvelope & { kind: 'pattern' } => entry.kind === 'pattern');
const schemaRoot = join(input.workdir, 'semantic-layer', input.connectionId, '_schema');
for (const file of (await walkFiles(schemaRoot)).filter((candidate) => candidate.endsWith('.yaml') || candidate.endsWith('.yml'))) {
const path = join(schemaRoot, file);
const before = await readFile(path, 'utf-8');
const shard = (YAML.parse(before) ?? {}) as ManifestShard;
if (!shard.tables) continue;
for (const [tableName, entry] of Object.entries(shard.tables)) {
const tableRef = entry.table ?? tableName;
const matchingEvidence = tableEvidence.find(
(candidate) => candidate.table === tableRef || tableSourceName(candidate.table) === tableName,
);
if (matchingEvidence) {
const merged = mergeUsagePreservingExternal(entry.usage as TableUsageOutput | undefined, matchingEvidence.usage);
if (JSON.stringify(entry.usage ?? null) !== JSON.stringify(merged ?? null)) {
entry.usage = merged as Record<string, unknown>;
result.tableUsageMerged += 1;
const sourceName = tableSourceName(matchingEvidence.table);
const key = `${input.connectionId}:${sourceName}`;
if (!touchedKeys.has(key)) {
touchedKeys.add(key);
result.touchedSources.push({ connectionId: input.connectionId, sourceName });
}
}
} else if (entry.usage && !currentTables.has(tableRef)) {
const merged = mergeUsagePreservingExternal(entry.usage as TableUsageOutput | undefined, staleUsage(manifest.fetchedAt));
if (JSON.stringify(entry.usage ?? null) !== JSON.stringify(merged ?? null)) {
entry.usage = merged as Record<string, unknown>;
result.staleTablesMarked += 1;
const sourceName = tableSourceName(tableRef);
const key = `${input.connectionId}:${sourceName}`;
if (!touchedKeys.has(key)) {
touchedKeys.add(key);
result.touchedSources.push({ connectionId: input.connectionId, sourceName });
}
}
}
}
const after = YAML.stringify(shard, { indent: 2, lineWidth: 0 });
if (after !== before) {
await writeYamlAtomic(path, shard);
}
}
const wikiRoot = join(input.workdir, 'knowledge/global/historic-sql');
await mkdir(wikiRoot, { recursive: true });
const allPages = await loadPatternPages(wikiRoot);
const patternPages = allPages.filter(isHistoricPatternPage);
const writtenKeys = new Set<string>();
for (const pattern of patternEvidence) {
const incomingSignals = [...pattern.pattern.tablesInvolved, ...pattern.pattern.constituentTemplateIds];
const reusable = patternPages.find((page) => overlapRatio(incomingSignals, existingPageSignals(page)) >= 0.6);
const key = reusable?.key ?? safeKnowledgeSlug(pattern.pattern.slug);
const pagePath = join(wikiRoot, `${key}.md`);
const frontmatter = {
summary: pattern.pattern.title,
tags: ['historic-sql', 'pattern'],
refs: [],
sl_refs: pattern.pattern.slRefs,
usage_mode: 'auto',
source: 'historic-sql',
tables: pattern.pattern.tablesInvolved,
representative_sql: pattern.pattern.definitionSql,
fingerprints: pattern.pattern.constituentTemplateIds,
};
await mkdir(dirname(pagePath), { recursive: true });
await writeFile(pagePath, renderMarkdownPage(frontmatter, renderPatternMarkdown(pattern)), 'utf-8');
writtenKeys.add(key);
result.patternPagesWritten += 1;
}
for (const page of patternPages) {
if (writtenKeys.has(page.key)) continue;
if (shouldArchive(page.frontmatter.stale_since, manifest.fetchedAt, manifest.staleArchiveAfterDays)) {
const archivePath = join(wikiRoot, '_archived', `${page.key}.md`);
const tags = [...new Set([...stringArray(page.frontmatter.tags), 'archived'])];
await mkdir(dirname(archivePath), { recursive: true });
await writeFile(archivePath, renderMarkdownPage({ ...page.frontmatter, tags }, page.content), 'utf-8');
await rm(page.path, { force: true });
result.archivedPatternPages += 1;
continue;
}
const tags = [...new Set([...stringArray(page.frontmatter.tags), 'stale'])];
await writeFile(
page.path,
renderMarkdownPage({ ...page.frontmatter, tags, stale_since: manifest.fetchedAt }, page.content),
'utf-8',
);
result.stalePatternPagesMarked += 1;
}
for (const page of allPages.filter(isLegacyQueryPage)) {
await rm(page.path, { force: true });
result.legacyPagesDeleted += 1;
}
return result;
}

View file

@ -124,6 +124,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => {
parseFailures: 1,
warnings: ['parse_failed:bad-parse'],
probeWarnings: ['pg_stat_statements.max is low; aggregation still proceeds'],
staleArchiveAfterDays: 90,
});
const orders = await readJson<Record<string, any>>(stagedDir, 'tables/public.orders.json');

View file

@ -277,5 +277,6 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql
parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length,
warnings,
probeWarnings: probe.warnings,
staleArchiveAfterDays: config.staleArchiveAfterDays,
});
}

View file

@ -91,7 +91,8 @@ describe('historic-sql unified contracts', () => {
parseFailures: 1,
warnings: ['parse_failed:bad'],
probeWarnings: [],
}).parseFailures,
).toBe(1);
staleArchiveAfterDays: 90,
}).staleArchiveAfterDays,
).toBe(90);
});
});

View file

@ -125,6 +125,7 @@ export const stagedManifestSchema = z.object({
parseFailures: z.number().int().nonnegative(),
warnings: z.array(z.string()),
probeWarnings: z.array(z.string()),
staleArchiveAfterDays: z.number().int().positive().default(90),
});
export type StagedManifest = z.infer<typeof stagedManifestSchema>;

View file

@ -347,6 +347,8 @@ export type {
HistoricSqlTableUsageEvidence,
} from './adapters/historic-sql/evidence.js';
export { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js';
export { projectHistoricSqlEvidence } from './adapters/historic-sql/projection.js';
export type { HistoricSqlProjectionInput, HistoricSqlProjectionResult } from './adapters/historic-sql/projection.js';
export {
patternOutputSchema,
patternsArraySchema,

View file

@ -24,6 +24,7 @@ export interface WikiFrontmatter {
representative_sql?: string;
usage?: HistoricSqlWikiUsageFrontmatter;
fingerprints?: string[];
stale_since?: string;
}
export interface WikiPage {