From 633d53a2cb3c26520d176e74b76095941c711953 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Thu, 21 May 2026 19:37:32 +0200 Subject: [PATCH] fix: honor mysql and clickhouse database scope --- .../connectors/clickhouse/connector.test.ts | 74 +++++++++++- .../src/connectors/clickhouse/connector.ts | 79 +++++++++---- .../src/connectors/mysql/connector.test.ts | 106 ++++++++++++++++++ .../cli/src/connectors/mysql/connector.ts | 106 ++++++++++++------ 4 files changed, 309 insertions(+), 56 deletions(-) diff --git a/packages/cli/src/connectors/clickhouse/connector.test.ts b/packages/cli/src/connectors/clickhouse/connector.test.ts index 6ff60299..a3ab11f6 100644 --- a/packages/cli/src/connectors/clickhouse/connector.test.ts +++ b/packages/cli/src/connectors/clickhouse/connector.test.ts @@ -25,7 +25,7 @@ function fakeClientFactory(): KtxClickHouseClientFactory { { table: 'event_summary', name: 'event_name', type: 'String', comment: '', is_in_primary_key: 0 }, ]); } - if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY table')) { + if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY')) { return result([{ table: 'events', row_count: '2' }]); } if (input.query.includes('SELECT `id`, `event_name` FROM `analytics`.`events` LIMIT 1')) { @@ -90,6 +90,50 @@ function fakeClientFactory(): KtxClickHouseClientFactory { }; } +function multiDatabaseClickHouseClientFactory(): KtxClickHouseClientFactory { + const query = vi.fn(async (input: { query: string; format: string; query_params?: Record }) => { + if (input.query.includes('FROM system.tables')) { + expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] }); + return result([ + { database: 'analytics', name: 'events', engine: 'MergeTree', comment: 'Event stream' }, + { database: 'mart', name: 'order_events', engine: 'MergeTree', comment: '' }, + ]); + } + if (input.query.includes('FROM system.columns')) { + expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] }); + return result([ + { + database: 'analytics', + table: 'events', + name: 'id', + type: 'UInt64', + comment: '', + is_in_primary_key: 1, + }, + { + database: 'mart', + table: 'order_events', + name: 'id', + type: 'UInt64', + comment: '', + is_in_primary_key: 1, + }, + ]); + } + if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY')) { + expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] }); + return result([ + { database: 'analytics', table: 'events', row_count: '2' }, + { database: 'mart', table: 'order_events', row_count: '5' }, + ]); + } + throw new Error(`Unexpected SQL: ${input.query}`); + }); + return { + createClient: vi.fn(() => ({ query, close: vi.fn(async () => undefined) })), + }; +} + describe('KtxClickHouseScanConnector', () => { it('resolves ClickHouse connection configuration safely', () => { expect(isKtxClickHouseConnectionConfig({ driver: 'clickhouse', host: 'localhost', database: 'analytics' })).toBe( @@ -166,6 +210,34 @@ describe('KtxClickHouseScanConnector', () => { expect(snapshot.tables.find((table) => table.name === 'events')?.foreignKeys).toEqual([]); }); + it('introspects every configured ClickHouse database scope while preserving the default database', async () => { + const connector = new KtxClickHouseScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'clickhouse', + host: 'ch.example.test', + database: 'analytics', + databases: ['analytics', 'mart'], + username: 'reader', + password: 'test-pass', // pragma: allowlist secret + }, + clientFactory: multiDatabaseClickHouseClientFactory(), + now: () => new Date('2026-05-21T10:00:00.000Z'), + }); + + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'clickhouse' }, + { runId: 'scan-run-1' }, + ); + + expect(snapshot.scope).toEqual({ schemas: ['analytics', 'mart'] }); + expect(snapshot.metadata).toMatchObject({ database: 'analytics', databases: ['analytics', 'mart'] }); + expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual([ + 'analytics.events', + 'mart.order_events', + ]); + }); + it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => { const clientFactory = fakeClientFactory(); const connector = new KtxClickHouseScanConnector({ diff --git a/packages/cli/src/connectors/clickhouse/connector.ts b/packages/cli/src/connectors/clickhouse/connector.ts index 714ccfb1..1d851001 100644 --- a/packages/cli/src/connectors/clickhouse/connector.ts +++ b/packages/cli/src/connectors/clickhouse/connector.ts @@ -12,6 +12,7 @@ export interface KtxClickHouseConnectionConfig { host?: string; port?: number; database?: string; + databases?: string[]; username?: string; user?: string; password?: string; @@ -87,12 +88,14 @@ export interface KtxClickHouseColumnDistinctValuesResult { } interface ClickHouseTableRow { + database?: string; name: string; engine: string; comment: string; } interface ClickHouseColumnRow { + database?: string; table: string; name: string; type: string; @@ -101,6 +104,7 @@ interface ClickHouseColumnRow { } interface ClickHouseRowCountRow { + database?: string; table?: string; row_count?: string | number; count?: string | number; @@ -174,6 +178,25 @@ function isNullableClickHouseType(type: string): boolean { return type.startsWith('Nullable(') || type.startsWith('LowCardinality(Nullable('); } +function configuredClickHouseDatabases( + connection: KtxClickHouseConnectionConfig, + fallbackDatabase: string, +): string[] { + if (Array.isArray(connection.databases) && connection.databases.length > 0) { + const selected = connection.databases + .filter((database): database is string => typeof database === 'string' && database.trim().length > 0) + .map((database) => database.trim()); + if (selected.length > 0) { + return [...new Set(selected)]; + } + } + return [fallbackDatabase]; +} + +function clickHouseTableKey(database: string, table: string): string { + return `${database}.${table}`; +} + export function isKtxClickHouseConnectionConfig( connection: KtxClickHouseConnectionConfig | undefined, ): connection is KtxClickHouseConnectionConfig { @@ -261,52 +284,61 @@ export class KtxClickHouseScanConnector implements KtxScanConnector { async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); - const database = this.clientConfig.database; + const databases = configuredClickHouseDatabases(this.connection, this.clientConfig.database); const tables = await this.queryEachRow( ` - SELECT name, engine, comment + SELECT database, name, engine, comment FROM system.tables - WHERE database = {database:String} + WHERE database IN {databases:Array(String)} AND engine NOT IN ('Dictionary') - ORDER BY name + ORDER BY database, name `, - { database }, + { databases }, ); const columns = await this.queryEachRow( ` - SELECT table, name, type, comment, is_in_primary_key + SELECT database, table, name, type, comment, is_in_primary_key FROM system.columns - WHERE database = {database:String} - ORDER BY table, position + WHERE database IN {databases:Array(String)} + ORDER BY database, table, position `, - { database }, + { databases }, ); const rowCounts = await this.queryEachRow( ` - SELECT table, sum(rows) AS row_count + SELECT database, table, sum(rows) AS row_count FROM system.parts - WHERE database = {database:String} + WHERE database IN {databases:Array(String)} AND active = 1 - GROUP BY table + GROUP BY database, table `, - { database }, + { databases }, ); const columnsByTable = new Map(); for (const column of columns) { - columnsByTable.set(column.table, [...(columnsByTable.get(column.table) ?? []), column]); + const key = clickHouseTableKey(column.database ?? this.clientConfig.database, column.table); + columnsByTable.set(key, [...(columnsByTable.get(key) ?? []), column]); } - const rowCountByTable = new Map(rowCounts.map((row) => [String(row.table), Number(row.row_count ?? 0)])); - const schemaTables = tables.map((table) => - this.toSchemaTable(table, columnsByTable.get(table.name) ?? [], rowCountByTable.get(table.name) ?? 0), + const rowCountByTable = new Map( + rowCounts.map((row) => [ + clickHouseTableKey(row.database ?? this.clientConfig.database, String(row.table)), + Number(row.row_count ?? 0), + ]), ); + const schemaTables = tables.map((table) => { + const database = table.database ?? this.clientConfig.database; + const key = clickHouseTableKey(database, table.name); + return this.toSchemaTable(database, table, columnsByTable.get(key) ?? [], rowCountByTable.get(key) ?? 0); + }); return { connectionId: this.connectionId, driver: 'clickhouse', extractedAt: this.now().toISOString(), - scope: { schemas: [database] }, + scope: { schemas: databases }, metadata: { - database, + database: this.clientConfig.database, + databases, host: this.clientConfig.host, table_count: schemaTables.length, total_columns: schemaTables.reduce((sum, table) => sum + table.columns.length, 0), @@ -436,11 +468,16 @@ export class KtxClickHouseScanConnector implements KtxScanConnector { } } - private toSchemaTable(table: ClickHouseTableRow, columns: ClickHouseColumnRow[], estimatedRows: number): KtxSchemaTable { + private toSchemaTable( + database: string, + table: ClickHouseTableRow, + columns: ClickHouseColumnRow[], + estimatedRows: number, + ): KtxSchemaTable { const kind = tableKind(table.engine); return { catalog: null, - db: this.clientConfig.database, + db: database, name: table.name, kind, comment: table.comment || null, diff --git a/packages/cli/src/connectors/mysql/connector.test.ts b/packages/cli/src/connectors/mysql/connector.test.ts index 64b576e2..f9f2d0ad 100644 --- a/packages/cli/src/connectors/mysql/connector.test.ts +++ b/packages/cli/src/connectors/mysql/connector.test.ts @@ -85,6 +85,84 @@ function fakePoolFactory(): KtxMysqlPoolFactory { }; } +function multiSchemaMysqlPoolFactory(): KtxMysqlPoolFactory { + const query = vi.fn(async (sql: string, params?: unknown): Promise<[RowDataPacket[], FieldPacket[]]> => { + if (sql.includes('INFORMATION_SCHEMA.TABLES')) { + expect(params).toEqual(['analytics', 'mart']); + return mysqlResult( + [ + { + TABLE_SCHEMA: 'analytics', + TABLE_NAME: 'customers', + TABLE_TYPE: 'BASE TABLE', + TABLE_COMMENT: '', + TABLE_ROWS: 2, + }, + { + TABLE_SCHEMA: 'mart', + TABLE_NAME: 'orders', + TABLE_TYPE: 'BASE TABLE', + TABLE_COMMENT: '', + TABLE_ROWS: 3, + }, + ], + [ + { name: 'TABLE_SCHEMA' }, + { name: 'TABLE_NAME' }, + { name: 'TABLE_TYPE' }, + { name: 'TABLE_COMMENT' }, + { name: 'TABLE_ROWS' }, + ], + ); + } + if (sql.includes('INFORMATION_SCHEMA.COLUMNS')) { + expect(params).toEqual(['analytics', 'mart']); + return mysqlResult( + [ + { + TABLE_SCHEMA: 'analytics', + TABLE_NAME: 'customers', + COLUMN_NAME: 'id', + DATA_TYPE: 'int', + IS_NULLABLE: 'NO', + COLUMN_COMMENT: '', + }, + { + TABLE_SCHEMA: 'mart', + TABLE_NAME: 'orders', + COLUMN_NAME: 'id', + DATA_TYPE: 'int', + IS_NULLABLE: 'NO', + COLUMN_COMMENT: '', + }, + ], + [], + ); + } + if (sql.includes('INFORMATION_SCHEMA.KEY_COLUMN_USAGE') && sql.includes("CONSTRAINT_NAME = 'PRIMARY'")) { + expect(params).toEqual(['analytics', 'mart']); + return mysqlResult( + [ + { TABLE_SCHEMA: 'analytics', TABLE_NAME: 'customers', COLUMN_NAME: 'id' }, + { TABLE_SCHEMA: 'mart', TABLE_NAME: 'orders', COLUMN_NAME: 'id' }, + ], + [], + ); + } + if (sql.includes('INFORMATION_SCHEMA.KEY_COLUMN_USAGE') && sql.includes('REFERENCED_TABLE_NAME IS NOT NULL')) { + expect(params).toEqual(['analytics', 'mart']); + return mysqlResult([], []); + } + throw new Error(`Unexpected SQL: ${sql} params=${JSON.stringify(params)}`); + }); + return { + createPool: vi.fn(() => ({ + getConnection: vi.fn(async () => ({ query, release: vi.fn() })), + end: vi.fn(async () => undefined), + })), + }; +} + describe('KtxMysqlScanConnector', () => { it('resolves MySQL connection configuration safely', () => { expect(isKtxMysqlConnectionConfig({ driver: 'mysql', host: 'localhost', database: 'analytics' })).toBe(true); @@ -169,6 +247,34 @@ describe('KtxMysqlScanConnector', () => { ]); }); + it('introspects every configured MySQL schema scope', async () => { + const connector = new KtxMysqlScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'mysql', + host: 'db.example.test', + database: 'analytics', + schemas: ['analytics', 'mart'], + username: 'reader', + password: 'secret', // pragma: allowlist secret + }, + poolFactory: multiSchemaMysqlPoolFactory(), + now: () => new Date('2026-05-21T10:00:00.000Z'), + }); + + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'mysql' }, + { runId: 'scan-run-1' }, + ); + + expect(snapshot.scope).toEqual({ schemas: ['analytics', 'mart'] }); + expect(snapshot.metadata).toMatchObject({ database: 'analytics', schemas: ['analytics', 'mart'] }); + expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual([ + 'analytics.customers', + 'mart.orders', + ]); + }); + it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => { const poolFactory = fakePoolFactory(); const connector = new KtxMysqlScanConnector({ diff --git a/packages/cli/src/connectors/mysql/connector.ts b/packages/cli/src/connectors/mysql/connector.ts index 9cf242f5..9d92c2e0 100644 --- a/packages/cli/src/connectors/mysql/connector.ts +++ b/packages/cli/src/connectors/mysql/connector.ts @@ -11,6 +11,7 @@ export interface KtxMysqlConnectionConfig { host?: string; port?: number; database?: string; + schemas?: string[]; username?: string; user?: string; password?: string; @@ -79,6 +80,7 @@ export interface KtxMysqlColumnDistinctValuesResult { } interface MysqlTableRow extends RowDataPacket { + TABLE_SCHEMA: string; TABLE_NAME: string; TABLE_TYPE: string; TABLE_COMMENT: string | null; @@ -86,6 +88,7 @@ interface MysqlTableRow extends RowDataPacket { } interface MysqlColumnRow extends RowDataPacket { + TABLE_SCHEMA: string; TABLE_NAME: string; COLUMN_NAME: string; DATA_TYPE: string; @@ -94,11 +97,13 @@ interface MysqlColumnRow extends RowDataPacket { } interface MysqlPrimaryKeyRow extends RowDataPacket { + TABLE_SCHEMA: string; TABLE_NAME: string; COLUMN_NAME: string; } interface MysqlForeignKeyRow extends RowDataPacket { + TABLE_SCHEMA: string; TABLE_NAME: string; COLUMN_NAME: string; REFERENCED_TABLE_NAME: string; @@ -185,22 +190,42 @@ function cleanMySqlTableComment(comment: string | null): string | null { return comment; } -function groupByTable(rows: T[]): Map { +function configuredMysqlSchemas(connection: KtxMysqlConnectionConfig, fallbackDatabase: string): string[] { + if (Array.isArray(connection.schemas) && connection.schemas.length > 0) { + const selected = connection.schemas + .filter((schema): schema is string => typeof schema === 'string' && schema.trim().length > 0) + .map((schema) => schema.trim()); + if (selected.length > 0) { + return [...new Set(selected)]; + } + } + return [fallbackDatabase]; +} + +function mysqlTableKey(schema: string, table: string): string { + return `${schema}.${table}`; +} + +function groupByTable( + rows: T[], + fallbackDatabase: string, +): Map { const grouped = new Map(); for (const row of rows) { - const tableRows = grouped.get(row.TABLE_NAME) ?? []; + const tableRows = grouped.get(mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME)) ?? []; tableRows.push(row); - grouped.set(row.TABLE_NAME, tableRows); + grouped.set(mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME), tableRows); } return grouped; } -function primaryKeyMap(rows: MysqlPrimaryKeyRow[]): Map> { +function primaryKeyMap(rows: MysqlPrimaryKeyRow[], fallbackDatabase: string): Map> { const grouped = new Map>(); for (const row of rows) { - const columns = grouped.get(row.TABLE_NAME) ?? new Set(); + const key = mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME); + const columns = grouped.get(key) ?? new Set(); columns.add(row.COLUMN_NAME); - grouped.set(row.TABLE_NAME, columns); + grouped.set(key, columns); } return grouped; } @@ -308,60 +333,68 @@ export class KtxMysqlScanConnector implements KtxScanConnector { async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); - const database = this.poolConfig.database; + const databases = configuredMysqlSchemas(this.connection, this.poolConfig.database); + const placeholders = databases.map(() => '?').join(', '); const tables = await this.queryRaw( ` - SELECT TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, TABLE_ROWS + SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, TABLE_ROWS FROM INFORMATION_SCHEMA.TABLES - WHERE TABLE_SCHEMA = ? AND TABLE_TYPE IN ('BASE TABLE', 'VIEW') - ORDER BY TABLE_NAME + WHERE TABLE_SCHEMA IN (${placeholders}) AND TABLE_TYPE IN ('BASE TABLE', 'VIEW') + ORDER BY TABLE_SCHEMA, TABLE_NAME `, - [database], + databases, ); const columns = await this.queryRaw( ` - SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT + SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA = ? - ORDER BY TABLE_NAME, ORDINAL_POSITION + WHERE TABLE_SCHEMA IN (${placeholders}) + ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION `, - [database], + databases, ); const primaryKeys = await this.queryRaw( ` - SELECT TABLE_NAME, COLUMN_NAME + SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE - WHERE TABLE_SCHEMA = ? + WHERE TABLE_SCHEMA IN (${placeholders}) AND CONSTRAINT_NAME = 'PRIMARY' - ORDER BY TABLE_NAME, ORDINAL_POSITION + ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION `, - [database], + databases, ); const foreignKeys = await this.queryRaw( ` - SELECT TABLE_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME, CONSTRAINT_NAME + SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME, CONSTRAINT_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE - WHERE TABLE_SCHEMA = ? + WHERE TABLE_SCHEMA IN (${placeholders}) AND REFERENCED_TABLE_NAME IS NOT NULL - ORDER BY TABLE_NAME, COLUMN_NAME + ORDER BY TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME `, - [database], + databases, ); - const columnsByTable = groupByTable(columns); - const primaryKeysByTable = primaryKeyMap(primaryKeys); - const foreignKeysByTable = groupByTable(foreignKeys); + const columnsByTable = groupByTable(columns, this.poolConfig.database); + const primaryKeysByTable = primaryKeyMap(primaryKeys, this.poolConfig.database); + const foreignKeysByTable = groupByTable(foreignKeys, this.poolConfig.database); const schemaTables = tables.map((table) => - this.toSchemaTable(table, columnsByTable.get(table.TABLE_NAME) ?? [], primaryKeysByTable, foreignKeysByTable), + this.toSchemaTable( + table.TABLE_SCHEMA ?? this.poolConfig.database, + table, + columnsByTable.get(mysqlTableKey(table.TABLE_SCHEMA ?? this.poolConfig.database, table.TABLE_NAME)) ?? [], + primaryKeysByTable, + foreignKeysByTable, + ), ); return { connectionId: this.connectionId, driver: 'mysql', extractedAt: this.now().toISOString(), - scope: { schemas: [database] }, + scope: { schemas: databases }, metadata: { - database, + database: this.poolConfig.database, + schemas: databases, host: this.poolConfig.host, table_count: schemaTables.length, total_columns: schemaTables.reduce((sum, table) => sum + table.columns.length, 0), @@ -487,6 +520,7 @@ export class KtxMysqlScanConnector implements KtxScanConnector { } private toSchemaTable( + database: string, table: MysqlTableRow, columns: MysqlColumnRow[], primaryKeysByTable: Map>, @@ -497,13 +531,17 @@ export class KtxMysqlScanConnector implements KtxScanConnector { const estimatedRows = kind === 'view' ? null : Number(table.TABLE_ROWS ?? 0); return { catalog: null, - db: this.poolConfig.database, + db: database, name: tableName, kind, comment: cleanMySqlTableComment(table.TABLE_COMMENT), estimatedRows: Number.isFinite(estimatedRows) ? estimatedRows : null, - columns: columns.map((column) => this.toSchemaColumn(column, primaryKeysByTable.get(tableName) ?? new Set())), - foreignKeys: (foreignKeysByTable.get(tableName) ?? []).map((row) => this.toSchemaForeignKey(row)), + columns: columns.map((column) => + this.toSchemaColumn(column, primaryKeysByTable.get(mysqlTableKey(database, tableName)) ?? new Set()), + ), + foreignKeys: (foreignKeysByTable.get(mysqlTableKey(database, tableName)) ?? []).map((row) => + this.toSchemaForeignKey(database, row), + ), }; } @@ -519,11 +557,11 @@ export class KtxMysqlScanConnector implements KtxScanConnector { }; } - private toSchemaForeignKey(row: MysqlForeignKeyRow): KtxSchemaForeignKey { + private toSchemaForeignKey(database: string, row: MysqlForeignKeyRow): KtxSchemaForeignKey { return { fromColumn: row.COLUMN_NAME, toCatalog: null, - toDb: this.poolConfig.database, + toDb: database, toTable: row.REFERENCED_TABLE_NAME, toColumn: row.REFERENCED_COLUMN_NAME, constraintName: row.CONSTRAINT_NAME || null,