From fb541cb9e5527f582e695fc4d9ef0c4b171fbc58 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 18:53:45 +0200 Subject: [PATCH] feat: project historic sql evidence --- .../adapters/historic-sql/projection.test.ts | 174 ++++++++++ .../adapters/historic-sql/projection.ts | 328 ++++++++++++++++++ .../historic-sql/stage-unified.test.ts | 1 + .../adapters/historic-sql/stage-unified.ts | 1 + .../adapters/historic-sql/types.test.ts | 5 +- .../src/ingest/adapters/historic-sql/types.ts | 1 + packages/context/src/ingest/index.ts | 2 + packages/context/src/wiki/types.ts | 1 + 8 files changed, 511 insertions(+), 2 deletions(-) create mode 100644 packages/context/src/ingest/adapters/historic-sql/projection.test.ts create mode 100644 packages/context/src/ingest/adapters/historic-sql/projection.ts diff --git a/packages/context/src/ingest/adapters/historic-sql/projection.test.ts b/packages/context/src/ingest/adapters/historic-sql/projection.test.ts new file mode 100644 index 00000000..abe93170 --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/projection.test.ts @@ -0,0 +1,174 @@ +import { mkdir, mkdtemp, readFile, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import YAML from 'yaml'; +import { describe, expect, it } from 'vitest'; +import { projectHistoricSqlEvidence } from './projection.js'; + +async function tempWorkdir(): Promise { + return mkdtemp(join(tmpdir(), 'historic-sql-projection-')); +} + +async function writeText(root: string, relPath: string, content: string): Promise { + const target = join(root, relPath); + await mkdir(join(target, '..'), { recursive: true }); + await writeFile(target, content, 'utf-8'); +} + +async function writeJson(root: string, relPath: string, value: unknown): Promise { + await writeText(root, relPath, `${JSON.stringify(value, null, 2)}\n`); +} + +describe('projectHistoricSqlEvidence', () => { + it('merges table usage into matching _schema shards and preserves external usage keys', async () => { + const workdir = await tempWorkdir(); + await writeText( + workdir, + 'semantic-layer/warehouse/_schema/public.yaml', + YAML.stringify({ + tables: { + orders: { + table: 'public.orders', + usage: { + narrative: 'Old generated usage.', + frequencyTier: 'low', + commonFilters: ['old_status'], + commonJoins: [], + ownerNote: 'keep me', + }, + columns: [{ name: 'id', type: 'string' }], + }, + }, + }), + ); + await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/manifest.json', { + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + fetchedAt: '2026-05-11T00:00:00.000Z', + windowStart: '2026-02-10T00:00:00.000Z', + windowEnd: '2026-05-11T00:00:00.000Z', + snapshotRowCount: 1, + touchedTableCount: 1, + parseFailures: 0, + warnings: [], + probeWarnings: [], + staleArchiveAfterDays: 90, + }); + await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.orders.json', { table: 'public.orders' }); + await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/orders.json', { + kind: 'table_usage', + connectionId: 'warehouse', + table: 'public.orders', + rawPath: 'tables/public.orders.json', + usage: { + narrative: 'Orders are repeatedly queried for lifecycle analysis.', + frequencyTier: 'high', + commonFilters: ['status', 'created_at'], + commonGroupBys: ['status'], + commonJoins: [{ table: 'public.customers', on: ['customer_id'] }], + staleSince: null, + }, + }); + + const result = await projectHistoricSqlEvidence({ workdir, connectionId: 'warehouse', syncId: 'sync-1', runId: 'run-1' }); + + expect(result.touchedSources).toEqual([{ connectionId: 'warehouse', sourceName: 'orders' }]); + const shard = YAML.parse(await readFile(join(workdir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8')); + expect(shard.tables.orders.usage).toEqual({ + ownerNote: 'keep me', + narrative: 'Orders are repeatedly queried for lifecycle analysis.', + frequencyTier: 'high', + commonFilters: ['status', 'created_at'], + commonGroupBys: ['status'], + commonJoins: [{ table: 'public.customers', on: ['customer_id'] }], + staleSince: null, + }); + }); + + it('writes pattern pages, reuses similar slugs, and marks missing old pattern pages stale', async () => { + const workdir = await tempWorkdir(); + await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/manifest.json', { + source: 'historic-sql', + connectionId: 'warehouse', + dialect: 'postgres', + fetchedAt: '2026-05-11T00:00:00.000Z', + windowStart: '2026-02-10T00:00:00.000Z', + windowEnd: '2026-05-11T00:00:00.000Z', + snapshotRowCount: 2, + touchedTableCount: 2, + parseFailures: 0, + warnings: [], + probeWarnings: [], + staleArchiveAfterDays: 90, + }); + await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.orders.json', { table: 'public.orders' }); + await writeJson(workdir, 'raw-sources/warehouse/historic-sql/sync-1/tables/public.customers.json', { table: 'public.customers' }); + await writeText( + workdir, + 'knowledge/global/historic-sql/old-order-lifecycle.md', + [ + '---', + YAML.stringify({ + summary: 'Old order lifecycle page', + tags: ['historic-sql', 'pattern'], + refs: [], + sl_refs: ['orders'], + usage_mode: 'auto', + source: 'historic-sql', + tables: ['public.orders', 'public.customers'], + fingerprints: ['pg:1'], + }).trimEnd(), + '---', + '', + 'Old body', + '', + ].join('\n'), + ); + await writeText( + workdir, + 'knowledge/global/historic-sql/retired-pattern.md', + [ + '---', + YAML.stringify({ + summary: 'Retired pattern', + tags: ['historic-sql', 'pattern'], + refs: [], + sl_refs: [], + usage_mode: 'auto', + source: 'historic-sql', + tables: ['public.tickets'], + fingerprints: ['pg:9'], + }).trimEnd(), + '---', + '', + 'Retired body', + '', + ].join('\n'), + ); + await writeJson(workdir, '.ktx/ingest-evidence/historic-sql/run-1/pattern.json', { + kind: 'pattern', + connectionId: 'warehouse', + rawPath: 'patterns-input.json', + pattern: { + slug: 'order-lifecycle-analysis', + title: 'Order Lifecycle Analysis', + narrative: 'Analysts compare order status with customer segment.', + definitionSql: 'select * from public.orders join public.customers on customers.id = orders.customer_id', + tablesInvolved: ['public.orders', 'public.customers'], + slRefs: ['orders', 'customers'], + constituentTemplateIds: ['pg:1', 'pg:2'], + }, + }); + + const result = await projectHistoricSqlEvidence({ workdir, connectionId: 'warehouse', syncId: 'sync-1', runId: 'run-1' }); + + expect(result.patternPagesWritten).toBe(1); + await expect(readFile(join(workdir, 'knowledge/global/historic-sql/old-order-lifecycle.md'), 'utf-8')).resolves.toContain( + 'Order Lifecycle Analysis', + ); + await expect(readFile(join(workdir, 'knowledge/global/historic-sql/retired-pattern.md'), 'utf-8')).resolves.toContain( + 'stale_since: "2026-05-11T00:00:00.000Z"', + ); + }); +}); diff --git a/packages/context/src/ingest/adapters/historic-sql/projection.ts b/packages/context/src/ingest/adapters/historic-sql/projection.ts new file mode 100644 index 00000000..37ea3dd4 --- /dev/null +++ b/packages/context/src/ingest/adapters/historic-sql/projection.ts @@ -0,0 +1,328 @@ +import { access, mkdir, readdir, readFile, rename, rm, writeFile } from 'node:fs/promises'; +import { dirname, join, relative } from 'node:path'; +import YAML from 'yaml'; +import { rawSourcesDirForSync } from '../../raw-sources-paths.js'; +import { mergeUsagePreservingExternal } from '../live-database/manifest.js'; +import { historicSqlEvidenceEnvelopeSchema, type HistoricSqlEvidenceEnvelope } from './evidence.js'; +import type { TableUsageOutput } from './skill-schemas.js'; +import { stagedManifestSchema } from './types.js'; + +export interface HistoricSqlProjectionInput { + workdir: string; + connectionId: string; + syncId: string; + runId: string; +} + +export interface HistoricSqlProjectionResult { + tableUsageMerged: number; + staleTablesMarked: number; + patternPagesWritten: number; + stalePatternPagesMarked: number; + archivedPatternPages: number; + legacyPagesDeleted: number; + touchedSources: Array<{ connectionId: string; sourceName: string }>; + warnings: string[]; +} + +interface ManifestShard { + tables?: Record; columns?: unknown[]; [key: string]: unknown }>; +} + +interface HistoricSqlPatternPage { + key: string; + path: string; + frontmatter: Record; + content: string; +} + +function safeKnowledgeSlug(value: string): string { + return value.toLowerCase().replace(/[^a-z0-9/-]+/g, '-').replace(/^-+|-+$/g, ''); +} + +async function pathExists(path: string): Promise { + try { + await access(path); + return true; + } catch { + return false; + } +} + +async function walkFiles(root: string): Promise { + if (!(await pathExists(root))) return []; + const result: string[] = []; + async function visit(dir: string): Promise { + const entries = await readdir(dir, { withFileTypes: true }); + for (const entry of entries) { + const absolute = join(dir, entry.name); + if (entry.isDirectory()) { + await visit(absolute); + } else if (entry.isFile()) { + result.push(relative(root, absolute).replace(/\\/g, '/')); + } + } + } + await visit(root); + return result.sort(); +} + +async function readJson(path: string): Promise { + return JSON.parse(await readFile(path, 'utf-8')) as unknown; +} + +async function writeYamlAtomic(path: string, value: unknown): Promise { + await mkdir(dirname(path), { recursive: true }); + const tmp = `${path}.tmp`; + await writeFile(tmp, YAML.stringify(value, { indent: 2, lineWidth: 0 }), 'utf-8'); + await rename(tmp, path); +} + +function tableSourceName(tableRef: string): string { + return tableRef.split('.').filter(Boolean).at(-1) ?? tableRef; +} + +function staleUsage(fetchedAt: string) { + return { + narrative: 'No recent historic SQL usage was observed in the latest snapshot.', + frequencyTier: 'unused' as const, + commonFilters: [], + commonGroupBys: [], + commonJoins: [], + staleSince: fetchedAt, + }; +} + +async function loadEvidence(workdir: string, runId: string): Promise { + const root = join(workdir, '.ktx/ingest-evidence/historic-sql', runId); + const files = await walkFiles(root); + const evidence: HistoricSqlEvidenceEnvelope[] = []; + for (const file of files.filter((candidate) => candidate.endsWith('.json'))) { + evidence.push(historicSqlEvidenceEnvelopeSchema.parse(await readJson(join(root, file)))); + } + return evidence; +} + +function renderPatternMarkdown(pattern: HistoricSqlEvidenceEnvelope & { kind: 'pattern' }): string { + return [ + `# ${pattern.pattern.title}`, + '', + pattern.pattern.narrative, + '', + '## Representative SQL', + '', + '```sql', + pattern.pattern.definitionSql, + '```', + '', + '## Tables', + '', + ...pattern.pattern.tablesInvolved.map((table) => `- ${table}`), + '', + '## Constituent Templates', + '', + ...pattern.pattern.constituentTemplateIds.map((id) => `- ${id}`), + '', + ].join('\n'); +} + +function overlapRatio(left: string[], right: string[]): number { + const rightSet = new Set(right); + const intersection = left.filter((value) => rightSet.has(value)).length; + return left.length === 0 ? 0 : intersection / left.length; +} + +function parseMarkdownPage(key: string, path: string, raw: string): HistoricSqlPatternPage | null { + const match = raw.match(/^---\n([\s\S]*?)\n---\n?([\s\S]*)$/); + if (!match) return null; + return { + key, + path, + frontmatter: (YAML.parse(match[1] ?? '') ?? {}) as Record, + content: match[2] ?? '', + }; +} + +function isHistoricPatternPage(page: HistoricSqlPatternPage): boolean { + const tags = Array.isArray(page.frontmatter.tags) ? page.frontmatter.tags : []; + return ( + page.frontmatter.source === 'historic-sql' && + tags.includes('historic-sql') && + tags.includes('pattern') + ); +} + +function isLegacyQueryPage(page: HistoricSqlPatternPage): boolean { + const tags = Array.isArray(page.frontmatter.tags) ? page.frontmatter.tags : []; + return page.frontmatter.source === 'historic-sql' && tags.includes('query-pattern') && !tags.includes('pattern'); +} + +function stringArray(value: unknown): string[] { + return Array.isArray(value) ? value.filter((entry): entry is string => typeof entry === 'string') : []; +} + +function renderMarkdownPage(frontmatter: Record, content: string): string { + let yaml = YAML.stringify(frontmatter, { indent: 2, lineWidth: 0 }).trimEnd(); + const staleSince = frontmatter.stale_since; + if (typeof staleSince === 'string') { + yaml = yaml.replace(`stale_since: ${staleSince}`, `stale_since: "${staleSince}"`); + } + return `---\n${yaml}\n---\n\n${content.trim()}\n`; +} + +function existingPageSignals(page: HistoricSqlPatternPage): string[] { + return [...stringArray(page.frontmatter.tables), ...stringArray(page.frontmatter.fingerprints)]; +} + +function shouldArchive(staleSince: unknown, fetchedAt: string, days: number): boolean { + if (typeof staleSince !== 'string') return false; + const staleTime = Date.parse(staleSince); + const fetchedTime = Date.parse(fetchedAt); + if (!Number.isFinite(staleTime) || !Number.isFinite(fetchedTime)) return false; + return fetchedTime - staleTime > days * 24 * 60 * 60 * 1000; +} + +async function loadPatternPages(root: string): Promise { + const files = await walkFiles(root); + const pages: HistoricSqlPatternPage[] = []; + for (const file of files.filter((candidate) => candidate.endsWith('.md'))) { + const key = file.replace(/\.md$/, ''); + const path = join(root, file); + const page = parseMarkdownPage(key, path, await readFile(path, 'utf-8')); + if (page) { + pages.push(page); + } + } + return pages; +} + +async function currentStagedTables(rawDir: string): Promise> { + const tablesRoot = join(rawDir, 'tables'); + const files = await walkFiles(tablesRoot); + const tables = new Set(); + for (const file of files.filter((candidate) => candidate.endsWith('.json'))) { + const value = await readJson(join(tablesRoot, file)); + if (typeof value === 'object' && value !== null && 'table' in value && typeof value.table === 'string') { + tables.add(value.table); + } + } + return tables; +} + +export async function projectHistoricSqlEvidence(input: HistoricSqlProjectionInput): Promise { + const result: HistoricSqlProjectionResult = { + tableUsageMerged: 0, + staleTablesMarked: 0, + patternPagesWritten: 0, + stalePatternPagesMarked: 0, + archivedPatternPages: 0, + legacyPagesDeleted: 0, + touchedSources: [], + warnings: [], + }; + const touchedKeys = new Set(); + const rawDir = join(input.workdir, rawSourcesDirForSync(input.connectionId, 'historic-sql', input.syncId)); + const manifest = stagedManifestSchema.parse(await readJson(join(rawDir, 'manifest.json'))); + const currentTables = await currentStagedTables(rawDir); + const evidence = await loadEvidence(input.workdir, input.runId); + const tableEvidence = evidence.filter((entry): entry is HistoricSqlEvidenceEnvelope & { kind: 'table_usage' } => entry.kind === 'table_usage'); + const patternEvidence = evidence.filter((entry): entry is HistoricSqlEvidenceEnvelope & { kind: 'pattern' } => entry.kind === 'pattern'); + + const schemaRoot = join(input.workdir, 'semantic-layer', input.connectionId, '_schema'); + for (const file of (await walkFiles(schemaRoot)).filter((candidate) => candidate.endsWith('.yaml') || candidate.endsWith('.yml'))) { + const path = join(schemaRoot, file); + const before = await readFile(path, 'utf-8'); + const shard = (YAML.parse(before) ?? {}) as ManifestShard; + if (!shard.tables) continue; + for (const [tableName, entry] of Object.entries(shard.tables)) { + const tableRef = entry.table ?? tableName; + const matchingEvidence = tableEvidence.find( + (candidate) => candidate.table === tableRef || tableSourceName(candidate.table) === tableName, + ); + if (matchingEvidence) { + const merged = mergeUsagePreservingExternal(entry.usage as TableUsageOutput | undefined, matchingEvidence.usage); + if (JSON.stringify(entry.usage ?? null) !== JSON.stringify(merged ?? null)) { + entry.usage = merged as Record; + result.tableUsageMerged += 1; + const sourceName = tableSourceName(matchingEvidence.table); + const key = `${input.connectionId}:${sourceName}`; + if (!touchedKeys.has(key)) { + touchedKeys.add(key); + result.touchedSources.push({ connectionId: input.connectionId, sourceName }); + } + } + } else if (entry.usage && !currentTables.has(tableRef)) { + const merged = mergeUsagePreservingExternal(entry.usage as TableUsageOutput | undefined, staleUsage(manifest.fetchedAt)); + if (JSON.stringify(entry.usage ?? null) !== JSON.stringify(merged ?? null)) { + entry.usage = merged as Record; + result.staleTablesMarked += 1; + const sourceName = tableSourceName(tableRef); + const key = `${input.connectionId}:${sourceName}`; + if (!touchedKeys.has(key)) { + touchedKeys.add(key); + result.touchedSources.push({ connectionId: input.connectionId, sourceName }); + } + } + } + } + const after = YAML.stringify(shard, { indent: 2, lineWidth: 0 }); + if (after !== before) { + await writeYamlAtomic(path, shard); + } + } + + const wikiRoot = join(input.workdir, 'knowledge/global/historic-sql'); + await mkdir(wikiRoot, { recursive: true }); + const allPages = await loadPatternPages(wikiRoot); + const patternPages = allPages.filter(isHistoricPatternPage); + const writtenKeys = new Set(); + + for (const pattern of patternEvidence) { + const incomingSignals = [...pattern.pattern.tablesInvolved, ...pattern.pattern.constituentTemplateIds]; + const reusable = patternPages.find((page) => overlapRatio(incomingSignals, existingPageSignals(page)) >= 0.6); + const key = reusable?.key ?? safeKnowledgeSlug(pattern.pattern.slug); + const pagePath = join(wikiRoot, `${key}.md`); + const frontmatter = { + summary: pattern.pattern.title, + tags: ['historic-sql', 'pattern'], + refs: [], + sl_refs: pattern.pattern.slRefs, + usage_mode: 'auto', + source: 'historic-sql', + tables: pattern.pattern.tablesInvolved, + representative_sql: pattern.pattern.definitionSql, + fingerprints: pattern.pattern.constituentTemplateIds, + }; + await mkdir(dirname(pagePath), { recursive: true }); + await writeFile(pagePath, renderMarkdownPage(frontmatter, renderPatternMarkdown(pattern)), 'utf-8'); + writtenKeys.add(key); + result.patternPagesWritten += 1; + } + + for (const page of patternPages) { + if (writtenKeys.has(page.key)) continue; + if (shouldArchive(page.frontmatter.stale_since, manifest.fetchedAt, manifest.staleArchiveAfterDays)) { + const archivePath = join(wikiRoot, '_archived', `${page.key}.md`); + const tags = [...new Set([...stringArray(page.frontmatter.tags), 'archived'])]; + await mkdir(dirname(archivePath), { recursive: true }); + await writeFile(archivePath, renderMarkdownPage({ ...page.frontmatter, tags }, page.content), 'utf-8'); + await rm(page.path, { force: true }); + result.archivedPatternPages += 1; + continue; + } + const tags = [...new Set([...stringArray(page.frontmatter.tags), 'stale'])]; + await writeFile( + page.path, + renderMarkdownPage({ ...page.frontmatter, tags, stale_since: manifest.fetchedAt }, page.content), + 'utf-8', + ); + result.stalePatternPagesMarked += 1; + } + + for (const page of allPages.filter(isLegacyQueryPage)) { + await rm(page.path, { force: true }); + result.legacyPagesDeleted += 1; + } + + return result; +} diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts b/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts index ad11bace..f9d60fce 100644 --- a/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/stage-unified.test.ts @@ -124,6 +124,7 @@ describe('stageHistoricSqlAggregatedSnapshot', () => { parseFailures: 1, warnings: ['parse_failed:bad-parse'], probeWarnings: ['pg_stat_statements.max is low; aggregation still proceeds'], + staleArchiveAfterDays: 90, }); const orders = await readJson>(stagedDir, 'tables/public.orders.json'); diff --git a/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts b/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts index c07fb10d..406c3016 100644 --- a/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts +++ b/packages/context/src/ingest/adapters/historic-sql/stage-unified.ts @@ -277,5 +277,6 @@ export async function stageHistoricSqlAggregatedSnapshot(input: StageHistoricSql parseFailures: warnings.filter((warning) => warning.startsWith('parse_failed:')).length, warnings, probeWarnings: probe.warnings, + staleArchiveAfterDays: config.staleArchiveAfterDays, }); } diff --git a/packages/context/src/ingest/adapters/historic-sql/types.test.ts b/packages/context/src/ingest/adapters/historic-sql/types.test.ts index 9d5aeea8..076e5d8e 100644 --- a/packages/context/src/ingest/adapters/historic-sql/types.test.ts +++ b/packages/context/src/ingest/adapters/historic-sql/types.test.ts @@ -91,7 +91,8 @@ describe('historic-sql unified contracts', () => { parseFailures: 1, warnings: ['parse_failed:bad'], probeWarnings: [], - }).parseFailures, - ).toBe(1); + staleArchiveAfterDays: 90, + }).staleArchiveAfterDays, + ).toBe(90); }); }); diff --git a/packages/context/src/ingest/adapters/historic-sql/types.ts b/packages/context/src/ingest/adapters/historic-sql/types.ts index 8dec732a..6ed75459 100644 --- a/packages/context/src/ingest/adapters/historic-sql/types.ts +++ b/packages/context/src/ingest/adapters/historic-sql/types.ts @@ -125,6 +125,7 @@ export const stagedManifestSchema = z.object({ parseFailures: z.number().int().nonnegative(), warnings: z.array(z.string()), probeWarnings: z.array(z.string()), + staleArchiveAfterDays: z.number().int().positive().default(90), }); export type StagedManifest = z.infer; diff --git a/packages/context/src/ingest/index.ts b/packages/context/src/ingest/index.ts index e3dc3899..f5a1ab0e 100644 --- a/packages/context/src/ingest/index.ts +++ b/packages/context/src/ingest/index.ts @@ -347,6 +347,8 @@ export type { HistoricSqlTableUsageEvidence, } from './adapters/historic-sql/evidence.js'; export { createEmitHistoricSqlEvidenceTool } from './adapters/historic-sql/evidence-tool.js'; +export { projectHistoricSqlEvidence } from './adapters/historic-sql/projection.js'; +export type { HistoricSqlProjectionInput, HistoricSqlProjectionResult } from './adapters/historic-sql/projection.js'; export { patternOutputSchema, patternsArraySchema, diff --git a/packages/context/src/wiki/types.ts b/packages/context/src/wiki/types.ts index cd11d49b..317b17ab 100644 --- a/packages/context/src/wiki/types.ts +++ b/packages/context/src/wiki/types.ts @@ -24,6 +24,7 @@ export interface WikiFrontmatter { representative_sql?: string; usage?: HistoricSqlWikiUsageFrontmatter; fingerprints?: string[]; + stale_since?: string; } export interface WikiPage {