mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-13 08:15:14 +02:00
719 lines
25 KiB
TypeScript
719 lines
25 KiB
TypeScript
import { createPrivateKey } from 'node:crypto';
|
|
import { readFileSync } from 'node:fs';
|
|
import { homedir } from 'node:os';
|
|
import { resolve } from 'node:path';
|
|
import { assertReadOnlySql, limitSqlForExecution } from '@ktx/context/connections';
|
|
import {
|
|
createKtxConnectorCapabilities,
|
|
type KtxColumnSampleInput,
|
|
type KtxColumnSampleResult,
|
|
type KtxColumnStatsInput,
|
|
type KtxColumnStatsResult,
|
|
type KtxQueryResult,
|
|
type KtxReadOnlyQueryInput,
|
|
type KtxScanConnector,
|
|
type KtxScanContext,
|
|
type KtxScanInput,
|
|
type KtxSchemaColumn,
|
|
type KtxSchemaSnapshot,
|
|
type KtxSchemaTable,
|
|
type KtxTableRef,
|
|
type KtxTableSampleInput,
|
|
type KtxTableListEntry,
|
|
type KtxTableSampleResult,
|
|
} from '@ktx/context/scan';
|
|
import * as snowflake from 'snowflake-sdk';
|
|
import { KtxSnowflakeDialect } from './dialect.js';
|
|
|
|
export interface KtxSnowflakeConnectionConfig {
|
|
driver?: string;
|
|
authMethod?: 'password' | 'rsa';
|
|
account?: string;
|
|
warehouse?: string;
|
|
database?: string;
|
|
schema_name?: string;
|
|
schema_names?: string[];
|
|
username?: string;
|
|
password?: string;
|
|
privateKey?: string;
|
|
passphrase?: string;
|
|
role?: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
export interface KtxSnowflakeResolvedConnectionConfig {
|
|
authMethod: 'password' | 'rsa';
|
|
account: string;
|
|
warehouse: string;
|
|
database: string;
|
|
schemas: string[];
|
|
username: string;
|
|
password?: string;
|
|
privateKey?: string;
|
|
passphrase?: string;
|
|
role?: string;
|
|
}
|
|
|
|
export interface KtxSnowflakeRawColumnMetadata {
|
|
name: string;
|
|
type: string;
|
|
nullable: boolean;
|
|
comment: string | null;
|
|
}
|
|
|
|
export interface KtxSnowflakeRawTableMetadata {
|
|
name: string;
|
|
catalog: string;
|
|
db: string;
|
|
rowCount: number | null;
|
|
comment: string | null;
|
|
columns: KtxSnowflakeRawColumnMetadata[];
|
|
}
|
|
|
|
export interface KtxSnowflakeDriver {
|
|
test(): Promise<{ success: boolean; error?: string }>;
|
|
query(sql: string, params?: unknown): Promise<KtxQueryResult>;
|
|
getSchemaMetadata(schemaName?: string): Promise<KtxSnowflakeRawTableMetadata[]>;
|
|
listSchemas(): Promise<string[]>;
|
|
listTables(schemas?: string[]): Promise<KtxTableListEntry[]>;
|
|
cleanup(): Promise<void>;
|
|
}
|
|
|
|
export interface KtxSnowflakeDriverFactory {
|
|
createDriver(input: {
|
|
resolved: KtxSnowflakeResolvedConnectionConfig;
|
|
sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider;
|
|
}): KtxSnowflakeDriver;
|
|
}
|
|
|
|
export interface KtxSnowflakeSdkOptionsProvider {
|
|
resolve(input: {
|
|
account: string;
|
|
connection: KtxSnowflakeConnectionConfig;
|
|
}): Promise<{ sdkOptions: Record<string, unknown>; close?: () => Promise<void> } | undefined>;
|
|
}
|
|
|
|
export interface KtxSnowflakeScanConnectorOptions {
|
|
connectionId: string;
|
|
connection: KtxSnowflakeConnectionConfig | undefined;
|
|
driverFactory?: KtxSnowflakeDriverFactory;
|
|
sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider;
|
|
env?: NodeJS.ProcessEnv;
|
|
now?: () => Date;
|
|
}
|
|
|
|
export interface KtxSnowflakeReadOnlyQueryInput extends KtxReadOnlyQueryInput {
|
|
params?: Record<string, unknown>;
|
|
}
|
|
|
|
export interface KtxSnowflakeColumnDistinctValuesOptions {
|
|
maxCardinality: number;
|
|
limit: number;
|
|
sampleSize?: number;
|
|
}
|
|
|
|
export interface KtxSnowflakeColumnDistinctValuesResult {
|
|
values: string[] | null;
|
|
cardinality: number;
|
|
}
|
|
|
|
const DATE_TYPES = ['DATE', 'TIMESTAMP', 'TIMESTAMP_LTZ', 'TIMESTAMP_NTZ', 'TIMESTAMP_TZ', 'TIME'];
|
|
|
|
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
|
|
if (value.startsWith('env:')) {
|
|
return env[value.slice('env:'.length)] ?? '';
|
|
}
|
|
if (value.startsWith('file:')) {
|
|
const rawPath = value.slice('file:'.length);
|
|
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
|
|
return readFileSync(path, 'utf-8').trim();
|
|
}
|
|
return value;
|
|
}
|
|
|
|
function stringConfigValue(
|
|
connection: KtxSnowflakeConnectionConfig | undefined,
|
|
key: keyof KtxSnowflakeConnectionConfig,
|
|
env: NodeJS.ProcessEnv,
|
|
): string | undefined {
|
|
const value = connection?.[key];
|
|
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
|
|
}
|
|
|
|
function schemaNames(connection: KtxSnowflakeConnectionConfig, env: NodeJS.ProcessEnv): string[] {
|
|
if (Array.isArray(connection.schema_names) && connection.schema_names.length > 0) {
|
|
return connection.schema_names
|
|
.filter((schema) => schema.trim().length > 0)
|
|
.map((schema) => resolveStringReference(schema, env));
|
|
}
|
|
return [stringConfigValue(connection, 'schema_name', env) ?? 'PUBLIC'];
|
|
}
|
|
|
|
function firstNumber(value: unknown): number | null {
|
|
const numberValue = Number(value);
|
|
return Number.isFinite(numberValue) ? numberValue : null;
|
|
}
|
|
|
|
function normalizeSnowflakeValue(value: unknown, columnType?: string): unknown {
|
|
if (columnType && DATE_TYPES.some((type) => columnType.toUpperCase().includes(type))) {
|
|
if (typeof value === 'number') {
|
|
return new Date(value).toISOString();
|
|
}
|
|
if (value instanceof Date) {
|
|
return value.toISOString();
|
|
}
|
|
}
|
|
if (typeof value === 'string') {
|
|
const trimmed = value.trim();
|
|
if ((trimmed.startsWith('{') && trimmed.endsWith('}')) || (trimmed.startsWith('[') && trimmed.endsWith(']'))) {
|
|
try {
|
|
return JSON.parse(trimmed) as unknown;
|
|
} catch {
|
|
return value;
|
|
}
|
|
}
|
|
}
|
|
return value;
|
|
}
|
|
|
|
function toSnowflakeBind(value: unknown): snowflake.Bind {
|
|
if (value === null || typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') {
|
|
return value;
|
|
}
|
|
if (value instanceof Date) {
|
|
return value.toISOString();
|
|
}
|
|
return String(value);
|
|
}
|
|
|
|
function toSnowflakeBinds(params: unknown[] | undefined): snowflake.Binds | undefined {
|
|
return params?.map((value) => toSnowflakeBind(value));
|
|
}
|
|
|
|
export function isKtxSnowflakeConnectionConfig(
|
|
connection: KtxSnowflakeConnectionConfig | undefined,
|
|
): connection is KtxSnowflakeConnectionConfig {
|
|
return String(connection?.driver ?? '').toLowerCase() === 'snowflake';
|
|
}
|
|
|
|
export function snowflakeConnectionConfigFromConfig(input: {
|
|
connectionId: string;
|
|
connection: KtxSnowflakeConnectionConfig | undefined;
|
|
env?: NodeJS.ProcessEnv;
|
|
}): KtxSnowflakeResolvedConnectionConfig {
|
|
const inputDriver = input.connection?.driver ?? 'unknown';
|
|
if (!isKtxSnowflakeConnectionConfig(input.connection)) {
|
|
throw new Error(`Native Snowflake connector cannot run driver "${inputDriver}"`);
|
|
}
|
|
const env = input.env ?? process.env;
|
|
const authMethod = input.connection?.authMethod ?? 'password';
|
|
const account = stringConfigValue(input.connection, 'account', env);
|
|
const warehouse = stringConfigValue(input.connection, 'warehouse', env);
|
|
const database = stringConfigValue(input.connection, 'database', env);
|
|
const username = stringConfigValue(input.connection, 'username', env);
|
|
if (!account) {
|
|
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.account`);
|
|
}
|
|
if (!warehouse) {
|
|
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.warehouse`);
|
|
}
|
|
if (!database) {
|
|
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.database`);
|
|
}
|
|
if (!username) {
|
|
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.username`);
|
|
}
|
|
const resolved: KtxSnowflakeResolvedConnectionConfig = {
|
|
authMethod,
|
|
account,
|
|
warehouse,
|
|
database,
|
|
schemas: schemaNames(input.connection!, env),
|
|
username,
|
|
};
|
|
const role = stringConfigValue(input.connection, 'role', env);
|
|
if (role) {
|
|
resolved.role = role;
|
|
}
|
|
if (authMethod === 'rsa') {
|
|
resolved.privateKey = stringConfigValue(input.connection, 'privateKey', env);
|
|
const passphrase = stringConfigValue(input.connection, 'passphrase', env);
|
|
if (passphrase) {
|
|
resolved.passphrase = passphrase;
|
|
}
|
|
if (!resolved.privateKey) {
|
|
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.privateKey for RSA auth`);
|
|
}
|
|
} else {
|
|
resolved.password = stringConfigValue(input.connection, 'password', env);
|
|
if (!resolved.password) {
|
|
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.password`);
|
|
}
|
|
}
|
|
return resolved;
|
|
}
|
|
|
|
class DefaultSnowflakeDriverFactory implements KtxSnowflakeDriverFactory {
|
|
createDriver(input: {
|
|
resolved: KtxSnowflakeResolvedConnectionConfig;
|
|
sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider;
|
|
}): KtxSnowflakeDriver {
|
|
return new SnowflakeSdkDriver(input.resolved, input.sdkOptionsProvider);
|
|
}
|
|
}
|
|
|
|
class SnowflakeSdkDriver implements KtxSnowflakeDriver {
|
|
private closeSdkOptions: Array<() => Promise<void>> = [];
|
|
|
|
constructor(
|
|
private readonly resolved: KtxSnowflakeResolvedConnectionConfig,
|
|
private readonly sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider,
|
|
) {}
|
|
|
|
async test(): Promise<{ success: boolean; error?: string }> {
|
|
const timeoutMs = 60_000;
|
|
return Promise.race([
|
|
this.runTest(),
|
|
new Promise<{ success: boolean; error: string }>((resolveTest) =>
|
|
setTimeout(
|
|
() => resolveTest({ success: false, error: `Connection test timed out after ${timeoutMs / 1000}s` }),
|
|
timeoutMs,
|
|
),
|
|
),
|
|
]);
|
|
}
|
|
|
|
async query(sql: string, params?: unknown): Promise<KtxQueryResult> {
|
|
let connection: snowflake.Connection | null = null;
|
|
try {
|
|
connection = await this.createConnection();
|
|
const binds = Array.isArray(params) ? toSnowflakeBinds(params) : undefined;
|
|
const result = await this.executeSnowflakeQuery(connection, sql, binds);
|
|
return { ...result, totalRows: result.rows.length, rowCount: result.rows.length };
|
|
} catch {
|
|
return { headers: [], rows: [], totalRows: 0, rowCount: 0 };
|
|
} finally {
|
|
if (connection) {
|
|
await this.destroyConnection(connection);
|
|
}
|
|
}
|
|
}
|
|
|
|
async getSchemaMetadata(schemaName = this.resolved.schemas[0] ?? 'PUBLIC'): Promise<KtxSnowflakeRawTableMetadata[]> {
|
|
const tablesResult = await this.query(
|
|
`
|
|
SELECT TABLE_NAME, TABLE_TYPE, COMMENT, ROW_COUNT
|
|
FROM INFORMATION_SCHEMA.TABLES
|
|
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
|
|
ORDER BY TABLE_NAME
|
|
`,
|
|
[schemaName, this.resolved.database],
|
|
);
|
|
const columnsResult = await this.query(
|
|
`
|
|
SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COMMENT, ORDINAL_POSITION
|
|
FROM INFORMATION_SCHEMA.COLUMNS
|
|
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
|
|
ORDER BY TABLE_NAME, ORDINAL_POSITION
|
|
`,
|
|
[schemaName, this.resolved.database],
|
|
);
|
|
const columnsByTable = new Map<string, KtxSnowflakeRawColumnMetadata[]>();
|
|
for (const row of columnsResult.rows) {
|
|
const tableName = String(row[0]);
|
|
const columns = columnsByTable.get(tableName) ?? [];
|
|
columns.push({
|
|
name: String(row[1]),
|
|
type: String(row[2]),
|
|
nullable: row[3] === 'YES',
|
|
comment: row[4] ? String(row[4]) : null,
|
|
});
|
|
columnsByTable.set(tableName, columns);
|
|
}
|
|
return tablesResult.rows.map((row) => ({
|
|
name: String(row[0]),
|
|
catalog: this.resolved.database,
|
|
db: schemaName,
|
|
rowCount: firstNumber(row[3]) ?? 0,
|
|
comment: row[2] ? String(row[2]) : null,
|
|
columns: columnsByTable.get(String(row[0])) ?? [],
|
|
}));
|
|
}
|
|
|
|
async listSchemas(): Promise<string[]> {
|
|
const result = await this.query(`SHOW SCHEMAS IN DATABASE "${this.resolved.database}"`);
|
|
return result.rows.map((row) => String(row[1])).filter((name) => name !== 'INFORMATION_SCHEMA');
|
|
}
|
|
|
|
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
|
|
const filterSchemas = schemas ?? (await this.listSchemas());
|
|
if (filterSchemas.length === 0) return [];
|
|
const entries: KtxTableListEntry[] = [];
|
|
for (const schemaName of filterSchemas) {
|
|
const result = await this.query(
|
|
`
|
|
SELECT TABLE_NAME, TABLE_TYPE
|
|
FROM INFORMATION_SCHEMA.TABLES
|
|
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
|
|
ORDER BY TABLE_NAME
|
|
`,
|
|
[schemaName, this.resolved.database],
|
|
);
|
|
for (const row of result.rows) {
|
|
entries.push({
|
|
schema: schemaName,
|
|
name: String(row[0]),
|
|
kind: String(row[1]) === 'VIEW' ? 'view' : 'table',
|
|
});
|
|
}
|
|
}
|
|
return entries;
|
|
}
|
|
|
|
async cleanup(): Promise<void> {
|
|
const closers = this.closeSdkOptions;
|
|
this.closeSdkOptions = [];
|
|
await Promise.all(closers.map((close) => close()));
|
|
}
|
|
|
|
private async runTest(): Promise<{ success: boolean; error?: string }> {
|
|
let connection: snowflake.Connection | null = null;
|
|
try {
|
|
connection = await this.createConnection();
|
|
await this.executeSnowflakeQuery(connection, 'SELECT 1');
|
|
return { success: true };
|
|
} catch (error) {
|
|
return { success: false, error: error instanceof Error ? error.message : String(error) };
|
|
} finally {
|
|
if (connection) {
|
|
await this.destroyConnection(connection);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async createConnection(): Promise<snowflake.Connection> {
|
|
const patch = await this.sdkOptionsProvider?.resolve({
|
|
account: this.resolved.account,
|
|
connection: { ...this.resolved, driver: 'snowflake' },
|
|
});
|
|
if (patch?.close) {
|
|
this.closeSdkOptions.push(patch.close);
|
|
}
|
|
const baseConfig: snowflake.ConnectionOptions = {
|
|
account: this.resolved.account,
|
|
username: this.resolved.username,
|
|
warehouse: this.resolved.warehouse,
|
|
database: this.resolved.database,
|
|
schema: this.resolved.schemas[0] ?? 'PUBLIC',
|
|
role: this.resolved.role,
|
|
...patch?.sdkOptions,
|
|
};
|
|
const connectionConfig: snowflake.ConnectionOptions =
|
|
this.resolved.authMethod === 'rsa'
|
|
? { ...baseConfig, authenticator: 'SNOWFLAKE_JWT', privateKey: this.decryptPrivateKey() }
|
|
: { ...baseConfig, password: this.resolved.password };
|
|
const connection = snowflake.createConnection(connectionConfig);
|
|
return new Promise((resolveConnection, rejectConnection) => {
|
|
connection.connect((error, connected) => {
|
|
if (error) {
|
|
rejectConnection(error);
|
|
return;
|
|
}
|
|
const resolvedConnection = connected ?? connection;
|
|
this.setConnectionContext(resolvedConnection).then(
|
|
() => resolveConnection(resolvedConnection),
|
|
(contextError) => {
|
|
resolvedConnection.destroy(() => undefined);
|
|
rejectConnection(contextError);
|
|
},
|
|
);
|
|
});
|
|
});
|
|
}
|
|
|
|
private async setConnectionContext(connection: snowflake.Connection): Promise<void> {
|
|
if (this.resolved.role) {
|
|
await this.executeSnowflakeQuery(connection, `USE ROLE "${this.resolved.role}"`);
|
|
}
|
|
await this.executeSnowflakeQuery(connection, `USE WAREHOUSE "${this.resolved.warehouse}"`);
|
|
await this.executeSnowflakeQuery(connection, `USE DATABASE "${this.resolved.database}"`);
|
|
await this.executeSnowflakeQuery(connection, `USE SCHEMA "${this.resolved.schemas[0] ?? 'PUBLIC'}"`);
|
|
}
|
|
|
|
private async executeSnowflakeQuery(
|
|
connection: snowflake.Connection,
|
|
sqlText: string,
|
|
binds?: snowflake.Binds,
|
|
): Promise<{ headers: string[]; headerTypes?: string[]; rows: unknown[][] }> {
|
|
return new Promise((resolveQuery, rejectQuery) => {
|
|
connection.execute({
|
|
sqlText,
|
|
binds,
|
|
complete: (error, statement, rows) => {
|
|
if (error) {
|
|
rejectQuery(error);
|
|
return;
|
|
}
|
|
const columns = statement.getColumns();
|
|
const headers = columns ? columns.map((column) => column.getName()) : [];
|
|
const headerTypes = columns ? columns.map((column) => column.getType()) : [];
|
|
const normalizedRows = rows
|
|
? rows.map((row) => headers.map((header, index) => normalizeSnowflakeValue(row[header], headerTypes[index])))
|
|
: [];
|
|
resolveQuery({ headers, headerTypes, rows: normalizedRows });
|
|
},
|
|
});
|
|
});
|
|
}
|
|
|
|
private destroyConnection(connection: snowflake.Connection): Promise<void> {
|
|
return new Promise((resolveDestroy, rejectDestroy) => {
|
|
connection.destroy((error) => {
|
|
if (error) {
|
|
rejectDestroy(error);
|
|
return;
|
|
}
|
|
resolveDestroy();
|
|
});
|
|
});
|
|
}
|
|
|
|
private decryptPrivateKey(): string {
|
|
if (!this.resolved.privateKey) {
|
|
throw new Error('Private key is required for RSA authentication');
|
|
}
|
|
const privateKeyObject = createPrivateKey({
|
|
key: this.resolved.privateKey,
|
|
format: 'pem',
|
|
...(this.resolved.passphrase ? { passphrase: this.resolved.passphrase } : {}),
|
|
});
|
|
return privateKeyObject.export({ format: 'pem', type: 'pkcs8' }) as string;
|
|
}
|
|
}
|
|
|
|
export class KtxSnowflakeScanConnector implements KtxScanConnector {
|
|
readonly id: string;
|
|
readonly driver = 'snowflake' as const;
|
|
readonly capabilities = createKtxConnectorCapabilities({
|
|
tableSampling: true,
|
|
columnSampling: true,
|
|
columnStats: false,
|
|
readOnlySql: true,
|
|
nestedAnalysis: true,
|
|
formalForeignKeys: false,
|
|
estimatedRowCounts: true,
|
|
});
|
|
|
|
private readonly resolved: KtxSnowflakeResolvedConnectionConfig;
|
|
private readonly driverFactory: KtxSnowflakeDriverFactory;
|
|
private readonly dialect = new KtxSnowflakeDialect();
|
|
private readonly now: () => Date;
|
|
private driverInstance: KtxSnowflakeDriver | null = null;
|
|
|
|
constructor(private readonly options: KtxSnowflakeScanConnectorOptions) {
|
|
this.resolved = snowflakeConnectionConfigFromConfig(options);
|
|
this.driverFactory = options.driverFactory ?? new DefaultSnowflakeDriverFactory();
|
|
this.now = options.now ?? (() => new Date());
|
|
this.id = `snowflake:${options.connectionId}`;
|
|
}
|
|
|
|
async testConnection(): Promise<{ success: boolean; error?: string }> {
|
|
return this.getDriver().test();
|
|
}
|
|
|
|
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
|
|
this.assertConnection(input.connectionId);
|
|
const tables: KtxSchemaTable[] = [];
|
|
for (const schemaName of this.resolved.schemas) {
|
|
const rawTables = await this.getDriver().getSchemaMetadata(schemaName);
|
|
const primaryKeys = await this.primaryKeys(rawTables.map((table) => table.name), schemaName);
|
|
tables.push(...rawTables.map((table) => this.toSchemaTable(table, primaryKeys)));
|
|
}
|
|
return {
|
|
connectionId: this.options.connectionId,
|
|
driver: 'snowflake',
|
|
extractedAt: this.now().toISOString(),
|
|
scope: { catalogs: [this.resolved.database], schemas: this.resolved.schemas },
|
|
metadata: {
|
|
account: this.resolved.account,
|
|
warehouse: this.resolved.warehouse,
|
|
database: this.resolved.database,
|
|
schemas: this.resolved.schemas,
|
|
table_count: tables.length,
|
|
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
|
|
},
|
|
tables,
|
|
};
|
|
}
|
|
|
|
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult> {
|
|
this.assertConnection(input.connectionId);
|
|
const result = await this.getDriver().query(
|
|
this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns),
|
|
);
|
|
return { headers: result.headers, rows: result.rows, totalRows: result.totalRows };
|
|
}
|
|
|
|
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
|
|
this.assertConnection(input.connectionId);
|
|
const result = await this.getDriver().query(
|
|
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
|
|
);
|
|
return {
|
|
values: result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]),
|
|
nullCount: null,
|
|
distinctCount: null,
|
|
};
|
|
}
|
|
|
|
async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
|
|
return null;
|
|
}
|
|
|
|
async executeReadOnly(input: KtxSnowflakeReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
|
|
this.assertConnection(input.connectionId);
|
|
const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
|
|
const prepared = this.dialect.prepareQuery(limitedSql, input.params);
|
|
return this.getDriver().query(prepared.sql, prepared.params);
|
|
}
|
|
|
|
async getColumnDistinctValues(
|
|
table: KtxTableRef,
|
|
columnName: string,
|
|
options: KtxSnowflakeColumnDistinctValuesOptions,
|
|
): Promise<KtxSnowflakeColumnDistinctValuesResult | null> {
|
|
const tableName = this.qTableName(table);
|
|
const quotedColumn = this.dialect.quoteIdentifier(columnName);
|
|
const cardinality = await this.singleNumber(
|
|
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, options.sampleSize ?? 10000),
|
|
'CARDINALITY',
|
|
);
|
|
if (cardinality === null) {
|
|
return null;
|
|
}
|
|
if (cardinality === 0) {
|
|
return { values: [], cardinality: 0 };
|
|
}
|
|
if (cardinality > options.maxCardinality) {
|
|
return { values: null, cardinality };
|
|
}
|
|
const valueRows = await this.queryRaw<Record<string, unknown>>(
|
|
this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit),
|
|
);
|
|
return { values: valueRows.map((row) => String(row.VAL ?? row.val)).filter((value) => value !== 'null'), cardinality };
|
|
}
|
|
|
|
async getTableRowCount(tableName: string, schemaName = this.resolved.schemas[0] ?? 'PUBLIC'): Promise<number> {
|
|
const tables = await this.getDriver().getSchemaMetadata(schemaName);
|
|
return tables.find((table) => table.name === tableName)?.rowCount ?? 0;
|
|
}
|
|
|
|
qTableName(table: Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>): string {
|
|
return this.dialect.formatTableName(table);
|
|
}
|
|
|
|
quoteIdentifier(identifier: string): string {
|
|
return this.dialect.quoteIdentifier(identifier);
|
|
}
|
|
|
|
listSchemas(): Promise<string[]> {
|
|
return this.getDriver().listSchemas();
|
|
}
|
|
|
|
listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
|
|
return this.getDriver().listTables(schemas);
|
|
}
|
|
|
|
async cleanup(): Promise<void> {
|
|
if (this.driverInstance) {
|
|
await this.driverInstance.cleanup();
|
|
this.driverInstance = null;
|
|
}
|
|
}
|
|
|
|
private getDriver(): KtxSnowflakeDriver {
|
|
if (!this.driverInstance) {
|
|
this.driverInstance = this.driverFactory.createDriver({
|
|
resolved: this.resolved,
|
|
sdkOptionsProvider: this.options.sdkOptionsProvider,
|
|
});
|
|
}
|
|
return this.driverInstance;
|
|
}
|
|
|
|
private async primaryKeys(tableNames: string[], schemaName: string): Promise<Map<string, Set<string>>> {
|
|
if (tableNames.length === 0) {
|
|
return new Map();
|
|
}
|
|
const result = await this.getDriver().query(
|
|
`
|
|
SELECT tc.TABLE_NAME, kcu.COLUMN_NAME
|
|
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
|
|
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
|
|
ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
|
|
AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
|
|
AND tc.TABLE_CATALOG = kcu.TABLE_CATALOG
|
|
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
|
|
AND tc.TABLE_SCHEMA = ?
|
|
AND tc.TABLE_CATALOG = ?
|
|
ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION
|
|
`,
|
|
[schemaName, this.resolved.database],
|
|
);
|
|
const grouped = new Map<string, Set<string>>();
|
|
for (const tableName of tableNames) {
|
|
grouped.set(tableName, new Set());
|
|
}
|
|
for (const row of result.rows) {
|
|
const tableName = String(row[0]);
|
|
const columnName = String(row[1]);
|
|
grouped.get(tableName)?.add(columnName);
|
|
}
|
|
return grouped;
|
|
}
|
|
|
|
private toSchemaTable(table: KtxSnowflakeRawTableMetadata, primaryKeys: Map<string, Set<string>>): KtxSchemaTable {
|
|
return {
|
|
catalog: table.catalog,
|
|
db: table.db,
|
|
name: table.name,
|
|
kind: 'table',
|
|
comment: table.comment,
|
|
estimatedRows: table.rowCount,
|
|
columns: table.columns.map((column) => this.toSchemaColumn(table.name, column, primaryKeys)),
|
|
foreignKeys: [],
|
|
};
|
|
}
|
|
|
|
private toSchemaColumn(
|
|
tableName: string,
|
|
column: KtxSnowflakeRawColumnMetadata,
|
|
primaryKeys: Map<string, Set<string>>,
|
|
): KtxSchemaColumn {
|
|
return {
|
|
name: column.name,
|
|
nativeType: column.type,
|
|
normalizedType: this.dialect.mapDataType(column.type),
|
|
dimensionType: this.dialect.mapToDimensionType(column.type),
|
|
nullable: column.nullable,
|
|
primaryKey: primaryKeys.get(tableName)?.has(column.name) ?? false,
|
|
comment: column.comment,
|
|
};
|
|
}
|
|
|
|
private async queryRaw<T extends Record<string, unknown>>(sql: string, params?: unknown): Promise<T[]> {
|
|
const result = await this.getDriver().query(sql, params);
|
|
return result.rows.map((row) => Object.fromEntries(result.headers.map((header, index) => [header, row[index]])) as T);
|
|
}
|
|
|
|
private async singleNumber(sql: string, header: string): Promise<number | null> {
|
|
const rows = await this.queryRaw<Record<string, unknown>>(sql);
|
|
return firstNumber(rows[0]?.[header] ?? rows[0]?.[header.toLowerCase()]);
|
|
}
|
|
|
|
private assertConnection(connectionId: string): void {
|
|
if (connectionId !== this.options.connectionId) {
|
|
throw new Error(`Snowflake connector ${this.options.connectionId} cannot scan connection ${connectionId}`);
|
|
}
|
|
}
|
|
}
|