feat(cli): redesign database scope picker for searchable schema-first setup (#203)

* feat: add searchable setup prompt pickers

* fix: make snowflake scope discovery single query

* fix: make bigquery table discovery schema scoped

* fix: honor mysql and clickhouse database scope

* feat: wire schema scope discovery for all relational setup drivers

* feat: add schema-first database scope picker

* test: update setup prompt stubs for type-check

* docs: document database scope picker fields

* Fix database setup edit preservation

---------

Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
This commit is contained in:
Andrey Avtomonov 2026-05-22 14:22:11 +02:00 committed by GitHub
parent fd2ba62d92
commit c87d14a554
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 1530 additions and 331 deletions

View file

@ -234,6 +234,89 @@ describe('KtxBigQueryScanConnector', () => {
await connector.cleanup();
});
it('constructs for discovery without dataset scope and lists tables through one region information schema query', async () => {
const createQueryJob = vi.fn(
async (
input: { query: string; params?: Record<string, unknown>; location?: string },
): ReturnType<KtxBigQueryClient['createQueryJob']> => [
{
getQueryResults: async (): ReturnType<KtxBigQueryQueryJob['getQueryResults']> => [
[
{ table_schema: 'analytics', table_name: 'orders', table_type: 'BASE TABLE' },
{ table_schema: 'analytics', table_name: 'order_clone', table_type: 'CLONE' },
{ table_schema: 'mart', table_name: 'orders_mv', table_type: 'MATERIALIZED VIEW' },
],
undefined,
{
schema: {
fields: [
{ name: 'table_schema', type: 'STRING' },
{ name: 'table_name', type: 'STRING' },
{ name: 'table_type', type: 'STRING' },
],
},
},
],
},
],
);
const clientFactory: KtxBigQueryClientFactory = {
createClient: vi.fn(() => ({
getDatasets: vi.fn(async () => [[{ id: 'analytics' }, { id: 'mart' }]] as [{ id: string }[]]),
dataset: vi.fn((datasetId: string) => ({
get: vi.fn(async () => [{ id: datasetId }]),
getTables: vi.fn(async () => [[]] as [never[]]),
})),
createQueryJob,
})),
};
const connector = new KtxBigQueryScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'bigquery',
credentials_json: JSON.stringify({ project_id: 'project-1' }),
location: 'US',
},
clientFactory,
});
await expect(connector.listTables(['analytics', 'mart'])).resolves.toEqual([
{ schema: 'analytics', name: 'orders', kind: 'table' },
{ schema: 'analytics', name: 'order_clone', kind: 'table' },
{ schema: 'mart', name: 'orders_mv', kind: 'view' },
]);
expect(createQueryJob).toHaveBeenCalledTimes(1);
expect(createQueryJob).toHaveBeenCalledWith(
expect.objectContaining({
location: 'US',
params: { dataset_ids: ['analytics', 'mart'] },
}),
);
expect(createQueryJob.mock.calls[0]?.[0].query).toContain('`project-1`.`region-us`.INFORMATION_SCHEMA.TABLES');
expect(createQueryJob.mock.calls[0]?.[0].query).toContain("'CLONE'");
expect(createQueryJob.mock.calls[0]?.[0].query).toContain("'SNAPSHOT'");
});
it('keeps scan paths requiring dataset scope', async () => {
const connector = new KtxBigQueryScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'bigquery',
credentials_json: JSON.stringify({ project_id: 'project-1' }),
location: 'US',
},
clientFactory: fakeClientFactory(),
});
await expect(
connector.introspect(
{ connectionId: 'warehouse', driver: 'bigquery' },
{ runId: 'scan-run-1' },
),
).rejects.toThrow('Native BigQuery scan requires connections.warehouse.dataset_ids or dataset_id');
});
it('applies maximumBytesBilled to read-only queries when configured', async () => {
const clientFactory = fakeClientFactory();
const connector = new KtxBigQueryScanConnector({

View file

@ -1,4 +1,5 @@
import { BigQuery, type TableField } from '@google-cloud/bigquery';
import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from '../../context/connections/bigquery-identifiers.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
import { readFileSync } from 'node:fs';
@ -230,9 +231,6 @@ export function bigQueryConnectionConfigFromConfig(input: {
throw new Error(`Native BigQuery connector requires credentials_json.project_id for connections.${input.connectionId}`);
}
const resolvedDatasetIds = datasetIds(input.connection, env);
if (resolvedDatasetIds.length === 0) {
throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.dataset_id or dataset_ids`);
}
const location = stringConfigValue(input.connection, 'location', env);
return { projectId, credentials, datasetIds: resolvedDatasetIds, ...(location ? { location } : {}) };
}
@ -289,17 +287,18 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const tables: KtxSchemaTable[] = [];
for (const datasetId of this.resolved.datasetIds) {
const datasetIds = this.requireDatasetIdsForScan();
for (const datasetId of datasetIds) {
tables.push(...(await this.introspectDataset(datasetId)));
}
return {
connectionId: this.connectionId,
driver: 'bigquery',
extractedAt: this.now().toISOString(),
scope: { catalogs: [this.resolved.projectId], datasets: this.resolved.datasetIds },
scope: { catalogs: [this.resolved.projectId], datasets: datasetIds },
metadata: {
project_id: this.resolved.projectId,
datasets: this.resolved.datasetIds,
datasets: datasetIds,
table_count: tables.length,
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
},
@ -381,22 +380,33 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
}
async listTables(datasetIds?: string[]): Promise<KtxTableListEntry[]> {
const filterDatasets = datasetIds ?? (await this.listDatasets());
const entries: KtxTableListEntry[] = [];
for (const datasetId of filterDatasets) {
const dataset = this.getClient().dataset(datasetId);
const [tables] = await dataset.getTables();
for (const table of tables) {
if (!table.id) continue;
entries.push({
schema: datasetId,
name: table.id,
kind: table.metadata?.type === 'VIEW' ? 'view' : 'table',
});
}
const projectId = normalizeBigQueryProjectId(this.resolved.projectId, 'table discovery');
const region = normalizeBigQueryRegion(this.resolved.location ?? 'US', 'table discovery');
const params: Record<string, unknown> = {};
const filter = datasetIds && datasetIds.length > 0 ? 'AND table_schema IN UNNEST(@dataset_ids)' : '';
if (datasetIds && datasetIds.length > 0) {
params.dataset_ids = datasetIds;
}
entries.sort((a, b) => a.schema.localeCompare(b.schema) || a.name.localeCompare(b.name));
return entries;
const rows = await this.queryRaw<{ table_schema: string; table_name: string; table_type: string }>(
`
SELECT table_schema, table_name, table_type
FROM \`${projectId}\`.\`region-${region}\`.INFORMATION_SCHEMA.TABLES
WHERE table_type IN (
'BASE TABLE', 'VIEW', 'MATERIALIZED VIEW', 'EXTERNAL', 'CLONE', 'SNAPSHOT'
)
${filter}
ORDER BY table_schema, table_name
`,
params,
);
return rows.map((row) => ({
schema: row.table_schema,
name: row.table_name,
kind:
row.table_type === 'VIEW' || row.table_type === 'MATERIALIZED VIEW'
? ('view' as const)
: ('table' as const),
}));
}
async cleanup(): Promise<void> {
@ -413,6 +423,13 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
return this.client;
}
private requireDatasetIdsForScan(): string[] {
if (this.resolved.datasetIds.length === 0) {
throw new Error(`Native BigQuery scan requires connections.${this.connectionId}.dataset_ids or dataset_id`);
}
return this.resolved.datasetIds;
}
private async query(sql: string, params?: Record<string, unknown>): Promise<KtxQueryResult> {
const [job] = await this.getClient().createQueryJob({
query: sql,

View file

@ -25,7 +25,7 @@ function fakeClientFactory(): KtxClickHouseClientFactory {
{ table: 'event_summary', name: 'event_name', type: 'String', comment: '', is_in_primary_key: 0 },
]);
}
if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY table')) {
if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY')) {
return result([{ table: 'events', row_count: '2' }]);
}
if (input.query.includes('SELECT `id`, `event_name` FROM `analytics`.`events` LIMIT 1')) {
@ -90,6 +90,50 @@ function fakeClientFactory(): KtxClickHouseClientFactory {
};
}
function multiDatabaseClickHouseClientFactory(): KtxClickHouseClientFactory {
const query = vi.fn(async (input: { query: string; format: string; query_params?: Record<string, unknown> }) => {
if (input.query.includes('FROM system.tables')) {
expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] });
return result([
{ database: 'analytics', name: 'events', engine: 'MergeTree', comment: 'Event stream' },
{ database: 'mart', name: 'order_events', engine: 'MergeTree', comment: '' },
]);
}
if (input.query.includes('FROM system.columns')) {
expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] });
return result([
{
database: 'analytics',
table: 'events',
name: 'id',
type: 'UInt64',
comment: '',
is_in_primary_key: 1,
},
{
database: 'mart',
table: 'order_events',
name: 'id',
type: 'UInt64',
comment: '',
is_in_primary_key: 1,
},
]);
}
if (input.query.includes('FROM system.parts') && input.query.includes('GROUP BY')) {
expect(input.query_params).toEqual({ databases: ['analytics', 'mart'] });
return result([
{ database: 'analytics', table: 'events', row_count: '2' },
{ database: 'mart', table: 'order_events', row_count: '5' },
]);
}
throw new Error(`Unexpected SQL: ${input.query}`);
});
return {
createClient: vi.fn(() => ({ query, close: vi.fn(async () => undefined) })),
};
}
describe('KtxClickHouseScanConnector', () => {
it('resolves ClickHouse connection configuration safely', () => {
expect(isKtxClickHouseConnectionConfig({ driver: 'clickhouse', host: 'localhost', database: 'analytics' })).toBe(
@ -166,6 +210,34 @@ describe('KtxClickHouseScanConnector', () => {
expect(snapshot.tables.find((table) => table.name === 'events')?.foreignKeys).toEqual([]);
});
it('introspects every configured ClickHouse database scope while preserving the default database', async () => {
const connector = new KtxClickHouseScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'clickhouse',
host: 'ch.example.test',
database: 'analytics',
databases: ['analytics', 'mart'],
username: 'reader',
password: 'test-pass', // pragma: allowlist secret
},
clientFactory: multiDatabaseClickHouseClientFactory(),
now: () => new Date('2026-05-21T10:00:00.000Z'),
});
const snapshot = await connector.introspect(
{ connectionId: 'warehouse', driver: 'clickhouse' },
{ runId: 'scan-run-1' },
);
expect(snapshot.scope).toEqual({ schemas: ['analytics', 'mart'] });
expect(snapshot.metadata).toMatchObject({ database: 'analytics', databases: ['analytics', 'mart'] });
expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual([
'analytics.events',
'mart.order_events',
]);
});
it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => {
const clientFactory = fakeClientFactory();
const connector = new KtxClickHouseScanConnector({

View file

@ -12,6 +12,7 @@ export interface KtxClickHouseConnectionConfig {
host?: string;
port?: number;
database?: string;
databases?: string[];
username?: string;
user?: string;
password?: string;
@ -87,12 +88,14 @@ export interface KtxClickHouseColumnDistinctValuesResult {
}
interface ClickHouseTableRow {
database?: string;
name: string;
engine: string;
comment: string;
}
interface ClickHouseColumnRow {
database?: string;
table: string;
name: string;
type: string;
@ -101,6 +104,7 @@ interface ClickHouseColumnRow {
}
interface ClickHouseRowCountRow {
database?: string;
table?: string;
row_count?: string | number;
count?: string | number;
@ -174,6 +178,25 @@ function isNullableClickHouseType(type: string): boolean {
return type.startsWith('Nullable(') || type.startsWith('LowCardinality(Nullable(');
}
function configuredClickHouseDatabases(
connection: KtxClickHouseConnectionConfig,
fallbackDatabase: string,
): string[] {
if (Array.isArray(connection.databases) && connection.databases.length > 0) {
const selected = connection.databases
.filter((database): database is string => typeof database === 'string' && database.trim().length > 0)
.map((database) => database.trim());
if (selected.length > 0) {
return [...new Set(selected)];
}
}
return [fallbackDatabase];
}
function clickHouseTableKey(database: string, table: string): string {
return `${database}.${table}`;
}
export function isKtxClickHouseConnectionConfig(
connection: KtxClickHouseConnectionConfig | undefined,
): connection is KtxClickHouseConnectionConfig {
@ -261,52 +284,61 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const database = this.clientConfig.database;
const databases = configuredClickHouseDatabases(this.connection, this.clientConfig.database);
const tables = await this.queryEachRow<ClickHouseTableRow>(
`
SELECT name, engine, comment
SELECT database, name, engine, comment
FROM system.tables
WHERE database = {database:String}
WHERE database IN {databases:Array(String)}
AND engine NOT IN ('Dictionary')
ORDER BY name
ORDER BY database, name
`,
{ database },
{ databases },
);
const columns = await this.queryEachRow<ClickHouseColumnRow>(
`
SELECT table, name, type, comment, is_in_primary_key
SELECT database, table, name, type, comment, is_in_primary_key
FROM system.columns
WHERE database = {database:String}
ORDER BY table, position
WHERE database IN {databases:Array(String)}
ORDER BY database, table, position
`,
{ database },
{ databases },
);
const rowCounts = await this.queryEachRow<ClickHouseRowCountRow>(
`
SELECT table, sum(rows) AS row_count
SELECT database, table, sum(rows) AS row_count
FROM system.parts
WHERE database = {database:String}
WHERE database IN {databases:Array(String)}
AND active = 1
GROUP BY table
GROUP BY database, table
`,
{ database },
{ databases },
);
const columnsByTable = new Map<string, ClickHouseColumnRow[]>();
for (const column of columns) {
columnsByTable.set(column.table, [...(columnsByTable.get(column.table) ?? []), column]);
const key = clickHouseTableKey(column.database ?? this.clientConfig.database, column.table);
columnsByTable.set(key, [...(columnsByTable.get(key) ?? []), column]);
}
const rowCountByTable = new Map(rowCounts.map((row) => [String(row.table), Number(row.row_count ?? 0)]));
const schemaTables = tables.map((table) =>
this.toSchemaTable(table, columnsByTable.get(table.name) ?? [], rowCountByTable.get(table.name) ?? 0),
const rowCountByTable = new Map(
rowCounts.map((row) => [
clickHouseTableKey(row.database ?? this.clientConfig.database, String(row.table)),
Number(row.row_count ?? 0),
]),
);
const schemaTables = tables.map((table) => {
const database = table.database ?? this.clientConfig.database;
const key = clickHouseTableKey(database, table.name);
return this.toSchemaTable(database, table, columnsByTable.get(key) ?? [], rowCountByTable.get(key) ?? 0);
});
return {
connectionId: this.connectionId,
driver: 'clickhouse',
extractedAt: this.now().toISOString(),
scope: { schemas: [database] },
scope: { schemas: databases },
metadata: {
database,
database: this.clientConfig.database,
databases,
host: this.clientConfig.host,
table_count: schemaTables.length,
total_columns: schemaTables.reduce((sum, table) => sum + table.columns.length, 0),
@ -436,11 +468,16 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
}
}
private toSchemaTable(table: ClickHouseTableRow, columns: ClickHouseColumnRow[], estimatedRows: number): KtxSchemaTable {
private toSchemaTable(
database: string,
table: ClickHouseTableRow,
columns: ClickHouseColumnRow[],
estimatedRows: number,
): KtxSchemaTable {
const kind = tableKind(table.engine);
return {
catalog: null,
db: this.clientConfig.database,
db: database,
name: table.name,
kind,
comment: table.comment || null,

View file

@ -85,6 +85,84 @@ function fakePoolFactory(): KtxMysqlPoolFactory {
};
}
function multiSchemaMysqlPoolFactory(): KtxMysqlPoolFactory {
const query = vi.fn(async (sql: string, params?: unknown): Promise<[RowDataPacket[], FieldPacket[]]> => {
if (sql.includes('INFORMATION_SCHEMA.TABLES')) {
expect(params).toEqual(['analytics', 'mart']);
return mysqlResult(
[
{
TABLE_SCHEMA: 'analytics',
TABLE_NAME: 'customers',
TABLE_TYPE: 'BASE TABLE',
TABLE_COMMENT: '',
TABLE_ROWS: 2,
},
{
TABLE_SCHEMA: 'mart',
TABLE_NAME: 'orders',
TABLE_TYPE: 'BASE TABLE',
TABLE_COMMENT: '',
TABLE_ROWS: 3,
},
],
[
{ name: 'TABLE_SCHEMA' },
{ name: 'TABLE_NAME' },
{ name: 'TABLE_TYPE' },
{ name: 'TABLE_COMMENT' },
{ name: 'TABLE_ROWS' },
],
);
}
if (sql.includes('INFORMATION_SCHEMA.COLUMNS')) {
expect(params).toEqual(['analytics', 'mart']);
return mysqlResult(
[
{
TABLE_SCHEMA: 'analytics',
TABLE_NAME: 'customers',
COLUMN_NAME: 'id',
DATA_TYPE: 'int',
IS_NULLABLE: 'NO',
COLUMN_COMMENT: '',
},
{
TABLE_SCHEMA: 'mart',
TABLE_NAME: 'orders',
COLUMN_NAME: 'id',
DATA_TYPE: 'int',
IS_NULLABLE: 'NO',
COLUMN_COMMENT: '',
},
],
[],
);
}
if (sql.includes('INFORMATION_SCHEMA.KEY_COLUMN_USAGE') && sql.includes("CONSTRAINT_NAME = 'PRIMARY'")) {
expect(params).toEqual(['analytics', 'mart']);
return mysqlResult(
[
{ TABLE_SCHEMA: 'analytics', TABLE_NAME: 'customers', COLUMN_NAME: 'id' },
{ TABLE_SCHEMA: 'mart', TABLE_NAME: 'orders', COLUMN_NAME: 'id' },
],
[],
);
}
if (sql.includes('INFORMATION_SCHEMA.KEY_COLUMN_USAGE') && sql.includes('REFERENCED_TABLE_NAME IS NOT NULL')) {
expect(params).toEqual(['analytics', 'mart']);
return mysqlResult([], []);
}
throw new Error(`Unexpected SQL: ${sql} params=${JSON.stringify(params)}`);
});
return {
createPool: vi.fn(() => ({
getConnection: vi.fn(async () => ({ query, release: vi.fn() })),
end: vi.fn(async () => undefined),
})),
};
}
describe('KtxMysqlScanConnector', () => {
it('resolves MySQL connection configuration safely', () => {
expect(isKtxMysqlConnectionConfig({ driver: 'mysql', host: 'localhost', database: 'analytics' })).toBe(true);
@ -169,6 +247,34 @@ describe('KtxMysqlScanConnector', () => {
]);
});
it('introspects every configured MySQL schema scope', async () => {
const connector = new KtxMysqlScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'mysql',
host: 'db.example.test',
database: 'analytics',
schemas: ['analytics', 'mart'],
username: 'reader',
password: 'secret', // pragma: allowlist secret
},
poolFactory: multiSchemaMysqlPoolFactory(),
now: () => new Date('2026-05-21T10:00:00.000Z'),
});
const snapshot = await connector.introspect(
{ connectionId: 'warehouse', driver: 'mysql' },
{ runId: 'scan-run-1' },
);
expect(snapshot.scope).toEqual({ schemas: ['analytics', 'mart'] });
expect(snapshot.metadata).toMatchObject({ database: 'analytics', schemas: ['analytics', 'mart'] });
expect(snapshot.tables.map((table) => `${table.db}.${table.name}`)).toEqual([
'analytics.customers',
'mart.orders',
]);
});
it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => {
const poolFactory = fakePoolFactory();
const connector = new KtxMysqlScanConnector({

View file

@ -11,6 +11,7 @@ export interface KtxMysqlConnectionConfig {
host?: string;
port?: number;
database?: string;
schemas?: string[];
username?: string;
user?: string;
password?: string;
@ -79,6 +80,7 @@ export interface KtxMysqlColumnDistinctValuesResult {
}
interface MysqlTableRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
TABLE_TYPE: string;
TABLE_COMMENT: string | null;
@ -86,6 +88,7 @@ interface MysqlTableRow extends RowDataPacket {
}
interface MysqlColumnRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
COLUMN_NAME: string;
DATA_TYPE: string;
@ -94,11 +97,13 @@ interface MysqlColumnRow extends RowDataPacket {
}
interface MysqlPrimaryKeyRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
COLUMN_NAME: string;
}
interface MysqlForeignKeyRow extends RowDataPacket {
TABLE_SCHEMA: string;
TABLE_NAME: string;
COLUMN_NAME: string;
REFERENCED_TABLE_NAME: string;
@ -185,22 +190,42 @@ function cleanMySqlTableComment(comment: string | null): string | null {
return comment;
}
function groupByTable<T extends { TABLE_NAME: string }>(rows: T[]): Map<string, T[]> {
function configuredMysqlSchemas(connection: KtxMysqlConnectionConfig, fallbackDatabase: string): string[] {
if (Array.isArray(connection.schemas) && connection.schemas.length > 0) {
const selected = connection.schemas
.filter((schema): schema is string => typeof schema === 'string' && schema.trim().length > 0)
.map((schema) => schema.trim());
if (selected.length > 0) {
return [...new Set(selected)];
}
}
return [fallbackDatabase];
}
function mysqlTableKey(schema: string, table: string): string {
return `${schema}.${table}`;
}
function groupByTable<T extends { TABLE_SCHEMA?: string; TABLE_NAME: string }>(
rows: T[],
fallbackDatabase: string,
): Map<string, T[]> {
const grouped = new Map<string, T[]>();
for (const row of rows) {
const tableRows = grouped.get(row.TABLE_NAME) ?? [];
const tableRows = grouped.get(mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME)) ?? [];
tableRows.push(row);
grouped.set(row.TABLE_NAME, tableRows);
grouped.set(mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME), tableRows);
}
return grouped;
}
function primaryKeyMap(rows: MysqlPrimaryKeyRow[]): Map<string, Set<string>> {
function primaryKeyMap(rows: MysqlPrimaryKeyRow[], fallbackDatabase: string): Map<string, Set<string>> {
const grouped = new Map<string, Set<string>>();
for (const row of rows) {
const columns = grouped.get(row.TABLE_NAME) ?? new Set<string>();
const key = mysqlTableKey(row.TABLE_SCHEMA ?? fallbackDatabase, row.TABLE_NAME);
const columns = grouped.get(key) ?? new Set<string>();
columns.add(row.COLUMN_NAME);
grouped.set(row.TABLE_NAME, columns);
grouped.set(key, columns);
}
return grouped;
}
@ -308,60 +333,68 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const database = this.poolConfig.database;
const databases = configuredMysqlSchemas(this.connection, this.poolConfig.database);
const placeholders = databases.map(() => '?').join(', ');
const tables = await this.queryRaw<MysqlTableRow>(
`
SELECT TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, TABLE_ROWS
SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, TABLE_ROWS
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
ORDER BY TABLE_NAME
WHERE TABLE_SCHEMA IN (${placeholders}) AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
ORDER BY TABLE_SCHEMA, TABLE_NAME
`,
[database],
databases,
);
const columns = await this.queryRaw<MysqlColumnRow>(
`
SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = ?
ORDER BY TABLE_NAME, ORDINAL_POSITION
WHERE TABLE_SCHEMA IN (${placeholders})
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION
`,
[database],
databases,
);
const primaryKeys = await this.queryRaw<MysqlPrimaryKeyRow>(
`
SELECT TABLE_NAME, COLUMN_NAME
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = ?
WHERE TABLE_SCHEMA IN (${placeholders})
AND CONSTRAINT_NAME = 'PRIMARY'
ORDER BY TABLE_NAME, ORDINAL_POSITION
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION
`,
[database],
databases,
);
const foreignKeys = await this.queryRaw<MysqlForeignKeyRow>(
`
SELECT TABLE_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME, CONSTRAINT_NAME
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME, CONSTRAINT_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = ?
WHERE TABLE_SCHEMA IN (${placeholders})
AND REFERENCED_TABLE_NAME IS NOT NULL
ORDER BY TABLE_NAME, COLUMN_NAME
ORDER BY TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME
`,
[database],
databases,
);
const columnsByTable = groupByTable(columns);
const primaryKeysByTable = primaryKeyMap(primaryKeys);
const foreignKeysByTable = groupByTable(foreignKeys);
const columnsByTable = groupByTable(columns, this.poolConfig.database);
const primaryKeysByTable = primaryKeyMap(primaryKeys, this.poolConfig.database);
const foreignKeysByTable = groupByTable(foreignKeys, this.poolConfig.database);
const schemaTables = tables.map((table) =>
this.toSchemaTable(table, columnsByTable.get(table.TABLE_NAME) ?? [], primaryKeysByTable, foreignKeysByTable),
this.toSchemaTable(
table.TABLE_SCHEMA ?? this.poolConfig.database,
table,
columnsByTable.get(mysqlTableKey(table.TABLE_SCHEMA ?? this.poolConfig.database, table.TABLE_NAME)) ?? [],
primaryKeysByTable,
foreignKeysByTable,
),
);
return {
connectionId: this.connectionId,
driver: 'mysql',
extractedAt: this.now().toISOString(),
scope: { schemas: [database] },
scope: { schemas: databases },
metadata: {
database,
database: this.poolConfig.database,
schemas: databases,
host: this.poolConfig.host,
table_count: schemaTables.length,
total_columns: schemaTables.reduce((sum, table) => sum + table.columns.length, 0),
@ -487,6 +520,7 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
}
private toSchemaTable(
database: string,
table: MysqlTableRow,
columns: MysqlColumnRow[],
primaryKeysByTable: Map<string, Set<string>>,
@ -497,13 +531,17 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
const estimatedRows = kind === 'view' ? null : Number(table.TABLE_ROWS ?? 0);
return {
catalog: null,
db: this.poolConfig.database,
db: database,
name: tableName,
kind,
comment: cleanMySqlTableComment(table.TABLE_COMMENT),
estimatedRows: Number.isFinite(estimatedRows) ? estimatedRows : null,
columns: columns.map((column) => this.toSchemaColumn(column, primaryKeysByTable.get(tableName) ?? new Set())),
foreignKeys: (foreignKeysByTable.get(tableName) ?? []).map((row) => this.toSchemaForeignKey(row)),
columns: columns.map((column) =>
this.toSchemaColumn(column, primaryKeysByTable.get(mysqlTableKey(database, tableName)) ?? new Set()),
),
foreignKeys: (foreignKeysByTable.get(mysqlTableKey(database, tableName)) ?? []).map((row) =>
this.toSchemaForeignKey(database, row),
),
};
}
@ -519,11 +557,11 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
};
}
private toSchemaForeignKey(row: MysqlForeignKeyRow): KtxSchemaForeignKey {
private toSchemaForeignKey(database: string, row: MysqlForeignKeyRow): KtxSchemaForeignKey {
return {
fromColumn: row.COLUMN_NAME,
toCatalog: null,
toDb: this.poolConfig.database,
toDb: database,
toTable: row.REFERENCED_TABLE_NAME,
toColumn: row.REFERENCED_COLUMN_NAME,
constraintName: row.CONSTRAINT_NAME || null,

View file

@ -215,6 +215,75 @@ describe('KtxSnowflakeScanConnector', () => {
expect(driver.cleanup).toHaveBeenCalledTimes(1);
});
it('lists tables across schemas with one information schema query', async () => {
const queries: Array<{ sql: string; params?: unknown }> = [];
const driverFactory: KtxSnowflakeDriverFactory = {
createDriver: vi.fn(() => ({
test: vi.fn(async () => ({ success: true })),
query: vi.fn(async (sql: string, params?: unknown) => {
queries.push({ sql, params });
return {
headers: ['TABLE_SCHEMA', 'TABLE_NAME', 'TABLE_TYPE'],
rows: [
['MART', 'ORDERS', 'BASE TABLE'],
['PUBLIC', 'ORDER_SUMMARY', 'VIEW'],
],
totalRows: 2,
rowCount: 2,
};
}),
getSchemaMetadata: vi.fn(async () => []),
listSchemas: vi.fn(async () => []),
listTables: vi.fn(async () => []),
cleanup: vi.fn(async () => undefined),
})),
};
const connector = new KtxSnowflakeScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'snowflake',
authMethod: 'password',
account: 'acct',
warehouse: 'WH',
database: 'ANALYTICS',
schema_name: 'PUBLIC',
username: 'reader',
password: 'fixture-pass', // pragma: allowlist secret
},
driverFactory,
});
await expect(connector.listTables(['MART', 'PUBLIC'])).resolves.toEqual([
{ schema: 'MART', name: 'ORDERS', kind: 'table' },
{ schema: 'PUBLIC', name: 'ORDER_SUMMARY', kind: 'view' },
]);
expect(queries).toHaveLength(1);
expect(queries[0]?.sql).toContain('FROM "ANALYTICS".INFORMATION_SCHEMA.TABLES');
expect(queries[0]?.sql).toContain('AND TABLE_SCHEMA IN (?, ?)');
expect(queries[0]?.params).toEqual(['ANALYTICS', 'MART', 'PUBLIC']);
});
it('rejects unsafe Snowflake identifiers before driver creation', () => {
expect(
() =>
new KtxSnowflakeScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'snowflake',
authMethod: 'password',
account: 'acct',
warehouse: 'WH;DROP',
database: 'ANALYTICS',
schema_name: 'PUBLIC',
username: 'reader',
password: 'fixture-pass', // pragma: allowlist secret
},
driverFactory: fakeDriverFactory(),
}),
).toThrow('Invalid Snowflake warehouse identifier "WH;DROP"');
});
it('converts a native snapshot into a live-database introspection snapshot', async () => {
const introspection = createSnowflakeLiveDatabaseIntrospection({
connections: {

View file

@ -6,6 +6,7 @@ import { assertReadOnlySql, limitSqlForExecution } from '../../context/connectio
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableListEntry, type KtxTableSampleResult } from '../../context/scan/types.js';
import * as snowflake from 'snowflake-sdk';
import { KtxSnowflakeDialect } from './dialect.js';
import { assertSafeSnowflakeIdentifier, quoteSnowflakeIdentifier } from './identifiers.js';
export interface KtxSnowflakeConnectionConfig {
driver?: string;
@ -206,16 +207,23 @@ export function snowflakeConnectionConfigFromConfig(input: {
if (!username) {
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.username`);
}
assertSafeSnowflakeIdentifier(warehouse, 'warehouse');
assertSafeSnowflakeIdentifier(database, 'database');
const resolvedSchemas = schemaNames(input.connection!, env);
for (const schema of resolvedSchemas) {
assertSafeSnowflakeIdentifier(schema, 'schema');
}
const resolved: KtxSnowflakeResolvedConnectionConfig = {
authMethod,
account,
warehouse,
database,
schemas: schemaNames(input.connection!, env),
schemas: resolvedSchemas,
username,
};
const role = stringConfigValue(input.connection, 'role', env);
if (role) {
assertSafeSnowflakeIdentifier(role, 'role');
resolved.role = role;
}
if (authMethod === 'rsa') {
@ -322,33 +330,30 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
}
async listSchemas(): Promise<string[]> {
const result = await this.query(`SHOW SCHEMAS IN DATABASE "${this.resolved.database}"`);
const result = await this.query(
`SHOW SCHEMAS IN DATABASE ${quoteSnowflakeIdentifier(this.resolved.database, 'database')}`,
);
return result.rows.map((row) => String(row[1])).filter((name) => name !== 'INFORMATION_SCHEMA');
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const entries: KtxTableListEntry[] = [];
for (const schemaName of filterSchemas) {
const result = await this.query(
`
SELECT TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
ORDER BY TABLE_NAME
`,
[schemaName, this.resolved.database],
);
for (const row of result.rows) {
entries.push({
schema: schemaName,
name: String(row[0]),
kind: String(row[1]) === 'VIEW' ? 'view' : 'table',
});
}
}
return entries;
const filters = schemas && schemas.length > 0 ? schemas.map(() => '?').join(', ') : null;
const result = await this.query(
`
SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE
FROM ${quoteSnowflakeIdentifier(this.resolved.database, 'database')}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_CATALOG = ?
AND TABLE_SCHEMA <> 'INFORMATION_SCHEMA'
${filters ? `AND TABLE_SCHEMA IN (${filters})` : ''}
ORDER BY TABLE_SCHEMA, TABLE_NAME
`,
[this.resolved.database, ...(schemas ?? [])],
);
return result.rows.map((row) => ({
schema: String(row[0]),
name: String(row[1]),
kind: String(row[2]) === 'VIEW' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
@ -414,11 +419,20 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
private async setConnectionContext(connection: snowflake.Connection): Promise<void> {
if (this.resolved.role) {
await this.executeSnowflakeQuery(connection, `USE ROLE "${this.resolved.role}"`);
await this.executeSnowflakeQuery(connection, `USE ROLE ${quoteSnowflakeIdentifier(this.resolved.role, 'role')}`);
}
await this.executeSnowflakeQuery(connection, `USE WAREHOUSE "${this.resolved.warehouse}"`);
await this.executeSnowflakeQuery(connection, `USE DATABASE "${this.resolved.database}"`);
await this.executeSnowflakeQuery(connection, `USE SCHEMA "${this.resolved.schemas[0] ?? 'PUBLIC'}"`);
await this.executeSnowflakeQuery(
connection,
`USE WAREHOUSE ${quoteSnowflakeIdentifier(this.resolved.warehouse, 'warehouse')}`,
);
await this.executeSnowflakeQuery(
connection,
`USE DATABASE ${quoteSnowflakeIdentifier(this.resolved.database, 'database')}`,
);
await this.executeSnowflakeQuery(
connection,
`USE SCHEMA ${quoteSnowflakeIdentifier(this.resolved.schemas[0] ?? 'PUBLIC', 'schema')}`,
);
}
private async executeSnowflakeQuery(
@ -601,8 +615,24 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
return this.getDriver().listSchemas();
}
listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
return this.getDriver().listTables(schemas);
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filters = schemas && schemas.length > 0 ? schemas.map(() => '?').join(', ') : null;
const result = await this.getDriver().query(
`
SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE
FROM ${quoteSnowflakeIdentifier(this.resolved.database, 'database')}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_CATALOG = ?
AND TABLE_SCHEMA <> 'INFORMATION_SCHEMA'
${filters ? `AND TABLE_SCHEMA IN (${filters})` : ''}
ORDER BY TABLE_SCHEMA, TABLE_NAME
`,
[this.resolved.database, ...(schemas ?? [])],
);
return result.rows.map((row) => ({
schema: String(row[0]),
name: String(row[1]),
kind: String(row[2]) === 'VIEW' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {

View file

@ -0,0 +1,18 @@
import { describe, expect, it } from 'vitest';
import { assertSafeSnowflakeIdentifier, quoteSnowflakeIdentifier } from './identifiers.js';
describe('Snowflake identifier guards', () => {
it('quotes simple Snowflake identifiers', () => {
expect(quoteSnowflakeIdentifier('ANALYTICS_DB', 'database')).toBe('"ANALYTICS_DB"');
expect(quoteSnowflakeIdentifier('ROLE_1$', 'role')).toBe('"ROLE_1$"');
});
it('rejects configured identifiers with field and value in the error', () => {
expect(() => assertSafeSnowflakeIdentifier('bad.db', 'database')).toThrow(
'Invalid Snowflake database identifier "bad.db"; use a simple unquoted identifier matching /^[A-Za-z_][A-Za-z0-9_$]*$/',
);
expect(() => assertSafeSnowflakeIdentifier('WH"DROP', 'warehouse')).toThrow(
'Invalid Snowflake warehouse identifier "WH\\"DROP"; use a simple unquoted identifier matching /^[A-Za-z_][A-Za-z0-9_$]*$/',
);
});
});

View file

@ -0,0 +1,14 @@
const SNOWFLAKE_SIMPLE_IDENTIFIER = /^[A-Za-z_][A-Za-z0-9_$]*$/;
export function assertSafeSnowflakeIdentifier(value: string, field: string): string {
if (!SNOWFLAKE_SIMPLE_IDENTIFIER.test(value)) {
throw new Error(
`Invalid Snowflake ${field} identifier ${JSON.stringify(value)}; use a simple unquoted identifier matching ${SNOWFLAKE_SIMPLE_IDENTIFIER}`,
);
}
return value;
}
export function quoteSnowflakeIdentifier(value: string, field: string): string {
return `"${assertSafeSnowflakeIdentifier(value, field)}"`;
}