refactor(setup): route scope discovery through driver registry

This commit is contained in:
Andrey Avtomonov 2026-05-25 13:45:40 +02:00
parent ff906a72fc
commit d0fa77970c

View file

@ -3,6 +3,7 @@ import { readFile, writeFile } from 'node:fs/promises';
import { delimiter, dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
import { promisify } from 'node:util';
import { getDriverRegistration } from './context/connections/drivers.js';
import { queryHistoryDialectForConnection } from './context/ingest/adapters/historic-sql/connection-dialect.js';
import type { HistoricSqlDialect } from './context/ingest/adapters/historic-sql/types.js';
import {
@ -361,74 +362,18 @@ async function defaultListSchemas(projectDir: string, connectionId: string): Pro
const project = await loadKtxProject({ projectDir });
const connection = project.config.connections[connectionId];
const driver = normalizeDriver(connection?.driver);
const registration = driver ? getDriverRegistration(driver) : undefined;
if (!registration) return [];
if (driver === 'postgres') {
const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('./connectors/postgres/connector.js');;
if (!isKtxPostgresConnectionConfig(connection)) return [];
const connector = new KtxPostgresScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
const connectorModule = await registration.load();
if (!connectorModule.isConnectionConfig(connection)) return [];
const connector = connectorModule.createScanConnector({ connectionId, connection, projectDir });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup?.();
}
if (driver === 'sqlserver') {
const { KtxSqlServerScanConnector, isKtxSqlServerConnectionConfig } = await import('./connectors/sqlserver/connector.js');;
if (!isKtxSqlServerConnectionConfig(connection)) return [];
const connector = new KtxSqlServerScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
if (driver === 'mysql') {
const { KtxMysqlScanConnector, isKtxMysqlConnectionConfig } = await import('./connectors/mysql/connector.js');;
if (!isKtxMysqlConnectionConfig(connection)) return [];
const connector = new KtxMysqlScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
if (driver === 'clickhouse') {
const { KtxClickHouseScanConnector, isKtxClickHouseConnectionConfig } = await import('./connectors/clickhouse/connector.js');;
if (!isKtxClickHouseConnectionConfig(connection)) return [];
const connector = new KtxClickHouseScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
if (driver === 'bigquery') {
const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('./connectors/bigquery/connector.js');;
if (!isKtxBigQueryConnectionConfig(connection)) return [];
const connector = new KtxBigQueryScanConnector({ connectionId, connection });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
if (driver === 'snowflake') {
const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');;
if (!isKtxSnowflakeConnectionConfig(connection)) return [];
const connector = new KtxSnowflakeScanConnector({ connectionId, connection, projectDir });
try {
return await connector.listSchemas();
} finally {
await connector.cleanup();
}
}
return [];
}
function configuredSchemas(connection: KtxProjectConnectionConfig | undefined, driver: KtxSetupDatabaseDriver): string[] | undefined {
@ -448,74 +393,18 @@ async function defaultListTables(
const connection = project.config.connections[connectionId];
const driver = normalizeDriver(connection?.driver);
const schemas = schemasOverride ?? (driver ? configuredSchemas(connection, driver) : undefined);
const registration = driver ? getDriverRegistration(driver) : undefined;
if (!registration) return [];
if (driver === 'postgres') {
const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('./connectors/postgres/connector.js');;
if (!isKtxPostgresConnectionConfig(connection)) return [];
const connector = new KtxPostgresScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
const connectorModule = await registration.load();
if (!connectorModule.isConnectionConfig(connection)) return [];
const connector = connectorModule.createScanConnector({ connectionId, connection, projectDir });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup?.();
}
if (driver === 'mysql') {
const { KtxMysqlScanConnector, isKtxMysqlConnectionConfig } = await import('./connectors/mysql/connector.js');;
if (!isKtxMysqlConnectionConfig(connection)) return [];
const connector = new KtxMysqlScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'sqlserver') {
const { KtxSqlServerScanConnector, isKtxSqlServerConnectionConfig } = await import('./connectors/sqlserver/connector.js');;
if (!isKtxSqlServerConnectionConfig(connection)) return [];
const connector = new KtxSqlServerScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'bigquery') {
const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('./connectors/bigquery/connector.js');;
if (!isKtxBigQueryConnectionConfig(connection)) return [];
const connector = new KtxBigQueryScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'snowflake') {
const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');;
if (!isKtxSnowflakeConnectionConfig(connection)) return [];
const connector = new KtxSnowflakeScanConnector({ connectionId, connection, projectDir });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
if (driver === 'clickhouse') {
const { KtxClickHouseScanConnector, isKtxClickHouseConnectionConfig } = await import('./connectors/clickhouse/connector.js');;
if (!isKtxClickHouseConnectionConfig(connection)) return [];
const connector = new KtxClickHouseScanConnector({ connectionId, connection });
try {
return await connector.listTables(schemas);
} finally {
await connector.cleanup();
}
}
return [];
}
function existingConnectionIdsByDriver(
@ -638,9 +527,9 @@ function scriptedScopeConfigForDriver(
databaseSchemas: string[],
): Record<string, unknown> {
if (databaseSchemas.length === 0) return {};
if (driver === 'bigquery') return { dataset_ids: databaseSchemas };
if (driver === 'clickhouse') return { databases: databaseSchemas };
return { schemas: databaseSchemas };
const registration = getDriverRegistration(driver);
if (!registration?.scopeConfigKey) return {};
return { [registration.scopeConfigKey]: databaseSchemas };
}
function databaseNameFromLiteralUrl(url: string): string | undefined {