Add scan table filtering

This commit is contained in:
Luca Martial 2026-05-12 18:22:03 -07:00
parent fcdf5234c6
commit 52ddb061a4
4 changed files with 120 additions and 5 deletions

View file

@ -105,7 +105,7 @@ export type {
LocalScanStatusResponse,
RunLocalScanOptions,
} from './local-scan.js';
export { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js';
export { filterSnapshotTables, getLocalScanReport, getLocalScanStatus, resolveEnabledTables, runLocalScan } from './local-scan.js';
export type { ReadLocalScanStructuralSnapshotInput } from './local-structural-artifacts.js';
export { readLocalScanStructuralSnapshot } from './local-structural-artifacts.js';
export type {
@ -393,6 +393,7 @@ export type {
KtxSchemaTable,
KtxSchemaTableKind,
KtxStructuralSyncStats,
KtxTableListEntry,
KtxTableRef,
KtxTableSampleInput,
KtxTableSampleResult,

View file

@ -6,8 +6,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import YAML from 'yaml';
import type { SourceAdapter } from '../ingest/index.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js';
import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js';
import type { KtxQueryResult, KtxReadOnlyQueryInput } from './types.js';
import { filterSnapshotTables, getLocalScanReport, getLocalScanStatus, resolveEnabledTables, runLocalScan } from './local-scan.js';
import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxSchemaSnapshot, KtxSchemaTable } from './types.js';
function relationshipSqlResult(
input: KtxReadOnlyQueryInput,
@ -1492,3 +1492,79 @@ describe('local scan', () => {
);
});
});
describe('resolveEnabledTables', () => {
it('returns null when no enabled_tables field', () => {
expect(resolveEnabledTables({ driver: 'postgres' })).toBeNull();
});
it('returns null for empty array', () => {
expect(resolveEnabledTables({ driver: 'postgres', enabled_tables: [] })).toBeNull();
});
it('returns Set of enabled table names', () => {
const result = resolveEnabledTables({
driver: 'postgres',
enabled_tables: ['public.users', 'public.orders'],
});
expect(result).toBeInstanceOf(Set);
expect(result!.size).toBe(2);
expect(result!.has('public.users')).toBe(true);
expect(result!.has('public.orders')).toBe(true);
});
it('returns null for undefined connection', () => {
expect(resolveEnabledTables(undefined)).toBeNull();
});
});
describe('filterSnapshotTables', () => {
function makeSnapshot(tables: Array<{ db: string; name: string }>): KtxSchemaSnapshot {
return {
connectionId: 'test',
driver: 'postgres',
extractedAt: '2026-01-01T00:00:00Z',
scope: {},
metadata: {},
tables: tables.map(
(t): KtxSchemaTable => ({
catalog: null,
db: t.db,
name: t.name,
kind: 'table',
comment: null,
estimatedRows: null,
columns: [],
foreignKeys: [],
}),
),
};
}
it('keeps only enabled tables', () => {
const snapshot = makeSnapshot([
{ db: 'public', name: 'users' },
{ db: 'public', name: 'orders' },
{ db: 'public', name: 'logs' },
]);
const enabled = new Set(['public.users', 'public.orders']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.tables).toHaveLength(2);
expect(filtered.tables.map((t) => t.name)).toEqual(['users', 'orders']);
});
it('returns empty tables when none match', () => {
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
const enabled = new Set(['public.orders']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.tables).toHaveLength(0);
});
it('preserves other snapshot fields', () => {
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
const enabled = new Set(['public.users']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.connectionId).toBe('test');
expect(filtered.driver).toBe('postgres');
});
});

View file

@ -29,10 +29,13 @@ import type {
KtxConnectionDriver,
KtxProgressPort,
KtxScanConnector,
KtxScanContext,
KtxScanEnrichmentStateSummary,
KtxScanInput,
KtxScanMode,
KtxScanReport,
KtxScanTrigger,
KtxSchemaSnapshot,
} from './types.js';
export interface RunLocalScanOptions {
@ -313,17 +316,45 @@ async function readScanReport(
}
}
export function resolveEnabledTables(connection: Record<string, unknown> | undefined): Set<string> | null {
const raw = connection?.enabled_tables;
if (!Array.isArray(raw) || raw.length === 0) return null;
return new Set(raw.filter((v): v is string => typeof v === 'string'));
}
export function filterSnapshotTables(snapshot: KtxSchemaSnapshot, enabledTables: Set<string>): KtxSchemaSnapshot {
return {
...snapshot,
tables: snapshot.tables.filter((table) => {
const key = table.db ? `${table.db}.${table.name}` : table.name;
return enabledTables.has(key);
}),
};
}
function createFilteredConnector(connector: KtxScanConnector, enabledTables: Set<string>): KtxScanConnector {
return {
...connector,
async introspect(input: KtxScanInput, ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
const snapshot = await connector.introspect(input, ctx);
return filterSnapshotTables(snapshot, enabledTables);
},
};
}
export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalScanRunResult> {
const mode = options.mode ?? 'structural';
assertSupportedMode(mode);
await options.progress?.update(0.05, 'Preparing scan');
const connector = await resolveScanConnector(options, mode);
const rawConnector = await resolveScanConnector(options, mode);
const connection = options.project.config.connections[options.connectionId];
if (!connection) {
throw new Error(`Connection "${options.connectionId}" is not configured in ktx.yaml`);
}
const driver = normalizeDriver(connection.driver);
const enabledTables = resolveEnabledTables(connection);
const connector = rawConnector && enabledTables ? createFilteredConnector(rawConnector, enabledTables) : rawConnector;
const adapters =
options.adapters ??
createDefaultLocalIngestAdapters(options.project, { databaseIntrospectionUrl: options.databaseIntrospectionUrl });
@ -372,13 +403,14 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
let enrichmentState: KtxScanEnrichmentStateSummary = completedKtxScanEnrichmentStateSummary();
if (!reusedExistingScanArtifacts && !report.dryRun && report.artifactPaths.rawSourcesDir) {
await options.progress?.update(0.7, 'Writing schema artifacts');
const structuralSnapshot = await readLocalScanStructuralSnapshot({
const rawSnapshot = await readLocalScanStructuralSnapshot({
project: options.project,
connectionId: options.connectionId,
driver,
rawSourcesDir: report.artifactPaths.rawSourcesDir,
extractedAtFallback: report.createdAt,
});
const structuralSnapshot = enabledTables ? filterSnapshotTables(rawSnapshot, enabledTables) : rawSnapshot;
const manifestArtifacts = await writeLocalScanManifestShards({
project: options.project,
connectionId: options.connectionId,

View file

@ -277,6 +277,12 @@ export interface KtxQueryResult {
rowCount: number | null;
}
export interface KtxTableListEntry {
schema: string;
name: string;
kind: 'table' | 'view';
}
export interface KtxScanConnector {
id: string;
driver: KtxConnectionDriver;