Merge pull request #52 from Kaelio/select-tables-on-connect

Select tables during database setup
This commit is contained in:
Luca Martial 2026-05-13 00:34:19 -04:00 committed by GitHub
commit 703ecd427e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 538 additions and 21 deletions

View file

@ -144,7 +144,7 @@ describe('setup agents', () => {
await expect(readKtxAgentInstallManifest(tempDir)).resolves.toEqual(null);
});
it('uses prompts in interactive mode and supports Back', async () => {
it('treats cancel as skip in interactive mode', async () => {
const io = makeIo();
const prompts = {
select: vi.fn(async () => 'back'),
@ -166,7 +166,7 @@ describe('setup agents', () => {
io.io,
{ prompts },
),
).resolves.toEqual({ status: 'back', projectDir: tempDir });
).resolves.toEqual({ status: 'skipped', projectDir: tempDir });
});
it('explains how to select multiple agent targets in interactive mode', async () => {

View file

@ -402,10 +402,9 @@ export async function runKtxSetupAgentsStep(
options: [
{ value: 'cli', label: 'CLI tools and skills' },
{ value: 'skip', label: 'Skip' },
{ value: 'back', label: 'Back' },
],
})) as KtxAgentInstallMode | 'skip' | 'back');
if (mode === 'back') return { status: 'back', projectDir: args.projectDir };
if (mode === 'back') return { status: 'skipped', projectDir: args.projectDir };
if (mode === 'skip') return { status: 'skipped', projectDir: args.projectDir };
const targets =

View file

@ -571,7 +571,6 @@ describe('setup databases step', () => {
options: [
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'add', label: 'Add another primary source' },
{ value: 'back', label: 'Back' },
],
});
expect(testConnection).not.toHaveBeenCalled();
@ -622,7 +621,6 @@ describe('setup databases step', () => {
options: [
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'add', label: 'Add another primary source' },
{ value: 'back', label: 'Back' },
],
});
expect(testConnection).toHaveBeenCalledTimes(1);
@ -657,7 +655,6 @@ describe('setup databases step', () => {
options: [
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'add', label: 'Add another primary source' },
{ value: 'back', label: 'Back' },
],
});
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
@ -692,7 +689,6 @@ describe('setup databases step', () => {
options: [
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'add', label: 'Add another primary source' },
{ value: 'back', label: 'Back' },
],
});
});
@ -735,7 +731,6 @@ describe('setup databases step', () => {
options: [
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'add', label: 'Add another primary source' },
{ value: 'back', label: 'Back' },
],
});
});

View file

