feat: wire schema scope discovery for all relational setup drivers

This commit is contained in:
Andrey Avtomonov 2026-05-21 19:48:13 +02:00
parent 633d53a2cb
commit 4efa061c0e
3 changed files with 219 additions and 34 deletions

View file

@ -38,6 +38,11 @@ export type DatabaseScopePickResult =
| { kind: 'selected'; activeSchemas: string[]; enabledTables: string[] }
| { kind: 'back' };
interface ScopeSuggestion {
excluded: Set<string>;
suggested: Set<string>;
}
export interface PickDatabaseScopeArgs {
connectionId: string;
schemaNoun: string;
@ -45,6 +50,7 @@ export interface PickDatabaseScopeArgs {
discovered: readonly KtxTableListEntry[];
existing: { enabledTables: readonly string[] };
defaultSchemas: readonly string[];
schemaSuggestion?: ScopeSuggestion;
supportsSchemaScope: boolean;
}

View file

@ -1,7 +1,7 @@
import { mkdtemp, readFile, rm, stat, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join, resolve } from 'node:path';
import { initKtxProject } from './context/project/project.js';
import { initKtxProject, loadKtxProject } from './context/project/project.js';
import { parseKtxProjectConfig } from './context/project/config.js';
import { readKtxSetupState, writeKtxSetupState } from './context/project/setup-config.js';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
@ -435,16 +435,13 @@ describe('setup databases step', () => {
{
driver: 'bigquery',
selectValues: ['no'],
textValues: ['', 'analytics', '/path/to/service-account.json', ''],
textValues: ['', '/path/to/service-account.json', ''],
expectedTextPrompts: [
{
message: connectionNamePrompt('BigQuery'),
placeholder: 'bigquery-warehouse',
initialValue: 'bigquery-warehouse',
},
{
message: 'BigQuery dataset\nFor example analytics.',
},
{
message: 'Path to service account JSON file',
},
@ -1457,6 +1454,88 @@ describe('setup databases step', () => {
});
});
it('offers schema scope discovery for MySQL and writes selected schemas', async () => {
const prompts = makePromptAdapter({
multiselectValues: [['mysql']],
selectValues: ['url', 'continue'],
textValues: ['mysql-warehouse', 'mysql://reader@localhost/analytics'],
});
const listSchemas = vi.fn(async () => ['analytics', 'mart']);
const listTables = vi.fn(async (_projectDir: string, _connectionId: string, schemas?: string[]) =>
(schemas ?? []).map((schema) => ({ schema, name: 'orders', kind: 'table' as const })),
);
const pickDatabaseScope = vi.fn(async (args: PickDatabaseScopeArgs) => {
const scopedArgs = args as PickDatabaseScopeArgs & {
schemaSuggestion: { suggested: Set<string> };
};
expect(args.schemaNoun).toBe('database');
expect(args.discovered.map((table) => table.schema)).toEqual(['analytics', 'mart']);
expect(scopedArgs.schemaSuggestion.suggested).toEqual(new Set(['analytics', 'mart']));
return { kind: 'selected' as const, activeSchemas: ['mart'], enabledTables: ['mart.orders'] };
});
await runKtxSetupDatabasesStep(
{ projectDir: tempDir, inputMode: 'auto', skipDatabases: false, databaseSchemas: [] },
makeIo().io,
{ prompts, testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), listSchemas, listTables, pickDatabaseScope },
);
const project = await loadKtxProject({ projectDir: tempDir });
expect(project.config.connections['mysql-warehouse']).toMatchObject({
driver: 'mysql',
schemas: ['mart'],
enabled_tables: ['mart.orders'],
});
});
it('maps ClickHouse scripted database schema input to databases and preserves database', async () => {
await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
skipDatabases: false,
databaseDrivers: ['clickhouse'],
databaseConnectionId: 'clickhouse-warehouse',
databaseUrl: 'clickhouse://reader@localhost/analytics',
databaseSchemas: ['analytics', 'mart'],
},
makeIo().io,
{ testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0) },
);
const project = await loadKtxProject({ projectDir: tempDir });
expect(project.config.connections['clickhouse-warehouse']).toMatchObject({
driver: 'clickhouse',
database: 'analytics',
databases: ['analytics', 'mart'],
});
expect(project.config.connections['clickhouse-warehouse']).not.toHaveProperty('schemas');
});
it('does not prompt for a bootstrap BigQuery dataset before scope discovery', async () => {
const prompts = makePromptAdapter({
multiselectValues: [['bigquery']],
selectValues: ['no', 'continue'],
textValues: ['bigquery-warehouse', '/tmp/service-account.json', 'US'],
});
const listSchemas = vi.fn(async () => ['analytics']);
const listTables = vi.fn(async () => [{ schema: 'analytics', name: 'orders', kind: 'table' as const }]);
const pickDatabaseScope = vi.fn(async () => ({
kind: 'selected' as const,
activeSchemas: ['analytics'],
enabledTables: ['analytics.orders'],
}));
await runKtxSetupDatabasesStep(
{ projectDir: tempDir, inputMode: 'auto', skipDatabases: false, databaseSchemas: [] },
makeIo().io,
{ prompts, testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), listSchemas, listTables, pickDatabaseScope },
);
const textMessages = vi.mocked(prompts.text).mock.calls.map(([options]) => options.message);
expect(textMessages).not.toContain(textInputPrompt('BigQuery dataset\nFor example analytics.'));
});
it('prompts for discovered Postgres schemas before the first scan', async () => {
const io = makeIo();
const prompts = makePromptAdapter({

View file

@ -134,8 +134,39 @@ interface ScopeDiscoverySpec {
nounPlural: string;
promptLabel: string;
configArrayField: string;
configSingleField: string;
defaultSelection: (values: string[]) => string[];
configSingleField?: string;
suggest: ScopeSuggest;
}
interface ScopeSuggestion {
excluded: Set<string>;
suggested: Set<string>;
}
type ScopeSuggest = (values: string[]) => ScopeSuggestion;
const SUGGESTED_SCOPE_PATTERN = /^(mart|prod|analytics|core|dim|fact|gold)(_|$)/i;
const EXCLUDED_SCOPE_PATTERN = /^(information_schema|pg_catalog|pg_toast|_airbyte_|mysql$|performance_schema$|sys$)/i;
function defaultSuggest(values: string[]): ScopeSuggestion {
const excluded = new Set(values.filter((value) => EXCLUDED_SCOPE_PATTERN.test(value)));
const suggested = new Set(
values.filter((value) => !excluded.has(value) && SUGGESTED_SCOPE_PATTERN.test(value)),
);
return { excluded, suggested };
}
function legacyDefaultSchemasForPicker(
schemas: string[],
suggestion: ScopeSuggestion,
): string[] {
const suggested = schemas.filter((schema) => suggestion.suggested.has(schema));
if (suggested.length > 0) {
return suggested;
}
const visible = schemas.filter((schema) => !suggestion.excluded.has(schema));
const nonPublic = visible.filter((schema) => schema !== 'public' && schema !== 'PUBLIC');
return nonPublic.length > 0 ? nonPublic : visible;
}
const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscoverySpec>> = {
@ -145,10 +176,22 @@ const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscove
promptLabel: 'PostgreSQL schemas',
configArrayField: 'schemas',
configSingleField: 'schema',
defaultSelection(schemas) {
const nonPublic = schemas.filter((s) => s !== 'public');
return nonPublic.length > 0 ? nonPublic : schemas;
},
suggest: defaultSuggest,
},
mysql: {
noun: 'database',
nounPlural: 'databases',
promptLabel: 'MySQL databases',
configArrayField: 'schemas',
configSingleField: 'schema',
suggest: defaultSuggest,
},
clickhouse: {
noun: 'database',
nounPlural: 'databases',
promptLabel: 'ClickHouse databases',
configArrayField: 'databases',
suggest: defaultSuggest,
},
sqlserver: {
noun: 'schema',
@ -156,7 +199,7 @@ const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscove
promptLabel: 'SQL Server schemas',
configArrayField: 'schemas',
configSingleField: 'schema',
defaultSelection: (schemas) => schemas,
suggest: defaultSuggest,
},
bigquery: {
noun: 'dataset',
@ -164,7 +207,7 @@ const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscove
promptLabel: 'BigQuery datasets',
configArrayField: 'dataset_ids',
configSingleField: 'dataset_id',
defaultSelection: (datasets) => datasets,
suggest: defaultSuggest,
},
snowflake: {
noun: 'schema',
@ -172,10 +215,7 @@ const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscove
promptLabel: 'Snowflake schemas',
configArrayField: 'schema_names',
configSingleField: 'schema_name',
defaultSelection(schemas) {
const nonPublic = schemas.filter((s) => s !== 'PUBLIC');
return nonPublic.length > 0 ? nonPublic : schemas;
},
suggest: defaultSuggest,
},
};
@ -386,6 +426,28 @@ async function defaultListSchemas(projectDir: string, connectionId: string): Pro
}
}
if (driver === 'mysql') {
const { KtxMysqlScanConnector, isKtxMysqlConnectionConfig } = await import('./connectors/mysql/connector.js');;
if (!isKtxMysqlConnectionConfig(connection)) return [];
const connector = new KtxMysqlScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
if (driver === 'clickhouse') {
const { KtxClickHouseScanConnector, isKtxClickHouseConnectionConfig } = await import('./connectors/clickhouse/connector.js');;
if (!isKtxClickHouseConnectionConfig(connection)) return [];
const connector = new KtxClickHouseScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
if (driver === 'bigquery') {
const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('./connectors/bigquery/connector.js');;
if (!isKtxBigQueryConnectionConfig(connection)) return [];
@ -606,6 +668,27 @@ function normalizeFileReference(value: string): string {
return `file:${normalized}`;
}
function scriptedScopeConfigForDriver(
driver: KtxSetupDatabaseDriver,
databaseSchemas: string[],
): Record<string, unknown> {
if (databaseSchemas.length === 0) return {};
if (driver === 'bigquery') return { dataset_ids: databaseSchemas };
if (driver === 'clickhouse') return { databases: databaseSchemas };
return { schemas: databaseSchemas };
}
function databaseNameFromLiteralUrl(url: string): string | undefined {
if (url.startsWith('env:') || url.startsWith('file:')) {
return undefined;
}
try {
return new URL(url).pathname.replace(/^\/+/, '') || undefined;
} catch {
return undefined;
}
}
async function promptCredential(input: {
prompts: KtxSetupDatabasesPromptAdapter;
message: string;
@ -694,7 +777,7 @@ async function buildFieldsConnectionConfig(input: {
database,
username,
...(passwordRef ? { password: passwordRef } : {}),
...(input.args.databaseSchemas.length > 0 ? { schemas: input.args.databaseSchemas } : {}),
...scriptedScopeConfigForDriver(input.driver, input.args.databaseSchemas),
};
}
@ -720,10 +803,11 @@ async function buildPastedUrlConnectionConfig(input: {
return {
driver: input.driver,
url,
...(input.args.databaseSchemas.length > 0 ? { schemas: input.args.databaseSchemas } : {}),
...scriptedScopeConfigForDriver(input.driver, input.args.databaseSchemas),
};
}
const database = input.driver === 'clickhouse' ? databaseNameFromLiteralUrl(url) : undefined;
if (urlHasCredentials(url)) {
const ref = await writeProjectLocalSecretReference({
projectDir: input.args.projectDir,
@ -733,14 +817,16 @@ async function buildPastedUrlConnectionConfig(input: {
return {
driver: input.driver,
url: ref,
...(input.args.databaseSchemas.length > 0 ? { schemas: input.args.databaseSchemas } : {}),
...(database ? { database } : {}),
...scriptedScopeConfigForDriver(input.driver, input.args.databaseSchemas),
};
}
return {
driver: input.driver,
url,
...(input.args.databaseSchemas.length > 0 ? { schemas: input.args.databaseSchemas } : {}),
...(database ? { database } : {}),
...scriptedScopeConfigForDriver(input.driver, input.args.databaseSchemas),
};
}
@ -756,6 +842,7 @@ async function buildUrlConnectionConfig(input: {
if (input.args.databaseUrl) {
const url = normalizeInputReference(input.args.databaseUrl);
if (urlHasCredentials(url)) {
const database = input.driver === 'clickhouse' ? databaseNameFromLiteralUrl(url) : undefined;
const ref = await writeProjectLocalSecretReference({
projectDir: input.args.projectDir,
fileName: `${input.connectionId}-url`,
@ -764,13 +851,16 @@ async function buildUrlConnectionConfig(input: {
return {
driver: input.driver,
url: ref,
...(input.args.databaseSchemas.length > 0 ? { schemas: input.args.databaseSchemas } : {}),
...(database ? { database } : {}),
...scriptedScopeConfigForDriver(input.driver, input.args.databaseSchemas),
};
}
const database = input.driver === 'clickhouse' ? databaseNameFromLiteralUrl(url) : undefined;
return {
driver: input.driver,
url,
...(input.args.databaseSchemas.length > 0 ? { schemas: input.args.databaseSchemas } : {}),
...(database ? { database } : {}),
...scriptedScopeConfigForDriver(input.driver, input.args.databaseSchemas),
};
}
@ -822,12 +912,6 @@ async function buildConnectionConfig(input: {
});
}
if (driver === 'bigquery') {
const datasetId = await promptText(
prompts,
'BigQuery dataset\nFor example analytics.',
stringConfigField(input.existingConnection, 'dataset_id'),
);
if (datasetId === undefined) return 'back';
const credentialsPath = await promptText(
prompts,
'Path to service account JSON file',
@ -840,12 +924,12 @@ async function buildConnectionConfig(input: {
stringConfigField(input.existingConnection, 'location') ?? 'US',
);
if (location === undefined) return 'back';
if (!datasetId || !credentialsPath) return null;
if (!credentialsPath) return null;
return {
driver: 'bigquery',
dataset_id: datasetId,
credentials_json: normalizeFileReference(credentialsPath),
...(location ? { location } : {}),
...scriptedScopeConfigForDriver('bigquery', args.databaseSchemas),
};
}
if (driver === 'snowflake') {
@ -1260,9 +1344,17 @@ function withExistingPrimaryEditPromptDefaults(input: {
Array.isArray(previousArray) &&
previousArray.length > 0
) {
delete merged[spec.configSingleField];
if (spec.configSingleField) {
delete merged[spec.configSingleField];
}
merged[spec.configArrayField] = previousArray;
} else if (!Object.hasOwn(input.next, spec.configArrayField) && !Object.hasOwn(input.next, spec.configSingleField)) {
} else if (
!Object.hasOwn(input.next, spec.configArrayField) &&
(!spec.configSingleField || !Object.hasOwn(input.next, spec.configSingleField))
) {
if (!spec.configSingleField) {
return merged;
}
const previousSingle = input.previous[spec.configSingleField];
if (typeof previousSingle === 'string' && previousSingle.trim().length > 0) {
merged[spec.configSingleField] = previousSingle;
@ -1286,6 +1378,9 @@ function configuredScopeValues(
.filter((v): v is string => typeof v === 'string' && v.trim().length > 0)
.map((v) => v.trim());
}
if (!spec.configSingleField) {
return [];
}
const singleVal = connection[spec.configSingleField];
return typeof singleVal === 'string' && singleVal.trim().length > 0 ? [singleVal.trim()] : [];
}
@ -1428,8 +1523,12 @@ async function maybeConfigureDatabaseScope(input: {
const defaultSchemas = (() => {
if (cliSchemas.length > 0) return cliSchemas;
if (!spec) return schemasInDiscovery;
return spec.defaultSelection(schemasInDiscovery);
const suggestion = spec.suggest(schemasInDiscovery);
return legacyDefaultSchemasForPicker(schemasInDiscovery, suggestion);
})();
const schemaSuggestion = cliSchemas.length > 0
? { excluded: new Set<string>(), suggested: new Set(cliSchemas) }
: spec?.suggest(schemasInDiscovery);
const existingEnabled =
hasExistingTables && input.forcePrompt === true
@ -1453,6 +1552,7 @@ async function maybeConfigureDatabaseScope(input: {
discovered,
existing: { enabledTables: existingEnabled },
defaultSchemas,
...(schemaSuggestion ? { schemaSuggestion } : {}),
supportsSchemaScope: spec !== undefined,
},
input.io,