From a698389bc92a43b24e4b0628ae3ea76509420755 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Fri, 22 May 2026 18:22:28 +0200 Subject: [PATCH] feat(scan): apply tableScope during metadata fetch --- .../src/connectors/bigquery/connector.test.ts | 54 ++++++ .../cli/src/connectors/bigquery/connector.ts | 15 +- .../bigquery/live-database-introspection.ts | 16 +- .../connectors/clickhouse/connector.test.ts | 52 ++++++ .../src/connectors/clickhouse/connector.ts | 43 ++++- .../clickhouse/live-database-introspection.ts | 13 +- .../src/connectors/mysql/connector.test.ts | 66 ++++++++ .../cli/src/connectors/mysql/connector.ts | 46 ++++- .../mysql/live-database-introspection.ts | 16 +- .../src/connectors/postgres/connector.test.ts | 58 +++++++ .../cli/src/connectors/postgres/connector.ts | 23 ++- .../postgres/live-database-introspection.ts | 16 +- .../connectors/snowflake/connector.test.ts | 56 ++++++ .../cli/src/connectors/snowflake/connector.ts | 41 ++++- .../snowflake/historic-sql-query-client.ts | 31 ++++ .../snowflake/live-database-introspection.ts | 15 +- .../connectors/snowflake/sdk-logger.test.ts | 57 +++++++ .../src/connectors/snowflake/sdk-logger.ts | 32 ++++ .../src/connectors/sqlite/connector.test.ts | 14 ++ .../cli/src/connectors/sqlite/connector.ts | 16 +- .../sqlite/live-database-introspection.ts | 16 +- .../connectors/sqlserver/connector.test.ts | 50 ++++++ .../cli/src/connectors/sqlserver/connector.ts | 76 ++++++--- .../sqlserver/live-database-introspection.ts | 13 +- packages/cli/src/context-build-view.ts | 3 +- .../cli/src/context/scan/local-scan.test.ts | 160 +++++++++++++++++- packages/cli/src/context/scan/local-scan.ts | 22 +-- packages/cli/src/context/scan/table-ref.ts | 2 + packages/cli/src/local-adapters.ts | 14 +- packages/cli/src/local-scan-connectors.ts | 2 +- packages/cli/src/public-ingest.test.ts | 13 +- packages/cli/src/public-ingest.ts | 81 +++++++-- packages/cli/src/setup-databases.test.ts | 57 ++++++- packages/cli/src/setup-databases.ts | 74 +++++++- 34 files changed, 1129 insertions(+), 134 deletions(-) create mode 100644 packages/cli/src/connectors/snowflake/historic-sql-query-client.ts create mode 100644 packages/cli/src/connectors/snowflake/sdk-logger.test.ts create mode 100644 packages/cli/src/connectors/snowflake/sdk-logger.ts diff --git a/packages/cli/src/connectors/bigquery/connector.test.ts b/packages/cli/src/connectors/bigquery/connector.test.ts index c517100a..be65af1e 100644 --- a/packages/cli/src/connectors/bigquery/connector.test.ts +++ b/packages/cli/src/connectors/bigquery/connector.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest'; import { bigQueryConnectionConfigFromConfig, isKtxBigQueryConnectionConfig, type KtxBigQueryClient, KtxBigQueryScanConnector, type KtxBigQueryClientFactory, type KtxBigQueryDataset, type KtxBigQueryQueryJob, type KtxBigQueryTableRef } from '../../connectors/bigquery/connector.js'; import { createBigQueryLiveDatabaseIntrospection } from '../../connectors/bigquery/live-database-introspection.js'; +import { tableRefSet } from '../../context/scan/table-ref.js'; function fakeClientFactory(): KtxBigQueryClientFactory { const queryResults = vi.fn(async (): ReturnType => [ @@ -234,6 +235,59 @@ describe('KtxBigQueryScanConnector', () => { await connector.cleanup(); }); + it('limits introspection to tables in tableScope', async () => { + const ordersGet = vi.fn(async (): ReturnType => [ + { + metadata: { + type: 'TABLE', + numRows: '12', + schema: { fields: [{ name: 'id', type: 'INT64', mode: 'REQUIRED' }] }, + }, + }, + ]); + const skippedGet = vi.fn(async (): ReturnType => [ + { metadata: { type: 'TABLE', numRows: '1', schema: { fields: [] } } }, + ]); + const clientFactory: KtxBigQueryClientFactory = { + createClient: vi.fn(() => ({ + getDatasets: vi.fn(async (): ReturnType => [[{ id: 'analytics' }]]), + dataset: vi.fn( + (): KtxBigQueryDataset => ({ + get: vi.fn(async () => [{ id: 'analytics' }]), + getTables: vi.fn(async (): ReturnType => [ + [ + { id: 'orders', get: ordersGet }, + { id: 'customers', get: skippedGet }, + ], + ]), + }), + ), + createQueryJob: vi.fn(async (): ReturnType => [ + { + getQueryResults: async (): ReturnType => [ + [], + undefined, + { schema: { fields: [{ name: 'table_name', type: 'STRING' }, { name: 'column_name', type: 'STRING' }] } }, + ], + }, + ]), + })), + }; + const connector = new KtxBigQueryScanConnector({ + connectionId: 'warehouse', + connection, + clientFactory, + }); + const scope = tableRefSet([{ catalog: 'project-1', db: 'analytics', name: 'orders' }]); + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'bigquery', tableScope: scope }, + { runId: 'scope-test' }, + ); + expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']); + expect(ordersGet).toHaveBeenCalledTimes(1); + expect(skippedGet).not.toHaveBeenCalled(); + }); + it('constructs for discovery without dataset scope and lists tables through one region information schema query', async () => { const createQueryJob = vi.fn( async ( diff --git a/packages/cli/src/connectors/bigquery/connector.ts b/packages/cli/src/connectors/bigquery/connector.ts index 6a93ccb0..7810e251 100644 --- a/packages/cli/src/connectors/bigquery/connector.ts +++ b/packages/cli/src/connectors/bigquery/connector.ts @@ -2,6 +2,7 @@ import { BigQuery, type TableField } from '@google-cloud/bigquery'; import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from '../../context/connections/bigquery-identifiers.js'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js'; +import { scopedTableNames } from '../../context/scan/table-ref.js'; import { readFileSync } from 'node:fs'; import { homedir } from 'node:os'; import { resolve } from 'node:path'; @@ -289,7 +290,10 @@ export class KtxBigQueryScanConnector implements KtxScanConnector { const tables: KtxSchemaTable[] = []; const datasetIds = this.requireDatasetIdsForScan(); for (const datasetId of datasetIds) { - tables.push(...(await this.introspectDataset(datasetId))); + const scopedNames = input.tableScope + ? scopedTableNames(input.tableScope, { catalog: this.resolved.projectId, db: datasetId }) + : null; + tables.push(...(await this.introspectDataset(datasetId, scopedNames))); } return { connectionId: this.connectionId, @@ -362,7 +366,7 @@ export class KtxBigQueryScanConnector implements KtxScanConnector { if (!datasetId) { return 0; } - const tables = await this.introspectDataset(datasetId); + const tables = await this.introspectDataset(datasetId, null); return tables.find((table) => table.name === tableName)?.estimatedRows ?? 0; } @@ -463,12 +467,15 @@ export class KtxBigQueryScanConnector implements KtxScanConnector { return firstNumber(rows[0]?.[header]); } - private async introspectDataset(datasetId: string): Promise { + private async introspectDataset(datasetId: string, scopedNames: readonly string[] | null): Promise { + if (scopedNames && scopedNames.length === 0) return []; const dataset = this.getClient().dataset(datasetId); const [tableRefs] = await dataset.getTables(); + const scopeSet = scopedNames ? new Set(scopedNames) : null; + const filteredTableRefs = scopeSet ? tableRefs.filter((tableRef) => scopeSet.has(tableRef.id ?? '')) : tableRefs; const primaryKeys = await this.primaryKeys(datasetId); const tables: KtxSchemaTable[] = []; - for (const tableRef of tableRefs) { + for (const tableRef of filteredTableRefs) { const tableName = tableRef.id || ''; const [table] = await tableRef.get(); const fields = table.metadata.schema?.fields ?? []; diff --git a/packages/cli/src/connectors/bigquery/live-database-introspection.ts b/packages/cli/src/connectors/bigquery/live-database-introspection.ts index 5e854b9e..4e701dc4 100644 --- a/packages/cli/src/connectors/bigquery/live-database-introspection.ts +++ b/packages/cli/src/connectors/bigquery/live-database-introspection.ts @@ -1,4 +1,7 @@ -import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js'; +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from '../../context/ingest/adapters/live-database/types.js'; import type { KtxProjectConnectionConfig } from '../../context/project/config.js'; import { KtxBigQueryScanConnector, @@ -16,7 +19,7 @@ export function createBigQueryLiveDatabaseIntrospection( options: CreateBigQueryLiveDatabaseIntrospectionOptions, ): LiveDatabaseIntrospectionPort { return { - async extractSchema(connectionId: string) { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) { const connection = options.connections[connectionId] as KtxBigQueryConnectionConfig | undefined; const connector = new KtxBigQueryScanConnector({ connectionId, @@ -25,7 +28,14 @@ export function createBigQueryLiveDatabaseIntrospection( now: options.now, }); try { - return await connector.introspect({ connectionId, driver: 'bigquery' }, { runId: `bigquery-${connectionId}` }); + return await connector.introspect( + { + connectionId, + driver: 'bigquery', + ...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}), + }, + { runId: `bigquery-${connectionId}` }, + ); } finally { await connector.cleanup(); } diff --git a/packages/cli/src/connectors/clickhouse/connector.test.ts b/packages/cli/src/connectors/clickhouse/connector.test.ts index a3ab11f6..abc7cad5 100644 --- a/packages/cli/src/connectors/clickhouse/connector.test.ts +++ b/packages/cli/src/connectors/clickhouse/connector.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest'; import { clickHouseClientConfigFromConfig, isKtxClickHouseConnectionConfig, KtxClickHouseScanConnector, type KtxClickHouseClientFactory } from '../../connectors/clickhouse/connector.js'; import { createClickHouseLiveDatabaseIntrospection } from '../../connectors/clickhouse/live-database-introspection.js'; +import { tableRefSet } from '../../context/scan/table-ref.js'; function result(payload: T) { return { @@ -238,6 +239,57 @@ describe('KtxClickHouseScanConnector', () => { ]); }); + it('limits introspection to tables in tableScope', async () => { + const queries: Array<{ query: string; query_params?: Record }> = []; + const clientFactory: KtxClickHouseClientFactory = { + createClient: vi.fn(() => ({ + query: vi.fn(async (input: { query: string; format: string; query_params?: Record }) => { + queries.push({ query: input.query, query_params: input.query_params }); + if (input.query.includes('FROM system.tables')) { + return result([{ database: 'analytics', name: 'events', engine: 'MergeTree', comment: '' }]); + } + if (input.query.includes('FROM system.columns')) { + return result([ + { + database: 'analytics', + table: 'events', + name: 'id', + type: 'UInt64', + comment: '', + is_in_primary_key: 1, + }, + ]); + } + if (input.query.includes('FROM system.parts')) { + return result([{ database: 'analytics', table: 'events', row_count: '2' }]); + } + throw new Error(`Unexpected SQL: ${input.query}`); + }), + close: vi.fn(async () => undefined), + })), + }; + const connector = new KtxClickHouseScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'clickhouse', + host: 'ch.example.test', + database: 'analytics', + username: 'reader', + password: 'test-pass', // pragma: allowlist secret + }, + clientFactory, + }); + const scope = tableRefSet([{ catalog: null, db: 'analytics', name: 'events' }]); + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'clickhouse', tableScope: scope }, + { runId: 'scope-test' }, + ); + expect(snapshot.tables.map((table) => table.name)).toEqual(['events']); + const tablesQuery = queries.find((query) => query.query.includes('FROM system.tables')); + expect(tablesQuery?.query).toContain('AND name IN {table_names:Array(String)}'); + expect(tablesQuery?.query_params).toEqual({ databases: ['analytics'], table_names: ['events'] }); + }); + it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => { const clientFactory = fakeClientFactory(); const connector = new KtxClickHouseScanConnector({ diff --git a/packages/cli/src/connectors/clickhouse/connector.ts b/packages/cli/src/connectors/clickhouse/connector.ts index 1d851001..a2ee568c 100644 --- a/packages/cli/src/connectors/clickhouse/connector.ts +++ b/packages/cli/src/connectors/clickhouse/connector.ts @@ -1,6 +1,7 @@ import { createClient } from '@clickhouse/client'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableListEntry, type KtxTableSampleResult } from '../../context/scan/types.js'; +import { scopedTableNames } from '../../context/scan/table-ref.js'; import { readFileSync } from 'node:fs'; import { Agent as HttpsAgent } from 'node:https'; import { homedir } from 'node:os'; @@ -285,24 +286,42 @@ export class KtxClickHouseScanConnector implements KtxScanConnector { async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const databases = configuredClickHouseDatabases(this.connection, this.clientConfig.database); + let allScopedTables: string[] | null = null; + if (input.tableScope) { + allScopedTables = []; + for (const database of databases) { + allScopedTables.push(...scopedTableNames(input.tableScope, { catalog: null, db: database })); + } + if (allScopedTables.length === 0) { + return this.emptySnapshot(databases); + } + } + const queryParams: Record = { databases }; + const tableNameClause = allScopedTables ? 'AND name IN {table_names:Array(String)}' : ''; + const columnTableNameClause = allScopedTables ? 'AND table IN {table_names:Array(String)}' : ''; + if (allScopedTables) { + queryParams.table_names = allScopedTables; + } const tables = await this.queryEachRow( ` SELECT database, name, engine, comment FROM system.tables WHERE database IN {databases:Array(String)} AND engine NOT IN ('Dictionary') + ${tableNameClause} ORDER BY database, name `, - { databases }, + queryParams, ); const columns = await this.queryEachRow( ` SELECT database, table, name, type, comment, is_in_primary_key FROM system.columns WHERE database IN {databases:Array(String)} + ${columnTableNameClause} ORDER BY database, table, position `, - { databases }, + queryParams, ); const rowCounts = await this.queryEachRow( ` @@ -310,9 +329,10 @@ export class KtxClickHouseScanConnector implements KtxScanConnector { FROM system.parts WHERE database IN {databases:Array(String)} AND active = 1 + ${columnTableNameClause} GROUP BY database, table `, - { databases }, + queryParams, ); const columnsByTable = new Map(); for (const column of columns) { @@ -347,6 +367,23 @@ export class KtxClickHouseScanConnector implements KtxScanConnector { }; } + private emptySnapshot(databases: string[]): KtxSchemaSnapshot { + return { + connectionId: this.connectionId, + driver: 'clickhouse', + extractedAt: this.now().toISOString(), + scope: { schemas: databases }, + metadata: { + database: this.clientConfig.database, + databases, + host: this.clientConfig.host, + table_count: 0, + total_columns: 0, + }, + tables: [], + }; + } + async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const result = await this.query( diff --git a/packages/cli/src/connectors/clickhouse/live-database-introspection.ts b/packages/cli/src/connectors/clickhouse/live-database-introspection.ts index 1e0ec918..74f9475d 100644 --- a/packages/cli/src/connectors/clickhouse/live-database-introspection.ts +++ b/packages/cli/src/connectors/clickhouse/live-database-introspection.ts @@ -1,4 +1,7 @@ -import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js'; +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from '../../context/ingest/adapters/live-database/types.js'; import type { KtxProjectConnectionConfig } from '../../context/project/config.js'; import { KtxClickHouseScanConnector, @@ -18,7 +21,7 @@ export function createClickHouseLiveDatabaseIntrospection( options: CreateClickHouseLiveDatabaseIntrospectionOptions, ): LiveDatabaseIntrospectionPort { return { - async extractSchema(connectionId: string) { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) { const connection = options.connections[connectionId] as KtxClickHouseConnectionConfig | undefined; const connector = new KtxClickHouseScanConnector({ connectionId, @@ -29,7 +32,11 @@ export function createClickHouseLiveDatabaseIntrospection( }); try { return await connector.introspect( - { connectionId, driver: 'clickhouse' }, + { + connectionId, + driver: 'clickhouse', + ...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}), + }, { runId: `clickhouse-${connectionId}` }, ); } finally { diff --git a/packages/cli/src/connectors/mysql/connector.test.ts b/packages/cli/src/connectors/mysql/connector.test.ts index f9f2d0ad..5a21ada7 100644 --- a/packages/cli/src/connectors/mysql/connector.test.ts +++ b/packages/cli/src/connectors/mysql/connector.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from 'vitest'; import type { FieldPacket, RowDataPacket } from 'mysql2/promise'; import { createMysqlLiveDatabaseIntrospection } from '../../connectors/mysql/live-database-introspection.js'; import { isKtxMysqlConnectionConfig, KtxMysqlScanConnector, mysqlConnectionPoolConfigFromConfig, type KtxMysqlPoolFactory } from '../../connectors/mysql/connector.js'; +import { tableRefSet } from '../../context/scan/table-ref.js'; function mysqlResult(rows: Record[], fields: Array<{ name: string; type?: number }>): [RowDataPacket[], FieldPacket[]] { return [rows as RowDataPacket[], fields as FieldPacket[]]; @@ -275,6 +276,71 @@ describe('KtxMysqlScanConnector', () => { ]); }); + it('limits introspection to tables in tableScope', async () => { + const queries: Array<{ sql: string; params?: unknown }> = []; + const poolFactory: KtxMysqlPoolFactory = { + createPool: vi.fn(() => ({ + getConnection: vi.fn(async () => ({ + query: vi.fn(async (sql: string, params?: unknown): Promise<[RowDataPacket[], FieldPacket[]]> => { + queries.push({ sql, params }); + if (sql.includes('INFORMATION_SCHEMA.TABLES')) { + return mysqlResult( + [ + { + TABLE_SCHEMA: 'analytics', + TABLE_NAME: 'orders', + TABLE_TYPE: 'BASE TABLE', + TABLE_COMMENT: '', + TABLE_ROWS: 2, + }, + ], + [], + ); + } + if (sql.includes('INFORMATION_SCHEMA.COLUMNS')) { + return mysqlResult( + [ + { + TABLE_SCHEMA: 'analytics', + TABLE_NAME: 'orders', + COLUMN_NAME: 'id', + DATA_TYPE: 'int', + IS_NULLABLE: 'NO', + COLUMN_COMMENT: '', + }, + ], + [], + ); + } + return mysqlResult([], []); + }), + release: vi.fn(), + })), + end: vi.fn(async () => undefined), + })), + }; + const connector = new KtxMysqlScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'mysql', + host: 'db.example.test', + database: 'analytics', + username: 'reader', + password: 'secret', // pragma: allowlist secret + }, + poolFactory, + }); + const scope = tableRefSet([{ catalog: null, db: 'analytics', name: 'orders' }]); + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'mysql', tableScope: scope }, + { runId: 'scope-test' }, + ); + expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']); + const tablesQuery = queries.find((query) => query.sql.includes('INFORMATION_SCHEMA.TABLES')); + expect(tablesQuery?.sql).toMatch(/TABLE_NAME IN \(\?\)/); + expect(tablesQuery?.params).toEqual(['analytics', 'orders']); + }); + it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => { const poolFactory = fakePoolFactory(); const connector = new KtxMysqlScanConnector({ diff --git a/packages/cli/src/connectors/mysql/connector.ts b/packages/cli/src/connectors/mysql/connector.ts index 9d92c2e0..82a2384c 100644 --- a/packages/cli/src/connectors/mysql/connector.ts +++ b/packages/cli/src/connectors/mysql/connector.ts @@ -4,6 +4,7 @@ import { homedir } from 'node:os'; import { resolve } from 'node:path'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxTableListEntry, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js'; +import { scopedTableNames } from '../../context/scan/table-ref.js'; import { KtxMysqlDialect } from './dialect.js'; export interface KtxMysqlConnectionConfig { @@ -335,23 +336,37 @@ export class KtxMysqlScanConnector implements KtxScanConnector { this.assertConnection(input.connectionId); const databases = configuredMysqlSchemas(this.connection, this.poolConfig.database); const placeholders = databases.map(() => '?').join(', '); + let allScopedTables: string[] | null = null; + if (input.tableScope) { + allScopedTables = []; + for (const database of databases) { + allScopedTables.push(...scopedTableNames(input.tableScope, { catalog: null, db: database })); + } + if (allScopedTables.length === 0) { + return this.emptySnapshot(databases); + } + } + const tableNameClause = allScopedTables + ? `AND TABLE_NAME IN (${allScopedTables.map(() => '?').join(', ')})` + : ''; + const tableNameParams = allScopedTables ?? []; const tables = await this.queryRaw( ` SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, TABLE_ROWS FROM INFORMATION_SCHEMA.TABLES - WHERE TABLE_SCHEMA IN (${placeholders}) AND TABLE_TYPE IN ('BASE TABLE', 'VIEW') + WHERE TABLE_SCHEMA IN (${placeholders}) AND TABLE_TYPE IN ('BASE TABLE', 'VIEW') ${tableNameClause} ORDER BY TABLE_SCHEMA, TABLE_NAME `, - databases, + [...databases, ...tableNameParams], ); const columns = await this.queryRaw( ` SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA IN (${placeholders}) + WHERE TABLE_SCHEMA IN (${placeholders}) ${tableNameClause} ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION `, - databases, + [...databases, ...tableNameParams], ); const primaryKeys = await this.queryRaw( ` @@ -359,9 +374,10 @@ export class KtxMysqlScanConnector implements KtxScanConnector { FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA IN (${placeholders}) AND CONSTRAINT_NAME = 'PRIMARY' + ${tableNameClause} ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION `, - databases, + [...databases, ...tableNameParams], ); const foreignKeys = await this.queryRaw( ` @@ -369,9 +385,10 @@ export class KtxMysqlScanConnector implements KtxScanConnector { FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_SCHEMA IN (${placeholders}) AND REFERENCED_TABLE_NAME IS NOT NULL + ${tableNameClause} ORDER BY TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME `, - databases, + [...databases, ...tableNameParams], ); const columnsByTable = groupByTable(columns, this.poolConfig.database); @@ -403,6 +420,23 @@ export class KtxMysqlScanConnector implements KtxScanConnector { }; } + private emptySnapshot(databases: string[]): KtxSchemaSnapshot { + return { + connectionId: this.connectionId, + driver: 'mysql', + extractedAt: this.now().toISOString(), + scope: { schemas: databases }, + metadata: { + database: this.poolConfig.database, + schemas: databases, + host: this.poolConfig.host, + table_count: 0, + total_columns: 0, + }, + tables: [], + }; + } + async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns)); diff --git a/packages/cli/src/connectors/mysql/live-database-introspection.ts b/packages/cli/src/connectors/mysql/live-database-introspection.ts index ea649761..897244d5 100644 --- a/packages/cli/src/connectors/mysql/live-database-introspection.ts +++ b/packages/cli/src/connectors/mysql/live-database-introspection.ts @@ -1,4 +1,7 @@ -import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js'; +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from '../../context/ingest/adapters/live-database/types.js'; import type { KtxProjectConnectionConfig } from '../../context/project/config.js'; import { KtxMysqlScanConnector, @@ -18,7 +21,7 @@ export function createMysqlLiveDatabaseIntrospection( options: CreateMysqlLiveDatabaseIntrospectionOptions, ): LiveDatabaseIntrospectionPort { return { - async extractSchema(connectionId: string) { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) { const connection = options.connections[connectionId] as KtxMysqlConnectionConfig | undefined; const connector = new KtxMysqlScanConnector({ connectionId, @@ -28,7 +31,14 @@ export function createMysqlLiveDatabaseIntrospection( now: options.now, }); try { - return await connector.introspect({ connectionId, driver: 'mysql' }, { runId: `mysql-${connectionId}` }); + return await connector.introspect( + { + connectionId, + driver: 'mysql', + ...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}), + }, + { runId: `mysql-${connectionId}` }, + ); } finally { await connector.cleanup(); } diff --git a/packages/cli/src/connectors/postgres/connector.test.ts b/packages/cli/src/connectors/postgres/connector.test.ts index cf595f5c..346c2ef2 100644 --- a/packages/cli/src/connectors/postgres/connector.test.ts +++ b/packages/cli/src/connectors/postgres/connector.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest'; import { createPostgresLiveDatabaseIntrospection } from '../../connectors/postgres/live-database-introspection.js'; import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js'; +import { tableRefSet } from '../../context/scan/table-ref.js'; interface FakeQueryResult { rows: Record[]; @@ -259,6 +260,63 @@ describe('KtxPostgresScanConnector', () => { ).rejects.toThrow('Only read-only SELECT/WITH queries can be executed locally'); }); + it('limits introspection to tables in tableScope', async () => { + const queries: Array<{ sql: string; params?: unknown[] }> = []; + const poolFactory: KtxPostgresPoolFactory = { + createPool() { + return { + async connect() { + return { + query: vi.fn(async (sql: string, params?: unknown[]) => { + queries.push({ sql, params }); + if (sql.includes('FROM pg_catalog.pg_class c')) { + return { rows: [{ table_name: 'orders', table_kind: 'r', row_count: '3', table_comment: null }] }; + } + if (sql.includes('FROM pg_catalog.pg_attribute a')) { + return { + rows: [ + { + table_name: 'orders', + column_name: 'id', + data_type: 'integer', + is_nullable: false, + column_comment: null, + }, + ], + }; + } + return { rows: [] }; + }), + release: vi.fn(), + }; + }, + end: vi.fn(async () => undefined), + }; + }, + }; + const connector = new KtxPostgresScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'postgres', + host: 'db.example.test', + database: 'analytics', + username: 'reader', + password: 'test-password', // pragma: allowlist secret + schema: 'public', + }, + poolFactory, + }); + const scope = tableRefSet([{ catalog: null, db: 'public', name: 'orders' }]); + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'postgres', tableScope: scope }, + { runId: 'scope-test' }, + ); + expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']); + const tablesQuery = queries.find((query) => query.sql.includes('FROM pg_catalog.pg_class c')); + expect(tablesQuery?.sql).toMatch(/c\.relname = ANY\(\$2\)/); + expect(tablesQuery?.params).toEqual(['public', ['orders']]); + }); + it('adapts native PostgreSQL snapshots to live-database introspection for local ingest', async () => { const introspection = createPostgresLiveDatabaseIntrospection({ connections: { diff --git a/packages/cli/src/connectors/postgres/connector.ts b/packages/cli/src/connectors/postgres/connector.ts index 36a2bda6..5cb94bf4 100644 --- a/packages/cli/src/connectors/postgres/connector.ts +++ b/packages/cli/src/connectors/postgres/connector.ts @@ -3,6 +3,7 @@ import { homedir } from 'node:os'; import { resolve } from 'node:path'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js'; +import { scopedTableNames } from '../../context/scan/table-ref.js'; import { Pool } from 'pg'; import { KtxPostgresDialect } from './dialect.js'; @@ -379,7 +380,9 @@ export class KtxPostgresScanConnector implements KtxScanConnector { const schemas = schemasFromConnection(this.connection); const allTables: KtxSchemaTable[] = []; for (const schema of schemas) { - const tables = await this.loadSchemaTables(schema); + const scopedNames = input.tableScope ? scopedTableNames(input.tableScope, { catalog: null, db: schema }) : null; + if (scopedNames && scopedNames.length === 0) continue; + const tables = await this.loadSchemaTables(schema, scopedNames); allTables.push(...tables); } return { @@ -543,7 +546,11 @@ export class KtxPostgresScanConnector implements KtxScanConnector { } } - private async loadSchemaTables(schema: string): Promise { + private async loadSchemaTables(schema: string, scopedNames: readonly string[] | null): Promise { + if (scopedNames && scopedNames.length === 0) return []; + const pgCatalogScopeClause = scopedNames ? 'AND c.relname = ANY($2)' : ''; + const tableConstraintScopeClause = scopedNames ? 'AND tc.table_name = ANY($2)' : ''; + const scopeValues = scopedNames ? [scopedNames] : []; const tables = await this.queryRaw( ` SELECT @@ -557,9 +564,10 @@ export class KtxPostgresScanConnector implements KtxScanConnector { ON d.objoid = c.oid AND d.objsubid = 0 WHERE n.nspname = $1 AND c.relkind IN ('r', 'v') + ${pgCatalogScopeClause} ORDER BY c.relname `, - [schema], + [schema, ...scopeValues], ); const columns = await this.queryRaw( ` @@ -578,9 +586,10 @@ export class KtxPostgresScanConnector implements KtxScanConnector { AND c.relkind IN ('r', 'v') AND a.attnum > 0 AND NOT a.attisdropped + ${pgCatalogScopeClause} ORDER BY c.relname, a.attnum `, - [schema], + [schema, ...scopeValues], ); const primaryKeys = await this.queryRaw( ` @@ -591,9 +600,10 @@ export class KtxPostgresScanConnector implements KtxScanConnector { AND tc.table_schema = kcu.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' AND tc.table_schema = $1 + ${tableConstraintScopeClause} ORDER BY tc.table_name, kcu.ordinal_position `, - [schema], + [schema, ...scopeValues], ); const foreignKeys = await this.queryRaw( ` @@ -613,9 +623,10 @@ export class KtxPostgresScanConnector implements KtxScanConnector { AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_schema = $1 + ${tableConstraintScopeClause} ORDER BY tc.table_name, kcu.column_name `, - [schema], + [schema, ...scopeValues], ); const columnsByTable = groupByTable(columns); diff --git a/packages/cli/src/connectors/postgres/live-database-introspection.ts b/packages/cli/src/connectors/postgres/live-database-introspection.ts index 83e29489..8b4454bc 100644 --- a/packages/cli/src/connectors/postgres/live-database-introspection.ts +++ b/packages/cli/src/connectors/postgres/live-database-introspection.ts @@ -1,4 +1,7 @@ -import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js'; +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from '../../context/ingest/adapters/live-database/types.js'; import type { KtxProjectConnectionConfig } from '../../context/project/config.js'; import { KtxPostgresScanConnector, @@ -18,7 +21,7 @@ export function createPostgresLiveDatabaseIntrospection( options: CreatePostgresLiveDatabaseIntrospectionOptions, ): LiveDatabaseIntrospectionPort { return { - async extractSchema(connectionId: string) { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) { const connection = options.connections[connectionId] as KtxPostgresConnectionConfig | undefined; const connector = new KtxPostgresScanConnector({ connectionId, @@ -28,7 +31,14 @@ export function createPostgresLiveDatabaseIntrospection( now: options.now, }); try { - return await connector.introspect({ connectionId, driver: 'postgres' }, { runId: `postgres-${connectionId}` }); + return await connector.introspect( + { + connectionId, + driver: 'postgres', + ...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}), + }, + { runId: `postgres-${connectionId}` }, + ); } finally { await connector.cleanup(); } diff --git a/packages/cli/src/connectors/snowflake/connector.test.ts b/packages/cli/src/connectors/snowflake/connector.test.ts index 2e181f0e..58761066 100644 --- a/packages/cli/src/connectors/snowflake/connector.test.ts +++ b/packages/cli/src/connectors/snowflake/connector.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest'; import { createSnowflakeLiveDatabaseIntrospection } from '../../connectors/snowflake/live-database-introspection.js'; import { isKtxSnowflakeConnectionConfig, KtxSnowflakeScanConnector, snowflakeConnectionConfigFromConfig, type KtxSnowflakeDriver, type KtxSnowflakeDriverFactory } from '../../connectors/snowflake/connector.js'; +import { tableRefSet } from '../../context/scan/table-ref.js'; function fakeDriverFactory(): KtxSnowflakeDriverFactory { const driver: KtxSnowflakeDriver = { @@ -206,6 +207,61 @@ describe('KtxSnowflakeScanConnector', () => { } }); + it('limits introspection to tables in tableScope', async () => { + const queries: Array<{ sql: string; params?: unknown }> = []; + const getSchemaMetadata = vi.fn(async (_schemaName?: string, scopedNames?: readonly string[] | null) => + scopedNames?.includes('ORDERS') + ? [ + { + name: 'ORDERS', + catalog: 'ANALYTICS', + db: 'MARTS', + rowCount: 10, + comment: null, + columns: [{ name: 'ID', type: 'NUMBER', nullable: false, comment: null }], + }, + ] + : [], + ); + const driverFactory: KtxSnowflakeDriverFactory = { + createDriver: vi.fn(() => ({ + test: vi.fn(async () => ({ success: true })), + query: vi.fn(async (sql: string, params?: unknown) => { + queries.push({ sql, params }); + return { headers: [], rows: [], totalRows: 0, rowCount: 0 }; + }), + getSchemaMetadata, + listSchemas: vi.fn(async () => []), + listTables: vi.fn(async () => []), + cleanup: vi.fn(async () => undefined), + })), + }; + const connector = new KtxSnowflakeScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'snowflake', + authMethod: 'password', + account: 'acct', + warehouse: 'WH', + database: 'ANALYTICS', + schema_name: 'MARTS', + username: 'reader', + password: 'fixture-pass', // pragma: allowlist secret + }, + driverFactory, + }); + const scope = tableRefSet([{ catalog: 'ANALYTICS', db: 'MARTS', name: 'ORDERS' }]); + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'snowflake', tableScope: scope }, + { runId: 'scope-test' }, + ); + expect(snapshot.tables.map((table) => table.name)).toEqual(['ORDERS']); + expect(getSchemaMetadata).toHaveBeenCalledWith('MARTS', ['ORDERS']); + const primaryKeysQuery = queries.find((query) => query.sql.includes('TABLE_CONSTRAINTS')); + expect(primaryKeysQuery?.sql).toMatch(/AND tc\.TABLE_NAME IN \(\?\)/); + expect(primaryKeysQuery?.params).toEqual(['MARTS', 'ANALYTICS', 'ORDERS']); + }); + it('supports read-only query, sampling, distinct values, row counts, schema listing, and cleanup', async () => { const driverFactory = fakeDriverFactory(); const connector = new KtxSnowflakeScanConnector({ diff --git a/packages/cli/src/connectors/snowflake/connector.ts b/packages/cli/src/connectors/snowflake/connector.ts index 61649ea8..25d5eaf3 100644 --- a/packages/cli/src/connectors/snowflake/connector.ts +++ b/packages/cli/src/connectors/snowflake/connector.ts @@ -4,10 +4,12 @@ import { homedir } from 'node:os'; import { resolve } from 'node:path'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableListEntry, type KtxTableSampleResult } from '../../context/scan/types.js'; +import { scopedTableNames } from '../../context/scan/table-ref.js'; import snowflake from 'snowflake-sdk'; import type { Bind, Binds, Connection, ConnectionOptions } from 'snowflake-sdk'; import { KtxSnowflakeDialect } from './dialect.js'; import { assertSafeSnowflakeIdentifier, quoteSnowflakeIdentifier } from './identifiers.js'; +import { configureSnowflakeSdkLogger } from './sdk-logger.js'; export interface KtxSnowflakeConnectionConfig { driver?: string; @@ -57,7 +59,7 @@ export interface KtxSnowflakeRawTableMetadata { export interface KtxSnowflakeDriver { test(): Promise<{ success: boolean; error?: string }>; query(sql: string, params?: unknown): Promise; - getSchemaMetadata(schemaName?: string): Promise; + getSchemaMetadata(schemaName?: string, scopedTableNames?: readonly string[] | null): Promise; listSchemas(): Promise; listTables(schemas?: string[]): Promise; cleanup(): Promise; @@ -80,6 +82,12 @@ export interface KtxSnowflakeSdkOptionsProvider { export interface KtxSnowflakeScanConnectorOptions { connectionId: string; connection: KtxSnowflakeConnectionConfig | undefined; + /** + * KTX project directory. When provided, snowflake-sdk's logger is redirected to + * `/.ktx/logs/snowflake.log` so its JSON output does not bleed into + * the CLI's TTY. Tests that use a fake driverFactory can leave this undefined. + */ + projectDir?: string; driverFactory?: KtxSnowflakeDriverFactory; sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider; env?: NodeJS.ProcessEnv; @@ -290,24 +298,32 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver { } } - async getSchemaMetadata(schemaName = this.resolved.schemas[0] ?? 'PUBLIC'): Promise { + async getSchemaMetadata( + schemaName = this.resolved.schemas[0] ?? 'PUBLIC', + scopedTableNames: readonly string[] | null = null, + ): Promise { + const scopeClause = + scopedTableNames && scopedTableNames.length > 0 + ? `AND TABLE_NAME IN (${scopedTableNames.map(() => '?').join(', ')})` + : ''; + const scopeParams = scopedTableNames ?? []; const tablesResult = await this.query( ` SELECT TABLE_NAME, TABLE_TYPE, COMMENT, ROW_COUNT FROM INFORMATION_SCHEMA.TABLES - WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ? + WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ? ${scopeClause} ORDER BY TABLE_NAME `, - [schemaName, this.resolved.database], + [schemaName, this.resolved.database, ...scopeParams], ); const columnsResult = await this.query( ` SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COMMENT, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ? + WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ? ${scopeClause} ORDER BY TABLE_NAME, ORDINAL_POSITION `, - [schemaName, this.resolved.database], + [schemaName, this.resolved.database, ...scopeParams], ); const columnsByTable = new Map(); for (const row of columnsResult.rows) { @@ -513,6 +529,9 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector { this.driverFactory = options.driverFactory ?? new DefaultSnowflakeDriverFactory(); this.now = options.now ?? (() => new Date()); this.id = `snowflake:${options.connectionId}`; + if (options.projectDir) { + configureSnowflakeSdkLogger(options.projectDir); + } } async testConnection(): Promise<{ success: boolean; error?: string }> { @@ -523,7 +542,11 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector { this.assertConnection(input.connectionId); const tables: KtxSchemaTable[] = []; for (const schemaName of this.resolved.schemas) { - const rawTables = await this.getDriver().getSchemaMetadata(schemaName); + const scopedNames = input.tableScope + ? scopedTableNames(input.tableScope, { catalog: this.resolved.database, db: schemaName }) + : null; + if (scopedNames && scopedNames.length === 0) continue; + const rawTables = await this.getDriver().getSchemaMetadata(schemaName, scopedNames); const primaryKeys = await this.primaryKeys(rawTables.map((table) => table.name), schemaName); tables.push(...rawTables.map((table) => this.toSchemaTable(table, primaryKeys))); } @@ -663,6 +686,7 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector { if (tableNames.length === 0) { return grouped; } + const tableNamePlaceholders = tableNames.map(() => '?').join(', '); try { const result = await this.getDriver().query( ` @@ -675,9 +699,10 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector { WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY' AND tc.TABLE_SCHEMA = ? AND tc.TABLE_CATALOG = ? + AND tc.TABLE_NAME IN (${tableNamePlaceholders}) ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION `, - [schemaName, this.resolved.database], + [schemaName, this.resolved.database, ...tableNames], ); for (const row of result.rows) { const tableName = String(row[0]); diff --git a/packages/cli/src/connectors/snowflake/historic-sql-query-client.ts b/packages/cli/src/connectors/snowflake/historic-sql-query-client.ts new file mode 100644 index 00000000..7d4070f5 --- /dev/null +++ b/packages/cli/src/connectors/snowflake/historic-sql-query-client.ts @@ -0,0 +1,31 @@ +import { KtxSnowflakeScanConnector, type KtxSnowflakeScanConnectorOptions } from './connector.js'; + +export type KtxSnowflakeHistoricSqlQueryClientOptions = KtxSnowflakeScanConnectorOptions; + +export class KtxSnowflakeHistoricSqlQueryClient { + private readonly connectionId: string; + private readonly connector: KtxSnowflakeScanConnector; + + constructor(options: KtxSnowflakeHistoricSqlQueryClientOptions) { + this.connectionId = options.connectionId; + this.connector = new KtxSnowflakeScanConnector(options); + } + + async executeQuery( + sql: string, + ): Promise<{ headers: string[]; rows: unknown[][]; totalRows: number }> { + const result = await this.connector.executeReadOnly( + { connectionId: this.connectionId, sql }, + {} as never, + ); + return { + headers: result.headers, + rows: result.rows, + totalRows: result.totalRows, + }; + } + + async cleanup(): Promise { + await this.connector.cleanup(); + } +} diff --git a/packages/cli/src/connectors/snowflake/live-database-introspection.ts b/packages/cli/src/connectors/snowflake/live-database-introspection.ts index 58812c1a..2becd219 100644 --- a/packages/cli/src/connectors/snowflake/live-database-introspection.ts +++ b/packages/cli/src/connectors/snowflake/live-database-introspection.ts @@ -1,4 +1,7 @@ -import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js'; +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from '../../context/ingest/adapters/live-database/types.js'; import type { KtxProjectConnectionConfig } from '../../context/project/config.js'; import { KtxSnowflakeScanConnector, @@ -9,6 +12,7 @@ import { interface CreateSnowflakeLiveDatabaseIntrospectionOptions { connections: Record; + projectDir?: string; driverFactory?: KtxSnowflakeDriverFactory; sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider; now?: () => Date; @@ -18,18 +22,23 @@ export function createSnowflakeLiveDatabaseIntrospection( options: CreateSnowflakeLiveDatabaseIntrospectionOptions, ): LiveDatabaseIntrospectionPort { return { - async extractSchema(connectionId: string) { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) { const connection = options.connections[connectionId] as KtxSnowflakeConnectionConfig | undefined; const connector = new KtxSnowflakeScanConnector({ connectionId, connection, + ...(options.projectDir ? { projectDir: options.projectDir } : {}), driverFactory: options.driverFactory, sdkOptionsProvider: options.sdkOptionsProvider, now: options.now, }); try { return await connector.introspect( - { connectionId, driver: 'snowflake' }, + { + connectionId, + driver: 'snowflake', + ...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}), + }, { runId: `snowflake-${connectionId}` }, ); } finally { diff --git a/packages/cli/src/connectors/snowflake/sdk-logger.test.ts b/packages/cli/src/connectors/snowflake/sdk-logger.test.ts new file mode 100644 index 00000000..73bf0c76 --- /dev/null +++ b/packages/cli/src/connectors/snowflake/sdk-logger.test.ts @@ -0,0 +1,57 @@ +import { mkdtempSync, rmSync, statSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join, resolve } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +const { configure } = vi.hoisted(() => ({ configure: vi.fn() })); +vi.mock('snowflake-sdk', () => ({ + default: { configure }, +})); + +import { + configureSnowflakeSdkLogger, + resetSnowflakeSdkLoggerConfigurationForTests, +} from './sdk-logger.js'; + +describe('configureSnowflakeSdkLogger', () => { + let projectDir: string; + + beforeEach(() => { + configure.mockReset(); + resetSnowflakeSdkLoggerConfigurationForTests(); + projectDir = mkdtempSync(join(tmpdir(), 'ktx-snowflake-logger-')); + }); + + afterEach(() => { + rmSync(projectDir, { recursive: true, force: true }); + }); + + it('routes logs to /.ktx/logs/snowflake.log with console output disabled', () => { + const expected = resolve(projectDir, '.ktx', 'logs', 'snowflake.log'); + const returned = configureSnowflakeSdkLogger(projectDir); + expect(returned).toBe(expected); + expect(configure).toHaveBeenCalledTimes(1); + expect(configure).toHaveBeenCalledWith({ + logFilePath: expected, + additionalLogToConsole: false, + }); + expect(statSync(resolve(projectDir, '.ktx', 'logs')).isDirectory()).toBe(true); + }); + + it('is idempotent for the same projectDir', () => { + configureSnowflakeSdkLogger(projectDir); + configureSnowflakeSdkLogger(projectDir); + expect(configure).toHaveBeenCalledTimes(1); + }); + + it('reconfigures when projectDir changes', () => { + const other = mkdtempSync(join(tmpdir(), 'ktx-snowflake-logger-other-')); + try { + configureSnowflakeSdkLogger(projectDir); + configureSnowflakeSdkLogger(other); + expect(configure).toHaveBeenCalledTimes(2); + } finally { + rmSync(other, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/cli/src/connectors/snowflake/sdk-logger.ts b/packages/cli/src/connectors/snowflake/sdk-logger.ts new file mode 100644 index 00000000..f6eed277 --- /dev/null +++ b/packages/cli/src/connectors/snowflake/sdk-logger.ts @@ -0,0 +1,32 @@ +import { mkdirSync } from 'node:fs'; +import { resolve } from 'node:path'; +import snowflake from 'snowflake-sdk'; + +let configuredLogFilePath: string | null = null; + +/** + * Redirects the snowflake-sdk logger to a project-scoped file so its JSON output + * does not bleed into the CLI's TTY (which would pollute the setup wizard and + * break the in-place progress repainter in `context-build-view.ts`). + * + * Idempotent per process: re-calling with the same projectDir is a no-op. + */ +export function configureSnowflakeSdkLogger(projectDir: string): string { + const logDir = resolve(projectDir, '.ktx', 'logs'); + const logFilePath = resolve(logDir, 'snowflake.log'); + if (configuredLogFilePath === logFilePath) { + return logFilePath; + } + mkdirSync(logDir, { recursive: true }); + snowflake.configure({ + logFilePath, + additionalLogToConsole: false, + }); + configuredLogFilePath = logFilePath; + return logFilePath; +} + +/** @internal */ +export function resetSnowflakeSdkLoggerConfigurationForTests(): void { + configuredLogFilePath = null; +} diff --git a/packages/cli/src/connectors/sqlite/connector.test.ts b/packages/cli/src/connectors/sqlite/connector.test.ts index 77ec4b3c..ecd283b7 100644 --- a/packages/cli/src/connectors/sqlite/connector.test.ts +++ b/packages/cli/src/connectors/sqlite/connector.test.ts @@ -6,6 +6,7 @@ import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { createSqliteLiveDatabaseIntrospection } from '../../connectors/sqlite/live-database-introspection.js'; import { isKtxSqliteConnectionConfig, KtxSqliteScanConnector, sqliteDatabasePathFromConfig } from '../../connectors/sqlite/connector.js'; +import { tableRefSet } from '../../context/scan/table-ref.js'; describe('KtxSqliteScanConnector', () => { let tempDir: string; @@ -196,6 +197,19 @@ describe('KtxSqliteScanConnector', () => { ).resolves.toBeNull(); }); + it('limits introspection to tables in tableScope', async () => { + const connector = new KtxSqliteScanConnector({ + connectionId: 'warehouse', + connection: { driver: 'sqlite', path: dbPath }, + }); + const scope = tableRefSet([{ catalog: null, db: null, name: 'orders' }]); + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'sqlite', tableScope: scope }, + { runId: 'scope-test' }, + ); + expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']); + }); + it('adapts native SQLite snapshots to live-database introspection for local ingest', async () => { const introspection = createSqliteLiveDatabaseIntrospection({ projectDir: tempDir, diff --git a/packages/cli/src/connectors/sqlite/connector.ts b/packages/cli/src/connectors/sqlite/connector.ts index e915c776..17b33a71 100644 --- a/packages/cli/src/connectors/sqlite/connector.ts +++ b/packages/cli/src/connectors/sqlite/connector.ts @@ -6,6 +6,7 @@ import { fileURLToPath } from 'node:url'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { normalizeQueryRows } from '../../context/connections/query-executor.js'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js'; +import { scopedTableNames } from '../../context/scan/table-ref.js'; import { KtxSqliteDialect } from './dialect.js'; export interface KtxSqliteConnectionConfig { @@ -181,11 +182,16 @@ export class KtxSqliteScanConnector implements KtxScanConnector { async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const database = this.database(); - const rawTables = database - .prepare( - `SELECT name, type FROM sqlite_master WHERE type IN ('table', 'view') AND name NOT LIKE 'sqlite_%' ORDER BY name`, - ) - .all() as SqliteMasterRow[]; + const scopedNames = input.tableScope ? scopedTableNames(input.tableScope, { catalog: null, db: null }) : null; + const scopeClause = scopedNames ? `AND name IN (${scopedNames.map(() => '?').join(', ')})` : ''; + const rawTables = + scopedNames && scopedNames.length === 0 + ? [] + : (database + .prepare( + `SELECT name, type FROM sqlite_master WHERE type IN ('table', 'view') AND name NOT LIKE 'sqlite_%' ${scopeClause} ORDER BY name`, + ) + .all(...(scopedNames ?? [])) as SqliteMasterRow[]); const tables = rawTables.map((table) => this.readTable(database, table)); const fileStats = existsSync(this.dbPath) ? statSync(this.dbPath) : null; return { diff --git a/packages/cli/src/connectors/sqlite/live-database-introspection.ts b/packages/cli/src/connectors/sqlite/live-database-introspection.ts index 62a1f8c5..93fae6a9 100644 --- a/packages/cli/src/connectors/sqlite/live-database-introspection.ts +++ b/packages/cli/src/connectors/sqlite/live-database-introspection.ts @@ -1,4 +1,7 @@ -import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js'; +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from '../../context/ingest/adapters/live-database/types.js'; import type { KtxProjectConnectionConfig } from '../../context/project/config.js'; import { KtxSqliteScanConnector, type KtxSqliteConnectionConfig } from './connector.js'; @@ -12,7 +15,7 @@ export function createSqliteLiveDatabaseIntrospection( options: CreateSqliteLiveDatabaseIntrospectionOptions, ): LiveDatabaseIntrospectionPort { return { - async extractSchema(connectionId: string) { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) { const connection = options.connections[connectionId] as KtxSqliteConnectionConfig | undefined; const connector = new KtxSqliteScanConnector({ connectionId, @@ -21,7 +24,14 @@ export function createSqliteLiveDatabaseIntrospection( now: options.now, }); try { - return await connector.introspect({ connectionId, driver: 'sqlite' }, { runId: `sqlite-${connectionId}` }); + return await connector.introspect( + { + connectionId, + driver: 'sqlite', + ...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}), + }, + { runId: `sqlite-${connectionId}` }, + ); } finally { await connector.cleanup(); } diff --git a/packages/cli/src/connectors/sqlserver/connector.test.ts b/packages/cli/src/connectors/sqlserver/connector.test.ts index bd9a8af1..ef00bd3a 100644 --- a/packages/cli/src/connectors/sqlserver/connector.test.ts +++ b/packages/cli/src/connectors/sqlserver/connector.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest'; import { createSqlServerLiveDatabaseIntrospection } from '../../connectors/sqlserver/live-database-introspection.js'; import { isKtxSqlServerConnectionConfig, KtxSqlServerScanConnector, sqlServerConnectionPoolConfigFromConfig, type KtxSqlServerPoolFactory, type KtxSqlServerQueryResult } from '../../connectors/sqlserver/connector.js'; +import { tableRefSet } from '../../context/scan/table-ref.js'; function recordset>( rows: T[], @@ -290,6 +291,55 @@ describe('KtxSqlServerScanConnector', () => { await connector.cleanup(); }); + it('limits introspection to tables in tableScope', async () => { + const queries: string[] = []; + const inputs: Array<{ name: string; value: unknown }> = []; + const request = { + input: vi.fn((name: string, value: unknown) => { + inputs.push({ name, value }); + return request; + }), + query: vi.fn(async (sql: string): Promise => { + queries.push(sql); + if (sql.includes('INFORMATION_SCHEMA.TABLES')) { + return result([{ table_name: 'orders', table_type: 'BASE TABLE' }], ['table_name', 'table_type']); + } + if (sql.includes('INFORMATION_SCHEMA.COLUMNS')) { + return result( + [{ table_name: 'orders', column_name: 'id', data_type: 'int', is_nullable: 'NO' }], + ['table_name', 'column_name', 'data_type', 'is_nullable'], + ); + } + return result([], []); + }), + }; + const poolFactory: KtxSqlServerPoolFactory = { + createPool: vi.fn(async () => ({ + request: () => request, + close: vi.fn(async () => undefined), + })), + }; + const connector = new KtxSqlServerScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'sqlserver', + host: 'db.example.test', + database: 'analytics', + username: 'reader', + schema: 'dbo', + }, + poolFactory, + }); + const scope = tableRefSet([{ catalog: 'analytics', db: 'dbo', name: 'orders' }]); + const snapshot = await connector.introspect( + { connectionId: 'warehouse', driver: 'sqlserver', tableScope: scope }, + { runId: 'scope-test' }, + ); + expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']); + expect(queries.find((query) => query.includes('INFORMATION_SCHEMA.TABLES'))).toMatch(/TABLE_NAME IN \(@table_0\)/); + expect(inputs).toEqual(expect.arrayContaining([{ name: 'table_0', value: 'orders' }])); + }); + it('adapts native SQL Server snapshots to live-database introspection for local ingest', async () => { const introspection = createSqlServerLiveDatabaseIntrospection({ connections: { diff --git a/packages/cli/src/connectors/sqlserver/connector.ts b/packages/cli/src/connectors/sqlserver/connector.ts index d9c227d7..64b8075e 100644 --- a/packages/cli/src/connectors/sqlserver/connector.ts +++ b/packages/cli/src/connectors/sqlserver/connector.ts @@ -1,5 +1,6 @@ import { assertReadOnlySql } from '../../context/connections/read-only-sql.js'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js'; +import { scopedTableNames } from '../../context/scan/table-ref.js'; import { readFileSync } from 'node:fs'; import { homedir } from 'node:os'; import { resolve } from 'node:path'; @@ -121,6 +122,20 @@ function sqlRecordset( return recordset; } +function tableScopeSql( + scopedNames: readonly string[] | null, + columnExpression: string, +): { clause: string; params: Record } { + if (!scopedNames) return { clause: '', params: {} }; + const params: Record = {}; + const placeholders = scopedNames.map((name, index) => { + const key = `table_${index}`; + params[key] = name; + return `@${key}`; + }); + return { clause: `AND ${columnExpression} IN (${placeholders.join(', ')})`, params }; +} + class DefaultSqlServerPoolFactory implements KtxSqlServerPoolFactory { async createPool(config: KtxSqlServerPoolConfig): Promise { const pool = await new sql.ConnectionPool(config as sql.config).connect(); @@ -314,7 +329,10 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { this.assertConnection(input.connectionId); const tables: KtxSchemaTable[] = []; for (const schemaName of this.schemas) { - tables.push(...(await this.introspectSchema(schemaName))); + const scopedNames = input.tableScope + ? scopedTableNames(input.tableScope, { catalog: this.poolConfig.database, db: schemaName }) + : null; + tables.push(...(await this.introspectSchema(schemaName, scopedNames))); } return { connectionId: this.connectionId, @@ -461,16 +479,19 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { } } - private async introspectSchema(schemaName: string): Promise { + private async introspectSchema(schemaName: string, scopedNames: readonly string[] | null): Promise { + if (scopedNames && scopedNames.length === 0) return []; + const tableScope = tableScopeSql(scopedNames, 'TABLE_NAME'); const tables = await this.queryRaw<{ table_name: string; table_type: string }>( ` SELECT TABLE_NAME AS table_name, TABLE_TYPE AS table_type FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = @schemaName AND TABLE_TYPE IN ('BASE TABLE', 'VIEW') + ${tableScope.clause} ORDER BY TABLE_NAME `, - { schemaName }, + { schemaName, ...tableScope.params }, ); const columns = await this.queryRaw<{ table_name: string; @@ -482,15 +503,16 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { SELECT TABLE_NAME AS table_name, COLUMN_NAME AS column_name, DATA_TYPE AS data_type, IS_NULLABLE AS is_nullable FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = @schemaName + ${tableScope.clause} ORDER BY TABLE_NAME, ORDINAL_POSITION `, - { schemaName }, + { schemaName, ...tableScope.params }, ); - const tableComments = await this.tableComments(schemaName); - const columnComments = await this.columnComments(schemaName); - const primaryKeys = await this.primaryKeys(schemaName); - const foreignKeys = await this.foreignKeys(schemaName); - const rowCounts = await this.rowCounts(schemaName); + const tableComments = await this.tableComments(schemaName, scopedNames); + const columnComments = await this.columnComments(schemaName, scopedNames); + const primaryKeys = await this.primaryKeys(schemaName, scopedNames); + const foreignKeys = await this.foreignKeys(schemaName, scopedNames); + const rowCounts = await this.rowCounts(schemaName, scopedNames); const columnsByTable = groupByTable(columns); const foreignKeysByTable = groupByTable(foreignKeys); @@ -508,7 +530,8 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { })); } - private async tableComments(schemaName: string): Promise> { + private async tableComments(schemaName: string, scopedNames: readonly string[] | null): Promise> { + const tableScope = tableScopeSql(scopedNames, 'o.name'); const rows = await this.queryRaw<{ table_name: string; table_comment: string }>( ` SELECT o.name AS table_name, CAST(ep.value AS NVARCHAR(MAX)) AS table_comment @@ -519,13 +542,15 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { AND ep.name = 'MS_Description' WHERE s.name = @schemaName AND o.type IN ('U', 'V') + ${tableScope.clause} `, - { schemaName }, + { schemaName, ...tableScope.params }, ); return new Map(rows.map((row) => [row.table_name, row.table_comment])); } - private async columnComments(schemaName: string): Promise> { + private async columnComments(schemaName: string, scopedNames: readonly string[] | null): Promise> { + const tableScope = tableScopeSql(scopedNames, 'o.name'); const rows = await this.queryRaw<{ table_name: string; column_name: string; column_comment: string }>( ` SELECT o.name AS table_name, c.name AS column_name, CAST(ep.value AS NVARCHAR(MAX)) AS column_comment @@ -537,13 +562,18 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { AND ep.name = 'MS_Description' WHERE s.name = @schemaName AND o.type IN ('U', 'V') + ${tableScope.clause} `, - { schemaName }, + { schemaName, ...tableScope.params }, ); return new Map(rows.map((row) => [`${row.table_name}.${row.column_name}`, row.column_comment])); } - private async primaryKeys(schemaName: string): Promise>> { + private async primaryKeys( + schemaName: string, + scopedNames: readonly string[] | null, + ): Promise>> { + const tableScope = tableScopeSql(scopedNames, 'tc.TABLE_NAME'); const rows = await this.queryRaw<{ table_name: string; column_name: string }>( ` SELECT tc.TABLE_NAME AS table_name, kcu.COLUMN_NAME AS column_name @@ -553,9 +583,10 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY' AND tc.TABLE_SCHEMA = @schemaName + ${tableScope.clause} ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION `, - { schemaName }, + { schemaName, ...tableScope.params }, ); const grouped = new Map>(); for (const row of rows) { @@ -566,7 +597,10 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { return grouped; } - private async foreignKeys(schemaName: string): Promise< + private async foreignKeys( + schemaName: string, + scopedNames: readonly string[] | null, + ): Promise< Array<{ table_name: string; column_name: string; @@ -576,6 +610,7 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { constraint_name: string; }> > { + const tableScope = tableScopeSql(scopedNames, 'fk.TABLE_NAME'); return this.queryRaw( ` SELECT @@ -596,13 +631,15 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { AND pk.CONSTRAINT_NAME = rc.UNIQUE_CONSTRAINT_NAME AND pk.ORDINAL_POSITION = fk.ORDINAL_POSITION WHERE fk.TABLE_SCHEMA = @schemaName + ${tableScope.clause} ORDER BY fk.TABLE_NAME, fk.COLUMN_NAME `, - { schemaName }, + { schemaName, ...tableScope.params }, ); } - private async rowCounts(schemaName: string): Promise> { + private async rowCounts(schemaName: string, scopedNames: readonly string[] | null): Promise> { + const tableScope = tableScopeSql(scopedNames, 't.name'); const rows = await this.queryRaw<{ table_name: string; row_count: unknown }>( ` SELECT t.name AS table_name, SUM(p.rows) AS row_count @@ -611,9 +648,10 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE s.name = @schemaName AND p.index_id IN (0, 1) + ${tableScope.clause} GROUP BY t.name `, - { schemaName }, + { schemaName, ...tableScope.params }, ); return new Map(rows.map((row) => [row.table_name, firstNumber(row.row_count) ?? 0])); } diff --git a/packages/cli/src/connectors/sqlserver/live-database-introspection.ts b/packages/cli/src/connectors/sqlserver/live-database-introspection.ts index 6bd54ba1..6468856d 100644 --- a/packages/cli/src/connectors/sqlserver/live-database-introspection.ts +++ b/packages/cli/src/connectors/sqlserver/live-database-introspection.ts @@ -1,4 +1,7 @@ -import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js'; +import type { + LiveDatabaseIntrospectionOptions, + LiveDatabaseIntrospectionPort, +} from '../../context/ingest/adapters/live-database/types.js'; import type { KtxProjectConnectionConfig } from '../../context/project/config.js'; import { KtxSqlServerScanConnector, @@ -18,7 +21,7 @@ export function createSqlServerLiveDatabaseIntrospection( options: CreateSqlServerLiveDatabaseIntrospectionOptions, ): LiveDatabaseIntrospectionPort { return { - async extractSchema(connectionId: string) { + async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) { const connection = options.connections[connectionId] as KtxSqlServerConnectionConfig | undefined; const connector = new KtxSqlServerScanConnector({ connectionId, @@ -29,7 +32,11 @@ export function createSqlServerLiveDatabaseIntrospection( }); try { return await connector.introspect( - { connectionId, driver: 'sqlserver' }, + { + connectionId, + driver: 'sqlserver', + ...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}), + }, { runId: `sqlserver-${connectionId}` }, ); } finally { diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index c734c6b9..9f6e5f78 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -319,7 +319,8 @@ function renderPhaseRow(phase: PhaseState, frame: number, styled: boolean): stri } else if (phase.status === 'skipped') { trailing = styled ? dim('skipped') : 'skipped'; } else if (phase.status === 'failed') { - trailing = styled ? red('failed') : 'failed'; + const label = styled ? red('failed') : 'failed'; + trailing = phase.summary ? `${label} ${phase.summary}` : label; } const bar = `${segments.join(' ')} ${trailing}`.trimEnd(); return ` ${icon} ${name} ${bar}`; diff --git a/packages/cli/src/context/scan/local-scan.test.ts b/packages/cli/src/context/scan/local-scan.test.ts index 081fa055..9e8c7391 100644 --- a/packages/cli/src/context/scan/local-scan.test.ts +++ b/packages/cli/src/context/scan/local-scan.test.ts @@ -8,7 +8,14 @@ import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js'; import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../../context/project/project.js'; import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js'; import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js'; -import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxSchemaSnapshot, KtxSchemaTable } from './types.js'; +import { tableRefKey, tableRefSet } from './table-ref.js'; +import type { + KtxQueryResult, + KtxReadOnlyQueryInput, + KtxScanConnector, + KtxSchemaSnapshot, + KtxSchemaTable, +} from './types.js'; function relationshipSqlResult( input: KtxReadOnlyQueryInput, @@ -551,6 +558,142 @@ describe('local scan', () => { expect(result.report.warnings).toEqual([]); }); + it('keeps prototype connector methods when enabled_tables is configured', async () => { + project.config.connections.warehouse = { + ...project.config.connections.warehouse, + enabled_tables: ['public.customers', 'public.orders'], + }; + const scopedAdapter: SourceAdapter = { + source: 'live-database', + skillNames: ['live_database_ingest'], + async fetch(_pullConfig, stagedDir) { + await mkdir(join(stagedDir, 'tables'), { recursive: true }); + await writeFile( + join(stagedDir, 'connection.json'), + '{"connectionId":"warehouse","driver":"postgres","scope":{"schemas":["public"]},"metadata":{}}\n', + 'utf-8', + ); + await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8'); + await writeFile( + join(stagedDir, 'tables', 'customers.json'), + '{"name":"customers","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":100,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n', + 'utf-8', + ); + await writeFile( + join(stagedDir, 'tables', 'orders.json'), + '{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":1000,"columns":[{"name":"customer_id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":false,"comment":null}],"foreignKeys":[]}\n', + 'utf-8', + ); + }, + async detect() { + return true; + }, + async chunk() { + return { + workUnits: [ + { + unitKey: 'live-database-public-customers', + rawFiles: ['tables/customers.json'], + dependencyPaths: ['connection.json', 'foreign-keys.json'], + peerFileIndex: [], + }, + { + unitKey: 'live-database-public-orders', + rawFiles: ['tables/orders.json'], + dependencyPaths: ['connection.json', 'foreign-keys.json'], + peerFileIndex: [], + }, + ], + }; + }, + }; + class FakeClassConnector implements KtxScanConnector { + readonly id = 'test:warehouse'; + readonly driver = 'postgres' as const; + readonly capabilities = { + structuralIntrospection: true as const, + tableSampling: false, + columnSampling: false, + columnStats: true, + readOnlySql: true, + nestedAnalysis: false, + eventStreamDiscovery: false, + formalForeignKeys: false, + estimatedRowCounts: true, + }; + + async introspect(): Promise { + return { + connectionId: 'warehouse', + driver: 'postgres', + extractedAt: '2026-05-22T00:00:00.000Z', + scope: { schemas: ['public'] }, + metadata: {}, + tables: [ + { + catalog: null, + db: 'public', + name: 'customers', + kind: 'table', + comment: null, + estimatedRows: 100, + foreignKeys: [], + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: null, + }, + ], + }, + { + catalog: null, + db: 'public', + name: 'orders', + kind: 'table', + comment: null, + estimatedRows: 1000, + foreignKeys: [], + columns: [ + { + name: 'customer_id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: false, + comment: null, + }, + ], + }, + ], + }; + } + + async executeReadOnly(input: KtxReadOnlyQueryInput): Promise { + return relationshipSqlResult(input); + } + } + + const result = await runLocalScan({ + project, + adapters: [scopedAdapter], + connectionId: 'warehouse', + mode: 'relationships', + detectRelationships: true, + connector: new FakeClassConnector(), + jobId: 'scan-prototype-connector-scope', + now: () => new Date('2026-05-22T00:00:00.000Z'), + }); + + expect(result.report.relationships.accepted).toBe(1); + expect(result.report.warnings).toEqual([]); + }); + it('threads scan relationship settings into relationship-only local scans', async () => { project.config.scan.enrichment = { mode: 'deterministic' }; project.config.scan.relationships = { @@ -1512,15 +1655,15 @@ describe('resolveEnabledTables', () => { expect(resolveEnabledTables({ driver: 'postgres', enabled_tables: [] })).toBeNull(); }); - it('returns Set of enabled table names', () => { + it('returns a canonical set of enabled table refs', () => { const result = resolveEnabledTables({ driver: 'postgres', enabled_tables: ['public.users', 'public.orders'], }); expect(result).toBeInstanceOf(Set); expect(result!.size).toBe(2); - expect(result!.has('public.users')).toBe(true); - expect(result!.has('public.orders')).toBe(true); + expect(result!.has(tableRefKey({ catalog: null, db: 'public', name: 'users' }))).toBe(true); + expect(result!.has(tableRefKey({ catalog: null, db: 'public', name: 'orders' }))).toBe(true); }); it('returns null for undefined connection', () => { @@ -1557,7 +1700,10 @@ describe('filterSnapshotTables', () => { { db: 'public', name: 'orders' }, { db: 'public', name: 'logs' }, ]); - const enabled = new Set(['public.users', 'public.orders']); + const enabled = tableRefSet([ + { catalog: null, db: 'public', name: 'users' }, + { catalog: null, db: 'public', name: 'orders' }, + ]); const filtered = filterSnapshotTables(snapshot, enabled); expect(filtered.tables).toHaveLength(2); expect(filtered.tables.map((t) => t.name)).toEqual(['users', 'orders']); @@ -1565,14 +1711,14 @@ describe('filterSnapshotTables', () => { it('returns empty tables when none match', () => { const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]); - const enabled = new Set(['public.orders']); + const enabled = tableRefSet([{ catalog: null, db: 'public', name: 'orders' }]); const filtered = filterSnapshotTables(snapshot, enabled); expect(filtered.tables).toHaveLength(0); }); it('preserves other snapshot fields', () => { const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]); - const enabled = new Set(['public.users']); + const enabled = tableRefSet([{ catalog: null, db: 'public', name: 'users' }]); const filtered = filterSnapshotTables(snapshot, enabled); expect(filtered.connectionId).toBe('test'); expect(filtered.driver).toBe('postgres'); diff --git a/packages/cli/src/context/scan/local-scan.ts b/packages/cli/src/context/scan/local-scan.ts index 35333f79..17aa587a 100644 --- a/packages/cli/src/context/scan/local-scan.ts +++ b/packages/cli/src/context/scan/local-scan.ts @@ -25,14 +25,11 @@ import type { KtxConnectionDriver, KtxProgressPort, KtxScanConnector, - KtxScanContext, KtxScanEnrichmentStateSummary, - KtxScanInput, KtxScanMode, KtxScanReport, KtxScanTrigger, KtxScanWarning, - KtxSchemaSnapshot, } from './types.js'; function enrichmentResolutionWarning( @@ -370,17 +367,6 @@ async function readScanReport( } } -function createFilteredConnector(connector: KtxScanConnector, enabledTables: Set): KtxScanConnector { - return { - ...connector, - async introspect(input: KtxScanInput, ctx: KtxScanContext): Promise { - const snapshot = await connector.introspect(input, ctx); - return filterSnapshotTables(snapshot, enabledTables); - }, - }; -} - - function withInternalLiveDatabaseAdapter(project: KtxLocalProject): KtxLocalProject { if (project.config.ingest.adapters.includes(LIVE_DATABASE_ADAPTER)) { return project; @@ -408,8 +394,8 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise { expect(io.stdout()).not.toContain('Debug:'); }); - it('prints query-history retry guidance for query-history facet failures', async () => { + it('skips the query-history facet but keeps the target green when query-history fails', async () => { const io = makeIo(); const project = deepReadyProject({ warehouse: { driver: 'postgres', context: { depth: 'deep' } }, @@ -935,11 +935,13 @@ describe('runKtxPublicIngest', () => { io.io, { loadProject: vi.fn(async () => project), runScan, runIngest }, ), - ).resolves.toBe(1); + ).resolves.toBe(0); - expect(io.stdout()).toMatch(/warehouse\s+done\s+failed\s+skipped\s+skipped/); + expect(io.stdout()).toContain('Ingest finished with skipped query history'); + expect(io.stdout()).toMatch(/warehouse\s+done\s+skipped\s+skipped\s+skipped/); + expect(io.stdout()).toContain('Skipped query history:'); expect(io.stdout()).toContain( - 'warehouse failed: Query history failed for 60 tasks. First failure: Google Cloud authentication failed while analyzing query history', + 'Query history failed for 60 tasks. First failure: Google Cloud authentication failed while analyzing query history', ); expect(io.stdout()).not.toContain('warehouse failed: Error:'); expect(io.stdout()).toContain('Retry: ktx ingest warehouse --project-dir /tmp/project --deep --query-history'); @@ -973,8 +975,9 @@ describe('runKtxPublicIngest', () => { io.io, { loadProject: vi.fn(async () => project), runScan, runIngest }, ), - ).resolves.toBe(1); + ).resolves.toBe(0); + expect(io.stdout()).toContain('Ingest finished with skipped query history'); expect(io.stdout()).toContain('Missing bundled Python runtime manifest'); expect(io.stdout()).toContain( 'In a source checkout, build the local runtime assets with: pnpm run artifacts:build', diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index 60b9622c..4e51a27a 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -599,17 +599,64 @@ function markTargetResult( }; } +function markTargetWithSkippedQueryHistory( + target: KtxPublicIngestPlanTarget, + args: Extract, + detail: string, +): KtxPublicIngestTargetResult { + const baseline = markTargetResult(target, args, 'done'); + return { + ...baseline, + steps: baseline.steps.map((step) => + step.operation === 'query-history' ? { ...step, status: 'skipped', detail } : step, + ), + }; +} + +function queryHistoryFailureDetail(input: { + target: KtxPublicIngestPlanTarget; + args: Extract; + capturedOutput?: string; +}): string { + const captured = capturedFailureMessage(input.capturedOutput ?? ''); + return failureDetailWithRetry({ + target: input.target, + args: input.args, + failedOperation: 'query-history', + failureDetail: captured, + }); +} + function resultFailed(result: KtxPublicIngestTargetResult): boolean { return result.steps.some((step) => step.status === 'failed'); } +function resultSkippedQueryHistory( + result: KtxPublicIngestTargetResult, +): { connectionId: string; detail: string } | null { + const skipped = result.steps.find( + (step) => step.operation === 'query-history' && step.status === 'skipped' && step.detail !== undefined, + ); + return skipped?.detail ? { connectionId: result.connectionId, detail: skipped.detail } : null; +} + function stepStatus(result: KtxPublicIngestTargetResult, operation: KtxPublicIngestStepName): string { return result.steps.find((step) => step.operation === operation)?.status ?? 'not-run'; } function renderPlainResults(results: KtxPublicIngestTargetResult[], io: KtxCliIo): void { const failures = results.filter(resultFailed); - io.stdout.write(failures.length > 0 ? 'Ingest finished with partial failures\n' : 'Ingest finished\n'); + const skippedQueryHistory = results.map(resultSkippedQueryHistory).filter((entry) => entry !== null) as Array<{ + connectionId: string; + detail: string; + }>; + const headerSuffix = + failures.length > 0 + ? ' with partial failures' + : skippedQueryHistory.length > 0 + ? ' with skipped query history' + : ''; + io.stdout.write(`Ingest finished${headerSuffix}\n`); io.stdout.write('\n'); io.stdout.write('Source Database schema Query history Source ingest Memory update\n'); for (const result of results) { @@ -624,17 +671,22 @@ function renderPlainResults(results: KtxPublicIngestTargetResult[], io: KtxCliIo ); } - if (failures.length === 0) { - return; + if (failures.length > 0) { + io.stdout.write('\nFailed sources:\n'); + for (const result of failures) { + const failedStep = result.steps.find((step) => step.status === 'failed'); + if (!failedStep) { + continue; + } + io.stdout.write(` ${failedStep.detail ?? `${result.connectionId} failed.`}\n`); + } } - io.stdout.write('\nFailed sources:\n'); - for (const result of failures) { - const failedStep = result.steps.find((step) => step.status === 'failed'); - if (!failedStep) { - continue; + if (skippedQueryHistory.length > 0) { + io.stdout.write('\nSkipped query history:\n'); + for (const { detail } of skippedQueryHistory) { + io.stdout.write(` ${detail}\n`); } - io.stdout.write(` ${failedStep.detail ?? `${result.connectionId} failed.`}\n`); } } @@ -814,14 +866,13 @@ export async function executePublicIngestTarget( ? await runIngest(ingestArgs, ingestIo, ingestDeps) : await runIngest(ingestArgs, ingestIo); if (qhExitCode !== 0) { - deps.onPhaseEnd?.('query-history', 'failed'); - return markTargetResult( + const detail = queryHistoryFailureDetail({ target, args, - 'failed', - 'query-history', - capturedIngestIo ? capturedFailureMessage(capturedIngestIo.capturedOutput()) : undefined, - ); + capturedOutput: capturedIngestIo ? capturedIngestIo.capturedOutput() : undefined, + }); + deps.onPhaseEnd?.('query-history', 'failed', detail); + return markTargetWithSkippedQueryHistory(target, args, detail); } deps.onPhaseEnd?.('query-history', 'done'); } diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index 1f3426df..1f678254 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -2041,6 +2041,7 @@ describe('setup databases step', () => { it('writes query history config for supported Snowflake databases after validation succeeds', async () => { const io = makeIo(); + const historicSqlProbe = vi.fn(async () => ({ ok: true, lines: [] })); const result = await runKtxSetupDatabasesStep( { projectDir: tempDir, @@ -2058,12 +2059,20 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), + historicSqlProbe, prompts: makePromptAdapter({ textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''], passwordValues: ['env:SNOWFLAKE_PASSWORD'], }), }, ); + expect(historicSqlProbe).toHaveBeenCalledWith( + expect.objectContaining({ + projectDir: tempDir, + connectionId: 'snowflake', + dialect: 'snowflake', + }), + ); expect(result.status).toBe('ready'); const configText = await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'); @@ -2453,7 +2462,53 @@ describe('setup databases step', () => { expect(io.stdout()).toContain('Query history probe...'); expect(io.stdout()).not.toContain('Historic SQL probe...'); expect(io.stdout()).toContain('pg_stat_statements extension is not installed'); - expect(io.stdout()).toContain('Setup written; first ingest run will fail until fixed.'); + expect(io.stdout()).toContain('Setup written; query history will be skipped until fixed.'); + }); + + it('prints a non-blocking Snowflake query history probe failure with the grants remediation', async () => { + const io = makeIo(); + const historicSqlProbe = vi.fn(async () => ({ + ok: false, + lines: [ + ' FAIL Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + ' Fix: Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ;', + ], + })); + + const result = await runKtxSetupDatabasesStep( + { + projectDir: tempDir, + inputMode: 'disabled', + databaseDrivers: ['snowflake'], + databaseConnectionId: 'warehouse', + databaseSchemas: [], + enableQueryHistory: true, + skipDatabases: false, + }, + io.io, + { + testConnection: vi.fn(async () => 0), + scanConnection: vi.fn(async () => 0), + historicSqlProbe, + prompts: makePromptAdapter({ + textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''], + passwordValues: ['env:SNOWFLAKE_PASSWORD'], + }), + }, + ); + + expect(result.status).toBe('ready'); + expect(historicSqlProbe).toHaveBeenCalledWith( + expect.objectContaining({ + projectDir: tempDir, + connectionId: 'warehouse', + dialect: 'snowflake', + }), + ); + expect(io.stdout()).toContain('Query history probe...'); + expect(io.stdout()).toContain('Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'); + expect(io.stdout()).toContain('GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE'); + expect(io.stdout()).toContain('Setup written; query history will be skipped until fixed.'); }); it('does not run the query history probe when the regular connection test fails', async () => { diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index b070e5dd..a801f8f7 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -341,6 +341,13 @@ function historicSqlProbeFailureLines(error: unknown): string[] { ]; } if (error instanceof Error && error.name === 'HistoricSqlGrantsMissingError') { + const dialect = (error as { dialect?: unknown }).dialect; + if (dialect === 'snowflake') { + return [ + ' FAIL Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + ' Fix: Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ;', + ]; + } return [ ' FAIL Postgres connection role lacks pg_read_all_stats', ' Fix: Run: GRANT pg_read_all_stats TO ;', @@ -353,10 +360,18 @@ function historicSqlProbeFailureLines(error: unknown): string[] { } async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Promise { - if (input.dialect !== 'postgres') { - return { ok: true, lines: [] }; + if (input.dialect === 'postgres') { + return probePostgresHistoricSql(input); } + if (input.dialect === 'snowflake') { + return probeSnowflakeHistoricSql(input); + } + return { ok: true, lines: [] }; +} +async function probePostgresHistoricSql( + input: KtxSetupHistoricSqlProbeInput, +): Promise { const project = await loadKtxProject({ projectDir: input.projectDir }); const connection = project.config.connections[input.connectionId]; const [{ PostgresPgssReader }, { KtxPostgresHistoricSqlQueryClient }, { isKtxPostgresConnectionConfig }] = @@ -394,6 +409,46 @@ async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Pr } } +async function probeSnowflakeHistoricSql( + input: KtxSetupHistoricSqlProbeInput, +): Promise { + const project = await loadKtxProject({ projectDir: input.projectDir }); + const connection = project.config.connections[input.connectionId]; + const [{ SnowflakeHistoricSqlQueryHistoryReader }, { KtxSnowflakeHistoricSqlQueryClient }, { isKtxSnowflakeConnectionConfig }] = + await Promise.all([ + import('./context/ingest/adapters/historic-sql/snowflake-query-history-reader.js'), + import('./connectors/snowflake/historic-sql-query-client.js'), + import('./connectors/snowflake/connector.js'), + ]); + + if (!isKtxSnowflakeConnectionConfig(connection)) { + return { + ok: false, + lines: [` FAIL Connection ${input.connectionId} is not a native Snowflake connection.`], + }; + } + + const client = new KtxSnowflakeHistoricSqlQueryClient({ + connectionId: input.connectionId, + connection, + projectDir: input.projectDir, + }); + try { + const result = await new SnowflakeHistoricSqlQueryHistoryReader().probe(client); + return { + ok: true, + lines: [ + ' OK SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY accessible', + ...result.warnings.map((warning: string) => ` ! ${warning}`), + ], + }; + } catch (error) { + return { ok: false, lines: historicSqlProbeFailureLines(error) }; + } finally { + await client.cleanup(); + } +} + async function defaultListSchemas(projectDir: string, connectionId: string): Promise { const project = await loadKtxProject({ projectDir }); const connection = project.config.connections[connectionId]; @@ -457,7 +512,7 @@ async function defaultListSchemas(projectDir: string, connectionId: string): Pro if (driver === 'snowflake') { const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');; if (!isKtxSnowflakeConnectionConfig(connection)) return []; - const connector = new KtxSnowflakeScanConnector({ connectionId, connection }); + const connector = new KtxSnowflakeScanConnector({ connectionId, connection, projectDir }); try { return await connector.listSchemas(); } finally { @@ -533,7 +588,7 @@ async function defaultListTables( if (driver === 'snowflake') { const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');; if (!isKtxSnowflakeConnectionConfig(connection)) return []; - const connector = new KtxSnowflakeScanConnector({ connectionId, connection }); + const connector = new KtxSnowflakeScanConnector({ connectionId, connection, projectDir }); try { return await connector.listTables(schemas); } finally { @@ -1651,7 +1706,12 @@ async function maybeRunHistoricSqlSetupProbe(input: { const connection = project.config.connections[input.connectionId]; const queryHistory = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection); const driver = normalizeDriver(connection?.driver); - if (queryHistory?.enabled !== true || driver !== 'postgres') { + if (queryHistory?.enabled !== true) { + return; + } + const dialect: 'postgres' | 'snowflake' | null = + driver === 'postgres' ? 'postgres' : driver === 'snowflake' ? 'snowflake' : null; + if (!dialect) { return; } @@ -1660,13 +1720,13 @@ async function maybeRunHistoricSqlSetupProbe(input: { const result = await probe({ projectDir: input.projectDir, connectionId: input.connectionId, - dialect: 'postgres', + dialect, }); for (const line of result.lines) { input.io.stdout.write(`│${line}\n`); } if (!result.ok) { - input.io.stdout.write('│ Setup written; first ingest run will fail until fixed.\n'); + input.io.stdout.write('│ Setup written; query history will be skipped until fixed.\n'); } }