@ -9,6 +9,7 @@ import {
setKtxSetupDatabaseConnectionIds,
stripKtxSetupCompletedSteps,
} from '@ktx/context/project';
import type { KtxTableListEntry } from '@ktx/context/scan';
import type { KtxCliIo } from './cli-runtime.js';
import { runKtxConnection } from './connection.js';
import { withMenuOptionsSpacing, withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js';
@ -83,6 +84,7 @@ export interface KtxSetupDatabasesDeps {
testConnection?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise<number>;
scanConnection?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise<number>;
listSchemas?: (projectDir: string, connectionId: string) => Promise<string[]>;
listTables?: (projectDir: string, connectionId: string) => Promise<KtxTableListEntry[]>;
historicSqlProbe?: KtxSetupHistoricSqlProbe;
}
@ -375,6 +377,89 @@ async function defaultListSchemas(projectDir: string, connectionId: string): Pro
return [];
}
function configuredSchemas(connection: KtxProjectConnectionConfig | undefined, driver: KtxSetupDatabaseDriver): string[] | undefined {
if (!connection) return undefined;
const spec = SCOPE_DISCOVERY_SPECS[driver];
if (!spec) return undefined;
const values = configuredScopeValues(connection, spec);
return values.length > 0 ? values : undefined;
}
async function defaultListTables(projectDir: string, connectionId: string): Promise<KtxTableListEntry[]> {
const project = await loadKtxProject({ projectDir });
const connection = project.config.connections[connectionId];
const driver = normalizeDriver(connection?.driver);
const schemas = driver ? configuredSchemas(connection, driver) : undefined;
if (driver === 'postgres') {
const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('@ktx/connector-postgres');
if (!isKtxPostgresConnectionConfig(connection)) return [];
const connector = new KtxPostgresScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'mysql') {
const { KtxMysqlScanConnector, isKtxMysqlConnectionConfig } = await import('@ktx/connector-mysql');
if (!isKtxMysqlConnectionConfig(connection)) return [];
const connector = new KtxMysqlScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'sqlserver') {
const { KtxSqlServerScanConnector, isKtxSqlServerConnectionConfig } = await import('@ktx/connector-sqlserver');
if (!isKtxSqlServerConnectionConfig(connection)) return [];
const connector = new KtxSqlServerScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'bigquery') {
const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('@ktx/connector-bigquery');
if (!isKtxBigQueryConnectionConfig(connection)) return [];
const connector = new KtxBigQueryScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'snowflake') {
const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('@ktx/connector-snowflake');
if (!isKtxSnowflakeConnectionConfig(connection)) return [];
const connector = new KtxSnowflakeScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'clickhouse') {
const { KtxClickHouseScanConnector, isKtxClickHouseConnectionConfig } = await import('@ktx/connector-clickhouse');
if (!isKtxClickHouseConnectionConfig(connection)) return [];
const connector = new KtxClickHouseScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
return [];
}
function existingConnectionIdsByDriver(
connections: Record<string, KtxProjectConnectionConfig>,
driver: KtxSetupDatabaseDriver,
@ -411,7 +496,6 @@ function configuredPrimarySourcesPrompt(connectionIds: string[]): {
options: [
{ value: 'continue', label: 'Continue to knowledge sources' },
{ value: 'add', label: 'Add another primary source' },
{ value: 'back', label: 'Back' },
],
};
}
@ -986,6 +1070,22 @@ async function writeScopeConfig(input: {
});
}
async function clearScopeConfig(projectDir: string, connectionId: string): Promise<void> {
const project = await loadKtxProject({ projectDir });
const connection = project.config.connections[connectionId];
if (!connection) return;
const driver = normalizeDriver(connection.driver);
if (!driver) return;
const spec = SCOPE_DISCOVERY_SPECS[driver];
if (!spec) return;
const cleaned = Object.fromEntries(
Object.entries(connection).filter(
([key]) => key !== spec.configArrayField && key !== spec.configSingleField && key !== 'enabled_tables',
),
) as KtxProjectConnectionConfig;
await writeConnectionConfig({ projectDir, connectionId, connection: cleaned });
}
async function maybeConfigureSchemaScope(input: {
projectDir: string;
connectionId: string;
@ -1072,6 +1172,130 @@ async function maybeConfigureSchemaScope(input: {
return true;
}
async function maybeConfigureTableScope(input: {
projectDir: string;
connectionId: string;
args: KtxSetupDatabasesArgs;
prompts: KtxSetupDatabasesPromptAdapter;
io: KtxCliIo;
deps: KtxSetupDatabasesDeps;
}): Promise<boolean> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const connection = project.config.connections[input.connectionId];
const driver = normalizeDriver(connection?.driver);
if (!driver || driver === 'sqlite') return true;
const existingTables = connection?.enabled_tables;
if (Array.isArray(existingTables) && existingTables.length > 0) {
return true;
}
if (input.args.inputMode === 'disabled') {
return true;
}
writeSetupSection(input.io, 'Discovering tables', [
`Connecting to ${input.connectionId}`,
]);
let discovered: KtxTableListEntry[];
try {
discovered = await (input.deps.listTables ?? defaultListTables)(
input.projectDir,
input.connectionId,
);
} catch (error) {
input.io.stderr.write(
`Could not discover tables for ${input.connectionId}; continuing without table filter. ` +
`${error instanceof Error ? error.message : String(error)}\n`,
);
return true;
}
if (discovered.length === 0) {
return true;
}
const allQualified = discovered.map((t) => `${t.schema}.${t.name}`);
if (discovered.length === 1) {
await writeConnectionConfig({
projectDir: input.projectDir,
connectionId: input.connectionId,
connection: { ...connection!, enabled_tables: allQualified },
});
writeSetupSection(input.io, `Tables enabled for ${input.connectionId}`, [
`${allQualified[0]}`,
]);
return true;
}
const bySchema = new Map<string, KtxTableListEntry[]>();
for (const entry of discovered) {
const existing = bySchema.get(entry.schema) ?? [];
existing.push(entry);
bySchema.set(entry.schema, existing);
}
const schemaList = [...bySchema.keys()].sort();
const schemaSummary = schemaList.map((s) => `${s} (${bySchema.get(s)!.length})`).join(', ');
let selected: string[] | null = null;
while (selected === null) {
const action = await input.prompts.select({
message: `Tables found in selected schemas\n` +
`${discovered.length} tables across ${schemaList.length} ${schemaList.length === 1 ? 'schema' : 'schemas'}: ${schemaSummary}`,
options: [
{ value: 'all', label: 'Enable all tables' },
{ value: 'customize', label: 'Customize which tables to enable' },
{ value: 'back', label: 'Back' },
],
});
if (action === 'back') {
return false;
}
if (action === 'all') {
selected = allQualified;
} else {
const choices = await input.prompts.multiselect({
message: withMultiselectNavigation(
`Tables to enable for ${input.connectionId}\n` +
`Deselect any tables agents should not use.`,
),
options: discovered.map((t) => {
const qualified = `${t.schema}.${t.name}`;
const suffix = t.kind === 'view' ? ' (view)' : '';
return { value: qualified, label: `${qualified}${suffix}` };
}),
initialValues: allQualified,
required: true,
});
if (choices.includes('back')) {
continue;
}
if (choices.length === 0) {
input.io.stdout.write('│ KTX needs at least one table enabled. Select a table or press Escape to go back.\n');
continue;
}
selected = choices;
}
}
await writeConnectionConfig({
projectDir: input.projectDir,
connectionId: input.connectionId,
connection: { ...connection!, enabled_tables: selected },
});
writeSetupSection(input.io, `Tables enabled for ${input.connectionId}`, [
`${selected.length}/${discovered.length} tables enabled`,
]);
return true;
}
async function ensureHistoricSqlIngestDefaults(projectDir: string): Promise<void> {
const project = await loadKtxProject({ projectDir });
const adapters = project.config.ingest.adapters.includes('historic-sql')
@ -1198,8 +1422,16 @@ async function validateAndScanConnection(input: {
const testLines = ['✓ Connection test passed', `Driver: ${driverDisplay}`];
writeSetupSection(input.io, `Testing ${input.connectionId}`, testLines);
if (!(await maybeConfigureSchemaScope(input))) {
return false;
while (true) {
if (!(await maybeConfigureSchemaScope(input))) {
return false;
}
if (await maybeConfigureTableScope(input)) {
break;
}
await clearScopeConfig(input.projectDir, input.connectionId);
}
await maybeRunHistoricSqlSetupProbe({
@ -1370,13 +1602,10 @@ export async function runKtxSetupDatabasesStep(
while (true) {
if (showConfiguredPrimaryMenu) {
const action = await prompts.select(configuredPrimarySourcesPrompt(selectedConnectionIds));
if (action === 'continue') {
if (action === 'continue' || action === 'back') {
await markDatabasesComplete(args.projectDir, selectedConnectionIds);
return { status: 'ready', projectDir: args.projectDir, connectionIds: selectedConnectionIds };
}
if (action === 'back') {
return { status: 'back', projectDir: args.projectDir };
}
}
showConfiguredPrimaryMenu = false;

View file

@ -14,6 +14,7 @@ import {
type KtxSchemaColumn,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
@ -63,6 +64,7 @@ export interface KtxBigQueryQueryJob {
export interface KtxBigQueryTableRef {
id?: string;
metadata?: { type?: string };
get(): Promise<
[
{
@ -369,6 +371,25 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
return datasets.map((dataset) => dataset.id).filter((id): id is string => Boolean(id));
}
async listTables(datasetIds?: string[]): Promise<KtxTableListEntry[]> {
const filterDatasets = datasetIds ?? (await this.listDatasets());
const entries: KtxTableListEntry[] = [];
for (const datasetId of filterDatasets) {
const dataset = this.getClient().dataset(datasetId);
const [tables] = await dataset.getTables();
for (const table of tables) {
if (!table.id) continue;
entries.push({
schema: datasetId,
name: table.id,
kind: table.metadata?.type === 'VIEW' ? 'view' : 'table',
});
}
}
entries.sort((a, b) => a.schema.localeCompare(b.schema) || a.name.localeCompare(b.name));
return entries;
}
async cleanup(): Promise<void> {
this.client = null;
}

View file

@ -16,6 +16,7 @@ import {
type KtxSchemaTable,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableListEntry,
type KtxTableSampleResult,
} from '@ktx/context/scan';
import { readFileSync } from 'node:fs';
@ -128,6 +129,12 @@ interface ClickHouseDatabaseRow {
name: string;
}
interface ClickHouseTableListRow {
database: string;
name: string;
engine: string;
}
interface ClickHouseCompactResponse {
meta?: Array<{ name: string; type: string }>;
data?: unknown[][];
@ -417,6 +424,25 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
return rows.map((row) => row.name);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const rows = await this.queryEachRow<ClickHouseTableListRow>(
`
SELECT database, name, engine
FROM system.tables
WHERE database IN ({schemas:Array(String)})
ORDER BY database, name
`,
{ schemas: filterSchemas },
);
return rows.map((row) => ({
schema: row.database,
name: row.name,
kind: row.engine === 'View' || row.engine === 'MaterializedView' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.client) {
await this.client.close();

View file

@ -15,6 +15,7 @@ import {
type KtxScanContext,
type KtxScanInput,
type KtxSchemaColumn,
type KtxTableListEntry,
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
@ -129,6 +130,12 @@ interface MysqlSchemaRow extends RowDataPacket {
SCHEMA_NAME: string;
}
interface MysqlTableListRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
TABLE_TYPE: string;
}
interface MysqlCountRow extends RowDataPacket {
count?: unknown;
cardinality?: unknown;
@ -466,6 +473,27 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
return rows.map((row) => row.SCHEMA_NAME);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const placeholders = filterSchemas.map(() => '?').join(', ');
const rows = await this.queryRaw<MysqlTableListRow>(
`
SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA IN (${placeholders})
AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
ORDER BY TABLE_SCHEMA, TABLE_NAME
`,
filterSchemas,
);
return rows.map((row) => ({
schema: row.TABLE_SCHEMA,
name: row.TABLE_NAME,
kind: row.TABLE_TYPE === 'VIEW' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.end();

View file

@ -17,6 +17,7 @@ import {
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
@ -179,6 +180,12 @@ interface PostgresSchemaRow {
schema_name: string;
}
interface PostgresTableListRow {
schema_name: string;
table_name: string;
table_kind: string;
}
interface PostgresCountRow {
count?: unknown;
cardinality?: unknown;
@ -523,6 +530,27 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
return rows.map((row) => row.schema_name);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const rows = await this.queryRaw<PostgresTableListRow>(
`
SELECT n.nspname AS schema_name, c.relname AS table_name, c.relkind AS table_kind
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = ANY($1)
AND c.relkind IN ('r', 'v')
ORDER BY n.nspname, c.relname
`,
[filterSchemas],
);
return rows.map((row) => ({
schema: row.schema_name,
name: row.table_name,
kind: row.table_kind === 'v' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.end();

View file

@ -60,6 +60,10 @@ function fakeDriverFactory(): KtxSnowflakeDriverFactory {
},
]),
listSchemas: vi.fn(async () => ['PUBLIC', 'MART']),
listTables: vi.fn(async () => [
{ schema: 'PUBLIC', name: 'ORDERS', kind: 'table' as const },
{ schema: 'PUBLIC', name: 'ORDER_SUMMARY', kind: 'view' as const },
]),
cleanup: vi.fn(async () => undefined),
};
return { createDriver: vi.fn(() => driver) };

View file

@ -19,6 +19,7 @@ import {
type KtxSchemaTable,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableListEntry,
type KtxTableSampleResult,
} from '@ktx/context/scan';
import * as snowflake from 'snowflake-sdk';
@ -75,6 +76,7 @@ export interface KtxSnowflakeDriver {
query(sql: string, params?: unknown): Promise<KtxQueryResult>;
getSchemaMetadata(schemaName?: string): Promise<KtxSnowflakeRawTableMetadata[]>;
listSchemas(): Promise<string[]>;
listTables(schemas?: string[]): Promise<KtxTableListEntry[]>;
cleanup(): Promise<void>;
}
@ -344,6 +346,31 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
return result.rows.map((row) => String(row[1])).filter((name) => name !== 'INFORMATION_SCHEMA');
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const entries: KtxTableListEntry[] = [];
for (const schemaName of filterSchemas) {
const result = await this.query(
`
SELECT TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
ORDER BY TABLE_NAME
`,
[schemaName, this.resolved.database],
);
for (const row of result.rows) {
entries.push({
schema: schemaName,
name: String(row[0]),
kind: String(row[1]) === 'VIEW' ? 'view' : 'table',
});
}
}
return entries;
}
async cleanup(): Promise<void> {
const closers = this.closeSdkOptions;
this.closeSdkOptions = [];
@ -594,6 +621,10 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
return this.getDriver().listSchemas();
}
listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
return this.getDriver().listTables(schemas);
}
async cleanup(): Promise<void> {
if (this.driverInstance) {
await this.driverInstance.cleanup();

View file

@ -14,6 +14,7 @@ import {
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
@ -441,6 +442,32 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
return rows.map((row) => row.schema_name);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const params: Record<string, unknown> = {};
const placeholders = filterSchemas.map((s, i) => {
params[`schema${i}`] = s;
return `@schema${i}`;
});
const rows = await this.queryRaw<{ schema_name: string; table_name: string; table_type: string }>(
`
SELECT s.name AS schema_name, o.name AS table_name, o.type_desc AS table_type
FROM sys.objects o
JOIN sys.schemas s ON o.schema_id = s.schema_id
WHERE o.type IN ('U', 'V')
AND s.name IN (${placeholders.join(', ')})
ORDER BY s.name, o.name
`,
params,
);
return rows.map((row) => ({
schema: row.schema_name,
name: row.table_name,
kind: row.table_type === 'VIEW' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.close();

View file

@ -105,7 +105,7 @@ export type {
LocalScanStatusResponse,
RunLocalScanOptions,
} from './local-scan.js';
export { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js';
export { filterSnapshotTables, getLocalScanReport, getLocalScanStatus, resolveEnabledTables, runLocalScan } from './local-scan.js';
export type { ReadLocalScanStructuralSnapshotInput } from './local-structural-artifacts.js';
export { readLocalScanStructuralSnapshot } from './local-structural-artifacts.js';
export type {
@ -393,6 +393,7 @@ export type {
KtxSchemaTable,
KtxSchemaTableKind,
KtxStructuralSyncStats,
KtxTableListEntry,
KtxTableRef,
KtxTableSampleInput,
KtxTableSampleResult,

View file

@ -6,8 +6,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import YAML from 'yaml';
import type { SourceAdapter } from '../ingest/index.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js';
import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js';
import type { KtxQueryResult, KtxReadOnlyQueryInput } from './types.js';
import { filterSnapshotTables, getLocalScanReport, getLocalScanStatus, resolveEnabledTables, runLocalScan } from './local-scan.js';
import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxSchemaSnapshot, KtxSchemaTable } from './types.js';
function relationshipSqlResult(
input: KtxReadOnlyQueryInput,
@ -1492,3 +1492,79 @@ describe('local scan', () => {
);
});
});
describe('resolveEnabledTables', () => {
it('returns null when no enabled_tables field', () => {
expect(resolveEnabledTables({ driver: 'postgres' })).toBeNull();
});
it('returns null for empty array', () => {
expect(resolveEnabledTables({ driver: 'postgres', enabled_tables: [] })).toBeNull();
});
it('returns Set of enabled table names', () => {
const result = resolveEnabledTables({
driver: 'postgres',
enabled_tables: ['public.users', 'public.orders'],
});
expect(result).toBeInstanceOf(Set);
expect(result!.size).toBe(2);
expect(result!.has('public.users')).toBe(true);
expect(result!.has('public.orders')).toBe(true);
});
it('returns null for undefined connection', () => {
expect(resolveEnabledTables(undefined)).toBeNull();
});
});
describe('filterSnapshotTables', () => {
function makeSnapshot(tables: Array<{ db: string; name: string }>): KtxSchemaSnapshot {
return {
connectionId: 'test',
driver: 'postgres',
extractedAt: '2026-01-01T00:00:00Z',
scope: {},
metadata: {},
tables: tables.map(
(t): KtxSchemaTable => ({
catalog: null,
db: t.db,
name: t.name,
kind: 'table',
comment: null,
estimatedRows: null,
columns: [],
foreignKeys: [],
}),
),
};
}
it('keeps only enabled tables', () => {
const snapshot = makeSnapshot([
{ db: 'public', name: 'users' },
{ db: 'public', name: 'orders' },
{ db: 'public', name: 'logs' },
]);
const enabled = new Set(['public.users', 'public.orders']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.tables).toHaveLength(2);
expect(filtered.tables.map((t) => t.name)).toEqual(['users', 'orders']);
});
it('returns empty tables when none match', () => {
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
const enabled = new Set(['public.orders']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.tables).toHaveLength(0);
});
it('preserves other snapshot fields', () => {
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
const enabled = new Set(['public.users']);
const filtered = filterSnapshotTables(snapshot, enabled);
expect(filtered.connectionId).toBe('test');
expect(filtered.driver).toBe('postgres');
});
});

View file

@ -29,10 +29,13 @@ import type {
KtxConnectionDriver,
KtxProgressPort,
KtxScanConnector,
KtxScanContext,
KtxScanEnrichmentStateSummary,
KtxScanInput,
KtxScanMode,
KtxScanReport,
KtxScanTrigger,
KtxSchemaSnapshot,
} from './types.js';
export interface RunLocalScanOptions {
@ -313,17 +316,45 @@ async function readScanReport(
}
}
export function resolveEnabledTables(connection: Record<string, unknown> | undefined): Set<string> | null {
const raw = connection?.enabled_tables;
if (!Array.isArray(raw) || raw.length === 0) return null;
return new Set(raw.filter((v): v is string => typeof v === 'string'));
}
export function filterSnapshotTables(snapshot: KtxSchemaSnapshot, enabledTables: Set<string>): KtxSchemaSnapshot {
return {
...snapshot,
tables: snapshot.tables.filter((table) => {
const key = table.db ? `${table.db}.${table.name}` : table.name;
return enabledTables.has(key);
}),
};
}
function createFilteredConnector(connector: KtxScanConnector, enabledTables: Set<string>): KtxScanConnector {
return {
...connector,
async introspect(input: KtxScanInput, ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
const snapshot = await connector.introspect(input, ctx);
return filterSnapshotTables(snapshot, enabledTables);
},
};
}
export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalScanRunResult> {
const mode = options.mode ?? 'structural';
assertSupportedMode(mode);
await options.progress?.update(0.05, 'Preparing scan');
const connector = await resolveScanConnector(options, mode);
const rawConnector = await resolveScanConnector(options, mode);
const connection = options.project.config.connections[options.connectionId];
if (!connection) {
throw new Error(`Connection "${options.connectionId}" is not configured in ktx.yaml`);
}
const driver = normalizeDriver(connection.driver);
const enabledTables = resolveEnabledTables(connection);
const connector = rawConnector && enabledTables ? createFilteredConnector(rawConnector, enabledTables) : rawConnector;
const adapters =
options.adapters ??
createDefaultLocalIngestAdapters(options.project, { databaseIntrospectionUrl: options.databaseIntrospectionUrl });
@ -372,13 +403,28 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
let enrichmentState: KtxScanEnrichmentStateSummary = completedKtxScanEnrichmentStateSummary();
if (!reusedExistingScanArtifacts && !report.dryRun && report.artifactPaths.rawSourcesDir) {
await options.progress?.update(0.7, 'Writing schema artifacts');
const structuralSnapshot = await readLocalScanStructuralSnapshot({
const rawSnapshot = await readLocalScanStructuralSnapshot({
project: options.project,
connectionId: options.connectionId,
driver,
rawSourcesDir: report.artifactPaths.rawSourcesDir,
extractedAtFallback: report.createdAt,
});
const structuralSnapshot = enabledTables ? filterSnapshotTables(rawSnapshot, enabledTables) : rawSnapshot;
if (enabledTables && structuralSnapshot.tables.length < rawSnapshot.tables.length) {
const excluded = rawSnapshot.tables.length - structuralSnapshot.tables.length;
let remaining = excluded;
const ds = report.diffSummary;
const subFrom = (field: 'tablesAdded' | 'tablesUnchanged' | 'tablesModified') => {
const take = Math.min(remaining, ds[field]);
ds[field] -= take;
remaining -= take;
};
subFrom('tablesAdded');
subFrom('tablesUnchanged');
subFrom('tablesModified');
await options.progress?.update(0.6, scanChangeSummary(report.diffSummary));
}
const manifestArtifacts = await writeLocalScanManifestShards({
project: options.project,
connectionId: options.connectionId,

View file

@ -277,6 +277,12 @@ export interface KtxQueryResult {
rowCount: number | null;
}
export interface KtxTableListEntry {
schema: string;
name: string;
kind: 'table' | 'view';
}
export interface KtxScanConnector {
id: string;
driver: KtxConnectionDriver;