mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
feat(scan): apply tableScope during metadata fetch
This commit is contained in:
parent
b1a2d4c378
commit
a698389bc9
34 changed files with 1129 additions and 134 deletions
|
|
@ -1,6 +1,7 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { bigQueryConnectionConfigFromConfig, isKtxBigQueryConnectionConfig, type KtxBigQueryClient, KtxBigQueryScanConnector, type KtxBigQueryClientFactory, type KtxBigQueryDataset, type KtxBigQueryQueryJob, type KtxBigQueryTableRef } from '../../connectors/bigquery/connector.js';
|
||||
import { createBigQueryLiveDatabaseIntrospection } from '../../connectors/bigquery/live-database-introspection.js';
|
||||
import { tableRefSet } from '../../context/scan/table-ref.js';
|
||||
|
||||
function fakeClientFactory(): KtxBigQueryClientFactory {
|
||||
const queryResults = vi.fn(async (): ReturnType<KtxBigQueryQueryJob['getQueryResults']> => [
|
||||
|
|
@ -234,6 +235,59 @@ describe('KtxBigQueryScanConnector', () => {
|
|||
await connector.cleanup();
|
||||
});
|
||||
|
||||
it('limits introspection to tables in tableScope', async () => {
|
||||
const ordersGet = vi.fn(async (): ReturnType<KtxBigQueryTableRef['get']> => [
|
||||
{
|
||||
metadata: {
|
||||
type: 'TABLE',
|
||||
numRows: '12',
|
||||
schema: { fields: [{ name: 'id', type: 'INT64', mode: 'REQUIRED' }] },
|
||||
},
|
||||
},
|
||||
]);
|
||||
const skippedGet = vi.fn(async (): ReturnType<KtxBigQueryTableRef['get']> => [
|
||||
{ metadata: { type: 'TABLE', numRows: '1', schema: { fields: [] } } },
|
||||
]);
|
||||
const clientFactory: KtxBigQueryClientFactory = {
|
||||
createClient: vi.fn(() => ({
|
||||
getDatasets: vi.fn(async (): ReturnType<KtxBigQueryClient['getDatasets']> => [[{ id: 'analytics' }]]),
|
||||
dataset: vi.fn(
|
||||
(): KtxBigQueryDataset => ({
|
||||
get: vi.fn(async () => [{ id: 'analytics' }]),
|
||||
getTables: vi.fn(async (): ReturnType<KtxBigQueryDataset['getTables']> => [
|
||||
[
|
||||
{ id: 'orders', get: ordersGet },
|
||||
{ id: 'customers', get: skippedGet },
|
||||
],
|
||||
]),
|
||||
}),
|
||||
),
|
||||
createQueryJob: vi.fn(async (): ReturnType<KtxBigQueryClient['createQueryJob']> => [
|
||||
{
|
||||
getQueryResults: async (): ReturnType<KtxBigQueryQueryJob['getQueryResults']> => [
|
||||
[],
|
||||
undefined,
|
||||
{ schema: { fields: [{ name: 'table_name', type: 'STRING' }, { name: 'column_name', type: 'STRING' }] } },
|
||||
],
|
||||
},
|
||||
]),
|
||||
})),
|
||||
};
|
||||
const connector = new KtxBigQueryScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection,
|
||||
clientFactory,
|
||||
});
|
||||
const scope = tableRefSet([{ catalog: 'project-1', db: 'analytics', name: 'orders' }]);
|
||||
const snapshot = await connector.introspect(
|
||||
{ connectionId: 'warehouse', driver: 'bigquery', tableScope: scope },
|
||||
{ runId: 'scope-test' },
|
||||
);
|
||||
expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']);
|
||||
expect(ordersGet).toHaveBeenCalledTimes(1);
|
||||
expect(skippedGet).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('constructs for discovery without dataset scope and lists tables through one region information schema query', async () => {
|
||||
const createQueryJob = vi.fn(
|
||||
async (
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import { BigQuery, type TableField } from '@google-cloud/bigquery';
|
|||
import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from '../../context/connections/bigquery-identifiers.js';
|
||||
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
|
||||
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
|
||||
import { scopedTableNames } from '../../context/scan/table-ref.js';
|
||||
import { readFileSync } from 'node:fs';
|
||||
import { homedir } from 'node:os';
|
||||
import { resolve } from 'node:path';
|
||||
|
|
@ -289,7 +290,10 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
|
|||
const tables: KtxSchemaTable[] = [];
|
||||
const datasetIds = this.requireDatasetIdsForScan();
|
||||
for (const datasetId of datasetIds) {
|
||||
tables.push(...(await this.introspectDataset(datasetId)));
|
||||
const scopedNames = input.tableScope
|
||||
? scopedTableNames(input.tableScope, { catalog: this.resolved.projectId, db: datasetId })
|
||||
: null;
|
||||
tables.push(...(await this.introspectDataset(datasetId, scopedNames)));
|
||||
}
|
||||
return {
|
||||
connectionId: this.connectionId,
|
||||
|
|
@ -362,7 +366,7 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
|
|||
if (!datasetId) {
|
||||
return 0;
|
||||
}
|
||||
const tables = await this.introspectDataset(datasetId);
|
||||
const tables = await this.introspectDataset(datasetId, null);
|
||||
return tables.find((table) => table.name === tableName)?.estimatedRows ?? 0;
|
||||
}
|
||||
|
||||
|
|
@ -463,12 +467,15 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
|
|||
return firstNumber(rows[0]?.[header]);
|
||||
}
|
||||
|
||||
private async introspectDataset(datasetId: string): Promise<KtxSchemaTable[]> {
|
||||
private async introspectDataset(datasetId: string, scopedNames: readonly string[] | null): Promise<KtxSchemaTable[]> {
|
||||
if (scopedNames && scopedNames.length === 0) return [];
|
||||
const dataset = this.getClient().dataset(datasetId);
|
||||
const [tableRefs] = await dataset.getTables();
|
||||
const scopeSet = scopedNames ? new Set(scopedNames) : null;
|
||||
const filteredTableRefs = scopeSet ? tableRefs.filter((tableRef) => scopeSet.has(tableRef.id ?? '')) : tableRefs;
|
||||
const primaryKeys = await this.primaryKeys(datasetId);
|
||||
const tables: KtxSchemaTable[] = [];
|
||||
for (const tableRef of tableRefs) {
|
||||
for (const tableRef of filteredTableRefs) {
|
||||
const tableName = tableRef.id || '';
|
||||
const [table] = await tableRef.get();
|
||||
const fields = table.metadata.schema?.fields ?? [];
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type {
|
||||
LiveDatabaseIntrospectionOptions,
|
||||
LiveDatabaseIntrospectionPort,
|
||||
} from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
|
||||
import {
|
||||
KtxBigQueryScanConnector,
|
||||
|
|
@ -16,7 +19,7 @@ export function createBigQueryLiveDatabaseIntrospection(
|
|||
options: CreateBigQueryLiveDatabaseIntrospectionOptions,
|
||||
): LiveDatabaseIntrospectionPort {
|
||||
return {
|
||||
async extractSchema(connectionId: string) {
|
||||
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
|
||||
const connection = options.connections[connectionId] as KtxBigQueryConnectionConfig | undefined;
|
||||
const connector = new KtxBigQueryScanConnector({
|
||||
connectionId,
|
||||
|
|
@ -25,7 +28,14 @@ export function createBigQueryLiveDatabaseIntrospection(
|
|||
now: options.now,
|
||||
});
|
||||
try {
|
||||
return await connector.introspect({ connectionId, driver: 'bigquery' }, { runId: `bigquery-${connectionId}` });
|
||||
return await connector.introspect(
|
||||
{
|
||||
connectionId,
|
||||
driver: 'bigquery',
|
||||
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
|
||||
},
|
||||
{ runId: `bigquery-${connectionId}` },
|
||||
);
|
||||
} finally {
|
||||
await connector.cleanup();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { clickHouseClientConfigFromConfig, isKtxClickHouseConnectionConfig, KtxClickHouseScanConnector, type KtxClickHouseClientFactory } from '../../connectors/clickhouse/connector.js';
|
||||
import { createClickHouseLiveDatabaseIntrospection } from '../../connectors/clickhouse/live-database-introspection.js';
|
||||
import { tableRefSet } from '../../context/scan/table-ref.js';
|
||||
|
||||
function result<T>(payload: T) {
|
||||
return {
|
||||
|
|
@ -238,6 +239,57 @@ describe('KtxClickHouseScanConnector', () => {
|
|||
]);
|
||||
});
|
||||
|
||||
it('limits introspection to tables in tableScope', async () => {
|
||||
const queries: Array<{ query: string; query_params?: Record<string, unknown> }> = [];
|
||||
const clientFactory: KtxClickHouseClientFactory = {
|
||||
createClient: vi.fn(() => ({
|
||||
query: vi.fn(async (input: { query: string; format: string; query_params?: Record<string, unknown> }) => {
|
||||
queries.push({ query: input.query, query_params: input.query_params });
|
||||
if (input.query.includes('FROM system.tables')) {
|
||||
return result([{ database: 'analytics', name: 'events', engine: 'MergeTree', comment: '' }]);
|
||||
}
|
||||
if (input.query.includes('FROM system.columns')) {
|
||||
return result([
|
||||
{
|
||||
database: 'analytics',
|
||||
table: 'events',
|
||||
name: 'id',
|
||||
type: 'UInt64',
|
||||
comment: '',
|
||||
is_in_primary_key: 1,
|
||||
},
|
||||
]);
|
||||
}
|
||||
if (input.query.includes('FROM system.parts')) {
|
||||
return result([{ database: 'analytics', table: 'events', row_count: '2' }]);
|
||||
}
|
||||
throw new Error(`Unexpected SQL: ${input.query}`);
|
||||
}),
|
||||
close: vi.fn(async () => undefined),
|
||||
})),
|
||||
};
|
||||
const connector = new KtxClickHouseScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'clickhouse',
|
||||
host: 'ch.example.test',
|
||||
database: 'analytics',
|
||||
username: 'reader',
|
||||
password: 'test-pass', // pragma: allowlist secret
|
||||
},
|
||||
clientFactory,
|
||||
});
|
||||
const scope = tableRefSet([{ catalog: null, db: 'analytics', name: 'events' }]);
|
||||
const snapshot = await connector.introspect(
|
||||
{ connectionId: 'warehouse', driver: 'clickhouse', tableScope: scope },
|
||||
{ runId: 'scope-test' },
|
||||
);
|
||||
expect(snapshot.tables.map((table) => table.name)).toEqual(['events']);
|
||||
const tablesQuery = queries.find((query) => query.query.includes('FROM system.tables'));
|
||||
expect(tablesQuery?.query).toContain('AND name IN {table_names:Array(String)}');
|
||||
expect(tablesQuery?.query_params).toEqual({ databases: ['analytics'], table_names: ['events'] });
|
||||
});
|
||||
|
||||
it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => {
|
||||
const clientFactory = fakeClientFactory();
|
||||
const connector = new KtxClickHouseScanConnector({
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { createClient } from '@clickhouse/client';
|
||||
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
|
||||
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableListEntry, type KtxTableSampleResult } from '../../context/scan/types.js';
|
||||
import { scopedTableNames } from '../../context/scan/table-ref.js';
|
||||
import { readFileSync } from 'node:fs';
|
||||
import { Agent as HttpsAgent } from 'node:https';
|
||||
import { homedir } from 'node:os';
|
||||
|
|
@ -285,24 +286,42 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
|
|||
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
|
||||
this.assertConnection(input.connectionId);
|
||||
const databases = configuredClickHouseDatabases(this.connection, this.clientConfig.database);
|
||||
let allScopedTables: string[] | null = null;
|
||||
if (input.tableScope) {
|
||||
allScopedTables = [];
|
||||
for (const database of databases) {
|
||||
allScopedTables.push(...scopedTableNames(input.tableScope, { catalog: null, db: database }));
|
||||
}
|
||||
if (allScopedTables.length === 0) {
|
||||
return this.emptySnapshot(databases);
|
||||
}
|
||||
}
|
||||
const queryParams: Record<string, unknown> = { databases };
|
||||
const tableNameClause = allScopedTables ? 'AND name IN {table_names:Array(String)}' : '';
|
||||
const columnTableNameClause = allScopedTables ? 'AND table IN {table_names:Array(String)}' : '';
|
||||
if (allScopedTables) {
|
||||
queryParams.table_names = allScopedTables;
|
||||
}
|
||||
const tables = await this.queryEachRow<ClickHouseTableRow>(
|
||||
`
|
||||
SELECT database, name, engine, comment
|
||||
FROM system.tables
|
||||
WHERE database IN {databases:Array(String)}
|
||||
AND engine NOT IN ('Dictionary')
|
||||
${tableNameClause}
|
||||
ORDER BY database, name
|
||||
`,
|
||||
{ databases },
|
||||
queryParams,
|
||||
);
|
||||
const columns = await this.queryEachRow<ClickHouseColumnRow>(
|
||||
`
|
||||
SELECT database, table, name, type, comment, is_in_primary_key
|
||||
FROM system.columns
|
||||
WHERE database IN {databases:Array(String)}
|
||||
${columnTableNameClause}
|
||||
ORDER BY database, table, position
|
||||
`,
|
||||
{ databases },
|
||||
queryParams,
|
||||
);
|
||||
const rowCounts = await this.queryEachRow<ClickHouseRowCountRow>(
|
||||
`
|
||||
|
|
@ -310,9 +329,10 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
|
|||
FROM system.parts
|
||||
WHERE database IN {databases:Array(String)}
|
||||
AND active = 1
|
||||
${columnTableNameClause}
|
||||
GROUP BY database, table
|
||||
`,
|
||||
{ databases },
|
||||
queryParams,
|
||||
);
|
||||
const columnsByTable = new Map<string, ClickHouseColumnRow[]>();
|
||||
for (const column of columns) {
|
||||
|
|
@ -347,6 +367,23 @@ export class KtxClickHouseScanConnector implements KtxScanConnector {
|
|||
};
|
||||
}
|
||||
|
||||
private emptySnapshot(databases: string[]): KtxSchemaSnapshot {
|
||||
return {
|
||||
connectionId: this.connectionId,
|
||||
driver: 'clickhouse',
|
||||
extractedAt: this.now().toISOString(),
|
||||
scope: { schemas: databases },
|
||||
metadata: {
|
||||
database: this.clientConfig.database,
|
||||
databases,
|
||||
host: this.clientConfig.host,
|
||||
table_count: 0,
|
||||
total_columns: 0,
|
||||
},
|
||||
tables: [],
|
||||
};
|
||||
}
|
||||
|
||||
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult> {
|
||||
this.assertConnection(input.connectionId);
|
||||
const result = await this.query(
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type {
|
||||
LiveDatabaseIntrospectionOptions,
|
||||
LiveDatabaseIntrospectionPort,
|
||||
} from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
|
||||
import {
|
||||
KtxClickHouseScanConnector,
|
||||
|
|
@ -18,7 +21,7 @@ export function createClickHouseLiveDatabaseIntrospection(
|
|||
options: CreateClickHouseLiveDatabaseIntrospectionOptions,
|
||||
): LiveDatabaseIntrospectionPort {
|
||||
return {
|
||||
async extractSchema(connectionId: string) {
|
||||
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
|
||||
const connection = options.connections[connectionId] as KtxClickHouseConnectionConfig | undefined;
|
||||
const connector = new KtxClickHouseScanConnector({
|
||||
connectionId,
|
||||
|
|
@ -29,7 +32,11 @@ export function createClickHouseLiveDatabaseIntrospection(
|
|||
});
|
||||
try {
|
||||
return await connector.introspect(
|
||||
{ connectionId, driver: 'clickhouse' },
|
||||
{
|
||||
connectionId,
|
||||
driver: 'clickhouse',
|
||||
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
|
||||
},
|
||||
{ runId: `clickhouse-${connectionId}` },
|
||||
);
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import { describe, expect, it, vi } from 'vitest';
|
|||
import type { FieldPacket, RowDataPacket } from 'mysql2/promise';
|
||||
import { createMysqlLiveDatabaseIntrospection } from '../../connectors/mysql/live-database-introspection.js';
|
||||
import { isKtxMysqlConnectionConfig, KtxMysqlScanConnector, mysqlConnectionPoolConfigFromConfig, type KtxMysqlPoolFactory } from '../../connectors/mysql/connector.js';
|
||||
import { tableRefSet } from '../../context/scan/table-ref.js';
|
||||
|
||||
function mysqlResult(rows: Record<string, unknown>[], fields: Array<{ name: string; type?: number }>): [RowDataPacket[], FieldPacket[]] {
|
||||
return [rows as RowDataPacket[], fields as FieldPacket[]];
|
||||
|
|
@ -275,6 +276,71 @@ describe('KtxMysqlScanConnector', () => {
|
|||
]);
|
||||
});
|
||||
|
||||
it('limits introspection to tables in tableScope', async () => {
|
||||
const queries: Array<{ sql: string; params?: unknown }> = [];
|
||||
const poolFactory: KtxMysqlPoolFactory = {
|
||||
createPool: vi.fn(() => ({
|
||||
getConnection: vi.fn(async () => ({
|
||||
query: vi.fn(async (sql: string, params?: unknown): Promise<[RowDataPacket[], FieldPacket[]]> => {
|
||||
queries.push({ sql, params });
|
||||
if (sql.includes('INFORMATION_SCHEMA.TABLES')) {
|
||||
return mysqlResult(
|
||||
[
|
||||
{
|
||||
TABLE_SCHEMA: 'analytics',
|
||||
TABLE_NAME: 'orders',
|
||||
TABLE_TYPE: 'BASE TABLE',
|
||||
TABLE_COMMENT: '',
|
||||
TABLE_ROWS: 2,
|
||||
},
|
||||
],
|
||||
[],
|
||||
);
|
||||
}
|
||||
if (sql.includes('INFORMATION_SCHEMA.COLUMNS')) {
|
||||
return mysqlResult(
|
||||
[
|
||||
{
|
||||
TABLE_SCHEMA: 'analytics',
|
||||
TABLE_NAME: 'orders',
|
||||
COLUMN_NAME: 'id',
|
||||
DATA_TYPE: 'int',
|
||||
IS_NULLABLE: 'NO',
|
||||
COLUMN_COMMENT: '',
|
||||
},
|
||||
],
|
||||
[],
|
||||
);
|
||||
}
|
||||
return mysqlResult([], []);
|
||||
}),
|
||||
release: vi.fn(),
|
||||
})),
|
||||
end: vi.fn(async () => undefined),
|
||||
})),
|
||||
};
|
||||
const connector = new KtxMysqlScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'mysql',
|
||||
host: 'db.example.test',
|
||||
database: 'analytics',
|
||||
username: 'reader',
|
||||
password: 'secret', // pragma: allowlist secret
|
||||
},
|
||||
poolFactory,
|
||||
});
|
||||
const scope = tableRefSet([{ catalog: null, db: 'analytics', name: 'orders' }]);
|
||||
const snapshot = await connector.introspect(
|
||||
{ connectionId: 'warehouse', driver: 'mysql', tableScope: scope },
|
||||
{ runId: 'scope-test' },
|
||||
);
|
||||
expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']);
|
||||
const tablesQuery = queries.find((query) => query.sql.includes('INFORMATION_SCHEMA.TABLES'));
|
||||
expect(tablesQuery?.sql).toMatch(/TABLE_NAME IN \(\?\)/);
|
||||
expect(tablesQuery?.params).toEqual(['analytics', 'orders']);
|
||||
});
|
||||
|
||||
it('runs samples, distinct values, read-only SQL, row count, schema list, and cleanup', async () => {
|
||||
const poolFactory = fakePoolFactory();
|
||||
const connector = new KtxMysqlScanConnector({
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import { homedir } from 'node:os';
|
|||
import { resolve } from 'node:path';
|
||||
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
|
||||
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxTableListEntry, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
|
||||
import { scopedTableNames } from '../../context/scan/table-ref.js';
|
||||
import { KtxMysqlDialect } from './dialect.js';
|
||||
|
||||
export interface KtxMysqlConnectionConfig {
|
||||
|
|
@ -335,23 +336,37 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
|
|||
this.assertConnection(input.connectionId);
|
||||
const databases = configuredMysqlSchemas(this.connection, this.poolConfig.database);
|
||||
const placeholders = databases.map(() => '?').join(', ');
|
||||
let allScopedTables: string[] | null = null;
|
||||
if (input.tableScope) {
|
||||
allScopedTables = [];
|
||||
for (const database of databases) {
|
||||
allScopedTables.push(...scopedTableNames(input.tableScope, { catalog: null, db: database }));
|
||||
}
|
||||
if (allScopedTables.length === 0) {
|
||||
return this.emptySnapshot(databases);
|
||||
}
|
||||
}
|
||||
const tableNameClause = allScopedTables
|
||||
? `AND TABLE_NAME IN (${allScopedTables.map(() => '?').join(', ')})`
|
||||
: '';
|
||||
const tableNameParams = allScopedTables ?? [];
|
||||
const tables = await this.queryRaw<MysqlTableRow>(
|
||||
`
|
||||
SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, TABLE_ROWS
|
||||
FROM INFORMATION_SCHEMA.TABLES
|
||||
WHERE TABLE_SCHEMA IN (${placeholders}) AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
|
||||
WHERE TABLE_SCHEMA IN (${placeholders}) AND TABLE_TYPE IN ('BASE TABLE', 'VIEW') ${tableNameClause}
|
||||
ORDER BY TABLE_SCHEMA, TABLE_NAME
|
||||
`,
|
||||
databases,
|
||||
[...databases, ...tableNameParams],
|
||||
);
|
||||
const columns = await this.queryRaw<MysqlColumnRow>(
|
||||
`
|
||||
SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_COMMENT
|
||||
FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_SCHEMA IN (${placeholders})
|
||||
WHERE TABLE_SCHEMA IN (${placeholders}) ${tableNameClause}
|
||||
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION
|
||||
`,
|
||||
databases,
|
||||
[...databases, ...tableNameParams],
|
||||
);
|
||||
const primaryKeys = await this.queryRaw<MysqlPrimaryKeyRow>(
|
||||
`
|
||||
|
|
@ -359,9 +374,10 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
|
|||
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
|
||||
WHERE TABLE_SCHEMA IN (${placeholders})
|
||||
AND CONSTRAINT_NAME = 'PRIMARY'
|
||||
${tableNameClause}
|
||||
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION
|
||||
`,
|
||||
databases,
|
||||
[...databases, ...tableNameParams],
|
||||
);
|
||||
const foreignKeys = await this.queryRaw<MysqlForeignKeyRow>(
|
||||
`
|
||||
|
|
@ -369,9 +385,10 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
|
|||
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
|
||||
WHERE TABLE_SCHEMA IN (${placeholders})
|
||||
AND REFERENCED_TABLE_NAME IS NOT NULL
|
||||
${tableNameClause}
|
||||
ORDER BY TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME
|
||||
`,
|
||||
databases,
|
||||
[...databases, ...tableNameParams],
|
||||
);
|
||||
|
||||
const columnsByTable = groupByTable(columns, this.poolConfig.database);
|
||||
|
|
@ -403,6 +420,23 @@ export class KtxMysqlScanConnector implements KtxScanConnector {
|
|||
};
|
||||
}
|
||||
|
||||
private emptySnapshot(databases: string[]): KtxSchemaSnapshot {
|
||||
return {
|
||||
connectionId: this.connectionId,
|
||||
driver: 'mysql',
|
||||
extractedAt: this.now().toISOString(),
|
||||
scope: { schemas: databases },
|
||||
metadata: {
|
||||
database: this.poolConfig.database,
|
||||
schemas: databases,
|
||||
host: this.poolConfig.host,
|
||||
table_count: 0,
|
||||
total_columns: 0,
|
||||
},
|
||||
tables: [],
|
||||
};
|
||||
}
|
||||
|
||||
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult> {
|
||||
this.assertConnection(input.connectionId);
|
||||
const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns));
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type {
|
||||
LiveDatabaseIntrospectionOptions,
|
||||
LiveDatabaseIntrospectionPort,
|
||||
} from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
|
||||
import {
|
||||
KtxMysqlScanConnector,
|
||||
|
|
@ -18,7 +21,7 @@ export function createMysqlLiveDatabaseIntrospection(
|
|||
options: CreateMysqlLiveDatabaseIntrospectionOptions,
|
||||
): LiveDatabaseIntrospectionPort {
|
||||
return {
|
||||
async extractSchema(connectionId: string) {
|
||||
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
|
||||
const connection = options.connections[connectionId] as KtxMysqlConnectionConfig | undefined;
|
||||
const connector = new KtxMysqlScanConnector({
|
||||
connectionId,
|
||||
|
|
@ -28,7 +31,14 @@ export function createMysqlLiveDatabaseIntrospection(
|
|||
now: options.now,
|
||||
});
|
||||
try {
|
||||
return await connector.introspect({ connectionId, driver: 'mysql' }, { runId: `mysql-${connectionId}` });
|
||||
return await connector.introspect(
|
||||
{
|
||||
connectionId,
|
||||
driver: 'mysql',
|
||||
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
|
||||
},
|
||||
{ runId: `mysql-${connectionId}` },
|
||||
);
|
||||
} finally {
|
||||
await connector.cleanup();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { createPostgresLiveDatabaseIntrospection } from '../../connectors/postgres/live-database-introspection.js';
|
||||
import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js';
|
||||
import { tableRefSet } from '../../context/scan/table-ref.js';
|
||||
|
||||
interface FakeQueryResult {
|
||||
rows: Record<string, unknown>[];
|
||||
|
|
@ -259,6 +260,63 @@ describe('KtxPostgresScanConnector', () => {
|
|||
).rejects.toThrow('Only read-only SELECT/WITH queries can be executed locally');
|
||||
});
|
||||
|
||||
it('limits introspection to tables in tableScope', async () => {
|
||||
const queries: Array<{ sql: string; params?: unknown[] }> = [];
|
||||
const poolFactory: KtxPostgresPoolFactory = {
|
||||
createPool() {
|
||||
return {
|
||||
async connect() {
|
||||
return {
|
||||
query: vi.fn(async (sql: string, params?: unknown[]) => {
|
||||
queries.push({ sql, params });
|
||||
if (sql.includes('FROM pg_catalog.pg_class c')) {
|
||||
return { rows: [{ table_name: 'orders', table_kind: 'r', row_count: '3', table_comment: null }] };
|
||||
}
|
||||
if (sql.includes('FROM pg_catalog.pg_attribute a')) {
|
||||
return {
|
||||
rows: [
|
||||
{
|
||||
table_name: 'orders',
|
||||
column_name: 'id',
|
||||
data_type: 'integer',
|
||||
is_nullable: false,
|
||||
column_comment: null,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
return { rows: [] };
|
||||
}),
|
||||
release: vi.fn(),
|
||||
};
|
||||
},
|
||||
end: vi.fn(async () => undefined),
|
||||
};
|
||||
},
|
||||
};
|
||||
const connector = new KtxPostgresScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'postgres',
|
||||
host: 'db.example.test',
|
||||
database: 'analytics',
|
||||
username: 'reader',
|
||||
password: 'test-password', // pragma: allowlist secret
|
||||
schema: 'public',
|
||||
},
|
||||
poolFactory,
|
||||
});
|
||||
const scope = tableRefSet([{ catalog: null, db: 'public', name: 'orders' }]);
|
||||
const snapshot = await connector.introspect(
|
||||
{ connectionId: 'warehouse', driver: 'postgres', tableScope: scope },
|
||||
{ runId: 'scope-test' },
|
||||
);
|
||||
expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']);
|
||||
const tablesQuery = queries.find((query) => query.sql.includes('FROM pg_catalog.pg_class c'));
|
||||
expect(tablesQuery?.sql).toMatch(/c\.relname = ANY\(\$2\)/);
|
||||
expect(tablesQuery?.params).toEqual(['public', ['orders']]);
|
||||
});
|
||||
|
||||
it('adapts native PostgreSQL snapshots to live-database introspection for local ingest', async () => {
|
||||
const introspection = createPostgresLiveDatabaseIntrospection({
|
||||
connections: {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { homedir } from 'node:os';
|
|||
import { resolve } from 'node:path';
|
||||
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
|
||||
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
|
||||
import { scopedTableNames } from '../../context/scan/table-ref.js';
|
||||
import { Pool } from 'pg';
|
||||
import { KtxPostgresDialect } from './dialect.js';
|
||||
|
||||
|
|
@ -379,7 +380,9 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
const schemas = schemasFromConnection(this.connection);
|
||||
const allTables: KtxSchemaTable[] = [];
|
||||
for (const schema of schemas) {
|
||||
const tables = await this.loadSchemaTables(schema);
|
||||
const scopedNames = input.tableScope ? scopedTableNames(input.tableScope, { catalog: null, db: schema }) : null;
|
||||
if (scopedNames && scopedNames.length === 0) continue;
|
||||
const tables = await this.loadSchemaTables(schema, scopedNames);
|
||||
allTables.push(...tables);
|
||||
}
|
||||
return {
|
||||
|
|
@ -543,7 +546,11 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
}
|
||||
}
|
||||
|
||||
private async loadSchemaTables(schema: string): Promise<KtxSchemaTable[]> {
|
||||
private async loadSchemaTables(schema: string, scopedNames: readonly string[] | null): Promise<KtxSchemaTable[]> {
|
||||
if (scopedNames && scopedNames.length === 0) return [];
|
||||
const pgCatalogScopeClause = scopedNames ? 'AND c.relname = ANY($2)' : '';
|
||||
const tableConstraintScopeClause = scopedNames ? 'AND tc.table_name = ANY($2)' : '';
|
||||
const scopeValues = scopedNames ? [scopedNames] : [];
|
||||
const tables = await this.queryRaw<PostgresTableRow>(
|
||||
`
|
||||
SELECT
|
||||
|
|
@ -557,9 +564,10 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
ON d.objoid = c.oid AND d.objsubid = 0
|
||||
WHERE n.nspname = $1
|
||||
AND c.relkind IN ('r', 'v')
|
||||
${pgCatalogScopeClause}
|
||||
ORDER BY c.relname
|
||||
`,
|
||||
[schema],
|
||||
[schema, ...scopeValues],
|
||||
);
|
||||
const columns = await this.queryRaw<PostgresColumnRow>(
|
||||
`
|
||||
|
|
@ -578,9 +586,10 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
AND c.relkind IN ('r', 'v')
|
||||
AND a.attnum > 0
|
||||
AND NOT a.attisdropped
|
||||
${pgCatalogScopeClause}
|
||||
ORDER BY c.relname, a.attnum
|
||||
`,
|
||||
[schema],
|
||||
[schema, ...scopeValues],
|
||||
);
|
||||
const primaryKeys = await this.queryRaw<PostgresPrimaryKeyRow>(
|
||||
`
|
||||
|
|
@ -591,9 +600,10 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
AND tc.table_schema = kcu.table_schema
|
||||
WHERE tc.constraint_type = 'PRIMARY KEY'
|
||||
AND tc.table_schema = $1
|
||||
${tableConstraintScopeClause}
|
||||
ORDER BY tc.table_name, kcu.ordinal_position
|
||||
`,
|
||||
[schema],
|
||||
[schema, ...scopeValues],
|
||||
);
|
||||
const foreignKeys = await this.queryRaw<PostgresForeignKeyRow>(
|
||||
`
|
||||
|
|
@ -613,9 +623,10 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
AND ccu.table_schema = tc.table_schema
|
||||
WHERE tc.constraint_type = 'FOREIGN KEY'
|
||||
AND tc.table_schema = $1
|
||||
${tableConstraintScopeClause}
|
||||
ORDER BY tc.table_name, kcu.column_name
|
||||
`,
|
||||
[schema],
|
||||
[schema, ...scopeValues],
|
||||
);
|
||||
|
||||
const columnsByTable = groupByTable(columns);
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type {
|
||||
LiveDatabaseIntrospectionOptions,
|
||||
LiveDatabaseIntrospectionPort,
|
||||
} from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
|
||||
import {
|
||||
KtxPostgresScanConnector,
|
||||
|
|
@ -18,7 +21,7 @@ export function createPostgresLiveDatabaseIntrospection(
|
|||
options: CreatePostgresLiveDatabaseIntrospectionOptions,
|
||||
): LiveDatabaseIntrospectionPort {
|
||||
return {
|
||||
async extractSchema(connectionId: string) {
|
||||
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
|
||||
const connection = options.connections[connectionId] as KtxPostgresConnectionConfig | undefined;
|
||||
const connector = new KtxPostgresScanConnector({
|
||||
connectionId,
|
||||
|
|
@ -28,7 +31,14 @@ export function createPostgresLiveDatabaseIntrospection(
|
|||
now: options.now,
|
||||
});
|
||||
try {
|
||||
return await connector.introspect({ connectionId, driver: 'postgres' }, { runId: `postgres-${connectionId}` });
|
||||
return await connector.introspect(
|
||||
{
|
||||
connectionId,
|
||||
driver: 'postgres',
|
||||
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
|
||||
},
|
||||
{ runId: `postgres-${connectionId}` },
|
||||
);
|
||||
} finally {
|
||||
await connector.cleanup();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { createSnowflakeLiveDatabaseIntrospection } from '../../connectors/snowflake/live-database-introspection.js';
|
||||
import { isKtxSnowflakeConnectionConfig, KtxSnowflakeScanConnector, snowflakeConnectionConfigFromConfig, type KtxSnowflakeDriver, type KtxSnowflakeDriverFactory } from '../../connectors/snowflake/connector.js';
|
||||
import { tableRefSet } from '../../context/scan/table-ref.js';
|
||||
|
||||
function fakeDriverFactory(): KtxSnowflakeDriverFactory {
|
||||
const driver: KtxSnowflakeDriver = {
|
||||
|
|
@ -206,6 +207,61 @@ describe('KtxSnowflakeScanConnector', () => {
|
|||
}
|
||||
});
|
||||
|
||||
it('limits introspection to tables in tableScope', async () => {
|
||||
const queries: Array<{ sql: string; params?: unknown }> = [];
|
||||
const getSchemaMetadata = vi.fn(async (_schemaName?: string, scopedNames?: readonly string[] | null) =>
|
||||
scopedNames?.includes('ORDERS')
|
||||
? [
|
||||
{
|
||||
name: 'ORDERS',
|
||||
catalog: 'ANALYTICS',
|
||||
db: 'MARTS',
|
||||
rowCount: 10,
|
||||
comment: null,
|
||||
columns: [{ name: 'ID', type: 'NUMBER', nullable: false, comment: null }],
|
||||
},
|
||||
]
|
||||
: [],
|
||||
);
|
||||
const driverFactory: KtxSnowflakeDriverFactory = {
|
||||
createDriver: vi.fn(() => ({
|
||||
test: vi.fn(async () => ({ success: true })),
|
||||
query: vi.fn(async (sql: string, params?: unknown) => {
|
||||
queries.push({ sql, params });
|
||||
return { headers: [], rows: [], totalRows: 0, rowCount: 0 };
|
||||
}),
|
||||
getSchemaMetadata,
|
||||
listSchemas: vi.fn(async () => []),
|
||||
listTables: vi.fn(async () => []),
|
||||
cleanup: vi.fn(async () => undefined),
|
||||
})),
|
||||
};
|
||||
const connector = new KtxSnowflakeScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'snowflake',
|
||||
authMethod: 'password',
|
||||
account: 'acct',
|
||||
warehouse: 'WH',
|
||||
database: 'ANALYTICS',
|
||||
schema_name: 'MARTS',
|
||||
username: 'reader',
|
||||
password: 'fixture-pass', // pragma: allowlist secret
|
||||
},
|
||||
driverFactory,
|
||||
});
|
||||
const scope = tableRefSet([{ catalog: 'ANALYTICS', db: 'MARTS', name: 'ORDERS' }]);
|
||||
const snapshot = await connector.introspect(
|
||||
{ connectionId: 'warehouse', driver: 'snowflake', tableScope: scope },
|
||||
{ runId: 'scope-test' },
|
||||
);
|
||||
expect(snapshot.tables.map((table) => table.name)).toEqual(['ORDERS']);
|
||||
expect(getSchemaMetadata).toHaveBeenCalledWith('MARTS', ['ORDERS']);
|
||||
const primaryKeysQuery = queries.find((query) => query.sql.includes('TABLE_CONSTRAINTS'));
|
||||
expect(primaryKeysQuery?.sql).toMatch(/AND tc\.TABLE_NAME IN \(\?\)/);
|
||||
expect(primaryKeysQuery?.params).toEqual(['MARTS', 'ANALYTICS', 'ORDERS']);
|
||||
});
|
||||
|
||||
it('supports read-only query, sampling, distinct values, row counts, schema listing, and cleanup', async () => {
|
||||
const driverFactory = fakeDriverFactory();
|
||||
const connector = new KtxSnowflakeScanConnector({
|
||||
|
|
|
|||
|
|
@ -4,10 +4,12 @@ import { homedir } from 'node:os';
|
|||
import { resolve } from 'node:path';
|
||||
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
|
||||
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableListEntry, type KtxTableSampleResult } from '../../context/scan/types.js';
|
||||
import { scopedTableNames } from '../../context/scan/table-ref.js';
|
||||
import snowflake from 'snowflake-sdk';
|
||||
import type { Bind, Binds, Connection, ConnectionOptions } from 'snowflake-sdk';
|
||||
import { KtxSnowflakeDialect } from './dialect.js';
|
||||
import { assertSafeSnowflakeIdentifier, quoteSnowflakeIdentifier } from './identifiers.js';
|
||||
import { configureSnowflakeSdkLogger } from './sdk-logger.js';
|
||||
|
||||
export interface KtxSnowflakeConnectionConfig {
|
||||
driver?: string;
|
||||
|
|
@ -57,7 +59,7 @@ export interface KtxSnowflakeRawTableMetadata {
|
|||
export interface KtxSnowflakeDriver {
|
||||
test(): Promise<{ success: boolean; error?: string }>;
|
||||
query(sql: string, params?: unknown): Promise<KtxQueryResult>;
|
||||
getSchemaMetadata(schemaName?: string): Promise<KtxSnowflakeRawTableMetadata[]>;
|
||||
getSchemaMetadata(schemaName?: string, scopedTableNames?: readonly string[] | null): Promise<KtxSnowflakeRawTableMetadata[]>;
|
||||
listSchemas(): Promise<string[]>;
|
||||
listTables(schemas?: string[]): Promise<KtxTableListEntry[]>;
|
||||
cleanup(): Promise<void>;
|
||||
|
|
@ -80,6 +82,12 @@ export interface KtxSnowflakeSdkOptionsProvider {
|
|||
export interface KtxSnowflakeScanConnectorOptions {
|
||||
connectionId: string;
|
||||
connection: KtxSnowflakeConnectionConfig | undefined;
|
||||
/**
|
||||
* KTX project directory. When provided, snowflake-sdk's logger is redirected to
|
||||
* `<projectDir>/.ktx/logs/snowflake.log` so its JSON output does not bleed into
|
||||
* the CLI's TTY. Tests that use a fake driverFactory can leave this undefined.
|
||||
*/
|
||||
projectDir?: string;
|
||||
driverFactory?: KtxSnowflakeDriverFactory;
|
||||
sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
|
|
@ -290,24 +298,32 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
|
|||
}
|
||||
}
|
||||
|
||||
async getSchemaMetadata(schemaName = this.resolved.schemas[0] ?? 'PUBLIC'): Promise<KtxSnowflakeRawTableMetadata[]> {
|
||||
async getSchemaMetadata(
|
||||
schemaName = this.resolved.schemas[0] ?? 'PUBLIC',
|
||||
scopedTableNames: readonly string[] | null = null,
|
||||
): Promise<KtxSnowflakeRawTableMetadata[]> {
|
||||
const scopeClause =
|
||||
scopedTableNames && scopedTableNames.length > 0
|
||||
? `AND TABLE_NAME IN (${scopedTableNames.map(() => '?').join(', ')})`
|
||||
: '';
|
||||
const scopeParams = scopedTableNames ?? [];
|
||||
const tablesResult = await this.query(
|
||||
`
|
||||
SELECT TABLE_NAME, TABLE_TYPE, COMMENT, ROW_COUNT
|
||||
FROM INFORMATION_SCHEMA.TABLES
|
||||
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
|
||||
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ? ${scopeClause}
|
||||
ORDER BY TABLE_NAME
|
||||
`,
|
||||
[schemaName, this.resolved.database],
|
||||
[schemaName, this.resolved.database, ...scopeParams],
|
||||
);
|
||||
const columnsResult = await this.query(
|
||||
`
|
||||
SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COMMENT, ORDINAL_POSITION
|
||||
FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
|
||||
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ? ${scopeClause}
|
||||
ORDER BY TABLE_NAME, ORDINAL_POSITION
|
||||
`,
|
||||
[schemaName, this.resolved.database],
|
||||
[schemaName, this.resolved.database, ...scopeParams],
|
||||
);
|
||||
const columnsByTable = new Map<string, KtxSnowflakeRawColumnMetadata[]>();
|
||||
for (const row of columnsResult.rows) {
|
||||
|
|
@ -513,6 +529,9 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
|
|||
this.driverFactory = options.driverFactory ?? new DefaultSnowflakeDriverFactory();
|
||||
this.now = options.now ?? (() => new Date());
|
||||
this.id = `snowflake:${options.connectionId}`;
|
||||
if (options.projectDir) {
|
||||
configureSnowflakeSdkLogger(options.projectDir);
|
||||
}
|
||||
}
|
||||
|
||||
async testConnection(): Promise<{ success: boolean; error?: string }> {
|
||||
|
|
@ -523,7 +542,11 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
|
|||
this.assertConnection(input.connectionId);
|
||||
const tables: KtxSchemaTable[] = [];
|
||||
for (const schemaName of this.resolved.schemas) {
|
||||
const rawTables = await this.getDriver().getSchemaMetadata(schemaName);
|
||||
const scopedNames = input.tableScope
|
||||
? scopedTableNames(input.tableScope, { catalog: this.resolved.database, db: schemaName })
|
||||
: null;
|
||||
if (scopedNames && scopedNames.length === 0) continue;
|
||||
const rawTables = await this.getDriver().getSchemaMetadata(schemaName, scopedNames);
|
||||
const primaryKeys = await this.primaryKeys(rawTables.map((table) => table.name), schemaName);
|
||||
tables.push(...rawTables.map((table) => this.toSchemaTable(table, primaryKeys)));
|
||||
}
|
||||
|
|
@ -663,6 +686,7 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
|
|||
if (tableNames.length === 0) {
|
||||
return grouped;
|
||||
}
|
||||
const tableNamePlaceholders = tableNames.map(() => '?').join(', ');
|
||||
try {
|
||||
const result = await this.getDriver().query(
|
||||
`
|
||||
|
|
@ -675,9 +699,10 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
|
|||
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
|
||||
AND tc.TABLE_SCHEMA = ?
|
||||
AND tc.TABLE_CATALOG = ?
|
||||
AND tc.TABLE_NAME IN (${tableNamePlaceholders})
|
||||
ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION
|
||||
`,
|
||||
[schemaName, this.resolved.database],
|
||||
[schemaName, this.resolved.database, ...tableNames],
|
||||
);
|
||||
for (const row of result.rows) {
|
||||
const tableName = String(row[0]);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
import { KtxSnowflakeScanConnector, type KtxSnowflakeScanConnectorOptions } from './connector.js';
|
||||
|
||||
export type KtxSnowflakeHistoricSqlQueryClientOptions = KtxSnowflakeScanConnectorOptions;
|
||||
|
||||
export class KtxSnowflakeHistoricSqlQueryClient {
|
||||
private readonly connectionId: string;
|
||||
private readonly connector: KtxSnowflakeScanConnector;
|
||||
|
||||
constructor(options: KtxSnowflakeHistoricSqlQueryClientOptions) {
|
||||
this.connectionId = options.connectionId;
|
||||
this.connector = new KtxSnowflakeScanConnector(options);
|
||||
}
|
||||
|
||||
async executeQuery(
|
||||
sql: string,
|
||||
): Promise<{ headers: string[]; rows: unknown[][]; totalRows: number }> {
|
||||
const result = await this.connector.executeReadOnly(
|
||||
{ connectionId: this.connectionId, sql },
|
||||
{} as never,
|
||||
);
|
||||
return {
|
||||
headers: result.headers,
|
||||
rows: result.rows,
|
||||
totalRows: result.totalRows,
|
||||
};
|
||||
}
|
||||
|
||||
async cleanup(): Promise<void> {
|
||||
await this.connector.cleanup();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,7 @@
|
|||
import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type {
|
||||
LiveDatabaseIntrospectionOptions,
|
||||
LiveDatabaseIntrospectionPort,
|
||||
} from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
|
||||
import {
|
||||
KtxSnowflakeScanConnector,
|
||||
|
|
@ -9,6 +12,7 @@ import {
|
|||
|
||||
interface CreateSnowflakeLiveDatabaseIntrospectionOptions {
|
||||
connections: Record<string, KtxProjectConnectionConfig>;
|
||||
projectDir?: string;
|
||||
driverFactory?: KtxSnowflakeDriverFactory;
|
||||
sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider;
|
||||
now?: () => Date;
|
||||
|
|
@ -18,18 +22,23 @@ export function createSnowflakeLiveDatabaseIntrospection(
|
|||
options: CreateSnowflakeLiveDatabaseIntrospectionOptions,
|
||||
): LiveDatabaseIntrospectionPort {
|
||||
return {
|
||||
async extractSchema(connectionId: string) {
|
||||
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
|
||||
const connection = options.connections[connectionId] as KtxSnowflakeConnectionConfig | undefined;
|
||||
const connector = new KtxSnowflakeScanConnector({
|
||||
connectionId,
|
||||
connection,
|
||||
...(options.projectDir ? { projectDir: options.projectDir } : {}),
|
||||
driverFactory: options.driverFactory,
|
||||
sdkOptionsProvider: options.sdkOptionsProvider,
|
||||
now: options.now,
|
||||
});
|
||||
try {
|
||||
return await connector.introspect(
|
||||
{ connectionId, driver: 'snowflake' },
|
||||
{
|
||||
connectionId,
|
||||
driver: 'snowflake',
|
||||
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
|
||||
},
|
||||
{ runId: `snowflake-${connectionId}` },
|
||||
);
|
||||
} finally {
|
||||
|
|
|
|||
57
packages/cli/src/connectors/snowflake/sdk-logger.test.ts
Normal file
57
packages/cli/src/connectors/snowflake/sdk-logger.test.ts
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
import { mkdtempSync, rmSync, statSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join, resolve } from 'node:path';
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
const { configure } = vi.hoisted(() => ({ configure: vi.fn() }));
|
||||
vi.mock('snowflake-sdk', () => ({
|
||||
default: { configure },
|
||||
}));
|
||||
|
||||
import {
|
||||
configureSnowflakeSdkLogger,
|
||||
resetSnowflakeSdkLoggerConfigurationForTests,
|
||||
} from './sdk-logger.js';
|
||||
|
||||
describe('configureSnowflakeSdkLogger', () => {
|
||||
let projectDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
configure.mockReset();
|
||||
resetSnowflakeSdkLoggerConfigurationForTests();
|
||||
projectDir = mkdtempSync(join(tmpdir(), 'ktx-snowflake-logger-'));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
rmSync(projectDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it('routes logs to <projectDir>/.ktx/logs/snowflake.log with console output disabled', () => {
|
||||
const expected = resolve(projectDir, '.ktx', 'logs', 'snowflake.log');
|
||||
const returned = configureSnowflakeSdkLogger(projectDir);
|
||||
expect(returned).toBe(expected);
|
||||
expect(configure).toHaveBeenCalledTimes(1);
|
||||
expect(configure).toHaveBeenCalledWith({
|
||||
logFilePath: expected,
|
||||
additionalLogToConsole: false,
|
||||
});
|
||||
expect(statSync(resolve(projectDir, '.ktx', 'logs')).isDirectory()).toBe(true);
|
||||
});
|
||||
|
||||
it('is idempotent for the same projectDir', () => {
|
||||
configureSnowflakeSdkLogger(projectDir);
|
||||
configureSnowflakeSdkLogger(projectDir);
|
||||
expect(configure).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('reconfigures when projectDir changes', () => {
|
||||
const other = mkdtempSync(join(tmpdir(), 'ktx-snowflake-logger-other-'));
|
||||
try {
|
||||
configureSnowflakeSdkLogger(projectDir);
|
||||
configureSnowflakeSdkLogger(other);
|
||||
expect(configure).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
rmSync(other, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
32
packages/cli/src/connectors/snowflake/sdk-logger.ts
Normal file
32
packages/cli/src/connectors/snowflake/sdk-logger.ts
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
import { mkdirSync } from 'node:fs';
|
||||
import { resolve } from 'node:path';
|
||||
import snowflake from 'snowflake-sdk';
|
||||
|
||||
let configuredLogFilePath: string | null = null;
|
||||
|
||||
/**
|
||||
* Redirects the snowflake-sdk logger to a project-scoped file so its JSON output
|
||||
* does not bleed into the CLI's TTY (which would pollute the setup wizard and
|
||||
* break the in-place progress repainter in `context-build-view.ts`).
|
||||
*
|
||||
* Idempotent per process: re-calling with the same projectDir is a no-op.
|
||||
*/
|
||||
export function configureSnowflakeSdkLogger(projectDir: string): string {
|
||||
const logDir = resolve(projectDir, '.ktx', 'logs');
|
||||
const logFilePath = resolve(logDir, 'snowflake.log');
|
||||
if (configuredLogFilePath === logFilePath) {
|
||||
return logFilePath;
|
||||
}
|
||||
mkdirSync(logDir, { recursive: true });
|
||||
snowflake.configure({
|
||||
logFilePath,
|
||||
additionalLogToConsole: false,
|
||||
});
|
||||
configuredLogFilePath = logFilePath;
|
||||
return logFilePath;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export function resetSnowflakeSdkLoggerConfigurationForTests(): void {
|
||||
configuredLogFilePath = null;
|
||||
}
|
||||
|
|
@ -6,6 +6,7 @@ import { join } from 'node:path';
|
|||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
import { createSqliteLiveDatabaseIntrospection } from '../../connectors/sqlite/live-database-introspection.js';
|
||||
import { isKtxSqliteConnectionConfig, KtxSqliteScanConnector, sqliteDatabasePathFromConfig } from '../../connectors/sqlite/connector.js';
|
||||
import { tableRefSet } from '../../context/scan/table-ref.js';
|
||||
|
||||
describe('KtxSqliteScanConnector', () => {
|
||||
let tempDir: string;
|
||||
|
|
@ -196,6 +197,19 @@ describe('KtxSqliteScanConnector', () => {
|
|||
).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it('limits introspection to tables in tableScope', async () => {
|
||||
const connector = new KtxSqliteScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection: { driver: 'sqlite', path: dbPath },
|
||||
});
|
||||
const scope = tableRefSet([{ catalog: null, db: null, name: 'orders' }]);
|
||||
const snapshot = await connector.introspect(
|
||||
{ connectionId: 'warehouse', driver: 'sqlite', tableScope: scope },
|
||||
{ runId: 'scope-test' },
|
||||
);
|
||||
expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']);
|
||||
});
|
||||
|
||||
it('adapts native SQLite snapshots to live-database introspection for local ingest', async () => {
|
||||
const introspection = createSqliteLiveDatabaseIntrospection({
|
||||
projectDir: tempDir,
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import { fileURLToPath } from 'node:url';
|
|||
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
|
||||
import { normalizeQueryRows } from '../../context/connections/query-executor.js';
|
||||
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
|
||||
import { scopedTableNames } from '../../context/scan/table-ref.js';
|
||||
import { KtxSqliteDialect } from './dialect.js';
|
||||
|
||||
export interface KtxSqliteConnectionConfig {
|
||||
|
|
@ -181,11 +182,16 @@ export class KtxSqliteScanConnector implements KtxScanConnector {
|
|||
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
|
||||
this.assertConnection(input.connectionId);
|
||||
const database = this.database();
|
||||
const rawTables = database
|
||||
.prepare(
|
||||
`SELECT name, type FROM sqlite_master WHERE type IN ('table', 'view') AND name NOT LIKE 'sqlite_%' ORDER BY name`,
|
||||
)
|
||||
.all() as SqliteMasterRow[];
|
||||
const scopedNames = input.tableScope ? scopedTableNames(input.tableScope, { catalog: null, db: null }) : null;
|
||||
const scopeClause = scopedNames ? `AND name IN (${scopedNames.map(() => '?').join(', ')})` : '';
|
||||
const rawTables =
|
||||
scopedNames && scopedNames.length === 0
|
||||
? []
|
||||
: (database
|
||||
.prepare(
|
||||
`SELECT name, type FROM sqlite_master WHERE type IN ('table', 'view') AND name NOT LIKE 'sqlite_%' ${scopeClause} ORDER BY name`,
|
||||
)
|
||||
.all(...(scopedNames ?? [])) as SqliteMasterRow[]);
|
||||
const tables = rawTables.map((table) => this.readTable(database, table));
|
||||
const fileStats = existsSync(this.dbPath) ? statSync(this.dbPath) : null;
|
||||
return {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type {
|
||||
LiveDatabaseIntrospectionOptions,
|
||||
LiveDatabaseIntrospectionPort,
|
||||
} from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
|
||||
import { KtxSqliteScanConnector, type KtxSqliteConnectionConfig } from './connector.js';
|
||||
|
||||
|
|
@ -12,7 +15,7 @@ export function createSqliteLiveDatabaseIntrospection(
|
|||
options: CreateSqliteLiveDatabaseIntrospectionOptions,
|
||||
): LiveDatabaseIntrospectionPort {
|
||||
return {
|
||||
async extractSchema(connectionId: string) {
|
||||
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
|
||||
const connection = options.connections[connectionId] as KtxSqliteConnectionConfig | undefined;
|
||||
const connector = new KtxSqliteScanConnector({
|
||||
connectionId,
|
||||
|
|
@ -21,7 +24,14 @@ export function createSqliteLiveDatabaseIntrospection(
|
|||
now: options.now,
|
||||
});
|
||||
try {
|
||||
return await connector.introspect({ connectionId, driver: 'sqlite' }, { runId: `sqlite-${connectionId}` });
|
||||
return await connector.introspect(
|
||||
{
|
||||
connectionId,
|
||||
driver: 'sqlite',
|
||||
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
|
||||
},
|
||||
{ runId: `sqlite-${connectionId}` },
|
||||
);
|
||||
} finally {
|
||||
await connector.cleanup();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { createSqlServerLiveDatabaseIntrospection } from '../../connectors/sqlserver/live-database-introspection.js';
|
||||
import { isKtxSqlServerConnectionConfig, KtxSqlServerScanConnector, sqlServerConnectionPoolConfigFromConfig, type KtxSqlServerPoolFactory, type KtxSqlServerQueryResult } from '../../connectors/sqlserver/connector.js';
|
||||
import { tableRefSet } from '../../context/scan/table-ref.js';
|
||||
|
||||
function recordset<T extends Record<string, unknown>>(
|
||||
rows: T[],
|
||||
|
|
@ -290,6 +291,55 @@ describe('KtxSqlServerScanConnector', () => {
|
|||
await connector.cleanup();
|
||||
});
|
||||
|
||||
it('limits introspection to tables in tableScope', async () => {
|
||||
const queries: string[] = [];
|
||||
const inputs: Array<{ name: string; value: unknown }> = [];
|
||||
const request = {
|
||||
input: vi.fn((name: string, value: unknown) => {
|
||||
inputs.push({ name, value });
|
||||
return request;
|
||||
}),
|
||||
query: vi.fn(async (sql: string): Promise<KtxSqlServerQueryResult> => {
|
||||
queries.push(sql);
|
||||
if (sql.includes('INFORMATION_SCHEMA.TABLES')) {
|
||||
return result([{ table_name: 'orders', table_type: 'BASE TABLE' }], ['table_name', 'table_type']);
|
||||
}
|
||||
if (sql.includes('INFORMATION_SCHEMA.COLUMNS')) {
|
||||
return result(
|
||||
[{ table_name: 'orders', column_name: 'id', data_type: 'int', is_nullable: 'NO' }],
|
||||
['table_name', 'column_name', 'data_type', 'is_nullable'],
|
||||
);
|
||||
}
|
||||
return result([], []);
|
||||
}),
|
||||
};
|
||||
const poolFactory: KtxSqlServerPoolFactory = {
|
||||
createPool: vi.fn(async () => ({
|
||||
request: () => request,
|
||||
close: vi.fn(async () => undefined),
|
||||
})),
|
||||
};
|
||||
const connector = new KtxSqlServerScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'sqlserver',
|
||||
host: 'db.example.test',
|
||||
database: 'analytics',
|
||||
username: 'reader',
|
||||
schema: 'dbo',
|
||||
},
|
||||
poolFactory,
|
||||
});
|
||||
const scope = tableRefSet([{ catalog: 'analytics', db: 'dbo', name: 'orders' }]);
|
||||
const snapshot = await connector.introspect(
|
||||
{ connectionId: 'warehouse', driver: 'sqlserver', tableScope: scope },
|
||||
{ runId: 'scope-test' },
|
||||
);
|
||||
expect(snapshot.tables.map((table) => table.name)).toEqual(['orders']);
|
||||
expect(queries.find((query) => query.includes('INFORMATION_SCHEMA.TABLES'))).toMatch(/TABLE_NAME IN \(@table_0\)/);
|
||||
expect(inputs).toEqual(expect.arrayContaining([{ name: 'table_0', value: 'orders' }]));
|
||||
});
|
||||
|
||||
it('adapts native SQL Server snapshots to live-database introspection for local ingest', async () => {
|
||||
const introspection = createSqlServerLiveDatabaseIntrospection({
|
||||
connections: {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { assertReadOnlySql } from '../../context/connections/read-only-sql.js';
|
||||
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
|
||||
import { scopedTableNames } from '../../context/scan/table-ref.js';
|
||||
import { readFileSync } from 'node:fs';
|
||||
import { homedir } from 'node:os';
|
||||
import { resolve } from 'node:path';
|
||||
|
|
@ -121,6 +122,20 @@ function sqlRecordset(
|
|||
return recordset;
|
||||
}
|
||||
|
||||
function tableScopeSql(
|
||||
scopedNames: readonly string[] | null,
|
||||
columnExpression: string,
|
||||
): { clause: string; params: Record<string, unknown> } {
|
||||
if (!scopedNames) return { clause: '', params: {} };
|
||||
const params: Record<string, unknown> = {};
|
||||
const placeholders = scopedNames.map((name, index) => {
|
||||
const key = `table_${index}`;
|
||||
params[key] = name;
|
||||
return `@${key}`;
|
||||
});
|
||||
return { clause: `AND ${columnExpression} IN (${placeholders.join(', ')})`, params };
|
||||
}
|
||||
|
||||
class DefaultSqlServerPoolFactory implements KtxSqlServerPoolFactory {
|
||||
async createPool(config: KtxSqlServerPoolConfig): Promise<KtxSqlServerPool> {
|
||||
const pool = await new sql.ConnectionPool(config as sql.config).connect();
|
||||
|
|
@ -314,7 +329,10 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
this.assertConnection(input.connectionId);
|
||||
const tables: KtxSchemaTable[] = [];
|
||||
for (const schemaName of this.schemas) {
|
||||
tables.push(...(await this.introspectSchema(schemaName)));
|
||||
const scopedNames = input.tableScope
|
||||
? scopedTableNames(input.tableScope, { catalog: this.poolConfig.database, db: schemaName })
|
||||
: null;
|
||||
tables.push(...(await this.introspectSchema(schemaName, scopedNames)));
|
||||
}
|
||||
return {
|
||||
connectionId: this.connectionId,
|
||||
|
|
@ -461,16 +479,19 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
}
|
||||
}
|
||||
|
||||
private async introspectSchema(schemaName: string): Promise<KtxSchemaTable[]> {
|
||||
private async introspectSchema(schemaName: string, scopedNames: readonly string[] | null): Promise<KtxSchemaTable[]> {
|
||||
if (scopedNames && scopedNames.length === 0) return [];
|
||||
const tableScope = tableScopeSql(scopedNames, 'TABLE_NAME');
|
||||
const tables = await this.queryRaw<{ table_name: string; table_type: string }>(
|
||||
`
|
||||
SELECT TABLE_NAME AS table_name, TABLE_TYPE AS table_type
|
||||
FROM INFORMATION_SCHEMA.TABLES
|
||||
WHERE TABLE_SCHEMA = @schemaName
|
||||
AND TABLE_TYPE IN ('BASE TABLE', 'VIEW')
|
||||
${tableScope.clause}
|
||||
ORDER BY TABLE_NAME
|
||||
`,
|
||||
{ schemaName },
|
||||
{ schemaName, ...tableScope.params },
|
||||
);
|
||||
const columns = await this.queryRaw<{
|
||||
table_name: string;
|
||||
|
|
@ -482,15 +503,16 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
SELECT TABLE_NAME AS table_name, COLUMN_NAME AS column_name, DATA_TYPE AS data_type, IS_NULLABLE AS is_nullable
|
||||
FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_SCHEMA = @schemaName
|
||||
${tableScope.clause}
|
||||
ORDER BY TABLE_NAME, ORDINAL_POSITION
|
||||
`,
|
||||
{ schemaName },
|
||||
{ schemaName, ...tableScope.params },
|
||||
);
|
||||
const tableComments = await this.tableComments(schemaName);
|
||||
const columnComments = await this.columnComments(schemaName);
|
||||
const primaryKeys = await this.primaryKeys(schemaName);
|
||||
const foreignKeys = await this.foreignKeys(schemaName);
|
||||
const rowCounts = await this.rowCounts(schemaName);
|
||||
const tableComments = await this.tableComments(schemaName, scopedNames);
|
||||
const columnComments = await this.columnComments(schemaName, scopedNames);
|
||||
const primaryKeys = await this.primaryKeys(schemaName, scopedNames);
|
||||
const foreignKeys = await this.foreignKeys(schemaName, scopedNames);
|
||||
const rowCounts = await this.rowCounts(schemaName, scopedNames);
|
||||
const columnsByTable = groupByTable(columns);
|
||||
const foreignKeysByTable = groupByTable(foreignKeys);
|
||||
|
||||
|
|
@ -508,7 +530,8 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
}));
|
||||
}
|
||||
|
||||
private async tableComments(schemaName: string): Promise<Map<string, string>> {
|
||||
private async tableComments(schemaName: string, scopedNames: readonly string[] | null): Promise<Map<string, string>> {
|
||||
const tableScope = tableScopeSql(scopedNames, 'o.name');
|
||||
const rows = await this.queryRaw<{ table_name: string; table_comment: string }>(
|
||||
`
|
||||
SELECT o.name AS table_name, CAST(ep.value AS NVARCHAR(MAX)) AS table_comment
|
||||
|
|
@ -519,13 +542,15 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
AND ep.name = 'MS_Description'
|
||||
WHERE s.name = @schemaName
|
||||
AND o.type IN ('U', 'V')
|
||||
${tableScope.clause}
|
||||
`,
|
||||
{ schemaName },
|
||||
{ schemaName, ...tableScope.params },
|
||||
);
|
||||
return new Map(rows.map((row) => [row.table_name, row.table_comment]));
|
||||
}
|
||||
|
||||
private async columnComments(schemaName: string): Promise<Map<string, string>> {
|
||||
private async columnComments(schemaName: string, scopedNames: readonly string[] | null): Promise<Map<string, string>> {
|
||||
const tableScope = tableScopeSql(scopedNames, 'o.name');
|
||||
const rows = await this.queryRaw<{ table_name: string; column_name: string; column_comment: string }>(
|
||||
`
|
||||
SELECT o.name AS table_name, c.name AS column_name, CAST(ep.value AS NVARCHAR(MAX)) AS column_comment
|
||||
|
|
@ -537,13 +562,18 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
AND ep.name = 'MS_Description'
|
||||
WHERE s.name = @schemaName
|
||||
AND o.type IN ('U', 'V')
|
||||
${tableScope.clause}
|
||||
`,
|
||||
{ schemaName },
|
||||
{ schemaName, ...tableScope.params },
|
||||
);
|
||||
return new Map(rows.map((row) => [`${row.table_name}.${row.column_name}`, row.column_comment]));
|
||||
}
|
||||
|
||||
private async primaryKeys(schemaName: string): Promise<Map<string, Set<string>>> {
|
||||
private async primaryKeys(
|
||||
schemaName: string,
|
||||
scopedNames: readonly string[] | null,
|
||||
): Promise<Map<string, Set<string>>> {
|
||||
const tableScope = tableScopeSql(scopedNames, 'tc.TABLE_NAME');
|
||||
const rows = await this.queryRaw<{ table_name: string; column_name: string }>(
|
||||
`
|
||||
SELECT tc.TABLE_NAME AS table_name, kcu.COLUMN_NAME AS column_name
|
||||
|
|
@ -553,9 +583,10 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
|
||||
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
|
||||
AND tc.TABLE_SCHEMA = @schemaName
|
||||
${tableScope.clause}
|
||||
ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION
|
||||
`,
|
||||
{ schemaName },
|
||||
{ schemaName, ...tableScope.params },
|
||||
);
|
||||
const grouped = new Map<string, Set<string>>();
|
||||
for (const row of rows) {
|
||||
|
|
@ -566,7 +597,10 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
return grouped;
|
||||
}
|
||||
|
||||
private async foreignKeys(schemaName: string): Promise<
|
||||
private async foreignKeys(
|
||||
schemaName: string,
|
||||
scopedNames: readonly string[] | null,
|
||||
): Promise<
|
||||
Array<{
|
||||
table_name: string;
|
||||
column_name: string;
|
||||
|
|
@ -576,6 +610,7 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
constraint_name: string;
|
||||
}>
|
||||
> {
|
||||
const tableScope = tableScopeSql(scopedNames, 'fk.TABLE_NAME');
|
||||
return this.queryRaw(
|
||||
`
|
||||
SELECT
|
||||
|
|
@ -596,13 +631,15 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
AND pk.CONSTRAINT_NAME = rc.UNIQUE_CONSTRAINT_NAME
|
||||
AND pk.ORDINAL_POSITION = fk.ORDINAL_POSITION
|
||||
WHERE fk.TABLE_SCHEMA = @schemaName
|
||||
${tableScope.clause}
|
||||
ORDER BY fk.TABLE_NAME, fk.COLUMN_NAME
|
||||
`,
|
||||
{ schemaName },
|
||||
{ schemaName, ...tableScope.params },
|
||||
);
|
||||
}
|
||||
|
||||
private async rowCounts(schemaName: string): Promise<Map<string, number>> {
|
||||
private async rowCounts(schemaName: string, scopedNames: readonly string[] | null): Promise<Map<string, number>> {
|
||||
const tableScope = tableScopeSql(scopedNames, 't.name');
|
||||
const rows = await this.queryRaw<{ table_name: string; row_count: unknown }>(
|
||||
`
|
||||
SELECT t.name AS table_name, SUM(p.rows) AS row_count
|
||||
|
|
@ -611,9 +648,10 @@ export class KtxSqlServerScanConnector implements KtxScanConnector {
|
|||
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
|
||||
WHERE s.name = @schemaName
|
||||
AND p.index_id IN (0, 1)
|
||||
${tableScope.clause}
|
||||
GROUP BY t.name
|
||||
`,
|
||||
{ schemaName },
|
||||
{ schemaName, ...tableScope.params },
|
||||
);
|
||||
return new Map(rows.map((row) => [row.table_name, firstNumber(row.row_count) ?? 0]));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type {
|
||||
LiveDatabaseIntrospectionOptions,
|
||||
LiveDatabaseIntrospectionPort,
|
||||
} from '../../context/ingest/adapters/live-database/types.js';
|
||||
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
|
||||
import {
|
||||
KtxSqlServerScanConnector,
|
||||
|
|
@ -18,7 +21,7 @@ export function createSqlServerLiveDatabaseIntrospection(
|
|||
options: CreateSqlServerLiveDatabaseIntrospectionOptions,
|
||||
): LiveDatabaseIntrospectionPort {
|
||||
return {
|
||||
async extractSchema(connectionId: string) {
|
||||
async extractSchema(connectionId: string, introspectionOptions?: LiveDatabaseIntrospectionOptions) {
|
||||
const connection = options.connections[connectionId] as KtxSqlServerConnectionConfig | undefined;
|
||||
const connector = new KtxSqlServerScanConnector({
|
||||
connectionId,
|
||||
|
|
@ -29,7 +32,11 @@ export function createSqlServerLiveDatabaseIntrospection(
|
|||
});
|
||||
try {
|
||||
return await connector.introspect(
|
||||
{ connectionId, driver: 'sqlserver' },
|
||||
{
|
||||
connectionId,
|
||||
driver: 'sqlserver',
|
||||
...(introspectionOptions?.tableScope ? { tableScope: introspectionOptions.tableScope } : {}),
|
||||
},
|
||||
{ runId: `sqlserver-${connectionId}` },
|
||||
);
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -319,7 +319,8 @@ function renderPhaseRow(phase: PhaseState, frame: number, styled: boolean): stri
|
|||
} else if (phase.status === 'skipped') {
|
||||
trailing = styled ? dim('skipped') : 'skipped';
|
||||
} else if (phase.status === 'failed') {
|
||||
trailing = styled ? red('failed') : 'failed';
|
||||
const label = styled ? red('failed') : 'failed';
|
||||
trailing = phase.summary ? `${label} ${phase.summary}` : label;
|
||||
}
|
||||
const bar = `${segments.join(' ')} ${trailing}`.trimEnd();
|
||||
return ` ${icon} ${name} ${bar}`;
|
||||
|
|
|
|||
|
|
@ -8,7 +8,14 @@ import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
|
|||
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../../context/project/project.js';
|
||||
import { filterSnapshotTables, resolveEnabledTables } from './enabled-tables.js';
|
||||
import { getLocalScanReport, getLocalScanStatus, runLocalScan } from './local-scan.js';
|
||||
import type { KtxQueryResult, KtxReadOnlyQueryInput, KtxSchemaSnapshot, KtxSchemaTable } from './types.js';
|
||||
import { tableRefKey, tableRefSet } from './table-ref.js';
|
||||
import type {
|
||||
KtxQueryResult,
|
||||
KtxReadOnlyQueryInput,
|
||||
KtxScanConnector,
|
||||
KtxSchemaSnapshot,
|
||||
KtxSchemaTable,
|
||||
} from './types.js';
|
||||
|
||||
function relationshipSqlResult(
|
||||
input: KtxReadOnlyQueryInput,
|
||||
|
|
@ -551,6 +558,142 @@ describe('local scan', () => {
|
|||
expect(result.report.warnings).toEqual([]);
|
||||
});
|
||||
|
||||
it('keeps prototype connector methods when enabled_tables is configured', async () => {
|
||||
project.config.connections.warehouse = {
|
||||
...project.config.connections.warehouse,
|
||||
enabled_tables: ['public.customers', 'public.orders'],
|
||||
};
|
||||
const scopedAdapter: SourceAdapter = {
|
||||
source: 'live-database',
|
||||
skillNames: ['live_database_ingest'],
|
||||
async fetch(_pullConfig, stagedDir) {
|
||||
await mkdir(join(stagedDir, 'tables'), { recursive: true });
|
||||
await writeFile(
|
||||
join(stagedDir, 'connection.json'),
|
||||
'{"connectionId":"warehouse","driver":"postgres","scope":{"schemas":["public"]},"metadata":{}}\n',
|
||||
'utf-8',
|
||||
);
|
||||
await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8');
|
||||
await writeFile(
|
||||
join(stagedDir, 'tables', 'customers.json'),
|
||||
'{"name":"customers","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":100,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
|
||||
'utf-8',
|
||||
);
|
||||
await writeFile(
|
||||
join(stagedDir, 'tables', 'orders.json'),
|
||||
'{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":1000,"columns":[{"name":"customer_id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":false,"comment":null}],"foreignKeys":[]}\n',
|
||||
'utf-8',
|
||||
);
|
||||
},
|
||||
async detect() {
|
||||
return true;
|
||||
},
|
||||
async chunk() {
|
||||
return {
|
||||
workUnits: [
|
||||
{
|
||||
unitKey: 'live-database-public-customers',
|
||||
rawFiles: ['tables/customers.json'],
|
||||
dependencyPaths: ['connection.json', 'foreign-keys.json'],
|
||||
peerFileIndex: [],
|
||||
},
|
||||
{
|
||||
unitKey: 'live-database-public-orders',
|
||||
rawFiles: ['tables/orders.json'],
|
||||
dependencyPaths: ['connection.json', 'foreign-keys.json'],
|
||||
peerFileIndex: [],
|
||||
},
|
||||
],
|
||||
};
|
||||
},
|
||||
};
|
||||
class FakeClassConnector implements KtxScanConnector {
|
||||
readonly id = 'test:warehouse';
|
||||
readonly driver = 'postgres' as const;
|
||||
readonly capabilities = {
|
||||
structuralIntrospection: true as const,
|
||||
tableSampling: false,
|
||||
columnSampling: false,
|
||||
columnStats: true,
|
||||
readOnlySql: true,
|
||||
nestedAnalysis: false,
|
||||
eventStreamDiscovery: false,
|
||||
formalForeignKeys: false,
|
||||
estimatedRowCounts: true,
|
||||
};
|
||||
|
||||
async introspect(): Promise<KtxSchemaSnapshot> {
|
||||
return {
|
||||
connectionId: 'warehouse',
|
||||
driver: 'postgres',
|
||||
extractedAt: '2026-05-22T00:00:00.000Z',
|
||||
scope: { schemas: ['public'] },
|
||||
metadata: {},
|
||||
tables: [
|
||||
{
|
||||
catalog: null,
|
||||
db: 'public',
|
||||
name: 'customers',
|
||||
kind: 'table',
|
||||
comment: null,
|
||||
estimatedRows: 100,
|
||||
foreignKeys: [],
|
||||
columns: [
|
||||
{
|
||||
name: 'id',
|
||||
nativeType: 'integer',
|
||||
normalizedType: 'integer',
|
||||
dimensionType: 'number',
|
||||
nullable: false,
|
||||
primaryKey: true,
|
||||
comment: null,
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
catalog: null,
|
||||
db: 'public',
|
||||
name: 'orders',
|
||||
kind: 'table',
|
||||
comment: null,
|
||||
estimatedRows: 1000,
|
||||
foreignKeys: [],
|
||||
columns: [
|
||||
{
|
||||
name: 'customer_id',
|
||||
nativeType: 'integer',
|
||||
normalizedType: 'integer',
|
||||
dimensionType: 'number',
|
||||
nullable: false,
|
||||
primaryKey: false,
|
||||
comment: null,
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
async executeReadOnly(input: KtxReadOnlyQueryInput): Promise<KtxQueryResult> {
|
||||
return relationshipSqlResult(input);
|
||||
}
|
||||
}
|
||||
|
||||
const result = await runLocalScan({
|
||||
project,
|
||||
adapters: [scopedAdapter],
|
||||
connectionId: 'warehouse',
|
||||
mode: 'relationships',
|
||||
detectRelationships: true,
|
||||
connector: new FakeClassConnector(),
|
||||
jobId: 'scan-prototype-connector-scope',
|
||||
now: () => new Date('2026-05-22T00:00:00.000Z'),
|
||||
});
|
||||
|
||||
expect(result.report.relationships.accepted).toBe(1);
|
||||
expect(result.report.warnings).toEqual([]);
|
||||
});
|
||||
|
||||
it('threads scan relationship settings into relationship-only local scans', async () => {
|
||||
project.config.scan.enrichment = { mode: 'deterministic' };
|
||||
project.config.scan.relationships = {
|
||||
|
|
@ -1512,15 +1655,15 @@ describe('resolveEnabledTables', () => {
|
|||
expect(resolveEnabledTables({ driver: 'postgres', enabled_tables: [] })).toBeNull();
|
||||
});
|
||||
|
||||
it('returns Set of enabled table names', () => {
|
||||
it('returns a canonical set of enabled table refs', () => {
|
||||
const result = resolveEnabledTables({
|
||||
driver: 'postgres',
|
||||
enabled_tables: ['public.users', 'public.orders'],
|
||||
});
|
||||
expect(result).toBeInstanceOf(Set);
|
||||
expect(result!.size).toBe(2);
|
||||
expect(result!.has('public.users')).toBe(true);
|
||||
expect(result!.has('public.orders')).toBe(true);
|
||||
expect(result!.has(tableRefKey({ catalog: null, db: 'public', name: 'users' }))).toBe(true);
|
||||
expect(result!.has(tableRefKey({ catalog: null, db: 'public', name: 'orders' }))).toBe(true);
|
||||
});
|
||||
|
||||
it('returns null for undefined connection', () => {
|
||||
|
|
@ -1557,7 +1700,10 @@ describe('filterSnapshotTables', () => {
|
|||
{ db: 'public', name: 'orders' },
|
||||
{ db: 'public', name: 'logs' },
|
||||
]);
|
||||
const enabled = new Set(['public.users', 'public.orders']);
|
||||
const enabled = tableRefSet([
|
||||
{ catalog: null, db: 'public', name: 'users' },
|
||||
{ catalog: null, db: 'public', name: 'orders' },
|
||||
]);
|
||||
const filtered = filterSnapshotTables(snapshot, enabled);
|
||||
expect(filtered.tables).toHaveLength(2);
|
||||
expect(filtered.tables.map((t) => t.name)).toEqual(['users', 'orders']);
|
||||
|
|
@ -1565,14 +1711,14 @@ describe('filterSnapshotTables', () => {
|
|||
|
||||
it('returns empty tables when none match', () => {
|
||||
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
|
||||
const enabled = new Set(['public.orders']);
|
||||
const enabled = tableRefSet([{ catalog: null, db: 'public', name: 'orders' }]);
|
||||
const filtered = filterSnapshotTables(snapshot, enabled);
|
||||
expect(filtered.tables).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('preserves other snapshot fields', () => {
|
||||
const snapshot = makeSnapshot([{ db: 'public', name: 'users' }]);
|
||||
const enabled = new Set(['public.users']);
|
||||
const enabled = tableRefSet([{ catalog: null, db: 'public', name: 'users' }]);
|
||||
const filtered = filterSnapshotTables(snapshot, enabled);
|
||||
expect(filtered.connectionId).toBe('test');
|
||||
expect(filtered.driver).toBe('postgres');
|
||||
|
|
|
|||
|
|
@ -25,14 +25,11 @@ import type {
|
|||
KtxConnectionDriver,
|
||||
KtxProgressPort,
|
||||
KtxScanConnector,
|
||||
KtxScanContext,
|
||||
KtxScanEnrichmentStateSummary,
|
||||
KtxScanInput,
|
||||
KtxScanMode,
|
||||
KtxScanReport,
|
||||
KtxScanTrigger,
|
||||
KtxScanWarning,
|
||||
KtxSchemaSnapshot,
|
||||
} from './types.js';
|
||||
|
||||
function enrichmentResolutionWarning(
|
||||
|
|
@ -370,17 +367,6 @@ async function readScanReport(
|
|||
}
|
||||
}
|
||||
|
||||
function createFilteredConnector(connector: KtxScanConnector, enabledTables: Set<string>): KtxScanConnector {
|
||||
return {
|
||||
...connector,
|
||||
async introspect(input: KtxScanInput, ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
|
||||
const snapshot = await connector.introspect(input, ctx);
|
||||
return filterSnapshotTables(snapshot, enabledTables);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
function withInternalLiveDatabaseAdapter(project: KtxLocalProject): KtxLocalProject {
|
||||
if (project.config.ingest.adapters.includes(LIVE_DATABASE_ADAPTER)) {
|
||||
return project;
|
||||
|
|
@ -408,8 +394,8 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
|
|||
throw new Error(`Connection "${options.connectionId}" is not configured in ktx.yaml`);
|
||||
}
|
||||
const driver = normalizeDriver(connection.driver);
|
||||
const enabledTables = resolveEnabledTables(connection);
|
||||
const connector = rawConnector && enabledTables ? createFilteredConnector(rawConnector, enabledTables) : rawConnector;
|
||||
const tableScope = resolveEnabledTables(connection) ?? undefined;
|
||||
const connector = rawConnector;
|
||||
const adapters =
|
||||
options.adapters ??
|
||||
createDefaultLocalIngestAdapters(options.project, { databaseIntrospectionUrl: options.databaseIntrospectionUrl });
|
||||
|
|
@ -476,8 +462,8 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
|
|||
rawSourcesDir: report.artifactPaths.rawSourcesDir,
|
||||
extractedAtFallback: report.createdAt,
|
||||
});
|
||||
const structuralSnapshot = enabledTables ? filterSnapshotTables(rawSnapshot, enabledTables) : rawSnapshot;
|
||||
if (enabledTables && structuralSnapshot.tables.length < rawSnapshot.tables.length) {
|
||||
const structuralSnapshot = tableScope ? filterSnapshotTables(rawSnapshot, tableScope) : rawSnapshot;
|
||||
if (tableScope && structuralSnapshot.tables.length < rawSnapshot.tables.length) {
|
||||
const excluded = rawSnapshot.tables.length - structuralSnapshot.tables.length;
|
||||
let remaining = excluded;
|
||||
const ds = report.diffSummary;
|
||||
|
|
|
|||
|
|
@ -12,10 +12,12 @@ export type KtxTableRefKey = string & { readonly __brand: 'KtxTableRefKey' };
|
|||
|
||||
const SEPARATOR = '\x1f';
|
||||
|
||||
/** @internal */
|
||||
export function tableRefKey(ref: KtxTableRef): KtxTableRefKey {
|
||||
return `${ref.catalog ?? ''}${SEPARATOR}${ref.db ?? ''}${SEPARATOR}${ref.name}` as KtxTableRefKey;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export function tableRefFromKey(key: KtxTableRefKey): KtxTableRef {
|
||||
const [catalog = '', db = '', name = ''] = key.split(SEPARATOR);
|
||||
return {
|
||||
|
|
|
|||
|
|
@ -123,22 +123,22 @@ function createKtxCliLiveDatabaseIntrospection(
|
|||
async extractSchema(connectionId: string, options?: LiveDatabaseIntrospectionOptions) {
|
||||
const connection = project.config.connections[connectionId];
|
||||
if (isKtxPostgresConnectionConfig(connection)) {
|
||||
return postgres.extractSchema(connectionId);
|
||||
return postgres.extractSchema(connectionId, options);
|
||||
}
|
||||
if (isKtxSqliteConnectionConfig(connection)) {
|
||||
return sqlite.extractSchema(connectionId);
|
||||
return sqlite.extractSchema(connectionId, options);
|
||||
}
|
||||
if (isKtxMysqlConnectionConfig(connection)) {
|
||||
return mysql.extractSchema(connectionId);
|
||||
return mysql.extractSchema(connectionId, options);
|
||||
}
|
||||
if (isKtxClickHouseConnectionConfig(connection)) {
|
||||
return clickhouse.extractSchema(connectionId);
|
||||
return clickhouse.extractSchema(connectionId, options);
|
||||
}
|
||||
if (isKtxSqlServerConnectionConfig(connection)) {
|
||||
return sqlserver.extractSchema(connectionId);
|
||||
return sqlserver.extractSchema(connectionId, options);
|
||||
}
|
||||
if (isKtxBigQueryConnectionConfig(connection)) {
|
||||
return bigquery.extractSchema(connectionId);
|
||||
return bigquery.extractSchema(connectionId, options);
|
||||
}
|
||||
if (hasSnowflakeDriver(connection)) {
|
||||
const { createSnowflakeLiveDatabaseIntrospection } = await import('./connectors/snowflake/live-database-introspection.js');
|
||||
|
|
@ -150,7 +150,7 @@ function createKtxCliLiveDatabaseIntrospection(
|
|||
connections: project.config.connections,
|
||||
projectDir: project.projectDir,
|
||||
});
|
||||
return snowflake.extractSchema(connectionId);
|
||||
return snowflake.extractSchema(connectionId, options);
|
||||
}
|
||||
return daemon.extractSchema(connectionId, options);
|
||||
},
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ export async function createKtxCliScanConnector(
|
|||
if (!isKtxSnowflakeConnectionConfig(connection)) {
|
||||
throw invalidConnectionConfigError(connectionId, driver);
|
||||
}
|
||||
return new KtxSnowflakeScanConnector({ connectionId, connection });
|
||||
return new KtxSnowflakeScanConnector({ connectionId, connection, projectDir: project.projectDir });
|
||||
}
|
||||
throw new Error(
|
||||
`Connection "${connectionId}" uses driver "${driver}", which has no native standalone KTX scan connector. Supported drivers: ${SUPPORTED_DRIVERS}.`,
|
||||
|
|
|
|||
|
|
@ -908,7 +908,7 @@ describe('runKtxPublicIngest', () => {
|
|||
expect(io.stdout()).not.toContain('Debug:');
|
||||
});
|
||||
|
||||
it('prints query-history retry guidance for query-history facet failures', async () => {
|
||||
it('skips the query-history facet but keeps the target green when query-history fails', async () => {
|
||||
const io = makeIo();
|
||||
const project = deepReadyProject({
|
||||
warehouse: { driver: 'postgres', context: { depth: 'deep' } },
|
||||
|
|
@ -935,11 +935,13 @@ describe('runKtxPublicIngest', () => {
|
|||
io.io,
|
||||
{ loadProject: vi.fn(async () => project), runScan, runIngest },
|
||||
),
|
||||
).resolves.toBe(1);
|
||||
).resolves.toBe(0);
|
||||
|
||||
expect(io.stdout()).toMatch(/warehouse\s+done\s+failed\s+skipped\s+skipped/);
|
||||
expect(io.stdout()).toContain('Ingest finished with skipped query history');
|
||||
expect(io.stdout()).toMatch(/warehouse\s+done\s+skipped\s+skipped\s+skipped/);
|
||||
expect(io.stdout()).toContain('Skipped query history:');
|
||||
expect(io.stdout()).toContain(
|
||||
'warehouse failed: Query history failed for 60 tasks. First failure: Google Cloud authentication failed while analyzing query history',
|
||||
'Query history failed for 60 tasks. First failure: Google Cloud authentication failed while analyzing query history',
|
||||
);
|
||||
expect(io.stdout()).not.toContain('warehouse failed: Error:');
|
||||
expect(io.stdout()).toContain('Retry: ktx ingest warehouse --project-dir /tmp/project --deep --query-history');
|
||||
|
|
@ -973,8 +975,9 @@ describe('runKtxPublicIngest', () => {
|
|||
io.io,
|
||||
{ loadProject: vi.fn(async () => project), runScan, runIngest },
|
||||
),
|
||||
).resolves.toBe(1);
|
||||
).resolves.toBe(0);
|
||||
|
||||
expect(io.stdout()).toContain('Ingest finished with skipped query history');
|
||||
expect(io.stdout()).toContain('Missing bundled Python runtime manifest');
|
||||
expect(io.stdout()).toContain(
|
||||
'In a source checkout, build the local runtime assets with: pnpm run artifacts:build',
|
||||
|
|
|
|||
|
|
@ -599,17 +599,64 @@ function markTargetResult(
|
|||
};
|
||||
}
|
||||
|
||||
function markTargetWithSkippedQueryHistory(
|
||||
target: KtxPublicIngestPlanTarget,
|
||||
args: Extract<KtxPublicIngestArgs, { command: 'run' }>,
|
||||
detail: string,
|
||||
): KtxPublicIngestTargetResult {
|
||||
const baseline = markTargetResult(target, args, 'done');
|
||||
return {
|
||||
...baseline,
|
||||
steps: baseline.steps.map((step) =>
|
||||
step.operation === 'query-history' ? { ...step, status: 'skipped', detail } : step,
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
function queryHistoryFailureDetail(input: {
|
||||
target: KtxPublicIngestPlanTarget;
|
||||
args: Extract<KtxPublicIngestArgs, { command: 'run' }>;
|
||||
capturedOutput?: string;
|
||||
}): string {
|
||||
const captured = capturedFailureMessage(input.capturedOutput ?? '');
|
||||
return failureDetailWithRetry({
|
||||
target: input.target,
|
||||
args: input.args,
|
||||
failedOperation: 'query-history',
|
||||
failureDetail: captured,
|
||||
});
|
||||
}
|
||||
|
||||
function resultFailed(result: KtxPublicIngestTargetResult): boolean {
|
||||
return result.steps.some((step) => step.status === 'failed');
|
||||
}
|
||||
|
||||
function resultSkippedQueryHistory(
|
||||
result: KtxPublicIngestTargetResult,
|
||||
): { connectionId: string; detail: string } | null {
|
||||
const skipped = result.steps.find(
|
||||
(step) => step.operation === 'query-history' && step.status === 'skipped' && step.detail !== undefined,
|
||||
);
|
||||
return skipped?.detail ? { connectionId: result.connectionId, detail: skipped.detail } : null;
|
||||
}
|
||||
|
||||
function stepStatus(result: KtxPublicIngestTargetResult, operation: KtxPublicIngestStepName): string {
|
||||
return result.steps.find((step) => step.operation === operation)?.status ?? 'not-run';
|
||||
}
|
||||
|
||||
function renderPlainResults(results: KtxPublicIngestTargetResult[], io: KtxCliIo): void {
|
||||
const failures = results.filter(resultFailed);
|
||||
io.stdout.write(failures.length > 0 ? 'Ingest finished with partial failures\n' : 'Ingest finished\n');
|
||||
const skippedQueryHistory = results.map(resultSkippedQueryHistory).filter((entry) => entry !== null) as Array<{
|
||||
connectionId: string;
|
||||
detail: string;
|
||||
}>;
|
||||
const headerSuffix =
|
||||
failures.length > 0
|
||||
? ' with partial failures'
|
||||
: skippedQueryHistory.length > 0
|
||||
? ' with skipped query history'
|
||||
: '';
|
||||
io.stdout.write(`Ingest finished${headerSuffix}\n`);
|
||||
io.stdout.write('\n');
|
||||
io.stdout.write('Source Database schema Query history Source ingest Memory update\n');
|
||||
for (const result of results) {
|
||||
|
|
@ -624,17 +671,22 @@ function renderPlainResults(results: KtxPublicIngestTargetResult[], io: KtxCliIo
|
|||
);
|
||||
}
|
||||
|
||||
if (failures.length === 0) {
|
||||
return;
|
||||
if (failures.length > 0) {
|
||||
io.stdout.write('\nFailed sources:\n');
|
||||
for (const result of failures) {
|
||||
const failedStep = result.steps.find((step) => step.status === 'failed');
|
||||
if (!failedStep) {
|
||||
continue;
|
||||
}
|
||||
io.stdout.write(` ${failedStep.detail ?? `${result.connectionId} failed.`}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
io.stdout.write('\nFailed sources:\n');
|
||||
for (const result of failures) {
|
||||
const failedStep = result.steps.find((step) => step.status === 'failed');
|
||||
if (!failedStep) {
|
||||
continue;
|
||||
if (skippedQueryHistory.length > 0) {
|
||||
io.stdout.write('\nSkipped query history:\n');
|
||||
for (const { detail } of skippedQueryHistory) {
|
||||
io.stdout.write(` ${detail}\n`);
|
||||
}
|
||||
io.stdout.write(` ${failedStep.detail ?? `${result.connectionId} failed.`}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -814,14 +866,13 @@ export async function executePublicIngestTarget(
|
|||
? await runIngest(ingestArgs, ingestIo, ingestDeps)
|
||||
: await runIngest(ingestArgs, ingestIo);
|
||||
if (qhExitCode !== 0) {
|
||||
deps.onPhaseEnd?.('query-history', 'failed');
|
||||
return markTargetResult(
|
||||
const detail = queryHistoryFailureDetail({
|
||||
target,
|
||||
args,
|
||||
'failed',
|
||||
'query-history',
|
||||
capturedIngestIo ? capturedFailureMessage(capturedIngestIo.capturedOutput()) : undefined,
|
||||
);
|
||||
capturedOutput: capturedIngestIo ? capturedIngestIo.capturedOutput() : undefined,
|
||||
});
|
||||
deps.onPhaseEnd?.('query-history', 'failed', detail);
|
||||
return markTargetWithSkippedQueryHistory(target, args, detail);
|
||||
}
|
||||
deps.onPhaseEnd?.('query-history', 'done');
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2041,6 +2041,7 @@ describe('setup databases step', () => {
|
|||
|
||||
it('writes query history config for supported Snowflake databases after validation succeeds', async () => {
|
||||
const io = makeIo();
|
||||
const historicSqlProbe = vi.fn(async () => ({ ok: true, lines: [] }));
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
projectDir: tempDir,
|
||||
|
|
@ -2058,12 +2059,20 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe,
|
||||
prompts: makePromptAdapter({
|
||||
textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''],
|
||||
passwordValues: ['env:SNOWFLAKE_PASSWORD'],
|
||||
}),
|
||||
},
|
||||
);
|
||||
expect(historicSqlProbe).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'snowflake',
|
||||
dialect: 'snowflake',
|
||||
}),
|
||||
);
|
||||
|
||||
expect(result.status).toBe('ready');
|
||||
const configText = await readFile(join(tempDir, 'ktx.yaml'), 'utf-8');
|
||||
|
|
@ -2453,7 +2462,53 @@ describe('setup databases step', () => {
|
|||
expect(io.stdout()).toContain('Query history probe...');
|
||||
expect(io.stdout()).not.toContain('Historic SQL probe...');
|
||||
expect(io.stdout()).toContain('pg_stat_statements extension is not installed');
|
||||
expect(io.stdout()).toContain('Setup written; first ingest run will fail until fixed.');
|
||||
expect(io.stdout()).toContain('Setup written; query history will be skipped until fixed.');
|
||||
});
|
||||
|
||||
it('prints a non-blocking Snowflake query history probe failure with the grants remediation', async () => {
|
||||
const io = makeIo();
|
||||
const historicSqlProbe = vi.fn(async () => ({
|
||||
ok: false,
|
||||
lines: [
|
||||
' FAIL Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
' Fix: Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE <connection role>;',
|
||||
],
|
||||
}));
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
projectDir: tempDir,
|
||||
inputMode: 'disabled',
|
||||
databaseDrivers: ['snowflake'],
|
||||
databaseConnectionId: 'warehouse',
|
||||
databaseSchemas: [],
|
||||
enableQueryHistory: true,
|
||||
skipDatabases: false,
|
||||
},
|
||||
io.io,
|
||||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe,
|
||||
prompts: makePromptAdapter({
|
||||
textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''],
|
||||
passwordValues: ['env:SNOWFLAKE_PASSWORD'],
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe('ready');
|
||||
expect(historicSqlProbe).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'warehouse',
|
||||
dialect: 'snowflake',
|
||||
}),
|
||||
);
|
||||
expect(io.stdout()).toContain('Query history probe...');
|
||||
expect(io.stdout()).toContain('Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY');
|
||||
expect(io.stdout()).toContain('GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE');
|
||||
expect(io.stdout()).toContain('Setup written; query history will be skipped until fixed.');
|
||||
});
|
||||
|
||||
it('does not run the query history probe when the regular connection test fails', async () => {
|
||||
|
|
|
|||
|
|
@ -341,6 +341,13 @@ function historicSqlProbeFailureLines(error: unknown): string[] {
|
|||
];
|
||||
}
|
||||
if (error instanceof Error && error.name === 'HistoricSqlGrantsMissingError') {
|
||||
const dialect = (error as { dialect?: unknown }).dialect;
|
||||
if (dialect === 'snowflake') {
|
||||
return [
|
||||
' FAIL Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
' Fix: Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE <connection role>;',
|
||||
];
|
||||
}
|
||||
return [
|
||||
' FAIL Postgres connection role lacks pg_read_all_stats',
|
||||
' Fix: Run: GRANT pg_read_all_stats TO <connection role>;',
|
||||
|
|
@ -353,10 +360,18 @@ function historicSqlProbeFailureLines(error: unknown): string[] {
|
|||
}
|
||||
|
||||
async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Promise<KtxSetupHistoricSqlProbeResult> {
|
||||
if (input.dialect !== 'postgres') {
|
||||
return { ok: true, lines: [] };
|
||||
if (input.dialect === 'postgres') {
|
||||
return probePostgresHistoricSql(input);
|
||||
}
|
||||
if (input.dialect === 'snowflake') {
|
||||
return probeSnowflakeHistoricSql(input);
|
||||
}
|
||||
return { ok: true, lines: [] };
|
||||
}
|
||||
|
||||
async function probePostgresHistoricSql(
|
||||
input: KtxSetupHistoricSqlProbeInput,
|
||||
): Promise<KtxSetupHistoricSqlProbeResult> {
|
||||
const project = await loadKtxProject({ projectDir: input.projectDir });
|
||||
const connection = project.config.connections[input.connectionId];
|
||||
const [{ PostgresPgssReader }, { KtxPostgresHistoricSqlQueryClient }, { isKtxPostgresConnectionConfig }] =
|
||||
|
|
@ -394,6 +409,46 @@ async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Pr
|
|||
}
|
||||
}
|
||||
|
||||
async function probeSnowflakeHistoricSql(
|
||||
input: KtxSetupHistoricSqlProbeInput,
|
||||
): Promise<KtxSetupHistoricSqlProbeResult> {
|
||||
const project = await loadKtxProject({ projectDir: input.projectDir });
|
||||
const connection = project.config.connections[input.connectionId];
|
||||
const [{ SnowflakeHistoricSqlQueryHistoryReader }, { KtxSnowflakeHistoricSqlQueryClient }, { isKtxSnowflakeConnectionConfig }] =
|
||||
await Promise.all([
|
||||
import('./context/ingest/adapters/historic-sql/snowflake-query-history-reader.js'),
|
||||
import('./connectors/snowflake/historic-sql-query-client.js'),
|
||||
import('./connectors/snowflake/connector.js'),
|
||||
]);
|
||||
|
||||
if (!isKtxSnowflakeConnectionConfig(connection)) {
|
||||
return {
|
||||
ok: false,
|
||||
lines: [` FAIL Connection ${input.connectionId} is not a native Snowflake connection.`],
|
||||
};
|
||||
}
|
||||
|
||||
const client = new KtxSnowflakeHistoricSqlQueryClient({
|
||||
connectionId: input.connectionId,
|
||||
connection,
|
||||
projectDir: input.projectDir,
|
||||
});
|
||||
try {
|
||||
const result = await new SnowflakeHistoricSqlQueryHistoryReader().probe(client);
|
||||
return {
|
||||
ok: true,
|
||||
lines: [
|
||||
' OK SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY accessible',
|
||||
...result.warnings.map((warning: string) => ` ! ${warning}`),
|
||||
],
|
||||
};
|
||||
} catch (error) {
|
||||
return { ok: false, lines: historicSqlProbeFailureLines(error) };
|
||||
} finally {
|
||||
await client.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
async function defaultListSchemas(projectDir: string, connectionId: string): Promise<string[]> {
|
||||
const project = await loadKtxProject({ projectDir });
|
||||
const connection = project.config.connections[connectionId];
|
||||
|
|
@ -457,7 +512,7 @@ async function defaultListSchemas(projectDir: string, connectionId: string): Pro
|
|||
if (driver === 'snowflake') {
|
||||
const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');;
|
||||
if (!isKtxSnowflakeConnectionConfig(connection)) return [];
|
||||
const connector = new KtxSnowflakeScanConnector({ connectionId, connection });
|
||||
const connector = new KtxSnowflakeScanConnector({ connectionId, connection, projectDir });
|
||||
try {
|
||||
return await connector.listSchemas();
|
||||
} finally {
|
||||
|
|
@ -533,7 +588,7 @@ async function defaultListTables(
|
|||
if (driver === 'snowflake') {
|
||||
const { KtxSnowflakeScanConnector, isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');;
|
||||
if (!isKtxSnowflakeConnectionConfig(connection)) return [];
|
||||
const connector = new KtxSnowflakeScanConnector({ connectionId, connection });
|
||||
const connector = new KtxSnowflakeScanConnector({ connectionId, connection, projectDir });
|
||||
try {
|
||||
return await connector.listTables(schemas);
|
||||
} finally {
|
||||
|
|
@ -1651,7 +1706,12 @@ async function maybeRunHistoricSqlSetupProbe(input: {
|
|||
const connection = project.config.connections[input.connectionId];
|
||||
const queryHistory = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection);
|
||||
const driver = normalizeDriver(connection?.driver);
|
||||
if (queryHistory?.enabled !== true || driver !== 'postgres') {
|
||||
if (queryHistory?.enabled !== true) {
|
||||
return;
|
||||
}
|
||||
const dialect: 'postgres' | 'snowflake' | null =
|
||||
driver === 'postgres' ? 'postgres' : driver === 'snowflake' ? 'snowflake' : null;
|
||||
if (!dialect) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -1660,13 +1720,13 @@ async function maybeRunHistoricSqlSetupProbe(input: {
|
|||
const result = await probe({
|
||||
projectDir: input.projectDir,
|
||||
connectionId: input.connectionId,
|
||||
dialect: 'postgres',
|
||||
dialect,
|
||||
});
|
||||
for (const line of result.lines) {
|
||||
input.io.stdout.write(`│${line}\n`);
|
||||
}
|
||||
if (!result.ok) {
|
||||
input.io.stdout.write('│ Setup written; first ingest run will fail until fixed.\n');
|
||||
input.io.stdout.write('│ Setup written; query history will be skipped until fixed.\n');
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue