From 700c0ba5d70cdd1e6fb7ed2c947f5e351981365f Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Thu, 14 May 2026 18:09:16 +0200 Subject: [PATCH] feat(context): add scan-backed entity details service --- .../context/src/scan/entity-details.test.ts | 276 +++++++++++++++ packages/context/src/scan/entity-details.ts | 315 ++++++++++++++++++ packages/context/src/scan/index.ts | 11 + 3 files changed, 602 insertions(+) create mode 100644 packages/context/src/scan/entity-details.test.ts create mode 100644 packages/context/src/scan/entity-details.ts diff --git a/packages/context/src/scan/entity-details.test.ts b/packages/context/src/scan/entity-details.test.ts new file mode 100644 index 00000000..71bcb419 --- /dev/null +++ b/packages/context/src/scan/entity-details.test.ts @@ -0,0 +1,276 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { initKtxProject, type KtxLocalProject } from '../project/index.js'; +import { createKtxEntityDetailsService } from './entity-details.js'; +import type { KtxConnectionDriver, KtxScanReport, KtxSchemaTable } from './types.js'; + +describe('createKtxEntityDetailsService', () => { + let tempDir: string; + let project: KtxLocalProject; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-entity-details-service-')); + project = await initKtxProject({ projectDir: join(tempDir, 'project'), projectName: 'warehouse' }); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + function scanReport(input: { + connectionId: string; + syncId: string; + runId: string; + driver?: KtxConnectionDriver; + createdAt?: string; + }): KtxScanReport { + const rawSourcesDir = `raw-sources/${input.connectionId}/live-database/${input.syncId}`; + return { + connectionId: input.connectionId, + driver: input.driver ?? 'postgres', + syncId: input.syncId, + runId: input.runId, + trigger: 'mcp', + mode: 'structural', + dryRun: false, + artifactPaths: { + rawSourcesDir, + reportPath: `${rawSourcesDir}/scan-report.json`, + manifestShards: [], + enrichmentArtifacts: [], + }, + diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 1 }, + manifestShardsWritten: 0, + structuralSyncStats: { tablesWritten: 1, tablesDeleted: 0, foreignKeysWritten: 0 }, + enrichment: { + dataDictionary: 'skipped', + tableDescriptions: 'skipped', + columnDescriptions: 'skipped', + embeddings: 'skipped', + deterministicRelationships: 'skipped', + llmRelationshipValidation: 'skipped', + statisticalValidation: 'skipped', + }, + capabilityGaps: [], + warnings: [], + relationships: { accepted: 0, review: 0, rejected: 0, skipped: 0 }, + enrichmentState: { resumedStages: [], completedStages: [], failedStages: [] }, + createdAt: input.createdAt ?? '2026-05-14T09:00:00.000Z', + }; + } + + function ordersTable(input: { db?: string | null; estimatedRows?: number | null } = {}): KtxSchemaTable { + return { + catalog: null, + db: input.db ?? 'public', + name: 'orders', + kind: 'table', + comment: 'Customer orders', + estimatedRows: input.estimatedRows ?? 12, + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: 'Order id', + }, + { + name: 'status', + nativeType: 'text', + normalizedType: 'text', + dimensionType: 'string', + nullable: false, + primaryKey: false, + comment: 'Order status', + }, + ], + foreignKeys: [ + { + fromColumn: 'customer_id', + toCatalog: null, + toDb: 'public', + toTable: 'customers', + toColumn: 'id', + constraintName: 'orders_customer_id_fkey', + }, + ], + }; + } + + async function seedScan(input: { + connectionId?: string; + syncId: string; + runId: string; + driver?: KtxConnectionDriver; + extractedAt?: string; + tables?: KtxSchemaTable[]; + }): Promise { + const connectionId = input.connectionId ?? 'warehouse'; + const report = scanReport({ + connectionId, + syncId: input.syncId, + runId: input.runId, + driver: input.driver, + createdAt: input.extractedAt, + }); + const root = report.artifactPaths.rawSourcesDir; + await project.fileStore.writeFile( + `${root}/connection.json`, + JSON.stringify( + { + connectionId, + driver: report.driver, + extractedAt: input.extractedAt ?? report.createdAt, + scope: { schemas: ['public'] }, + }, + null, + 2, + ), + 'ktx', + 'ktx@example.com', + 'seed connection', + ); + for (const table of input.tables ?? [ordersTable()]) { + await project.fileStore.writeFile( + `${root}/tables/${table.db ?? 'default'}-${table.name}.json`, + JSON.stringify(table, null, 2), + 'ktx', + 'ktx@example.com', + `seed ${table.name}`, + ); + } + await project.fileStore.writeFile( + `${root}/scan-report.json`, + JSON.stringify(report, null, 2), + 'ktx', + 'ktx@example.com', + 'seed scan report', + ); + } + + it('returns the latest scan snapshot table details for a display string', async () => { + await seedScan({ syncId: 'sync-1', runId: 'scan-old', extractedAt: '2026-05-14T08:00:00.000Z' }); + await seedScan({ + syncId: 'sync-2', + runId: 'scan-new', + extractedAt: '2026-05-14T09:00:00.000Z', + tables: [ordersTable({ estimatedRows: 99 })], + }); + const service = createKtxEntityDetailsService(project); + + const result = await service.read({ + connectionId: 'warehouse', + entities: [{ table: 'public.orders' }], + }); + + expect(result.results).toHaveLength(1); + expect(result.results[0]).toMatchObject({ + ok: true, + connectionId: 'warehouse', + display: 'public.orders', + estimatedRows: 99, + snapshot: { + syncId: 'sync-2', + scanRunId: 'scan-new', + extractedAt: '2026-05-14T09:00:00.000Z', + }, + columns: [ + { name: 'id', nativeType: 'integer', primaryKey: true }, + { name: 'status', nativeType: 'text', nullable: false }, + ], + }); + }); + + it('filters requested columns while keeping full-table foreign keys', async () => { + await seedScan({ syncId: 'sync-1', runId: 'scan-1' }); + const service = createKtxEntityDetailsService(project); + + const result = await service.read({ + connectionId: 'warehouse', + entities: [{ table: { catalog: null, db: 'public', name: 'orders' }, columns: ['status'] }], + }); + + expect(result.results[0]).toMatchObject({ + ok: true, + columns: [{ name: 'status' }], + foreignKeys: [ + { + fromColumn: 'customer_id', + toDb: 'public', + toTable: 'customers', + toColumn: 'id', + }, + ], + }); + }); + + it('returns a structured missing-scan error', async () => { + const service = createKtxEntityDetailsService(project); + + const result = await service.read({ + connectionId: 'warehouse', + entities: [{ table: 'public.orders' }], + }); + + expect(result.results).toEqual([ + { + ok: false, + connectionId: 'warehouse', + table: 'public.orders', + error: { + code: 'scan_missing', + message: 'No live-database scan found for connection "warehouse"; run `ktx ingest warehouse` or `ktx scan warehouse`.', + }, + }, + ]); + }); + + it('reports ambiguous bare table names across schemas', async () => { + await seedScan({ + syncId: 'sync-1', + runId: 'scan-1', + tables: [ordersTable({ db: 'public' }), ordersTable({ db: 'archive' })], + }); + const service = createKtxEntityDetailsService(project); + + const result = await service.read({ + connectionId: 'warehouse', + entities: [{ table: 'orders' }], + }); + + expect(result.results[0]).toMatchObject({ + ok: false, + error: { + code: 'ambiguous_table', + candidates: [ + { tableRef: { catalog: null, db: 'archive', name: 'orders' }, display: 'archive.orders' }, + { tableRef: { catalog: null, db: 'public', name: 'orders' }, display: 'public.orders' }, + ], + }, + }); + }); + + it('reports missing requested columns with available column candidates', async () => { + await seedScan({ syncId: 'sync-1', runId: 'scan-1' }); + const service = createKtxEntityDetailsService(project); + + const result = await service.read({ + connectionId: 'warehouse', + entities: [{ table: 'public.orders', columns: ['status', 'plan_tier'] }], + }); + + expect(result.results[0]).toMatchObject({ + ok: false, + error: { + code: 'column_not_found', + message: 'Column(s) not found on public.orders: plan_tier', + candidates: ['id', 'status'], + }, + }); + }); +}); diff --git a/packages/context/src/scan/entity-details.ts b/packages/context/src/scan/entity-details.ts new file mode 100644 index 00000000..6e95690e --- /dev/null +++ b/packages/context/src/scan/entity-details.ts @@ -0,0 +1,315 @@ +import type { KtxLocalProject } from '../project/index.js'; +import { readLocalScanStructuralSnapshot } from './local-structural-artifacts.js'; +import type { + KtxConnectionDriver, + KtxScanReport, + KtxSchemaColumn, + KtxSchemaSnapshot, + KtxSchemaTable, + KtxTableRef, +} from './types.js'; + +export type KtxEntityDetailsTableInput = string | KtxTableRef; + +export interface KtxEntityDetailsInput { + connectionId: string; + entities: Array<{ + table: KtxEntityDetailsTableInput; + columns?: string[]; + }>; +} + +export interface KtxEntityDetailsSnapshotInfo { + syncId: string; + extractedAt: string; + scanRunId: string | null; +} + +export interface KtxEntityDetailsColumn { + name: string; + nativeType: string; + normalizedType: string; + dimensionType: KtxSchemaColumn['dimensionType']; + nullable: boolean; + primaryKey: boolean; + comment: string | null; +} + +export interface KtxEntityDetailsRecord { + ok: true; + connectionId: string; + tableRef: KtxTableRef; + display: string; + kind: KtxSchemaTable['kind']; + comment: string | null; + estimatedRows: number | null; + columns: KtxEntityDetailsColumn[]; + foreignKeys: KtxSchemaTable['foreignKeys']; + snapshot: KtxEntityDetailsSnapshotInfo; +} + +export type KtxEntityDetailsErrorCode = 'scan_missing' | 'table_not_found' | 'ambiguous_table' | 'column_not_found'; + +export interface KtxEntityDetailsErrorResult { + ok: false; + connectionId: string; + table: KtxEntityDetailsTableInput; + snapshot?: KtxEntityDetailsSnapshotInfo; + error: { + code: KtxEntityDetailsErrorCode; + message: string; + candidates?: Array<{ tableRef: KtxTableRef; display: string }> | string[]; + }; +} + +export interface KtxEntityDetailsResponse { + results: Array; +} + +interface LatestScan { + report: KtxScanReport; + snapshot: KtxSchemaSnapshot; +} + +interface ResolveResult { + table: KtxSchemaTable | null; + error?: Omit & { message: string }; +} + +function normalize(value: string | null | undefined): string { + return (value ?? '').toLowerCase(); +} + +function refsEqual(left: KtxTableRef, right: KtxTableRef): boolean { + return ( + normalize(left.catalog) === normalize(right.catalog) && + normalize(left.db) === normalize(right.db) && + normalize(left.name) === normalize(right.name) + ); +} + +function cleanIdentifierPart(part: string): string { + return part.trim().replace(/^["'`\[]|["'`\]]$/g, ''); +} + +function splitDisplay(display: string): string[] { + return display + .trim() + .split('.') + .map(cleanIdentifierPart) + .filter(Boolean); +} + +function displayForTable(driver: KtxConnectionDriver, table: KtxTableRef): string { + if (driver === 'sqlite') { + return table.name; + } + return [table.catalog, table.db, table.name].filter((part): part is string => Boolean(part)).join('.'); +} + +function tableRef(table: KtxSchemaTable): KtxTableRef { + return { catalog: table.catalog, db: table.db, name: table.name }; +} + +function candidateList( + driver: KtxConnectionDriver, + tables: KtxSchemaTable[], +): Array<{ tableRef: KtxTableRef; display: string }> { + return tables + .map((table) => ({ + tableRef: tableRef(table), + display: displayForTable(driver, table), + })) + .sort((left, right) => left.display.localeCompare(right.display)); +} + +function parseDisplayRef(driver: KtxConnectionDriver, display: string): KtxTableRef | null { + const parts = splitDisplay(display); + if (driver === 'sqlite') { + return parts.length === 1 ? { catalog: null, db: null, name: parts[0]! } : null; + } + if (driver === 'bigquery' || driver === 'snowflake' || driver === 'sqlserver') { + return parts.length === 3 ? { catalog: parts[0]!, db: parts[1]!, name: parts[2]! } : null; + } + if (parts.length === 2) { + return { catalog: null, db: parts[0]!, name: parts[1]! }; + } + if (parts.length === 3) { + return { catalog: parts[0]!, db: parts[1]!, name: parts[2]! }; + } + return null; +} + +function resolveTable(snapshot: KtxSchemaSnapshot, input: KtxEntityDetailsTableInput): ResolveResult { + if (typeof input !== 'string') { + const table = snapshot.tables.find((candidate) => refsEqual(candidate, input)) ?? null; + return table + ? { table } + : { + table: null, + error: { + code: 'table_not_found', + message: `Table not found in latest scan: ${displayForTable(snapshot.driver, input)}`, + candidates: candidateList(snapshot.driver, snapshot.tables), + }, + }; + } + + const parsed = parseDisplayRef(snapshot.driver, input); + if (parsed) { + const table = snapshot.tables.find((candidate) => refsEqual(candidate, parsed)) ?? null; + return table + ? { table } + : { + table: null, + error: { + code: 'table_not_found', + message: `Table not found in latest scan: ${input}`, + candidates: candidateList(snapshot.driver, snapshot.tables), + }, + }; + } + + const byName = snapshot.tables.filter((candidate) => normalize(candidate.name) === normalize(input)); + if (byName.length === 1) { + return { table: byName[0]! }; + } + if (byName.length > 1) { + return { + table: null, + error: { + code: 'ambiguous_table', + message: `Table name "${input}" is ambiguous across schemas/catalogs; pass a structured table ref.`, + candidates: candidateList(snapshot.driver, byName), + }, + }; + } + return { + table: null, + error: { + code: 'table_not_found', + message: `Table not found in latest scan: ${input}`, + candidates: candidateList(snapshot.driver, snapshot.tables), + }, + }; +} + +function toColumn(column: KtxSchemaColumn): KtxEntityDetailsColumn { + return { + name: column.name, + nativeType: column.nativeType, + normalizedType: column.normalizedType, + dimensionType: column.dimensionType, + nullable: column.nullable, + primaryKey: column.primaryKey, + comment: column.comment, + }; +} + +function snapshotInfo(report: KtxScanReport, snapshot: KtxSchemaSnapshot): KtxEntityDetailsSnapshotInfo { + return { + syncId: report.syncId, + extractedAt: snapshot.extractedAt, + scanRunId: report.runId ?? null, + }; +} + +async function readJson(project: KtxLocalProject, path: string): Promise { + return JSON.parse((await project.fileStore.readFile(path)).content) as T; +} + +async function latestScan(project: KtxLocalProject, connectionId: string): Promise { + const root = `raw-sources/${connectionId}/live-database`; + let listed; + try { + listed = await project.fileStore.listFiles(root); + } catch { + return null; + } + const reportPath = listed.files.filter((path) => path.endsWith('/scan-report.json')).sort().at(-1); + if (!reportPath) { + return null; + } + const report = await readJson(project, reportPath); + const rawSourcesDir = report.artifactPaths.rawSourcesDir ?? reportPath.slice(0, -'/scan-report.json'.length); + const snapshot = await readLocalScanStructuralSnapshot({ + project, + connectionId, + driver: report.driver, + rawSourcesDir, + extractedAtFallback: report.createdAt, + }); + return { report, snapshot }; +} + +export function createKtxEntityDetailsService(project: KtxLocalProject) { + return { + async read(input: KtxEntityDetailsInput): Promise { + const scan = await latestScan(project, input.connectionId); + if (!scan) { + return { + results: input.entities.map((entity) => ({ + ok: false, + connectionId: input.connectionId, + table: entity.table, + error: { + code: 'scan_missing', + message: `No live-database scan found for connection "${input.connectionId}"; run \`ktx ingest ${input.connectionId}\` or \`ktx scan ${input.connectionId}\`.`, + }, + })), + }; + } + + const info = snapshotInfo(scan.report, scan.snapshot); + const results: KtxEntityDetailsResponse['results'] = []; + for (const entity of input.entities) { + const resolved = resolveTable(scan.snapshot, entity.table); + if (!resolved.table) { + results.push({ + ok: false, + connectionId: input.connectionId, + table: entity.table, + snapshot: info, + error: resolved.error!, + }); + continue; + } + + const requested = new Set((entity.columns ?? []).map((column) => normalize(column))); + const columns = requested.size + ? resolved.table.columns.filter((column) => requested.has(normalize(column.name))) + : resolved.table.columns; + if (requested.size && columns.length !== requested.size) { + const found = new Set(columns.map((column) => normalize(column.name))); + const missing = [...requested].filter((column) => !found.has(column)); + results.push({ + ok: false, + connectionId: input.connectionId, + table: entity.table, + snapshot: info, + error: { + code: 'column_not_found', + message: `Column(s) not found on ${displayForTable(scan.snapshot.driver, resolved.table)}: ${missing.join(', ')}`, + candidates: resolved.table.columns.map((column) => column.name), + }, + }); + continue; + } + + results.push({ + ok: true, + connectionId: input.connectionId, + tableRef: tableRef(resolved.table), + display: displayForTable(scan.snapshot.driver, resolved.table), + kind: resolved.table.kind, + comment: resolved.table.comment, + estimatedRows: resolved.table.estimatedRows, + columns: columns.map(toColumn), + foreignKeys: resolved.table.foreignKeys, + snapshot: info, + }); + } + return { results }; + }, + }; +} diff --git a/packages/context/src/scan/index.ts b/packages/context/src/scan/index.ts index 94450891..de1ef4ce 100644 --- a/packages/context/src/scan/index.ts +++ b/packages/context/src/scan/index.ts @@ -61,6 +61,17 @@ export { ktxScanErrorMessage, skippedKtxScanEnrichmentSummary, } from './enrichment-summary.js'; +export type { + KtxEntityDetailsColumn, + KtxEntityDetailsErrorCode, + KtxEntityDetailsErrorResult, + KtxEntityDetailsInput, + KtxEntityDetailsRecord, + KtxEntityDetailsResponse, + KtxEntityDetailsSnapshotInfo, + KtxEntityDetailsTableInput, +} from './entity-details.js'; +export { createKtxEntityDetailsService } from './entity-details.js'; export type { KtxColumnSampleUpdate, KtxDescriptionSource,