diff --git a/packages/cli/src/context/connections/drivers.ts b/packages/cli/src/context/connections/drivers.ts new file mode 100644 index 00000000..be4f30f3 --- /dev/null +++ b/packages/cli/src/context/connections/drivers.ts @@ -0,0 +1,205 @@ +import type { KtxConnectionDriver, KtxScanConnector } from '../scan/types.js'; + +type KtxScopeConfigKey = 'dataset_ids' | 'databases' | 'schemas' | 'schema_names'; + +interface KtxDriverConnectorModule { + isConnectionConfig(connection: unknown): boolean; + createScanConnector(args: { + connectionId: string; + connection: unknown; + projectDir: string; + }): KtxScanConnector; +} + +export interface KtxDriverRegistration { + readonly driver: KtxConnectionDriver; + readonly scopeConfigKey: KtxScopeConfigKey | null; + readonly hasHistoricSqlReader: boolean; + readonly hasLocalQueryExecutor: boolean; + load(): Promise; +} + +function invalidConnectionConfig(driver: KtxConnectionDriver): Error { + return new Error(`Connection config does not match warehouse driver "${driver}".`); +} + +/** @internal */ +export const driverRegistrations: Record = { + bigquery: { + driver: 'bigquery', + scopeConfigKey: 'dataset_ids', + hasHistoricSqlReader: true, + hasLocalQueryExecutor: false, + load: async () => { + const m = await import('../../connectors/bigquery/connector.js'); + return { + isConnectionConfig: (connection) => { + const typedConnection = connection as Parameters[0]; + return m.isKtxBigQueryConnectionConfig(typedConnection); + }, + createScanConnector: ({ connectionId, connection }) => { + const typedConnection = connection as Parameters[0]; + if (!m.isKtxBigQueryConnectionConfig(typedConnection)) { + throw invalidConnectionConfig('bigquery'); + } + return new m.KtxBigQueryScanConnector({ connectionId, connection: typedConnection }); + }, + }; + }, + }, + clickhouse: { + driver: 'clickhouse', + scopeConfigKey: 'databases', + hasHistoricSqlReader: false, + hasLocalQueryExecutor: false, + load: async () => { + const m = await import('../../connectors/clickhouse/connector.js'); + return { + isConnectionConfig: (connection) => { + const typedConnection = connection as Parameters[0]; + return m.isKtxClickHouseConnectionConfig(typedConnection); + }, + createScanConnector: ({ connectionId, connection }) => { + const typedConnection = connection as Parameters[0]; + if (!m.isKtxClickHouseConnectionConfig(typedConnection)) { + throw invalidConnectionConfig('clickhouse'); + } + return new m.KtxClickHouseScanConnector({ connectionId, connection: typedConnection }); + }, + }; + }, + }, + mysql: { + driver: 'mysql', + scopeConfigKey: 'schemas', + hasHistoricSqlReader: false, + hasLocalQueryExecutor: false, + load: async () => { + const m = await import('../../connectors/mysql/connector.js'); + return { + isConnectionConfig: (connection) => { + const typedConnection = connection as Parameters[0]; + return m.isKtxMysqlConnectionConfig(typedConnection); + }, + createScanConnector: ({ connectionId, connection }) => { + const typedConnection = connection as Parameters[0]; + if (!m.isKtxMysqlConnectionConfig(typedConnection)) { + throw invalidConnectionConfig('mysql'); + } + return new m.KtxMysqlScanConnector({ connectionId, connection: typedConnection }); + }, + }; + }, + }, + postgres: { + driver: 'postgres', + scopeConfigKey: 'schemas', + hasHistoricSqlReader: true, + hasLocalQueryExecutor: true, + load: async () => { + const m = await import('../../connectors/postgres/connector.js'); + return { + isConnectionConfig: (connection) => { + const typedConnection = connection as Parameters[0]; + return m.isKtxPostgresConnectionConfig(typedConnection); + }, + createScanConnector: ({ connectionId, connection }) => { + const typedConnection = connection as Parameters[0]; + if (!m.isKtxPostgresConnectionConfig(typedConnection)) { + throw invalidConnectionConfig('postgres'); + } + return new m.KtxPostgresScanConnector({ connectionId, connection: typedConnection }); + }, + }; + }, + }, + sqlite: { + driver: 'sqlite', + scopeConfigKey: null, + hasHistoricSqlReader: false, + hasLocalQueryExecutor: true, + load: async () => { + const m = await import('../../connectors/sqlite/connector.js'); + return { + isConnectionConfig: (connection) => { + const typedConnection = connection as Parameters[0]; + return m.isKtxSqliteConnectionConfig(typedConnection); + }, + createScanConnector: ({ connectionId, connection, projectDir }) => { + const typedConnection = connection as Parameters[0]; + if (!m.isKtxSqliteConnectionConfig(typedConnection)) { + throw invalidConnectionConfig('sqlite'); + } + return new m.KtxSqliteScanConnector({ connectionId, connection: typedConnection, projectDir }); + }, + }; + }, + }, + snowflake: { + driver: 'snowflake', + scopeConfigKey: 'schema_names', + hasHistoricSqlReader: true, + hasLocalQueryExecutor: false, + load: async () => { + const m = await import('../../connectors/snowflake/connector.js'); + return { + isConnectionConfig: (connection) => { + const typedConnection = connection as Parameters[0]; + return m.isKtxSnowflakeConnectionConfig(typedConnection); + }, + createScanConnector: ({ connectionId, connection, projectDir }) => { + const typedConnection = connection as Parameters[0]; + if (!m.isKtxSnowflakeConnectionConfig(typedConnection)) { + throw invalidConnectionConfig('snowflake'); + } + return new m.KtxSnowflakeScanConnector({ connectionId, connection: typedConnection, projectDir }); + }, + }; + }, + }, + sqlserver: { + driver: 'sqlserver', + scopeConfigKey: 'schemas', + hasHistoricSqlReader: false, + hasLocalQueryExecutor: false, + load: async () => { + const m = await import('../../connectors/sqlserver/connector.js'); + return { + isConnectionConfig: (connection) => { + const typedConnection = connection as Parameters[0]; + return m.isKtxSqlServerConnectionConfig(typedConnection); + }, + createScanConnector: ({ connectionId, connection }) => { + const typedConnection = connection as Parameters[0]; + if (!m.isKtxSqlServerConnectionConfig(typedConnection)) { + throw invalidConnectionConfig('sqlserver'); + } + return new m.KtxSqlServerScanConnector({ connectionId, connection: typedConnection }); + }, + }; + }, + }, +}; + +const supportedDrivers: KtxConnectionDriver[] = [ + 'bigquery', + 'clickhouse', + 'mysql', + 'postgres', + 'sqlite', + 'snowflake', + 'sqlserver', +]; + +function isRegisteredDriver(driver: string): driver is KtxConnectionDriver { + return Object.prototype.hasOwnProperty.call(driverRegistrations, driver); +} + +export function getDriverRegistration(driver: string): KtxDriverRegistration | undefined { + const normalized = driver.toLowerCase().trim(); + return isRegisteredDriver(normalized) ? driverRegistrations[normalized] : undefined; +} + +export function listSupportedDrivers(): KtxConnectionDriver[] { + return [...supportedDrivers]; +} diff --git a/packages/cli/src/local-scan-connectors.ts b/packages/cli/src/local-scan-connectors.ts index 31fc158e..4d98bc0c 100644 --- a/packages/cli/src/local-scan-connectors.ts +++ b/packages/cli/src/local-scan-connectors.ts @@ -1,7 +1,11 @@ +import { + getDriverRegistration, + listSupportedDrivers, +} from './context/connections/drivers.js'; import type { KtxLocalProject } from './context/project/project.js'; import type { KtxScanConnector } from './context/scan/types.js'; -const SUPPORTED_DRIVERS = 'sqlite, postgres, mysql, clickhouse, sqlserver, bigquery, snowflake'; +const SUPPORTED_DRIVERS = listSupportedDrivers().join(', '); export async function createKtxCliScanConnector( project: KtxLocalProject, @@ -17,58 +21,23 @@ export async function createKtxCliScanConnector( `Connection "${connectionId}" has no \`driver\` field in ktx.yaml. Supported drivers: ${SUPPORTED_DRIVERS}.`, ); } - if (driver === 'sqlite') { - const { KtxSqliteScanConnector, isKtxSqliteConnectionConfig } = await import('./connectors/sqlite/connector.js');; - if (!isKtxSqliteConnectionConfig(connection)) { - throw invalidConnectionConfigError(connectionId, driver); - } - return new KtxSqliteScanConnector({ connectionId, connection, projectDir: project.projectDir }); + + const registration = getDriverRegistration(driver); + if (!registration) { + throw new Error( + `Connection "${connectionId}" uses driver "${driver}", which has no native standalone KTX scan connector. Supported drivers: ${SUPPORTED_DRIVERS}.`, + ); } - if (driver === 'postgres') { - const { KtxPostgresScanConnector, isKtxPostgresConnectionConfig } = await import('./connectors/postgres/connector.js');; - if (!isKtxPostgresConnectionConfig(connection)) { - throw invalidConnectionConfigError(connectionId, driver); - } - return new KtxPostgresScanConnector({ connectionId, connection }); + + const connectorModule = await registration.load(); + if (!connectorModule.isConnectionConfig(connection)) { + throw invalidConnectionConfigError(connectionId, driver); } - if (driver === 'mysql') { - const { KtxMysqlScanConnector, isKtxMysqlConnectionConfig } = await import('./connectors/mysql/connector.js');; - if (!isKtxMysqlConnectionConfig(connection)) { - throw invalidConnectionConfigError(connectionId, driver); - } - return new KtxMysqlScanConnector({ connectionId, connection }); - } - if (driver === 'clickhouse') { - const { KtxClickHouseScanConnector, isKtxClickHouseConnectionConfig } = await import('./connectors/clickhouse/connector.js');; - if (!isKtxClickHouseConnectionConfig(connection)) { - throw invalidConnectionConfigError(connectionId, driver); - } - return new KtxClickHouseScanConnector({ connectionId, connection }); - } - if (driver === 'sqlserver') { - const { KtxSqlServerScanConnector, isKtxSqlServerConnectionConfig } = await import('./connectors/sqlserver/connector.js');; - if (!isKtxSqlServerConnectionConfig(connection)) { - throw invalidConnectionConfigError(connectionId, driver); - } - return new KtxSqlServerScanConnector({ connectionId, connection }); - } - if (driver === 'bigquery') { - const { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig } = await import('./connectors/bigquery/connector.js');; - if (!isKtxBigQueryConnectionConfig(connection)) { - throw invalidConnectionConfigError(connectionId, driver); - } - return new KtxBigQueryScanConnector({ connectionId, connection }); - } - if (driver === 'snowflake') { - const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');; - if (!isKtxSnowflakeConnectionConfig(connection)) { - throw invalidConnectionConfigError(connectionId, driver); - } - return new KtxSnowflakeScanConnector({ connectionId, connection, projectDir: project.projectDir }); - } - throw new Error( - `Connection "${connectionId}" uses driver "${driver}", which has no native standalone KTX scan connector. Supported drivers: ${SUPPORTED_DRIVERS}.`, - ); + return connectorModule.createScanConnector({ + connectionId, + connection, + projectDir: project.projectDir, + }); } function invalidConnectionConfigError(connectionId: string, driver: string): Error { diff --git a/packages/cli/test/context/connections/drivers.test.ts b/packages/cli/test/context/connections/drivers.test.ts new file mode 100644 index 00000000..06ccc11b --- /dev/null +++ b/packages/cli/test/context/connections/drivers.test.ts @@ -0,0 +1,124 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { + driverRegistrations, + getDriverRegistration, + listSupportedDrivers, +} from '../../../src/context/connections/drivers.js'; +import type { KtxConnectionDriver } from '../../../src/context/scan/types.js'; + +type FixtureFactory = (projectDir: string) => Record; + +const connectionFixtures: Record = { + postgres: () => ({ + driver: 'postgres', + url: 'postgresql://reader:secret@localhost:5432/analytics', // pragma: allowlist secret + schemas: ['public'], + }), + sqlite: () => ({ driver: 'sqlite', path: 'warehouse.db' }), + mysql: () => ({ + driver: 'mysql', + host: 'localhost', + database: 'analytics', + username: 'reader', + password: 'secret', // pragma: allowlist secret + schemas: ['analytics'], + }), + clickhouse: () => ({ + driver: 'clickhouse', + url: 'http://localhost:8123', + database: 'analytics', + username: 'reader', + password: 'secret', // pragma: allowlist secret + }), + sqlserver: () => ({ + driver: 'sqlserver', + host: 'localhost', + database: 'analytics', + username: 'reader', + password: 'secret', // pragma: allowlist secret + schemas: ['dbo'], + }), + bigquery: () => ({ + driver: 'bigquery', + dataset_id: 'analytics', + credentials_json: JSON.stringify({ + project_id: 'project-1', + client_email: 'reader@example.test', + private_key: '-----BEGIN PRIVATE KEY-----\nsecret\n-----END PRIVATE KEY-----\n', // pragma: allowlist secret + }), + location: 'US', + }), + snowflake: () => ({ + driver: 'snowflake', + account: 'example-account', + username: 'reader', + password: 'secret', // pragma: allowlist secret + warehouse: 'COMPUTE_WH', + database: 'ANALYTICS', + schema: 'PUBLIC', + }), +}; + +const allowedScopeKeys = new Set(['dataset_ids', 'databases', 'schemas', 'schema_names']); +const historicSqlReaderDrivers = new Set(['postgres', 'bigquery', 'snowflake']); +const localExecutorDrivers = new Set(['postgres', 'sqlite']); + +describe('driverRegistrations', () => { + let projectDir: string; + + beforeEach(async () => { + projectDir = await mkdtemp(join(tmpdir(), 'ktx-driver-registry-')); + }); + + afterEach(async () => { + await rm(projectDir, { recursive: true, force: true }); + }); + + it('lists every supported warehouse driver', () => { + expect(listSupportedDrivers()).toEqual([ + 'bigquery', + 'clickhouse', + 'mysql', + 'postgres', + 'sqlite', + 'snowflake', + 'sqlserver', + ]); + }); + + it('resolves registered drivers case-insensitively', () => { + expect(getDriverRegistration(' Postgres ')?.driver).toBe('postgres'); + expect(getDriverRegistration('unknown')).toBeUndefined(); + }); + + it.each(Object.values(driverRegistrations))('adapts $driver connector exports', async (registration) => { + const connectorModule = await registration.load(); + const connection = connectionFixtures[registration.driver](projectDir); + + expect(connectorModule.isConnectionConfig(connection)).toBe(true); + expect(connectorModule.isConnectionConfig({})).toBe(false); + + const connector = connectorModule.createScanConnector({ + connectionId: 'warehouse', + connection, + projectDir, + }); + + expect(connector.driver).toBe(registration.driver); + expect(connector.listSchemas).toEqual(expect.any(Function)); + expect(connector.listTables).toEqual(expect.any(Function)); + await connector.cleanup?.(); + + if (registration.driver === 'sqlite') { + expect(registration.scopeConfigKey).toBeNull(); + } else { + expect(registration.scopeConfigKey).not.toBeNull(); + expect(allowedScopeKeys.has(registration.scopeConfigKey ?? '')).toBe(true); + } + expect(registration.hasHistoricSqlReader).toBe(historicSqlReaderDrivers.has(registration.driver)); + expect(registration.hasLocalQueryExecutor).toBe(localExecutorDrivers.has(registration.driver)); + }); +});