import Database from 'better-sqlite3'; import { describe, expect, it, vi } from 'vitest'; import { buildDefaultKtxProjectConfig } from '../../../src/context/project/config.js'; import type { KtxScanEnrichmentCompletedStage, KtxScanEnrichmentFailedStage, KtxScanEnrichmentStageLookup, KtxScanEnrichmentStateStore, } from '../../../src/context/scan/enrichment-state.js'; import { createDeterministicLocalScanEnrichmentProviders, runLocalScanEnrichment, snapshotToKtxEnrichedSchema, } from '../../../src/context/scan/local-enrichment.js'; import { createKtxConnectorCapabilities, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxEmbeddingPort, type KtxScanConnector, type KtxScanContext, type KtxSchemaSnapshot, } from '../../../src/context/scan/types.js'; function fakeScanEmbedding(options: { dimensions: number; maxBatchSize?: number }): KtxEmbeddingPort { return { dimensions: options.dimensions, maxBatchSize: options.maxBatchSize ?? 64, async embedBatch(texts) { return texts.map((_, textIndex) => Array.from({ length: options.dimensions }, (__, dimensionIndex) => textIndex + dimensionIndex), ); }, }; } const snapshot: KtxSchemaSnapshot = { connectionId: 'warehouse', driver: 'postgres', extractedAt: '2026-04-29T12:00:00.000Z', scope: { schemas: ['public'] }, metadata: {}, tables: [ { catalog: null, db: 'public', name: 'customers', kind: 'table', comment: 'Customer accounts', estimatedRows: 2, foreignKeys: [], columns: [ { name: 'id', nativeType: 'integer', normalizedType: 'integer', dimensionType: 'number', nullable: false, primaryKey: true, comment: 'Customer id', }, ], }, { catalog: null, db: 'public', name: 'orders', kind: 'table', comment: 'Customer orders', estimatedRows: 3, foreignKeys: [], columns: [ { name: 'id', nativeType: 'integer', normalizedType: 'integer', dimensionType: 'number', nullable: false, primaryKey: true, comment: 'Order id', }, { name: 'customer_id', nativeType: 'integer', normalizedType: 'integer', dimensionType: 'number', nullable: false, primaryKey: false, comment: 'Customer id', }, ], }, ], }; function connector(): KtxScanConnector { return { id: 'test:warehouse', driver: 'postgres', capabilities: createKtxConnectorCapabilities({ tableSampling: true, columnSampling: true, readOnlySql: true, columnStats: true, }), introspect: vi.fn(async () => snapshot), listSchemas: vi.fn(async () => []), listTables: vi.fn(async () => []), sampleTable: vi.fn(async () => ({ headers: ['id', 'customer_id'], rows: [[1, 10]], totalRows: 1, })), sampleColumn: vi.fn(async () => ({ values: ['10', '11'], nullCount: 0, distinctCount: 2, })), }; } class InMemorySqliteExecutor { readonly db = new Database(':memory:'); executeReadOnly(input: KtxReadOnlyQueryInput, _ctx: KtxScanContext): Promise { const rows = this.db.prepare(input.sql).all() as Record[]; const headers = Object.keys(rows[0] ?? {}); return Promise.resolve({ headers, rows: rows.map((row) => headers.map((header) => row[header])), totalRows: rows.length, rowCount: rows.length, }); } close(): void { this.db.close(); } } function noDeclaredRelationshipSnapshot(): KtxSchemaSnapshot { return { connectionId: 'warehouse', driver: 'sqlite', extractedAt: '2026-05-07T00:00:00.000Z', scope: {}, metadata: {}, tables: [ { catalog: null, db: null, name: 'accounts', kind: 'table', comment: null, estimatedRows: 2, foreignKeys: [], columns: [ { name: 'id', nativeType: 'INTEGER', normalizedType: 'integer', dimensionType: 'number', nullable: false, primaryKey: false, comment: null, }, ], }, { catalog: null, db: null, name: 'orders', kind: 'table', comment: null, estimatedRows: 3, foreignKeys: [], columns: [ { name: 'id', nativeType: 'INTEGER', normalizedType: 'integer', dimensionType: 'number', nullable: false, primaryKey: false, comment: null, }, { name: 'account_id', nativeType: 'INTEGER', normalizedType: 'integer', dimensionType: 'number', nullable: false, primaryKey: false, comment: null, }, ], }, ], }; } function memoryEnrichmentStateStore(): KtxScanEnrichmentStateStore { const records = new Map(); const key = (input: Pick) => `${input.runId}:${input.stage}`; return { async findCompletedStage(input: KtxScanEnrichmentStageLookup) { const record = records.get(key(input)); if (!record || record.status !== 'completed' || record.inputHash !== input.inputHash) { return null; } return record as KtxScanEnrichmentCompletedStage; }, async saveCompletedStage(input) { records.set(key(input), { ...input, status: 'completed', errorMessage: null, }); }, async saveFailedStage(input) { records.set(key(input), { ...input, status: 'failed', output: null, }); }, async listRunStages(runId) { return [...records.values()].filter((record) => record.runId === runId); }, }; } describe('local scan enrichment', () => { it('maps a scan snapshot into relationship detector schema', () => { const schema = snapshotToKtxEnrichedSchema(snapshot); expect(schema.connectionId).toBe('warehouse'); expect(schema.tables).toHaveLength(2); expect(schema.tables[1]?.columns.map((column) => column.name)).toEqual(['id', 'customer_id']); expect(schema.tables[1]?.columns[1]).toMatchObject({ id: 'public.orders.customer_id', tableId: 'public.orders', primaryKey: false, sampleValues: null, embedding: null, }); }); it('maps snapshot foreign keys into formal schema relationships', () => { const source = noDeclaredRelationshipSnapshot(); const snapshotWithForeignKey = { ...source, tables: source.tables.map((table) => table.name === 'orders' ? { ...table, foreignKeys: [ { fromColumn: 'account_id', toCatalog: null, toDb: null, toTable: 'accounts', toColumn: 'id', constraintName: 'orders_account_id_fkey', }, ], } : table.name === 'accounts' ? { ...table, columns: table.columns.map((column) => column.name === 'id' ? { ...column, primaryKey: true } : column, ), } : table, ), }; const schema = snapshotToKtxEnrichedSchema(snapshotWithForeignKey); expect(schema.relationships).toEqual([ { id: 'orders:(orders.account_id)->accounts:(accounts.id)', source: 'formal', from: { tableId: 'orders', columnIds: ['orders.account_id'], table: { catalog: null, db: null, name: 'orders' }, columns: ['account_id'], }, to: { tableId: 'accounts', columnIds: ['accounts.id'], table: { catalog: null, db: null, name: 'accounts' }, columns: ['id'], }, relationshipType: 'many_to_one', confidence: 1, isPrimaryKeyReference: true, }, ]); }); it('uses the supplied snapshot without calling connector.introspect', async () => { const scanConnector = connector(); const introspect = vi.mocked(scanConnector.introspect); const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'structural', connector: scanConnector, snapshot, context: { runId: 'scan-run-snapshot' }, providers: null, }); expect(result.snapshot).toEqual(snapshot); expect(introspect).not.toHaveBeenCalled(); }); it('falls back to connector.introspect when no snapshot is supplied', async () => { const scanConnector = connector(); const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'structural', connector: scanConnector, context: { runId: 'scan-run-introspect' }, providers: null, }); expect(result.snapshot).toEqual(snapshot); expect(scanConnector.introspect).toHaveBeenCalledTimes(1); }); it('fails when connector driver and snapshot driver differ', async () => { const mismatchedConnector: KtxScanConnector = { ...connector(), driver: 'mysql', }; await expect( runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, connector: mismatchedConnector, snapshot, context: { runId: 'scan-run-driver-mismatch' }, providers: null, }), ).rejects.toThrow( 'ktx scan connector driver "mysql" does not match snapshot driver "postgres" for connection "warehouse"', ); }); it('runs deterministic relationship detection for relationship scans', async () => { const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, connector: connector(), context: { runId: 'scan-run-1' }, providers: null, }); expect(result.summary).toMatchObject({ deterministicRelationships: 'completed', llmRelationshipValidation: 'skipped', embeddings: 'skipped', }); expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 }); expect(result.summary.statisticalValidation).toBe('skipped'); expect(result.warnings).toContainEqual({ code: 'relationship_validation_failed', message: 'KTX scan connector advertises readOnlySql but does not expose executeReadOnly', recoverable: true, metadata: { capability: 'readOnlySql' }, }); }); it('runs relationship discovery with connector SQL evidence', async () => { const executor = new InMemorySqliteExecutor(); try { executor.db.exec(` CREATE TABLE accounts (id INTEGER NOT NULL); CREATE TABLE orders (id INTEGER NOT NULL, account_id INTEGER NOT NULL); INSERT INTO accounts (id) VALUES (1), (2); INSERT INTO orders (id, account_id) VALUES (10, 1), (11, 1), (12, 2); `); const scanConnector = { ...connector(), driver: 'sqlite' as const, capabilities: createKtxConnectorCapabilities({ readOnlySql: true, columnStats: true }), introspect: vi.fn(async () => noDeclaredRelationshipSnapshot()), executeReadOnly: executor.executeReadOnly.bind(executor), }; const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, connector: scanConnector, context: { runId: 'scan-run-relationship-discovery' }, providers: null, }); expect(result.relationships).toEqual({ accepted: 1, review: 0, rejected: 0, skipped: 0 }); expect(result.summary.statisticalValidation).toBe('completed'); expect(result.relationshipProfile).toMatchObject({ sqlAvailable: true }); expect(result.resolvedRelationships).toEqual([ expect.objectContaining({ status: 'accepted', from: expect.objectContaining({ table: expect.objectContaining({ name: 'orders' }), columns: ['account_id'] }), to: expect.objectContaining({ table: expect.objectContaining({ name: 'accounts' }), columns: ['id'] }), }), ]); expect(result.relationshipUpdate?.accepted).toHaveLength(1); } finally { executor.close(); } }); it('honors scan relationship config when LLM proposals are disabled', async () => { const providers = createDeterministicLocalScanEnrichmentProviders(); const generateObject = vi.fn(); const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, connector: connector(), context: { runId: 'scan-run-llm-disabled' }, providers: { ...providers, llmRuntime: { ...providers.llmRuntime, generateObject: generateObject as never, }, }, relationshipSettings: { ...buildDefaultKtxProjectConfig().scan.relationships, llmProposals: false, maxLlmTablesPerBatch: 40, }, }); expect(result.summary.llmRelationshipValidation).toBe('skipped'); expect(generateObject).not.toHaveBeenCalled(); }); it('skips relationship detection when scan relationships are disabled', async () => { const settings = { ...buildDefaultKtxProjectConfig().scan.relationships, enabled: false, }; const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', connector: connector(), context: { runId: 'disabled-relationships' }, providers: createDeterministicLocalScanEnrichmentProviders(), relationshipSettings: settings, }); expect(result.summary.deterministicRelationships).toBe('skipped'); expect(result.summary.statisticalValidation).toBe('skipped'); expect(result.summary.llmRelationshipValidation).toBe('skipped'); expect(result.relationships).toEqual({ accepted: 0, review: 0, rejected: 0, skipped: 0 }); expect(result.relationshipUpdate).toBeNull(); expect(result.relationshipProfile).toBeNull(); expect(result.resolvedRelationships).toBeNull(); }); it('forwards context.logger and emits warnings when sampleTable fails repeatedly', async () => { const failingConnector: KtxScanConnector = { ...connector(), sampleTable: vi.fn(async () => { throw new Error('pool: ECONNRESET'); }), }; const logger = { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), }; const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: false, connector: failingConnector, context: { runId: 'scan-run-warnings', logger }, providers: createDeterministicLocalScanEnrichmentProviders(), }); const codes = result.warnings.map((warning) => warning.code); expect(codes).toContain('sampling_failed'); expect(codes).toContain('description_fallback_used'); expect(result.warnings.some((warning) => warning.table === 'customers')).toBe(true); expect(logger.warn).toHaveBeenCalled(); expect(logger.error).toHaveBeenCalled(); // Each of the two tables produced sampling_failed + description_fallback_used, so 2 + 2 = 4 warnings minimum. expect(result.warnings.length).toBeGreaterThanOrEqual(4); // Sampling was retried 3× for each of the 2 tables = 6 calls expect(failingConnector.sampleTable).toHaveBeenCalledTimes(6); }); it('runs configured deterministic enrichment with descriptions and no embeddings', async () => { const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: true, connector: connector(), context: { runId: 'scan-run-2' }, providers: createDeterministicLocalScanEnrichmentProviders(), }); expect(result.summary).toMatchObject({ dataDictionary: 'completed', tableDescriptions: 'completed', columnDescriptions: 'completed', embeddings: 'skipped', deterministicRelationships: 'completed', }); expect(result.embeddingUpdates).toEqual([]); expect(result.snapshot).toEqual(snapshot); expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 }); }); it('generates batched table descriptions with bounded table-level concurrency', async () => { const concurrentSnapshot: KtxSchemaSnapshot = { ...snapshot, tables: Array.from({ length: 8 }, (_, index) => ({ catalog: null, db: 'public', name: `table_${index + 1}`, kind: 'table' as const, comment: null, estimatedRows: 2, foreignKeys: [], columns: [ { name: 'id', nativeType: 'integer', normalizedType: 'integer', dimensionType: 'number' as const, nullable: false, primaryKey: true, comment: null, }, ], })), }; let activeTableSamples = 0; let maxActiveTableSamples = 0; const scanConnector = { ...connector(), introspect: vi.fn(async () => concurrentSnapshot), sampleColumn: vi.fn(async () => ({ values: ['1'], nullCount: 0, distinctCount: 1, })), sampleTable: vi.fn(async () => { activeTableSamples += 1; maxActiveTableSamples = Math.max(maxActiveTableSamples, activeTableSamples); await new Promise((resolve) => setTimeout(resolve, 10)); activeTableSamples -= 1; return { headers: ['id'], rows: [[1]], totalRows: 1, }; }), }; const settings = { ...buildDefaultKtxProjectConfig().scan.relationships, enabled: false, }; await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', connector: scanConnector, context: { runId: 'scan-run-concurrent-descriptions' }, providers: createDeterministicLocalScanEnrichmentProviders(), relationshipSettings: settings, }); expect(maxActiveTableSamples).toBe(4); expect(scanConnector.sampleColumn).not.toHaveBeenCalled(); }); it('reports enrichment progress for countable stages', async () => { const events: Array<{ progress: number; message?: string; transient?: boolean }> = []; const progress = { async update(progressValue: number, message?: string, options?: { transient?: boolean }) { events.push({ progress: progressValue, message, transient: options?.transient }); }, startPhase() { return progress; }, }; await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: true, connector: connector(), context: { runId: 'scan-run-progress', progress }, providers: { ...createDeterministicLocalScanEnrichmentProviders(), embedding: fakeScanEmbedding({ dimensions: 6 }), }, }); expect(events).toEqual( expect.arrayContaining([ expect.objectContaining({ message: 'Generating descriptions 1/2 tables', transient: true }), expect.objectContaining({ message: 'Generating descriptions 2/2 tables', transient: true }), expect.objectContaining({ message: 'Building embeddings 1/1 batches', transient: true }), expect.objectContaining({ message: 'Detecting relationships' }), ]), ); }); it('reports progress before enrichment connector introspection starts', async () => { const events: Array<{ progress: number; message?: string; transient?: boolean }> = []; const progress = { async update(progressValue: number, message?: string, options?: { transient?: boolean }) { events.push({ progress: progressValue, message, transient: options?.transient }); }, startPhase() { return progress; }, }; const scanConnector = { ...connector(), introspect: vi.fn(async () => { expect(events).toContainEqual(expect.objectContaining({ message: 'Loading enrichment schema snapshot' })); return snapshot; }), }; await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, connector: scanConnector, context: { runId: 'scan-run-progress-before-introspection', progress }, providers: null, }); expect(scanConnector.introspect).toHaveBeenCalled(); }); it('splits enrichment embedding requests by provider batch size', async () => { const manyColumnSnapshot: KtxSchemaSnapshot = { ...snapshot, tables: [ { catalog: null, db: 'public', name: 'wide_orders', kind: 'table', comment: 'Wide order facts', estimatedRows: 3, foreignKeys: [], columns: Array.from({ length: 5 }, (_, index) => ({ name: `metric_${index + 1}`, nativeType: 'integer', normalizedType: 'integer', dimensionType: 'number' as const, nullable: false, primaryKey: false, comment: `Metric ${index + 1}`, })), }, ], }; const scanConnector = { ...connector(), introspect: vi.fn(async () => manyColumnSnapshot), }; const deterministicProviders = createDeterministicLocalScanEnrichmentProviders(); const embedBatch = vi.fn(async (texts: string[]) => { if (texts.length > 2) { throw new Error(`Embedding batch size ${texts.length} exceeds maximum 2`); } return texts.map((_, index) => [index, index + 1, index + 2]); }); const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: false, connector: scanConnector, context: { runId: 'scan-run-batched-embeddings' }, providers: { llmRuntime: deterministicProviders.llmRuntime, embedding: { dimensions: 3, maxBatchSize: 2, embedBatch, }, }, }); expect(result.embeddingUpdates).toHaveLength(5); expect(embedBatch.mock.calls.map(([texts]) => texts).map((texts) => texts.length)).toEqual([2, 2, 1]); }); it('reuses completed description and embedding stages for the same run id and snapshot hash', async () => { const stateStore = memoryEnrichmentStateStore(); const scanConnector = connector(); const providers = { ...createDeterministicLocalScanEnrichmentProviders(), embedding: fakeScanEmbedding({ dimensions: 6 }), }; const first = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: true, connector: scanConnector, context: { runId: 'scan-run-resume-1' }, providers, stateStore, syncId: 'sync-resume-1', providerIdentity: { provider: 'fake', embeddingDimensions: 6 }, }); const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject'); const embedBatch = vi.spyOn(providers.embedding, 'embedBatch'); const second = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: true, connector: scanConnector, context: { runId: 'scan-run-resume-1' }, providers, stateStore, syncId: 'sync-resume-1', providerIdentity: { provider: 'fake', embeddingDimensions: 6 }, }); expect(first.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']); expect(first.state.resumedStages).toEqual([]); expect(second.state.resumedStages).toEqual(['descriptions', 'embeddings', 'relationships']); expect(second.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']); expect(generateObject).not.toHaveBeenCalled(); expect(embedBatch).not.toHaveBeenCalled(); expect(second.descriptionUpdates).toEqual(first.descriptionUpdates); expect(second.embeddingUpdates).toEqual(first.embeddingUpdates); expect(second.relationships).toEqual(first.relationships); }); it('does not reuse completed stages when the snapshot changes', async () => { const stateStore = memoryEnrichmentStateStore(); const providers = { ...createDeterministicLocalScanEnrichmentProviders(), embedding: fakeScanEmbedding({ dimensions: 6 }), }; const scanConnector = connector(); await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: false, connector: scanConnector, context: { runId: 'scan-run-resume-hash' }, providers, stateStore, syncId: 'sync-resume-hash', providerIdentity: { provider: 'fake', embeddingDimensions: 6 }, }); const firstTable = snapshot.tables[0]; if (!firstTable) { throw new Error('Expected test snapshot table'); } const changedConnector = { ...connector(), introspect: vi.fn(async () => ({ ...snapshot, tables: [{ ...firstTable, name: 'customers' }], })), }; const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject'); const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: false, connector: changedConnector, context: { runId: 'scan-run-resume-hash' }, providers, stateStore, syncId: 'sync-resume-hash', providerIdentity: { provider: 'fake', embeddingDimensions: 6 }, }); expect(result.state.resumedStages).toEqual([]); expect(result.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']); expect(generateObject).toHaveBeenCalled(); }); it('runs providerless enriched scans as relationship-only discovery enrichment', async () => { const executor = new InMemorySqliteExecutor(); try { executor.db.exec(` CREATE TABLE accounts (id INTEGER NOT NULL); CREATE TABLE orders (id INTEGER NOT NULL, account_id INTEGER NOT NULL); INSERT INTO accounts (id) VALUES (1), (2); INSERT INTO orders (id, account_id) VALUES (10, 1), (11, 1), (12, 2); `); const scanConnector = { ...connector(), driver: 'sqlite' as const, capabilities: createKtxConnectorCapabilities({ readOnlySql: true, columnStats: true }), introspect: vi.fn(async () => noDeclaredRelationshipSnapshot()), executeReadOnly: executor.executeReadOnly.bind(executor), }; const result = await runLocalScanEnrichment({ connectionId: 'warehouse', mode: 'enriched', detectRelationships: false, connector: scanConnector, context: { runId: 'scan-run-providerless-enriched' }, providers: null, }); expect(result.summary).toEqual({ dataDictionary: 'skipped', tableDescriptions: 'skipped', columnDescriptions: 'skipped', embeddings: 'skipped', deterministicRelationships: 'completed', llmRelationshipValidation: 'skipped', statisticalValidation: 'completed', }); expect(result.descriptionUpdates).toEqual([]); expect(result.embeddingUpdates).toEqual([]); expect(result.relationships).toEqual({ accepted: 1, review: 0, rejected: 0, skipped: 0 }); expect(result.relationshipUpdate?.accepted).toHaveLength(1); expect(result.relationshipProfile).toMatchObject({ sqlAvailable: true }); expect(result.resolvedRelationships).toEqual([ expect.objectContaining({ status: 'accepted', from: expect.objectContaining({ table: expect.objectContaining({ name: 'orders' }), columns: ['account_id'] }), to: expect.objectContaining({ table: expect.objectContaining({ name: 'accounts' }), columns: ['id'] }), }), ]); expect(result.warnings).toContainEqual({ code: 'scan_enrichment_backend_not_configured', message: 'Skipping description and embedding enrichment because scan.enrichment.mode is not configured; relationship discovery still ran.', recoverable: true, metadata: { skippedStages: ['descriptions', 'embeddings'], relationshipDetection: true, }, }); } finally { executor.close(); } }); });