feat(cli): add warehouse driver registry

This commit is contained in:
Andrey Avtomonov 2026-05-25 13:40:56 +02:00
parent 4e4adcc061
commit ff906a72fc
3 changed files with 349 additions and 51 deletions

View file

@ -0,0 +1,205 @@
import type { KtxConnectionDriver, KtxScanConnector } from '../scan/types.js';
type KtxScopeConfigKey = 'dataset_ids' | 'databases' | 'schemas' | 'schema_names';
interface KtxDriverConnectorModule {
isConnectionConfig(connection: unknown): boolean;
createScanConnector(args: {
connectionId: string;
connection: unknown;
projectDir: string;
}): KtxScanConnector;
}
export interface KtxDriverRegistration {
readonly driver: KtxConnectionDriver;
readonly scopeConfigKey: KtxScopeConfigKey | null;
readonly hasHistoricSqlReader: boolean;
readonly hasLocalQueryExecutor: boolean;
load(): Promise<KtxDriverConnectorModule>;
}
function invalidConnectionConfig(driver: KtxConnectionDriver): Error {
return new Error(`Connection config does not match warehouse driver "${driver}".`);
}
/** @internal */
export const driverRegistrations: Record<KtxConnectionDriver, KtxDriverRegistration> = {
bigquery: {
driver: 'bigquery',
scopeConfigKey: 'dataset_ids',
hasHistoricSqlReader: true,
hasLocalQueryExecutor: false,
load: async () => {
const m = await import('../../connectors/bigquery/connector.js');
return {
isConnectionConfig: (connection) => {
const typedConnection = connection as Parameters<typeof m.isKtxBigQueryConnectionConfig>[0];
return m.isKtxBigQueryConnectionConfig(typedConnection);
},
createScanConnector: ({ connectionId, connection }) => {
const typedConnection = connection as Parameters<typeof m.isKtxBigQueryConnectionConfig>[0];
if (!m.isKtxBigQueryConnectionConfig(typedConnection)) {
throw invalidConnectionConfig('bigquery');
}
return new m.KtxBigQueryScanConnector({ connectionId, connection: typedConnection });
},
};
},
},
clickhouse: {
driver: 'clickhouse',
scopeConfigKey: 'databases',
hasHistoricSqlReader: false,
hasLocalQueryExecutor: false,
load: async () => {
const m = await import('../../connectors/clickhouse/connector.js');
return {
isConnectionConfig: (connection) => {
const typedConnection = connection as Parameters<typeof m.isKtxClickHouseConnectionConfig>[0];
return m.isKtxClickHouseConnectionConfig(typedConnection);
},
createScanConnector: ({ connectionId, connection }) => {
const typedConnection = connection as Parameters<typeof m.isKtxClickHouseConnectionConfig>[0];
if (!m.isKtxClickHouseConnectionConfig(typedConnection)) {
throw invalidConnectionConfig('clickhouse');
}
return new m.KtxClickHouseScanConnector({ connectionId, connection: typedConnection });
},
};
},
},
mysql: {
driver: 'mysql',
scopeConfigKey: 'schemas',
hasHistoricSqlReader: false,
hasLocalQueryExecutor: false,
load: async () => {
const m = await import('../../connectors/mysql/connector.js');
return {
isConnectionConfig: (connection) => {
const typedConnection = connection as Parameters<typeof m.isKtxMysqlConnectionConfig>[0];
return m.isKtxMysqlConnectionConfig(typedConnection);
},
createScanConnector: ({ connectionId, connection }) => {
const typedConnection = connection as Parameters<typeof m.isKtxMysqlConnectionConfig>[0];
if (!m.isKtxMysqlConnectionConfig(typedConnection)) {
throw invalidConnectionConfig('mysql');
}
return new m.KtxMysqlScanConnector({ connectionId, connection: typedConnection });
},
};
},
},
postgres: {
driver: 'postgres',
scopeConfigKey: 'schemas',
hasHistoricSqlReader: true,
hasLocalQueryExecutor: true,
load: async () => {
const m = await import('../../connectors/postgres/connector.js');
return {
isConnectionConfig: (connection) => {
const typedConnection = connection as Parameters<typeof m.isKtxPostgresConnectionConfig>[0];
return m.isKtxPostgresConnectionConfig(typedConnection);
},
createScanConnector: ({ connectionId, connection }) => {
const typedConnection = connection as Parameters<typeof m.isKtxPostgresConnectionConfig>[0];
if (!m.isKtxPostgresConnectionConfig(typedConnection)) {
throw invalidConnectionConfig('postgres');
}
return new m.KtxPostgresScanConnector({ connectionId, connection: typedConnection });
},
};
},
},
sqlite: {
driver: 'sqlite',
scopeConfigKey: null,
hasHistoricSqlReader: false,
hasLocalQueryExecutor: true,
load: async () => {
const m = await import('../../connectors/sqlite/connector.js');
return {
isConnectionConfig: (connection) => {
const typedConnection = connection as Parameters<typeof m.isKtxSqliteConnectionConfig>[0];
return m.isKtxSqliteConnectionConfig(typedConnection);
},
createScanConnector: ({ connectionId, connection, projectDir }) => {
const typedConnection = connection as Parameters<typeof m.isKtxSqliteConnectionConfig>[0];
if (!m.isKtxSqliteConnectionConfig(typedConnection)) {
throw invalidConnectionConfig('sqlite');
}
return new m.KtxSqliteScanConnector({ connectionId, connection: typedConnection, projectDir });
},
};
},
},
snowflake: {
driver: 'snowflake',
scopeConfigKey: 'schema_names',
hasHistoricSqlReader: true,
hasLocalQueryExecutor: false,
load: async () => {
const m = await import('../../connectors/snowflake/connector.js');
return {
isConnectionConfig: (connection) => {
const typedConnection = connection as Parameters<typeof m.isKtxSnowflakeConnectionConfig>[0];
return m.isKtxSnowflakeConnectionConfig(typedConnection);
},
createScanConnector: ({ connectionId, connection, projectDir }) => {
const typedConnection = connection as Parameters<typeof m.isKtxSnowflakeConnectionConfig>[0];
if (!m.isKtxSnowflakeConnectionConfig(typedConnection)) {
throw invalidConnectionConfig('snowflake');
}
return new m.KtxSnowflakeScanConnector({ connectionId, connection: typedConnection, projectDir });
},
};
},
},
sqlserver: {
driver: 'sqlserver',
scopeConfigKey: 'schemas',
hasHistoricSqlReader: false,
hasLocalQueryExecutor: false,
load: async () => {
const m = await import('../../connectors/sqlserver/connector.js');
return {
isConnectionConfig: (connection) => {
const typedConnection = connection as Parameters<typeof m.isKtxSqlServerConnectionConfig>[0];
return m.isKtxSqlServerConnectionConfig(typedConnection);
},
createScanConnector: ({ connectionId, connection }) => {
const typedConnection = connection as Parameters<typeof m.isKtxSqlServerConnectionConfig>[0];
if (!m.isKtxSqlServerConnectionConfig(typedConnection)) {
throw invalidConnectionConfig('sqlserver');
}
return new m.KtxSqlServerScanConnector({ connectionId, connection: typedConnection });
},
};
},
},
};
const supportedDrivers: KtxConnectionDriver[] = [
'bigquery',
'clickhouse',
'mysql',
'postgres',
'sqlite',
'snowflake',
'sqlserver',
];
function isRegisteredDriver(driver: string): driver is KtxConnectionDriver {
return Object.prototype.hasOwnProperty.call(driverRegistrations, driver);
}
export function getDriverRegistration(driver: string): KtxDriverRegistration | undefined {
const normalized = driver.toLowerCase().trim();
return isRegisteredDriver(normalized) ? driverRegistrations[normalized] : undefined;
}
export function listSupportedDrivers(): KtxConnectionDriver[] {
return [...supportedDrivers];
}

