diff --git a/packages/cli/src/context/scan/local-enrichment.test.ts b/packages/cli/src/context/scan/local-enrichment.test.ts index 66b66fc2..90ddae78 100644 --- a/packages/cli/src/context/scan/local-enrichment.test.ts +++ b/packages/cli/src/context/scan/local-enrichment.test.ts @@ -299,6 +299,38 @@ describe('local scan enrichment', () => { ]); }); + it('uses the supplied snapshot without calling connector.introspect', async () => { + const scanConnector = connector(); + const introspect = vi.mocked(scanConnector.introspect); + + const result = await runLocalScanEnrichment({ + connectionId: 'warehouse', + mode: 'structural', + connector: scanConnector, + snapshot, + context: { runId: 'scan-run-snapshot' }, + providers: null, + }); + + expect(result.snapshot).toEqual(snapshot); + expect(introspect).not.toHaveBeenCalled(); + }); + + it('falls back to connector.introspect when no snapshot is supplied', async () => { + const scanConnector = connector(); + + const result = await runLocalScanEnrichment({ + connectionId: 'warehouse', + mode: 'structural', + connector: scanConnector, + context: { runId: 'scan-run-introspect' }, + providers: null, + }); + + expect(result.snapshot).toEqual(snapshot); + expect(scanConnector.introspect).toHaveBeenCalledTimes(1); + }); + it('runs deterministic relationship detection for relationship scans', async () => { const result = await runLocalScanEnrichment({ connectionId: 'warehouse', diff --git a/packages/cli/src/context/scan/local-enrichment.ts b/packages/cli/src/context/scan/local-enrichment.ts index 680f8f60..32807532 100644 --- a/packages/cli/src/context/scan/local-enrichment.ts +++ b/packages/cli/src/context/scan/local-enrichment.ts @@ -53,6 +53,7 @@ export interface KtxLocalScanEnrichmentInput { mode: KtxScanMode; detectRelationships?: boolean; connector: KtxScanConnector; + snapshot?: KtxSchemaSnapshot; context: KtxScanContext; providers: KtxLocalScanEnrichmentProviders | null; stateStore?: KtxScanEnrichmentStateStore | null; @@ -472,15 +473,17 @@ export async function runLocalScanEnrichment( ): Promise { const progress = input.context.progress; await progress?.update(0, 'Loading enrichment schema snapshot'); - const snapshot = await input.connector.introspect( - { - connectionId: input.connectionId, - driver: input.connector.driver, - mode: input.mode, - detectRelationships: input.detectRelationships, - }, - input.context, - ); + const snapshot = + input.snapshot ?? + (await input.connector.introspect( + { + connectionId: input.connectionId, + driver: input.connector.driver, + mode: input.mode, + detectRelationships: input.detectRelationships, + }, + input.context, + )); await progress?.update(0.05, `Loaded schema snapshot with ${snapshot.tables.length} tables`); const now = input.now ?? (() => new Date()); diff --git a/packages/cli/src/context/scan/local-scan.test.ts b/packages/cli/src/context/scan/local-scan.test.ts index 081fa055..7a9cc9f7 100644 --- a/packages/cli/src/context/scan/local-scan.test.ts +++ b/packages/cli/src/context/scan/local-scan.test.ts @@ -8,7 +8,13 @@ import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js'; import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../../context/project/project.js'; import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js'; import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js'; -import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxSchemaSnapshot, KtxSchemaTable } from './types.js'; +import type { + KtxQueryResult, + KtxReadOnlyQueryInput, + KtxScanConnector, + KtxSchemaSnapshot, + KtxSchemaTable, +} from './types.js'; function relationshipSqlResult( input: KtxReadOnlyQueryInput, @@ -120,7 +126,43 @@ async function writeDatabaseConfigWithoutIngestAdapters(projectDir: string): Pro ); } -function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceAdapter { +function defaultFetchSnapshot(options: { extractedAt?: () => string } = {}): KtxSchemaSnapshot { + return { + connectionId: 'warehouse', + driver: 'postgres', + extractedAt: options.extractedAt?.() ?? '2026-04-29T09:00:00.000Z', + scope: { schemas: ['public'] }, + metadata: {}, + tables: [ + { + name: 'orders', + catalog: null, + db: 'public', + kind: 'table', + comment: null, + estimatedRows: null, + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: null, + }, + ], + foreignKeys: [], + }, + ], + }; +} + +function fetchOnlyAdapter(options: { extractedAt?: () => string; snapshot?: KtxSchemaSnapshot } = {}): SourceAdapter { + const scanSnapshot = options.snapshot + ? { ...options.snapshot, ...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}) } + : defaultFetchSnapshot(options); + return { source: 'live-database', skillNames: ['live_database_ingest'], @@ -129,39 +171,89 @@ function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceA await writeFile( join(stagedDir, 'connection.json'), `${JSON.stringify({ - connectionId: 'warehouse', - driver: 'postgres', - ...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}), - scope: { schemas: ['public'] }, - metadata: {}, + connectionId: scanSnapshot.connectionId, + driver: scanSnapshot.driver, + extractedAt: scanSnapshot.extractedAt, + scope: scanSnapshot.scope, + metadata: scanSnapshot.metadata, })}\n`, 'utf-8', ); await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8'); - await writeFile( - join(stagedDir, 'tables', 'orders.json'), - '{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":null,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n', - 'utf-8', - ); + for (const table of scanSnapshot.tables) { + await writeFile(join(stagedDir, 'tables', `${table.name}.json`), `${JSON.stringify(table)}\n`, 'utf-8'); + } }, async detect() { return true; }, async chunk() { return { - workUnits: [ - { - unitKey: 'live-database-public-orders', - rawFiles: ['tables/orders.json'], - dependencyPaths: ['connection.json', 'foreign-keys.json'], - peerFileIndex: [], - }, - ], + workUnits: scanSnapshot.tables.map((table) => ({ + unitKey: `live-database-${table.db ?? 'default'}-${table.name}`, + rawFiles: [`tables/${table.name}.json`], + dependencyPaths: ['connection.json', 'foreign-keys.json'], + peerFileIndex: [], + })), }; }, }; } +function nativeScanSnapshot(): KtxSchemaSnapshot { + return { + connectionId: 'warehouse', + driver: 'postgres', + extractedAt: '2026-04-29T09:00:00.000Z', + scope: { schemas: ['public'] }, + metadata: {}, + tables: [ + { + catalog: null, + db: 'public', + name: 'orders', + kind: 'table', + comment: 'Orders', + estimatedRows: 1, + foreignKeys: [], + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: 'Order id', + }, + ], + }, + ], + }; +} + +function nativeScanConnector(options: { cleanup?: () => Promise } = {}): KtxScanConnector { + return { + id: 'test:warehouse', + driver: 'postgres', + capabilities: { + structuralIntrospection: true, + tableSampling: true, + columnSampling: true, + columnStats: false, + readOnlySql: false, + nestedAnalysis: false, + eventStreamDiscovery: false, + formalForeignKeys: false, + estimatedRowCounts: false, + }, + introspect: vi.fn(async () => nativeScanSnapshot()), + sampleTable: vi.fn(async () => ({ headers: ['id'], rows: [[1]], totalRows: 1 })), + sampleColumn: vi.fn(async () => ({ values: ['1'], nullCount: 0, distinctCount: 1 })), + ...(options.cleanup ? { cleanup: options.cleanup } : {}), + }; +} + describe('local scan', () => { let tempDir: string; let project: KtxLocalProject; @@ -265,6 +357,59 @@ describe('local scan', () => { }); }); + it('threads the structural snapshot into enrichment without connector re-introspection', async () => { + project.config.scan.enrichment = { mode: 'deterministic' }; + const connector = nativeScanConnector(); + const introspect = vi.mocked(connector.introspect); + + const result = await runLocalScan({ + project, + adapters: [fetchOnlyAdapter()], + connectionId: 'warehouse', + mode: 'enriched', + connector, + jobId: 'scan-enrichment-snapshot-threading', + now: () => new Date('2026-04-29T09:11:00.000Z'), + }); + + expect(result.report.enrichment.tableDescriptions).toBe('completed'); + expect(introspect).not.toHaveBeenCalled(); + }); + + it('cleans up a scan connector constructed by local scan', async () => { + const cleanup = vi.fn(async () => undefined); + + await runLocalScan({ + project, + adapters: [fetchOnlyAdapter()], + connectionId: 'warehouse', + mode: 'relationships', + detectRelationships: true, + createConnector: vi.fn(async () => nativeScanConnector({ cleanup })), + jobId: 'scan-owned-connector-cleanup', + now: () => new Date('2026-04-29T09:13:00.000Z'), + }); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it('does not clean up a caller-supplied scan connector', async () => { + const cleanup = vi.fn(async () => undefined); + + await runLocalScan({ + project, + adapters: [fetchOnlyAdapter()], + connectionId: 'warehouse', + mode: 'relationships', + detectRelationships: true, + connector: nativeScanConnector({ cleanup }), + jobId: 'scan-supplied-connector-cleanup', + now: () => new Date('2026-04-29T09:13:30.000Z'), + }); + + expect(cleanup).not.toHaveBeenCalled(); + }); + it('reuses scan report and raw-source paths when the same local scan run id is retried', async () => { const first = await runLocalScan({ project, @@ -447,10 +592,11 @@ describe('local scan', () => { }; }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -534,10 +680,11 @@ describe('local scan', () => { return relationshipSqlResult(input); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -628,10 +775,11 @@ describe('local scan', () => { return relationshipSqlResult(input); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -737,10 +885,11 @@ describe('local scan', () => { return relationshipSqlResult(input); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -863,10 +1012,11 @@ describe('local scan', () => { return relationshipSqlResult(input); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'enriched', connector, @@ -993,10 +1143,11 @@ describe('local scan', () => { return relationshipSqlResult(input, { throwOnCoverage: true }); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -1301,10 +1452,11 @@ describe('local scan', () => { }, }; const llmRuntime = deterministicLlmRuntime(); + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const first = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'enriched', connector, @@ -1333,7 +1485,7 @@ describe('local scan', () => { const generateObject = vi.spyOn(llmRuntime, 'generateObject'); const retry = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'enriched', connector, diff --git a/packages/cli/src/context/scan/local-scan.ts b/packages/cli/src/context/scan/local-scan.ts index 35333f79..d6515448 100644 --- a/packages/cli/src/context/scan/local-scan.ts +++ b/packages/cli/src/context/scan/local-scan.ts @@ -402,6 +402,9 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise const isKtxSnowflakeConnectionConfig = vi.hoisted(() => vi.fn((connection: { driver?: string } | undefined) => connection?.driver === 'snowflake'), ); +const snowflakeConnectorInstances = vi.hoisted(() => [] as Array<{ cleanup: ReturnType }>); const KtxSnowflakeScanConnector = vi.hoisted( () => class { readonly id: string; readonly driver = 'snowflake'; + readonly cleanup = vi.fn(async () => undefined); constructor(options: { connectionId: string }) { this.id = `snowflake:${options.connectionId}`; + snowflakeConnectorInstances.push(this); } }, ); @@ -1008,6 +1011,95 @@ describe('runKtxScan', () => { await rm(tempProject, { recursive: true, force: true }); }); + it('cleans up a constructed scan connector after an enriched scan succeeds', async () => { + await initKtxProject({ projectDir: tempDir }); + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'connections:', + ' warehouse:', + ' driver: snowflake', + ' account: acct', + ' warehouse: WH', + ' database: ANALYTICS', + ' schema_name: PUBLIC', + ' username: reader', + ' password: env:SNOWFLAKE_PASSWORD', + '', + ].join('\n'), + 'utf-8', + ); + snowflakeConnectorInstances.length = 0; + const runLocalScan = vi.fn(async (): Promise => ({ + runId: 'scan-run-cleanup', + status: 'done', + done: true, + connectionId: 'warehouse', + mode: 'enriched', + dryRun: false, + syncId: 'sync-1', + report: { ...report, mode: 'enriched' }, + })); + + await expect( + runKtxScan( + { + command: 'run', + projectDir: tempDir, + connectionId: 'warehouse', + mode: 'enriched', + detectRelationships: false, + dryRun: false, + }, + makeIo().io, + { runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters }, + ), + ).resolves.toBe(0); + + expect(snowflakeConnectorInstances[0]?.cleanup).toHaveBeenCalledTimes(1); + }); + + it('cleans up a constructed scan connector after runLocalScan throws', async () => { + await initKtxProject({ projectDir: tempDir }); + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'connections:', + ' warehouse:', + ' driver: snowflake', + ' account: acct', + ' warehouse: WH', + ' database: ANALYTICS', + ' schema_name: PUBLIC', + ' username: reader', + ' password: env:SNOWFLAKE_PASSWORD', + '', + ].join('\n'), + 'utf-8', + ); + snowflakeConnectorInstances.length = 0; + const runLocalScan = vi.fn(async () => { + throw new Error('scan failed'); + }); + + await expect( + runKtxScan( + { + command: 'run', + projectDir: tempDir, + connectionId: 'warehouse', + mode: 'relationships', + detectRelationships: true, + dryRun: false, + }, + makeIo().io, + { runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters }, + ), + ).resolves.toBe(1); + + expect(snowflakeConnectorInstances[0]?.cleanup).toHaveBeenCalledTimes(1); + }); + it('routes standalone postgres scans through the native connector before daemon fallback', async () => { const tempProject = await mkdtemp(join(tmpdir(), 'ktx-scan-cli-native-postgres-')); await initKtxProject({ projectDir: tempProject }); diff --git a/packages/cli/src/scan.ts b/packages/cli/src/scan.ts index a92aaa62..beb0770d 100644 --- a/packages/cli/src/scan.ts +++ b/packages/cli/src/scan.ts @@ -350,6 +350,7 @@ export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps writeRunSummary(result.report, args.projectDir, io); } finally { cliProgress?.flush(); + await connector?.cleanup?.(); } return 0; } catch (error) {