fix: honor mysql and clickhouse database scope

This commit is contained in:
Andrey Avtomonov 2026-05-21 19:37:32 +02:00
parent 5273406b6a
commit 633d53a2cb
4 changed files with 309 additions and 56 deletions

View file

@ -25,7 +25,7 @@ function fakeClientFactory(): KtxClickHouseClientFactory {
{ table: 'event_summary', name: 'event_name', type: 'String', comment: '', is_in_primary_key: 0 },
]);
}
if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY table')) {
if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY')) {
return result([{ table: 'events', row_count: '2' }]);
}
if (input.query.includes('SELECT `id`, `event_name` FROM `analytics`.`events` LIMIT 1')) {
@ -90,6 +90,50 @@ function fakeClientFactory(): KtxClickHouseClientFactory {
};
}
function multiDatabaseClickHouseClientFactory(): KtxClickHouseClientFactory {
const query = vi.fn(async (input: { query: string; format: string; query_params?: Record<string, unknown> }) => {
if (input.query.includes('FROM system.tables')) {
expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] });
return result([
{ database: 'analytics', name: 'events', engine: 'MergeTree', comment: 'Event stream' },
{ database: 'mart', name: 'order_events', engine: 'MergeTree', comment: '' },
]);
}
if (input.query.includes('FROM system.columns')) {
expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] });
return result([
{
database: 'analytics',
table: 'events',
name: 'id',
type: 'UInt64',
comment: '',
is_in_primary_key: 1,
},
{
database: 'mart',
table: 'order_events',
name: 'id',
type: 'UInt64',
comment: '',
is_in_primary_key: 1,
},
]);
}
if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY')) {
expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] });
return result([
{ database: 'analytics', table: 'events', row_count: '2' },
{ database: 'mart', table: 'order_events', row_count: '5' },
]);
}
throw new Error(`Unexpected SQL: ${input.query}`);
});
return {
createClient: vi.fn(() => ({ query, close: vi.fn(async () => undefined) })),
};
}
describe('KtxClickHouseScanConnector', () => {
it('resolves ClickHouse connection configuration safely', () => {
expect(isKtxClickHouseConnectionConfig({ driver: 'clickhouse', host: 'localhost', database: 'analytics' })).toBe(
@ -166,6 +210,34 @@ describe('KtxClickHouseScanConnector', () => {
expect(snapshot.tables.find((table) => table.name === 'events')?.foreignKeys).toEqual([]);
});
it('introspects every configured ClickHouse database scope while preserving the default database', async () => {
const connector = new KtxClickHouseScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'clickhouse',
host: 'ch.example.test',
database: 'analytics',
databases: ['analytics', 'mart'],
username: 'reader',
password: 'test-pass', // pragma: allowlist secret
},
clientFactory: multiDatabaseClickHouseClientFactory(),
now: () => new Date('2026-05-21T10:00:00.000Z'),
});
const snapshot = await connector.introspect(
{ connectionId: 'warehouse', driver: 'clickhouse' },
{ runId: 'scan-run-1' },
);
expect(snapshot.scope).toEqual({ schemas: ['analytics', 'mart'] });
expect(snapshot.metadata).toMatchObject({ database: 'analytics', databases: ['analytics', 'mart'] });
expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual([
'analytics.events',
'mart.order_events',
]);
});
it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => {
const clientFactory = fakeClientFactory();
const connector = new KtxClickHouseScanConnector({

View file

@ -12,6 +12,7 @@ export interface KtxClickHouseConnectionConfig {
host?: string;
port?: number;
database?: string;
databases?: string[];
username?: string;
user?: string;
password?: string;
@ -87,12 +88,14 @@ export interface KtxClickHouseColumnDistinctValuesResult {
}
interface ClickHouseTableRow {
database?: string;
name: string;
engine: string;
comment: string;
}
interface ClickHouseColumnRow {
database?: string;
table: string;
name: string;
type: string;
@ -101,6 +104,7 @@ interface ClickHouseColumnRow {
}
interface ClickHouseRowCountRow {
database?: string;
table?: string;
row_count?: string | number;
count?: string | number;
@ -174,6 +178,25 @@ function isNullableClickHouseType(type: string): boolean {
return type.startsWith('Nullable(') || type.startsWith('LowCardinality(Nullable(');
}
function configuredClickHouseDatabases(
connection: KtxClickHouseConnectionConfig,
fallbackDatabase: string,
): string[] {
if (Array.isArray(connection.databases) && connection.databases.length > 0) {
const selected = connection.databases
.filter((database): database is string => typeof database === 'string' && database.trim().length > 0)
.map((database) => database.trim());
if (selected.length > 0) {
return [...new Set(selected)];
}
}
return [fallbackDatabase];
}
function clickHouseTableKey(database: string, table: string): string {
return `${database}.${table}`;
}
export function isKtxClickHouseConnectionConfig(
connection: KtxClickHouseConnectionConfig | undefined,
): connection is KtxClickHouseConnectionConfig {
@ -261,52 +284,61 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const database = this.clientConfig.database;
const databases = configuredClickHouseDatabases(this.connection, this.clientConfig.database);
const tables = await this.queryEachRow<ClickHouseTableRow>(
`
SELECT name, engine, comment
SELECT database, name, engine, comment
FROM system.tables
WHERE database = {database:String}
WHERE database IN {databases:Array(String)}
AND engine NOT IN ('Dictionary')
ORDER BY name
ORDER BY database, name
`,
{ database },
{ databases },
);
const columns = await this.queryEachRow<ClickHouseColumnRow>(
`
SELECT table, name, type, comment, is_in_primary_key
SELECT database, table, name, type, comment, is_in_primary_key
FROM system.columns
WHERE database = {database:String}
ORDER BY table, position
WHERE database IN {databases:Array(String)}
ORDER BY database, table, position
`,
{ database },
{ databases },
);
const rowCounts = await this.queryEachRow<ClickHouseRowCountRow>(
`
SELECT table, sum(rows) AS row_count
SELECT database, table, sum(rows) AS row_count
FROM system.parts
WHERE database = {database:String}
WHERE database IN {databases:Array(String)}
AND active = 1
GROUP BY table
GROUP BY database, table
`,
{ database },
{ databases },
);
const columnsByTable = new Map<string, ClickHouseColumnRow[]>();
for (const column of columns) {
columnsByTable.set(column.table, [...(columnsByTable.get(column.table) ?? []), column]);
const key = clickHouseTableKey(column.database ?? this.clientConfig.database, column.table);
columnsByTable.set(key, [...(columnsByTable.get(key) ?? []), column]);
}
const rowCountByTable = new Map(rowCounts.map((row) => [String(row.table), Number(row.row_count ?? 0)]));
const schemaTables = tables.map((table) =>
this.toSchemaTable(table, columnsByTable.get(table.name) ?? [], rowCountByTable.get(table.name) ?? 0),
const rowCountByTable = new Map(
rowCounts.map((row) => [
clickHouseTableKey(row.database ?? this.clientConfig.database, String(row.table)),
Number(row.row_count ?? 0),
]),
);
const schemaTables = tables.map((table) => {
const database = table.database ?? this.clientConfig.database;
const key = clickHouseTableKey(database, table.name);
return this.toSchemaTable(database, table, columnsByTable.get(key) ?? [], rowCountByTable.get(key) ?? 0);
});
return {
connectionId: this.connectionId,
driver: 'clickhouse',
extractedAt: this.now().toISOString(),
scope: { schemas: [database] },
scope: { schemas: databases },
metadata: {
database,
database: this.clientConfig.database,
databases,
host: this.clientConfig.host,
table_count: schemaTables.length,
total_columns: schemaTables.reduce((sum, table) => sum + table.columns.length, 0),
@ -436,11 +468,16 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
}
}
private toSchemaTable(table: ClickHouseTableRow, columns: ClickHouseColumnRow[], estimatedRows: number): KtxSchemaTable {
private toSchemaTable(
database: string,
table: ClickHouseTableRow,
columns: ClickHouseColumnRow[],
estimatedRows: number,
): KtxSchemaTable {
const kind = tableKind(table.engine);
return {
catalog: null,
db: this.clientConfig.database,
db: database,
name: table.name,
kind,
comment: table.comment || null,

View file

@ -85,6 +85,84 @@ function fakePoolFactory(): KtxMysqlPoolFactory {
};
}
function multiSchemaMysqlPoolFactory(): KtxMysqlPoolFactory {
const query = vi.fn(async (sql: string, params?: unknown): Promise<[RowDataPacket[], FieldPacket[]]> => {
if (sql.includes('INFORMATION_SCHEMA.TABLES')) {
expect(params).toEqual(['analytics', 'mart']);
return mysqlResult(
[
{
TABLE_SCHEMA: 'analytics',
TABLE_NAME: 'customers',
TABLE_TYPE: 'BASE TABLE',
TABLE_COMMENT: '',
TABLE_ROWS: 2,
},
{
TABLE_SCHEMA: 'mart',
TABLE_NAME: 'orders',
TABLE_TYPE: 'BASE TABLE',
TABLE_COMMENT: '',
TABLE_ROWS: 3,
},
],
[
{ name: 'TABLE_SCHEMA' },
{ name: 'TABLE_NAME' },
{ name: 'TABLE_TYPE' },
{ name: 'TABLE_COMMENT' },
{ name: 'TABLE_ROWS' },
],
);
}
if (sql.includes('INFORMATION_SCHEMA.COLUMNS')) {
expect(params).toEqual(['analytics', 'mart']);
return mysqlResult(
[
{
TABLE_SCHEMA: 'analytics',
TABLE_NAME: 'customers',
COLUMN_NAME: 'id',
DATA_TYPE: 'int',
IS_NULLABLE: 'NO',
COLUMN_COMMENT: '',
},
{
TABLE_SCHEMA: 'mart',
TABLE_NAME: 'orders',
COLUMN_NAME: 'id',
DATA_TYPE: 'int',
IS_NULLABLE: 'NO',
COLUMN_COMMENT: '',
},
],
[],
);
}
if (sql.includes('INFORMATION_SCHEMA.KEY_COLUMN_USAGE') && sql.includes("CONSTRAINT_NAME = 'PRIMARY'")) {
expect(params).toEqual(['analytics', 'mart']);
return mysqlResult(
[
{ TABLE_SCHEMA: 'analytics', TABLE_NAME: 'customers', COLUMN_NAME: 'id' },
{ TABLE_SCHEMA: 'mart', TABLE_NAME: 'orders', COLUMN_NAME: 'id' },
],
[],
);
}
if (sql.includes('INFORMATION_SCHEMA.KEY_COLUMN_USAGE') && sql.includes('REFERENCED_TABLE_NAME IS NOT NULL')) {
expect(params).toEqual(['analytics', 'mart']);
return mysqlResult([], []);
}
throw new Error(`Unexpected SQL: ${sql} params=${JSON.stringify(params)}`);
});
return {
createPool: vi.fn(() => ({
getConnection: vi.fn(async () => ({ query, release: vi.fn() })),
end: vi.fn(async () => undefined),
})),
};
}
describe('KtxMysqlScanConnector', () => {
it('resolves MySQL connection configuration safely', () => {
expect(isKtxMysqlConnectionConfig({ driver: 'mysql', host: 'localhost', database: 'analytics' })).toBe(true);
@ -169,6 +247,34 @@ describe('KtxMysqlScanConnector', () => {
]);
});
it('introspects every configured MySQL schema scope', async () => {
const connector = new KtxMysqlScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'mysql',
host: 'db.example.test',
database: 'analytics',
schemas: ['analytics', 'mart'],
username: 'reader',
password: 'secret', // pragma: allowlist secret
},
poolFactory: multiSchemaMysqlPoolFactory(),
now: () => new Date('2026-05-21T10:00:00.000Z'),
});
const snapshot = await connector.introspect(
{ connectionId: 'warehouse', driver: 'mysql' },
{ runId: 'scan-run-1' },
);
expect(snapshot.scope).toEqual({ schemas: ['analytics', 'mart'] });
expect(snapshot.metadata).toMatchObject({ database: 'analytics', schemas: ['analytics', 'mart'] });
expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual([
'analytics.customers',
'mart.orders',
]);
});
it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => {
const poolFactory = fakePoolFactory();
const connector = new KtxMysqlScanConnector({

View file

@ -11,6 +11,7 @@ export interface KtxMysqlConnectionConfig {
host?: string;
port?: number;
database?: string;
schemas?: string[];
username?: string;
user?: string;
password?: string;
@ -79,6 +80,7 @@ export interface KtxMysqlColumnDistinctValuesResult {
}
interface MysqlTableRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
TABLE_TYPE: string;
TABLE_COMMENT: string | null;
@ -86,6 +88,7 @@ interface MysqlTableRow extends RowDataPacket {
}
interface MysqlColumnRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
COLUMN_NAME: string;
DATA_TYPE: string;
@ -94,11 +97,13 @@ interface MysqlColumnRow extends RowDataPacket {
}
interface MysqlPrimaryKeyRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
COLUMN_NAME: string;
}
interface MysqlForeignKeyRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
COLUMN_NAME: string;
REFERENCED_TABLE_NAME: string;
@ -185,22 +190,42 @@ function cleanMySqlTableComment(comment: string | null): string | null {
return comment;
}
function groupByTable<T extends { TABLE_NAME: string }>(rows: T[]): Map<string, T[]> {
function configuredMysqlSchemas(connection: KtxMysqlConnectionConfig, fallbackDatabase: string): string[] {
if (Array.isArray(connection.schemas) && connection.schemas.length > 0) {
const selected = connection.schemas
.filter((schema): schema is string => typeof schema === 'string' && schema.trim().length > 0)
.map((schema) => schema.trim());
if (selected.length > 0) {
return [...new Set(selected)];
}
}
return [fallbackDatabase];
}
function mysqlTableKey(schema: string, table: string): string {
return `${schema}.${table}`;
}
function groupByTable<T extends { TABLE_SCHEMA?: string; TABLE_NAME: string }>(
rows: T[],
fallbackDatabase: string,
): Map<string, T[]> {
const grouped = new Map<string, T[]>();
for (const row of rows) {
const tableRows = grouped.get(row.TABLE_NAME) ?? [];
const tableRows = grouped.get(mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME)) ?? [];
tableRows.push(row);
grouped.set(row.TABLE_NAME, tableRows);
grouped.set(mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME), tableRows);
}
return grouped;
}
function primaryKeyMap(rows: MysqlPrimaryKeyRow[]): Map<string, Set<string>> {
function primaryKeyMap(rows: MysqlPrimaryKeyRow[], fallbackDatabase: string): Map<string, Set<string>> {
const grouped = new Map<string, Set<string>>();
for (const row of rows) {
const columns = grouped.get(row.TABLE_NAME) ?? new Set<string>();
const key = mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME);
const columns = grouped.get(key) ?? new Set<string>();
columns.add(row.COLUMN_NAME);
grouped.set(row.TABLE_NAME, columns);
grouped.set(key, columns);
}
return grouped;
}
@ -308,60 +333,68 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const database = this.poolConfig.database;
const databases = configuredMysqlSchemas(this.connection, this.poolConfig.database);
const placeholders = databases.map(() => '?').join(', ');
const tables = await this.queryRaw<MysqlTableRow>(
`
SELECT TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, TABLE_ROWS
SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, TABLE_ROWS
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
ORDER BY TABLE_NAME
WHERE TABLE_SCHEMA IN (${placeholders}) AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
ORDER BY TABLE_SCHEMA, TABLE_NAME
`,
[database],
databases,
);
const columns = await this.queryRaw<MysqlColumnRow>(
`
SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = ?
ORDER BY TABLE_NAME, ORDINAL_POSITION
WHERE TABLE_SCHEMA IN (${placeholders})
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION
`,
[database],
databases,
);
const primaryKeys = await this.queryRaw<MysqlPrimaryKeyRow>(
`
SELECT TABLE_NAME, COLUMN_NAME
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = ?
WHERE TABLE_SCHEMA IN (${placeholders})
AND CONSTRAINT_NAME = 'PRIMARY'
ORDER BY TABLE_NAME, ORDINAL_POSITION
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION
`,
[database],
databases,
);
const foreignKeys = await this.queryRaw<MysqlForeignKeyRow>(
`
SELECT TABLE_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME, CONSTRAINT_NAME
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME, CONSTRAINT_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = ?
WHERE TABLE_SCHEMA IN (${placeholders})
AND REFERENCED_TABLE_NAME IS NOT NULL
ORDER BY TABLE_NAME, COLUMN_NAME
ORDER BY TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME
`,
[database],
databases,
);
const columnsByTable = groupByTable(columns);
const primaryKeysByTable = primaryKeyMap(primaryKeys);
const foreignKeysByTable = groupByTable(foreignKeys);
const columnsByTable = groupByTable(columns, this.poolConfig.database);
const primaryKeysByTable = primaryKeyMap(primaryKeys, this.poolConfig.database);
const foreignKeysByTable = groupByTable(foreignKeys, this.poolConfig.database);
const schemaTables = tables.map((table) =>
this.toSchemaTable(table, columnsByTable.get(table.TABLE_NAME) ?? [], primaryKeysByTable, foreignKeysByTable),
this.toSchemaTable(
table.TABLE_SCHEMA ?? this.poolConfig.database,
table,
columnsByTable.get(mysqlTableKey(table.TABLE_SCHEMA ?? this.poolConfig.database, table.TABLE_NAME)) ?? [],
primaryKeysByTable,
foreignKeysByTable,
),
);
return {
connectionId: this.connectionId,
driver: 'mysql',
extractedAt: this.now().toISOString(),
scope: { schemas: [database] },
scope: { schemas: databases },
metadata: {
database,
database: this.poolConfig.database,
schemas: databases,
host: this.poolConfig.host,
table_count: schemaTables.length,
total_columns: schemaTables.reduce((sum, table) => sum + table.columns.length, 0),
@ -487,6 +520,7 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
}
private toSchemaTable(
database: string,
table: MysqlTableRow,
columns: MysqlColumnRow[],
primaryKeysByTable: Map<string, Set<string>>,
@ -497,13 +531,17 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
const estimatedRows = kind === 'view' ? null : Number(table.TABLE_ROWS ?? 0);
return {
catalog: null,
db: this.poolConfig.database,
db: database,
name: tableName,
kind,
comment: cleanMySqlTableComment(table.TABLE_COMMENT),
estimatedRows: Number.isFinite(estimatedRows) ? estimatedRows : null,
columns: columns.map((column) => this.toSchemaColumn(column, primaryKeysByTable.get(tableName) ?? new Set())),
foreignKeys: (foreignKeysByTable.get(tableName) ?? []).map((row) => this.toSchemaForeignKey(row)),
columns: columns.map((column) =>
this.toSchemaColumn(column, primaryKeysByTable.get(mysqlTableKey(database, tableName)) ?? new Set()),
),
foreignKeys: (foreignKeysByTable.get(mysqlTableKey(database, tableName)) ?? []).map((row) =>
this.toSchemaForeignKey(database, row),
),
};
}
@ -519,11 +557,11 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
};
}
private toSchemaForeignKey(row: MysqlForeignKeyRow): KtxSchemaForeignKey {
private toSchemaForeignKey(database: string, row: MysqlForeignKeyRow): KtxSchemaForeignKey {
return {
fromColumn: row.COLUMN_NAME,
toCatalog: null,
toDb: this.poolConfig.database,
toDb: database,
toTable: row.REFERENCED_TABLE_NAME,
toColumn: row.REFERENCED_COLUMN_NAME,
constraintName: row.CONSTRAINT_NAME || null,