import { describe, expect, it, vi } from 'vitest'; import { clickHouseClientConfigFromConfig, createClickHouseLiveDatabaseIntrospection, isKtxClickHouseConnectionConfig, KtxClickHouseScanConnector, type KtxClickHouseClientFactory, } from './index.js'; function result(payload: T) { return { async json(): Promise { return payload; }, }; } function fakeClientFactory(): KtxClickHouseClientFactory { const query = vi.fn(async (input: { query: string; format: string; query_params?: Record }) => { if (input.query.includes('FROM system.tables')) { return result([ { name: 'events', engine: 'MergeTree', comment: 'Event stream' }, { name: 'event_summary', engine: 'View', comment: '' }, ]); } if (input.query.includes('FROM system.columns')) { return result([ { table: 'events', name: 'id', type: 'UInt64', comment: 'PK', is_in_primary_key: 1 }, { table: 'events', name: 'event_name', type: 'LowCardinality(String)', comment: '', is_in_primary_key: 0 }, { table: 'event_summary', name: 'event_name', type: 'String', comment: '', is_in_primary_key: 0 }, ]); } if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY table')) { return result([{ table: 'events', row_count: '2' }]); } if (input.query.includes('SELECT `id`, `event_name` FROM `analytics`.`events` LIMIT 1')) { return result({ meta: [ { name: 'id', type: 'UInt64' }, { name: 'event_name', type: 'String' }, ], data: [[10, 'signup']], rows: 1, }); } if (input.query.includes('SELECT `event_name` FROM `analytics`.`events`')) { return result({ meta: [{ name: 'event_name', type: 'String' }], data: [['signup'], ['purchase']], rows: 2, }); } if (input.query.includes('COUNT(DISTINCT val)')) { return result({ meta: [{ name: 'cardinality', type: 'UInt64' }], data: [[2]], rows: 1, }); } if (input.query.includes('SELECT DISTINCT toString(`event_name`) AS val')) { return result({ meta: [{ name: 'val', type: 'String' }], data: [['purchase'], ['signup']], rows: 2, }); } if (input.query.includes('sum(rows) AS count')) { return result({ meta: [{ name: 'count', type: 'UInt64' }], data: [[2]], rows: 1, }); } if (input.query.includes('FROM system.databases')) { return result([{ name: 'analytics' }, { name: 'warehouse' }]); } if (input.query.trim() === 'SELECT 1') { return result({ meta: [{ name: '1', type: 'UInt8' }], data: [[1]], rows: 1 }); } if (input.query.includes('select * from (select id, event_name from analytics.events) as ktx_query_result limit 1')) { return result({ meta: [ { name: 'id', type: 'UInt64' }, { name: 'event_name', type: 'String' }, ], data: [[10, 'signup']], rows: 1, }); } throw new Error(`Unexpected SQL: ${input.query}`); }); const close = vi.fn(async () => undefined); return { createClient: vi.fn(() => ({ query, close })), }; } describe('KtxClickHouseScanConnector', () => { it('resolves ClickHouse connection configuration safely', () => { expect(isKtxClickHouseConnectionConfig({ driver: 'clickhouse', host: 'localhost', database: 'analytics' })).toBe( true, ); expect(isKtxClickHouseConnectionConfig({ driver: 'mysql', host: 'localhost', database: 'analytics' })).toBe(false); expect( clickHouseClientConfigFromConfig({ connectionId: 'warehouse', connection: { driver: 'clickhouse', host: 'ch.example.test', port: 9440, database: 'analytics', username: 'reader', password: 'test-pass', // pragma: allowlist secret ssl: true, readonly: true, }, }), ).toMatchObject({ host: 'ch.example.test', port: 9440, database: 'analytics', username: 'reader', password: 'test-pass', // pragma: allowlist secret ssl: true, }); expect(() => clickHouseClientConfigFromConfig({ connectionId: 'warehouse', connection: { driver: 'clickhouse', host: 'ch.example.test', database: 'analytics', readonly: false }, }), ).toThrow('Native ClickHouse connector requires connections.warehouse.readonly: true'); }); it('introspects schema, primary keys, comments, row counts, and views', async () => { const connector = new KtxClickHouseScanConnector({ connectionId: 'warehouse', connection: { driver: 'clickhouse', host: 'ch.example.test', database: 'analytics', username: 'reader', password: 'test-pass', // pragma: allowlist secret readonly: true, }, clientFactory: fakeClientFactory(), now: () => new Date('2026-04-29T14:00:00.000Z'), }); const snapshot = await connector.introspect( { connectionId: 'warehouse', driver: 'clickhouse' }, { runId: 'scan-run-1' }, ); expect(snapshot).toMatchObject({ connectionId: 'warehouse', driver: 'clickhouse', extractedAt: '2026-04-29T14:00:00.000Z', scope: { schemas: ['analytics'] }, metadata: { database: 'analytics', host: 'ch.example.test', table_count: 2, total_columns: 3, }, }); expect(snapshot.tables.map((table) => [table.name, table.kind, table.estimatedRows, table.comment])).toEqual([ ['events', 'table', 2, 'Event stream'], ['event_summary', 'view', null, null], ]); expect(snapshot.tables.find((table) => table.name === 'events')?.columns[0]).toMatchObject({ name: 'id', nativeType: 'UInt64', normalizedType: 'UInt64', dimensionType: 'number', nullable: false, primaryKey: true, comment: 'PK', }); expect(snapshot.tables.find((table) => table.name === 'events')?.foreignKeys).toEqual([]); }); it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => { const clientFactory = fakeClientFactory(); const connector = new KtxClickHouseScanConnector({ connectionId: 'warehouse', connection: { driver: 'clickhouse', host: 'ch.example.test', database: 'analytics', username: 'reader', password: 'test-pass', // pragma: allowlist secret readonly: true, }, clientFactory, }); await expect( connector.sampleTable( { connectionId: 'warehouse', table: { catalog: null, db: 'analytics', name: 'events' }, columns: ['id', 'event_name'], limit: 1, }, { runId: 'scan-run-1' }, ), ).resolves.toEqual({ headers: ['id', 'event_name'], rows: [[10, 'signup']], totalRows: 1 }); await expect( connector.sampleColumn( { connectionId: 'warehouse', table: { catalog: null, db: 'analytics', name: 'events' }, column: 'event_name', limit: 5 }, { runId: 'scan-run-1' }, ), ).resolves.toMatchObject({ values: ['signup', 'purchase'], nullCount: null, distinctCount: null }); await expect( connector.getColumnDistinctValues( { catalog: null, db: 'analytics', name: 'events' }, 'event_name', { maxCardinality: 5, limit: 10, sampleSize: 100 }, ), ).resolves.toEqual({ values: ['purchase', 'signup'], cardinality: 2 }); await expect( connector.executeReadOnly( { connectionId: 'warehouse', sql: 'select id, event_name from analytics.events', maxRows: 1 }, { runId: 'scan-run-1' }, ), ).resolves.toMatchObject({ headers: ['id', 'event_name'], rows: [[10, 'signup']], totalRows: 1, rowCount: 1 }); await expect( connector.executeReadOnly({ connectionId: 'warehouse', sql: 'delete from events' }, { runId: 'scan-run-1' }), ).rejects.toThrow('Only read-only SELECT/WITH queries can be executed locally'); await expect(connector.getTableRowCount('events')).resolves.toBe(2); await expect(connector.listSchemas()).resolves.toEqual(['analytics', 'warehouse']); await expect( connector.columnStats( { connectionId: 'warehouse', table: { catalog: null, db: 'analytics', name: 'events' }, column: 'event_name' }, { runId: 'scan-run-1' }, ), ).resolves.toBeNull(); await connector.cleanup(); }); it('adapts native ClickHouse snapshots to live-database introspection for local ingest', async () => { const introspection = createClickHouseLiveDatabaseIntrospection({ connections: { warehouse: { driver: 'clickhouse', host: 'ch.example.test', database: 'analytics', username: 'reader', password: 'test-pass', // pragma: allowlist secret readonly: true, }, }, clientFactory: fakeClientFactory(), now: () => new Date('2026-04-29T14:00:00.000Z'), }); const snapshot = await introspection.extractSchema('warehouse'); expect(snapshot).toMatchObject({ connectionId: 'warehouse', extractedAt: '2026-04-29T14:00:00.000Z', }); expect(snapshot.tables.find((table) => table.name === 'events')).toMatchObject({ name: 'events', catalog: null, db: 'analytics', columns: [ { name: 'id', nativeType: 'UInt64', normalizedType: 'UInt64', dimensionType: 'number', nullable: false, primaryKey: true, comment: 'PK', }, { name: 'event_name', nativeType: 'LowCardinality(String)', normalizedType: 'LowCardinality(String)', dimensionType: 'string', nullable: false, primaryKey: false, comment: null, }, ], foreignKeys: [], }); }); });