fix(scan): reuse structural snapshots and cleanup connectors

This commit is contained in:
Andrey Avtomonov 2026-05-22 16:58:32 +02:00
parent e56eabb22b
commit 21188c7f51
6 changed files with 328 additions and 37 deletions

View file

@ -299,6 +299,38 @@ describe('local scan enrichment', () => {
]);
});
it('uses the supplied snapshot without calling connector.introspect', async () => {
const scanConnector = connector();
const introspect = vi.mocked(scanConnector.introspect);
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'structural',
connector: scanConnector,
snapshot,
context: { runId: 'scan-run-snapshot' },
providers: null,
});
expect(result.snapshot).toEqual(snapshot);
expect(introspect).not.toHaveBeenCalled();
});
it('falls back to connector.introspect when no snapshot is supplied', async () => {
const scanConnector = connector();
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'structural',
connector: scanConnector,
context: { runId: 'scan-run-introspect' },
providers: null,
});
expect(result.snapshot).toEqual(snapshot);
expect(scanConnector.introspect).toHaveBeenCalledTimes(1);
});
it('runs deterministic relationship detection for relationship scans', async () => {
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',

View file

@ -53,6 +53,7 @@ export interface KtxLocalScanEnrichmentInput {
mode: KtxScanMode;
detectRelationships?: boolean;
connector: KtxScanConnector;
snapshot?: KtxSchemaSnapshot;
context: KtxScanContext;
providers: KtxLocalScanEnrichmentProviders | null;
stateStore?: KtxScanEnrichmentStateStore | null;
@ -472,15 +473,17 @@ export async function runLocalScanEnrichment(
): Promise<KtxLocalScanEnrichmentResult> {
const progress = input.context.progress;
await progress?.update(0, 'Loading enrichment schema snapshot');
const snapshot = await input.connector.introspect(
{
connectionId: input.connectionId,
driver: input.connector.driver,
mode: input.mode,
detectRelationships: input.detectRelationships,
},
input.context,
);
const snapshot =
input.snapshot ??
(await input.connector.introspect(
{
connectionId: input.connectionId,
driver: input.connector.driver,
mode: input.mode,
detectRelationships: input.detectRelationships,
},
input.context,
));
await progress?.update(0.05, `Loaded schema snapshot with ${snapshot.tables.length} tables`);
const now = input.now ?? (() => new Date());

View file

@ -8,7 +8,13 @@ import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../../context/project/project.js';
import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js';
import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js';
import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxSchemaSnapshot, KtxSchemaTable } from './types.js';
import type {
KtxQueryResult,
KtxReadOnlyQueryInput,
KtxScanConnector,
KtxSchemaSnapshot,
KtxSchemaTable,
} from './types.js';
function relationshipSqlResult(
input: KtxReadOnlyQueryInput,
@ -120,7 +126,43 @@ async function writeDatabaseConfigWithoutIngestAdapters(projectDir: string): Pro
);
}
function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceAdapter {
function defaultFetchSnapshot(options: { extractedAt?: () => string } = {}): KtxSchemaSnapshot {
return {
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: options.extractedAt?.() ?? '2026-04-29T09:00:00.000Z',
scope: { schemas: ['public'] },
metadata: {},
tables: [
{
name: 'orders',
catalog: null,
db: 'public',
kind: 'table',
comment: null,
estimatedRows: null,
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: null,
},
],
foreignKeys: [],
},
],
};
}
function fetchOnlyAdapter(options: { extractedAt?: () => string; snapshot?: KtxSchemaSnapshot } = {}): SourceAdapter {
const scanSnapshot = options.snapshot
? { ...options.snapshot, ...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}) }
: defaultFetchSnapshot(options);
return {
source: 'live-database',
skillNames: ['live_database_ingest'],
@ -129,39 +171,89 @@ function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceA
await writeFile(
join(stagedDir, 'connection.json'),
`${JSON.stringify({
connectionId: 'warehouse',
driver: 'postgres',
...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}),
scope: { schemas: ['public'] },
metadata: {},
connectionId: scanSnapshot.connectionId,
driver: scanSnapshot.driver,
extractedAt: scanSnapshot.extractedAt,
scope: scanSnapshot.scope,
metadata: scanSnapshot.metadata,
})}\n`,
'utf-8',
);
await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8');
await writeFile(
join(stagedDir, 'tables', 'orders.json'),
'{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":null,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
for (const table of scanSnapshot.tables) {
await writeFile(join(stagedDir, 'tables', `${table.name}.json`), `${JSON.stringify(table)}\n`, 'utf-8');
}
},
async detect() {
return true;
},
async chunk() {
return {
workUnits: [
{
unitKey: 'live-database-public-orders',
rawFiles: ['tables/orders.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
],
workUnits: scanSnapshot.tables.map((table) => ({
unitKey: `live-database-${table.db ?? 'default'}-${table.name}`,
rawFiles: [`tables/${table.name}.json`],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
})),
};
},
};
}
function nativeScanSnapshot(): KtxSchemaSnapshot {
return {
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: '2026-04-29T09:00:00.000Z',
scope: { schemas: ['public'] },
metadata: {},
tables: [
{
catalog: null,
db: 'public',
name: 'orders',
kind: 'table',
comment: 'Orders',
estimatedRows: 1,
foreignKeys: [],
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: 'Order id',
},
],
},
],
};
}
function nativeScanConnector(options: { cleanup?: () => Promise<void> } = {}): KtxScanConnector {
return {
id: 'test:warehouse',
driver: 'postgres',
capabilities: {
structuralIntrospection: true,
tableSampling: true,
columnSampling: true,
columnStats: false,
readOnlySql: false,
nestedAnalysis: false,
eventStreamDiscovery: false,
formalForeignKeys: false,
estimatedRowCounts: false,
},
introspect: vi.fn(async () => nativeScanSnapshot()),
sampleTable: vi.fn(async () => ({ headers: ['id'], rows: [[1]], totalRows: 1 })),
sampleColumn: vi.fn(async () => ({ values: ['1'], nullCount: 0, distinctCount: 1 })),
...(options.cleanup ? { cleanup: options.cleanup } : {}),
};
}
describe('local scan', () => {
let tempDir: string;
let project: KtxLocalProject;
@ -265,6 +357,59 @@ describe('local scan', () => {
});
});
it('threads the structural snapshot into enrichment without connector re-introspection', async () => {
project.config.scan.enrichment = { mode: 'deterministic' };
const connector = nativeScanConnector();
const introspect = vi.mocked(connector.introspect);
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'enriched',
connector,
jobId: 'scan-enrichment-snapshot-threading',
now: () => new Date('2026-04-29T09:11:00.000Z'),
});
expect(result.report.enrichment.tableDescriptions).toBe('completed');
expect(introspect).not.toHaveBeenCalled();
});
it('cleans up a scan connector constructed by local scan', async () => {
const cleanup = vi.fn(async () => undefined);
await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
createConnector: vi.fn(async () => nativeScanConnector({ cleanup })),
jobId: 'scan-owned-connector-cleanup',
now: () => new Date('2026-04-29T09:13:00.000Z'),
});
expect(cleanup).toHaveBeenCalledTimes(1);
});
it('does not clean up a caller-supplied scan connector', async () => {
const cleanup = vi.fn(async () => undefined);
await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
connector: nativeScanConnector({ cleanup }),
jobId: 'scan-supplied-connector-cleanup',
now: () => new Date('2026-04-29T09:13:30.000Z'),
});
expect(cleanup).not.toHaveBeenCalled();
});
it('reuses scan report and raw-source paths when the same local scan run id is retried', async () => {
const first = await runLocalScan({
project,
@ -447,10 +592,11 @@ describe('local scan', () => {
};
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -534,10 +680,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -628,10 +775,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -737,10 +885,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -863,10 +1012,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,
@ -993,10 +1143,11 @@ describe('local scan', () => {
return relationshipSqlResult(input, { throwOnCoverage: true });
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -1301,10 +1452,11 @@ describe('local scan', () => {
},
};
const llmRuntime = deterministicLlmRuntime();
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const first = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,
@ -1333,7 +1485,7 @@ describe('local scan', () => {
const generateObject = vi.spyOn(llmRuntime, 'generateObject');
const retry = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,

View file

@ -402,6 +402,9 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
assertSupportedMode(mode);
await options.progress?.update(0.05, 'Preparing scan');
const rawConnector = await resolveScanConnector(options, mode);
const ownsConnector = !!rawConnector && !options.connector;
try {
const connection = options.project.config.connections[options.connectionId];
if (!connection) {
@ -467,6 +470,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
}
const enrichmentStateStore = connector ? createLocalScanEnrichmentStateStore(options) : null;
let enrichmentState: KtxScanEnrichmentStateSummary = completedKtxScanEnrichmentStateSummary();
let enrichmentSnapshot: KtxSchemaSnapshot | null = null;
if (!reusedExistingScanArtifacts && !report.dryRun && report.artifactPaths.rawSourcesDir) {
await options.progress?.update(0.7, 'Writing schema artifacts');
const rawSnapshot = await readLocalScanStructuralSnapshot({
@ -491,6 +495,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
subFrom('tablesModified');
await options.progress?.update(0.6, scanChangeSummary(report.diffSummary));
}
enrichmentSnapshot = structuralSnapshot;
const manifestArtifacts = await writeLocalScanManifestShards({
project: options.project,
connectionId: options.connectionId,
@ -515,6 +520,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
mode,
detectRelationships: options.detectRelationships,
connector,
...(enrichmentSnapshot ? { snapshot: enrichmentSnapshot } : {}),
context: { runId: record.runId, progress: options.progress?.startPhase(0.18) },
providers: enrichmentProviders,
stateStore: enrichmentStateStore,
@ -585,6 +591,11 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
syncId: record.syncId,
report,
};
} finally {
if (ownsConnector) {
await rawConnector?.cleanup?.();
}
}
}
/** @internal */

View file

@ -96,14 +96,17 @@ const createSnowflakeLiveDatabaseIntrospection = vi.hoisted(() =>
const isKtxSnowflakeConnectionConfig = vi.hoisted(() =>
vi.fn((connection: { driver?: string } | undefined) => connection?.driver === 'snowflake'),
);
const snowflakeConnectorInstances = vi.hoisted(() => [] as Array<{ cleanup: ReturnType<typeof vi.fn> }>);
const KtxSnowflakeScanConnector = vi.hoisted(
() =>
class {
readonly id: string;
readonly driver = 'snowflake';
readonly cleanup = vi.fn(async () => undefined);
constructor(options: { connectionId: string }) {
this.id = `snowflake:${options.connectionId}`;
snowflakeConnectorInstances.push(this);
}
},
);
@ -1008,6 +1011,95 @@ describe('runKtxScan', () => {
await rm(tempProject, { recursive: true, force: true });
});
it('cleans up a constructed scan connector after an enriched scan succeeds', async () => {
await initKtxProject({ projectDir: tempDir });
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' warehouse:',
' driver: snowflake',
' account: acct',
' warehouse: WH',
' database: ANALYTICS',
' schema_name: PUBLIC',
' username: reader',
' password: env:SNOWFLAKE_PASSWORD',
'',
].join('\n'),
'utf-8',
);
snowflakeConnectorInstances.length = 0;
const runLocalScan = vi.fn(async (): Promise<LocalScanRunResult> => ({
runId: 'scan-run-cleanup',
status: 'done',
done: true,
connectionId: 'warehouse',
mode: 'enriched',
dryRun: false,
syncId: 'sync-1',
report: { ...report, mode: 'enriched' },
}));
await expect(
runKtxScan(
{
command: 'run',
projectDir: tempDir,
connectionId: 'warehouse',
mode: 'enriched',
detectRelationships: false,
dryRun: false,
},
makeIo().io,
{ runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters },
),
).resolves.toBe(0);
expect(snowflakeConnectorInstances[0]?.cleanup).toHaveBeenCalledTimes(1);
});
it('cleans up a constructed scan connector after runLocalScan throws', async () => {
await initKtxProject({ projectDir: tempDir });
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' warehouse:',
' driver: snowflake',
' account: acct',
' warehouse: WH',
' database: ANALYTICS',
' schema_name: PUBLIC',
' username: reader',
' password: env:SNOWFLAKE_PASSWORD',
'',
].join('\n'),
'utf-8',
);
snowflakeConnectorInstances.length = 0;
const runLocalScan = vi.fn(async () => {
throw new Error('scan failed');
});
await expect(
runKtxScan(
{
command: 'run',
projectDir: tempDir,
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
dryRun: false,
},
makeIo().io,
{ runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters },
),
).resolves.toBe(1);
expect(snowflakeConnectorInstances[0]?.cleanup).toHaveBeenCalledTimes(1);
});
it('routes standalone postgres scans through the native connector before daemon fallback', async () => {
const tempProject = await mkdtemp(join(tmpdir(), 'ktx-scan-cli-native-postgres-'));
await initKtxProject({ projectDir: tempProject });

View file

@ -350,6 +350,7 @@ export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps
writeRunSummary(result.report, args.projectDir, io);
} finally {
cliProgress?.flush();
await connector?.cleanup?.();
}
return 0;
} catch (error) {