View file

@ -1,7 +1,11 @@
import {
getDriverRegistration,
listSupportedDrivers,
} from './context/connections/drivers.js';
import type { KtxLocalProject } from './context/project/project.js';
import type { KtxScanConnector } from './context/scan/types.js';
const SUPPORTED_DRIVERS = 'sqlite, postgres, mysql, clickhouse, sqlserver, bigquery, snowflake';
const SUPPORTED_DRIVERS = listSupportedDrivers().join(', ');
export async function createKtxCliScanConnector(
project: KtxLocalProject,
@ -17,58 +21,23 @@ export async function createKtxCliScanConnector(
`Connection "${connectionId}" has no \`driver\` field in ktx.yaml. Supported drivers: ${SUPPORTED_DRIVERS}.`,
);
}
if (driver === 'sqlite') {
const { KtxSqliteScanConnector, isKtxSqliteConnectionConfig } = await import('./connectors/sqlite/connector.js');;
if (!isKtxSqliteConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxSqliteScanConnector({ connectionId, connection, projectDir: project.projectDir });
const registration = getDriverRegistration(driver);
if (!registration) {
throw new Error(
`Connection "${connectionId}" uses driver "${driver}", which has no native standalone KTX scan connector. Supported drivers: ${SUPPORTED_DRIVERS}.`,
);
}
if (driver === 'postgres') {
const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('./connectors/postgres/connector.js');;
if (!isKtxPostgresConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxPostgresScanConnector({ connectionId, connection });
const connectorModule = await registration.load();
if (!connectorModule.isConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
if (driver === 'mysql') {
const { KtxMysqlScanConnector, isKtxMysqlConnectionConfig } = await import('./connectors/mysql/connector.js');;
if (!isKtxMysqlConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxMysqlScanConnector({ connectionId, connection });
}
if (driver === 'clickhouse') {
const { KtxClickHouseScanConnector, isKtxClickHouseConnectionConfig } = await import('./connectors/clickhouse/connector.js');;
if (!isKtxClickHouseConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxClickHouseScanConnector({ connectionId, connection });
}
if (driver === 'sqlserver') {
const { KtxSqlServerScanConnector, isKtxSqlServerConnectionConfig } = await import('./connectors/sqlserver/connector.js');;
if (!isKtxSqlServerConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxSqlServerScanConnector({ connectionId, connection });
}
if (driver === 'bigquery') {
const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('./connectors/bigquery/connector.js');;
if (!isKtxBigQueryConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxBigQueryScanConnector({ connectionId, connection });
}
if (driver === 'snowflake') {
const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');;
if (!isKtxSnowflakeConnectionConfig(connection)) {
throw invalidConnectionConfigError(connectionId, driver);
}
return new KtxSnowflakeScanConnector({ connectionId, connection, projectDir: project.projectDir });
}
throw new Error(
`Connection "${connectionId}" uses driver "${driver}", which has no native standalone KTX scan connector. Supported drivers: ${SUPPORTED_DRIVERS}.`,
);
return connectorModule.createScanConnector({
connectionId,
connection,
projectDir: project.projectDir,
});
}
function invalidConnectionConfigError(connectionId: string, driver: string): Error {

View file

@ -0,0 +1,124 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import {
driverRegistrations,
getDriverRegistration,
listSupportedDrivers,
} from '../../../src/context/connections/drivers.js';
import type { KtxConnectionDriver } from '../../../src/context/scan/types.js';
type FixtureFactory = (projectDir: string) => Record<string, unknown>;
const connectionFixtures: Record<KtxConnectionDriver, FixtureFactory> = {
postgres: () => ({
driver: 'postgres',
url: 'postgresql://reader:secret@localhost:5432/analytics', // pragma: allowlist secret
schemas: ['public'],
}),
sqlite: () => ({ driver: 'sqlite', path: 'warehouse.db' }),
mysql: () => ({
driver: 'mysql',
host: 'localhost',
database: 'analytics',
username: 'reader',
password: 'secret', // pragma: allowlist secret
schemas: ['analytics'],
}),
clickhouse: () => ({
driver: 'clickhouse',
url: 'http://localhost:8123',
database: 'analytics',
username: 'reader',
password: 'secret', // pragma: allowlist secret
}),
sqlserver: () => ({
driver: 'sqlserver',
host: 'localhost',
database: 'analytics',
username: 'reader',
password: 'secret', // pragma: allowlist secret
schemas: ['dbo'],
}),
bigquery: () => ({
driver: 'bigquery',
dataset_id: 'analytics',
credentials_json: JSON.stringify({
project_id: 'project-1',
client_email: 'reader@example.test',
private_key: '-----BEGIN PRIVATE KEY-----\nsecret\n-----END PRIVATE KEY-----\n', // pragma: allowlist secret
}),
location: 'US',
}),
snowflake: () => ({
driver: 'snowflake',
account: 'example-account',
username: 'reader',
password: 'secret', // pragma: allowlist secret
warehouse: 'COMPUTE_WH',
database: 'ANALYTICS',
schema: 'PUBLIC',
}),
};
const allowedScopeKeys = new Set(['dataset_ids', 'databases', 'schemas', 'schema_names']);
const historicSqlReaderDrivers = new Set<KtxConnectionDriver>(['postgres', 'bigquery', 'snowflake']);
const localExecutorDrivers = new Set<KtxConnectionDriver>(['postgres', 'sqlite']);
describe('driverRegistrations', () => {
let projectDir: string;
beforeEach(async () => {
projectDir = await mkdtemp(join(tmpdir(), 'ktx-driver-registry-'));
});
afterEach(async () => {
await rm(projectDir, { recursive: true, force: true });
});
it('lists every supported warehouse driver', () => {
expect(listSupportedDrivers()).toEqual([
'bigquery',
'clickhouse',
'mysql',
'postgres',
'sqlite',
'snowflake',
'sqlserver',
]);
});
it('resolves registered drivers case-insensitively', () => {
expect(getDriverRegistration(' Postgres ')?.driver).toBe('postgres');
expect(getDriverRegistration('unknown')).toBeUndefined();
});
it.each(Object.values(driverRegistrations))('adapts $driver connector exports', async (registration) => {
const connectorModule = await registration.load();
const connection = connectionFixtures[registration.driver](projectDir);
expect(connectorModule.isConnectionConfig(connection)).toBe(true);
expect(connectorModule.isConnectionConfig({})).toBe(false);
const connector = connectorModule.createScanConnector({
connectionId: 'warehouse',
connection,
projectDir,
});
expect(connector.driver).toBe(registration.driver);
expect(connector.listSchemas).toEqual(expect.any(Function));
expect(connector.listTables).toEqual(expect.any(Function));
await connector.cleanup?.();
if (registration.driver === 'sqlite') {
expect(registration.scopeConfigKey).toBeNull();
} else {
expect(registration.scopeConfigKey).not.toBeNull();
expect(allowedScopeKeys.has(registration.scopeConfigKey ?? '')).toBe(true);
}
expect(registration.hasHistoricSqlReader).toBe(historicSqlReaderDrivers.has(registration.driver));
expect(registration.hasLocalQueryExecutor).toBe(localExecutorDrivers.has(registration.driver));
});
});