ktx/packages/connector-snowflake/src/connector.ts
2026-05-13 19:37:25 +02:00

719 lines
25 KiB
TypeScript

import { createPrivateKey } from 'node:crypto';
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { assertReadOnlySql, limitSqlForExecution } from '@ktx/context/connections';
import {
createKtxConnectorCapabilities,
type KtxColumnSampleInput,
type KtxColumnSampleResult,
type KtxColumnStatsInput,
type KtxColumnStatsResult,
type KtxQueryResult,
type KtxReadOnlyQueryInput,
type KtxScanConnector,
type KtxScanContext,
type KtxScanInput,
type KtxSchemaColumn,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableListEntry,
type KtxTableSampleResult,
} from '@ktx/context/scan';
import * as snowflake from 'snowflake-sdk';
import { KtxSnowflakeDialect } from './dialect.js';
export interface KtxSnowflakeConnectionConfig {
driver?: string;
authMethod?: 'password' | 'rsa';
account?: string;
warehouse?: string;
database?: string;
schema_name?: string;
schema_names?: string[];
username?: string;
password?: string;
privateKey?: string;
passphrase?: string;
role?: string;
[key: string]: unknown;
}
export interface KtxSnowflakeResolvedConnectionConfig {
authMethod: 'password' | 'rsa';
account: string;
warehouse: string;
database: string;
schemas: string[];
username: string;
password?: string;
privateKey?: string;
passphrase?: string;
role?: string;
}
export interface KtxSnowflakeRawColumnMetadata {
name: string;
type: string;
nullable: boolean;
comment: string | null;
}
export interface KtxSnowflakeRawTableMetadata {
name: string;
catalog: string;
db: string;
rowCount: number | null;
comment: string | null;
columns: KtxSnowflakeRawColumnMetadata[];
}
export interface KtxSnowflakeDriver {
test(): Promise<{ success: boolean; error?: string }>;
query(sql: string, params?: unknown): Promise<KtxQueryResult>;
getSchemaMetadata(schemaName?: string): Promise<KtxSnowflakeRawTableMetadata[]>;
listSchemas(): Promise<string[]>;
listTables(schemas?: string[]): Promise<KtxTableListEntry[]>;
cleanup(): Promise<void>;
}
export interface KtxSnowflakeDriverFactory {
createDriver(input: {
resolved: KtxSnowflakeResolvedConnectionConfig;
sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider;
}): KtxSnowflakeDriver;
}
export interface KtxSnowflakeSdkOptionsProvider {
resolve(input: {
account: string;
connection: KtxSnowflakeConnectionConfig;
}): Promise<{ sdkOptions: Record<string, unknown>; close?: () => Promise<void> } | undefined>;
}
export interface KtxSnowflakeScanConnectorOptions {
connectionId: string;
connection: KtxSnowflakeConnectionConfig | undefined;
driverFactory?: KtxSnowflakeDriverFactory;
sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider;
env?: NodeJS.ProcessEnv;
now?: () => Date;
}
export interface KtxSnowflakeReadOnlyQueryInput extends KtxReadOnlyQueryInput {
params?: Record<string, unknown>;
}
export interface KtxSnowflakeColumnDistinctValuesOptions {
maxCardinality: number;
limit: number;
sampleSize?: number;
}
export interface KtxSnowflakeColumnDistinctValuesResult {
values: string[] | null;
cardinality: number;
}
const DATE_TYPES = ['DATE', 'TIMESTAMP', 'TIMESTAMP_LTZ', 'TIMESTAMP_NTZ', 'TIMESTAMP_TZ', 'TIME'];
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function stringConfigValue(
connection: KtxSnowflakeConnectionConfig | undefined,
key: keyof KtxSnowflakeConnectionConfig,
env: NodeJS.ProcessEnv,
): string | undefined {
const value = connection?.[key];
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function schemaNames(connection: KtxSnowflakeConnectionConfig, env: NodeJS.ProcessEnv): string[] {
if (Array.isArray(connection.schema_names) && connection.schema_names.length > 0) {
return connection.schema_names
.filter((schema) => schema.trim().length > 0)
.map((schema) => resolveStringReference(schema, env));
}
return [stringConfigValue(connection, 'schema_name', env) ?? 'PUBLIC'];
}
function firstNumber(value: unknown): number | null {
const numberValue = Number(value);
return Number.isFinite(numberValue) ? numberValue : null;
}
function normalizeSnowflakeValue(value: unknown, columnType?: string): unknown {
if (columnType && DATE_TYPES.some((type) => columnType.toUpperCase().includes(type))) {
if (typeof value === 'number') {
return new Date(value).toISOString();
}
if (value instanceof Date) {
return value.toISOString();
}
}
if (typeof value === 'string') {
const trimmed = value.trim();
if ((trimmed.startsWith('{') && trimmed.endsWith('}')) || (trimmed.startsWith('[') && trimmed.endsWith(']'))) {
try {
return JSON.parse(trimmed) as unknown;
} catch {
return value;
}
}
}
return value;
}
function toSnowflakeBind(value: unknown): snowflake.Bind {
if (value === null || typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') {
return value;
}
if (value instanceof Date) {
return value.toISOString();
}
return String(value);
}
function toSnowflakeBinds(params: unknown[] | undefined): snowflake.Binds | undefined {
return params?.map((value) => toSnowflakeBind(value));
}
export function isKtxSnowflakeConnectionConfig(
connection: KtxSnowflakeConnectionConfig | undefined,
): connection is KtxSnowflakeConnectionConfig {
return String(connection?.driver ?? '').toLowerCase() === 'snowflake';
}
export function snowflakeConnectionConfigFromConfig(input: {
connectionId: string;
connection: KtxSnowflakeConnectionConfig | undefined;
env?: NodeJS.ProcessEnv;
}): KtxSnowflakeResolvedConnectionConfig {
const inputDriver = input.connection?.driver ?? 'unknown';
if (!isKtxSnowflakeConnectionConfig(input.connection)) {
throw new Error(`Native Snowflake connector cannot run driver "${inputDriver}"`);
}
const env = input.env ?? process.env;
const authMethod = input.connection?.authMethod ?? 'password';
const account = stringConfigValue(input.connection, 'account', env);
const warehouse = stringConfigValue(input.connection, 'warehouse', env);
const database = stringConfigValue(input.connection, 'database', env);
const username = stringConfigValue(input.connection, 'username', env);
if (!account) {
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.account`);
}
if (!warehouse) {
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.warehouse`);
}
if (!database) {
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.database`);
}
if (!username) {
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.username`);
}
const resolved: KtxSnowflakeResolvedConnectionConfig = {
authMethod,
account,
warehouse,
database,
schemas: schemaNames(input.connection!, env),
username,
};
const role = stringConfigValue(input.connection, 'role', env);
if (role) {
resolved.role = role;
}
if (authMethod === 'rsa') {
resolved.privateKey = stringConfigValue(input.connection, 'privateKey', env);
const passphrase = stringConfigValue(input.connection, 'passphrase', env);
if (passphrase) {
resolved.passphrase = passphrase;
}
if (!resolved.privateKey) {
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.privateKey for RSA auth`);
}
} else {
resolved.password = stringConfigValue(input.connection, 'password', env);
if (!resolved.password) {
throw new Error(`Native Snowflake connector requires connections.${input.connectionId}.password`);
}
}
return resolved;
}
class DefaultSnowflakeDriverFactory implements KtxSnowflakeDriverFactory {
createDriver(input: {
resolved: KtxSnowflakeResolvedConnectionConfig;
sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider;
}): KtxSnowflakeDriver {
return new SnowflakeSdkDriver(input.resolved, input.sdkOptionsProvider);
}
}
class SnowflakeSdkDriver implements KtxSnowflakeDriver {
private closeSdkOptions: Array<() => Promise<void>> = [];
constructor(
private readonly resolved: KtxSnowflakeResolvedConnectionConfig,
private readonly sdkOptionsProvider?: KtxSnowflakeSdkOptionsProvider,
) {}
async test(): Promise<{ success: boolean; error?: string }> {
const timeoutMs = 60_000;
return Promise.race([
this.runTest(),
new Promise<{ success: boolean; error: string }>((resolveTest) =>
setTimeout(
() => resolveTest({ success: false, error: `Connection test timed out after ${timeoutMs / 1000}s` }),
timeoutMs,
),
),
]);
}
async query(sql: string, params?: unknown): Promise<KtxQueryResult> {
let connection: snowflake.Connection | null = null;
try {
connection = await this.createConnection();
const binds = Array.isArray(params) ? toSnowflakeBinds(params) : undefined;
const result = await this.executeSnowflakeQuery(connection, sql, binds);
return { ...result, totalRows: result.rows.length, rowCount: result.rows.length };
} catch {
return { headers: [], rows: [], totalRows: 0, rowCount: 0 };
} finally {
if (connection) {
await this.destroyConnection(connection);
}
}
}
async getSchemaMetadata(schemaName = this.resolved.schemas[0] ?? 'PUBLIC'): Promise<KtxSnowflakeRawTableMetadata[]> {
const tablesResult = await this.query(
`
SELECT TABLE_NAME, TABLE_TYPE, COMMENT, ROW_COUNT
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
ORDER BY TABLE_NAME
`,
[schemaName, this.resolved.database],
);
const columnsResult = await this.query(
`
SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COMMENT, ORDINAL_POSITION
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
ORDER BY TABLE_NAME, ORDINAL_POSITION
`,
[schemaName, this.resolved.database],
);
const columnsByTable = new Map<string, KtxSnowflakeRawColumnMetadata[]>();
for (const row of columnsResult.rows) {
const tableName = String(row[0]);
const columns = columnsByTable.get(tableName) ?? [];
columns.push({
name: String(row[1]),
type: String(row[2]),
nullable: row[3] === 'YES',
comment: row[4] ? String(row[4]) : null,
});
columnsByTable.set(tableName, columns);
}
return tablesResult.rows.map((row) => ({
name: String(row[0]),
catalog: this.resolved.database,
db: schemaName,
rowCount: firstNumber(row[3]) ?? 0,
comment: row[2] ? String(row[2]) : null,
columns: columnsByTable.get(String(row[0])) ?? [],
}));
}
async listSchemas(): Promise<string[]> {
const result = await this.query(`SHOW SCHEMAS IN DATABASE "${this.resolved.database}"`);
return result.rows.map((row) => String(row[1])).filter((name) => name !== 'INFORMATION_SCHEMA');
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const entries: KtxTableListEntry[] = [];
for (const schemaName of filterSchemas) {
const result = await this.query(
`
SELECT TABLE_NAME, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_CATALOG = ?
ORDER BY TABLE_NAME
`,
[schemaName, this.resolved.database],
);
for (const row of result.rows) {
entries.push({
schema: schemaName,
name: String(row[0]),
kind: String(row[1]) === 'VIEW' ? 'view' : 'table',
});
}
}
return entries;
}
async cleanup(): Promise<void> {
const closers = this.closeSdkOptions;
this.closeSdkOptions = [];
await Promise.all(closers.map((close) => close()));
}
private async runTest(): Promise<{ success: boolean; error?: string }> {
let connection: snowflake.Connection | null = null;
try {
connection = await this.createConnection();
await this.executeSnowflakeQuery(connection, 'SELECT 1');
return { success: true };
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) };
} finally {
if (connection) {
await this.destroyConnection(connection);
}
}
}
private async createConnection(): Promise<snowflake.Connection> {
const patch = await this.sdkOptionsProvider?.resolve({
account: this.resolved.account,
connection: { ...this.resolved, driver: 'snowflake' },
});
if (patch?.close) {
this.closeSdkOptions.push(patch.close);
}
const baseConfig: snowflake.ConnectionOptions = {
account: this.resolved.account,
username: this.resolved.username,
warehouse: this.resolved.warehouse,
database: this.resolved.database,
schema: this.resolved.schemas[0] ?? 'PUBLIC',
role: this.resolved.role,
...patch?.sdkOptions,
};
const connectionConfig: snowflake.ConnectionOptions =
this.resolved.authMethod === 'rsa'
? { ...baseConfig, authenticator: 'SNOWFLAKE_JWT', privateKey: this.decryptPrivateKey() }
: { ...baseConfig, password: this.resolved.password };
const connection = snowflake.createConnection(connectionConfig);
return new Promise((resolveConnection, rejectConnection) => {
connection.connect((error, connected) => {
if (error) {
rejectConnection(error);
return;
}
const resolvedConnection = connected ?? connection;
this.setConnectionContext(resolvedConnection).then(
() => resolveConnection(resolvedConnection),
(contextError) => {
resolvedConnection.destroy(() => undefined);
rejectConnection(contextError);
},
);
});
});
}
private async setConnectionContext(connection: snowflake.Connection): Promise<void> {
if (this.resolved.role) {
await this.executeSnowflakeQuery(connection, `USE ROLE "${this.resolved.role}"`);
}
await this.executeSnowflakeQuery(connection, `USE WAREHOUSE "${this.resolved.warehouse}"`);
await this.executeSnowflakeQuery(connection, `USE DATABASE "${this.resolved.database}"`);
await this.executeSnowflakeQuery(connection, `USE SCHEMA "${this.resolved.schemas[0] ?? 'PUBLIC'}"`);
}
private async executeSnowflakeQuery(
connection: snowflake.Connection,
sqlText: string,
binds?: snowflake.Binds,
): Promise<{ headers: string[]; headerTypes?: string[]; rows: unknown[][] }> {
return new Promise((resolveQuery, rejectQuery) => {
connection.execute({
sqlText,
binds,
complete: (error, statement, rows) => {
if (error) {
rejectQuery(error);
return;
}
const columns = statement.getColumns();
const headers = columns ? columns.map((column) => column.getName()) : [];
const headerTypes = columns ? columns.map((column) => column.getType()) : [];
const normalizedRows = rows
? rows.map((row) => headers.map((header, index) => normalizeSnowflakeValue(row[header], headerTypes[index])))
: [];
resolveQuery({ headers, headerTypes, rows: normalizedRows });
},
});
});
}
private destroyConnection(connection: snowflake.Connection): Promise<void> {
return new Promise((resolveDestroy, rejectDestroy) => {
connection.destroy((error) => {
if (error) {
rejectDestroy(error);
return;
}
resolveDestroy();
});
});
}
private decryptPrivateKey(): string {
if (!this.resolved.privateKey) {
throw new Error('Private key is required for RSA authentication');
}
const privateKeyObject = createPrivateKey({
key: this.resolved.privateKey,
format: 'pem',
...(this.resolved.passphrase ? { passphrase: this.resolved.passphrase } : {}),
});
return privateKeyObject.export({ format: 'pem', type: 'pkcs8' }) as string;
}
}
export class KtxSnowflakeScanConnector implements KtxScanConnector {
readonly id: string;
readonly driver = 'snowflake' as const;
readonly capabilities = createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: false,
readOnlySql: true,
nestedAnalysis: true,
formalForeignKeys: false,
estimatedRowCounts: true,
});
private readonly resolved: KtxSnowflakeResolvedConnectionConfig;
private readonly driverFactory: KtxSnowflakeDriverFactory;
private readonly dialect = new KtxSnowflakeDialect();
private readonly now: () => Date;
private driverInstance: KtxSnowflakeDriver | null = null;
constructor(private readonly options: KtxSnowflakeScanConnectorOptions) {
this.resolved = snowflakeConnectionConfigFromConfig(options);
this.driverFactory = options.driverFactory ?? new DefaultSnowflakeDriverFactory();
this.now = options.now ?? (() => new Date());
this.id = `snowflake:${options.connectionId}`;
}
async testConnection(): Promise<{ success: boolean; error?: string }> {
return this.getDriver().test();
}
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const tables: KtxSchemaTable[] = [];
for (const schemaName of this.resolved.schemas) {
const rawTables = await this.getDriver().getSchemaMetadata(schemaName);
const primaryKeys = await this.primaryKeys(rawTables.map((table) => table.name), schemaName);
tables.push(...rawTables.map((table) => this.toSchemaTable(table, primaryKeys)));
}
return {
connectionId: this.options.connectionId,
driver: 'snowflake',
extractedAt: this.now().toISOString(),
scope: { catalogs: [this.resolved.database], schemas: this.resolved.schemas },
metadata: {
account: this.resolved.account,
warehouse: this.resolved.warehouse,
database: this.resolved.database,
schemas: this.resolved.schemas,
table_count: tables.length,
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables,
};
}
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.getDriver().query(
this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns),
);
return { headers: result.headers, rows: result.rows, totalRows: result.totalRows };
}
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.getDriver().query(
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
);
return {
values: result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]),
nullCount: null,
distinctCount: null,
};
}
async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
return null;
}
async executeReadOnly(input: KtxSnowflakeReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
this.assertConnection(input.connectionId);
const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
const prepared = this.dialect.prepareQuery(limitedSql, input.params);
return this.getDriver().query(prepared.sql, prepared.params);
}
async getColumnDistinctValues(
table: KtxTableRef,
columnName: string,
options: KtxSnowflakeColumnDistinctValuesOptions,
): Promise<KtxSnowflakeColumnDistinctValuesResult | null> {
const tableName = this.qTableName(table);
const quotedColumn = this.dialect.quoteIdentifier(columnName);
const cardinality = await this.singleNumber(
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, options.sampleSize ?? 10000),
'CARDINALITY',
);
if (cardinality === null) {
return null;
}
if (cardinality === 0) {
return { values: [], cardinality: 0 };
}
if (cardinality > options.maxCardinality) {
return { values: null, cardinality };
}
const valueRows = await this.queryRaw<Record<string, unknown>>(
this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit),
);
return { values: valueRows.map((row) => String(row.VAL ?? row.val)).filter((value) => value !== 'null'), cardinality };
}
async getTableRowCount(tableName: string, schemaName = this.resolved.schemas[0] ?? 'PUBLIC'): Promise<number> {
const tables = await this.getDriver().getSchemaMetadata(schemaName);
return tables.find((table) => table.name === tableName)?.rowCount ?? 0;
}
qTableName(table: Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>): string {
return this.dialect.formatTableName(table);
}
quoteIdentifier(identifier: string): string {
return this.dialect.quoteIdentifier(identifier);
}
listSchemas(): Promise<string[]> {
return this.getDriver().listSchemas();
}
listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
return this.getDriver().listTables(schemas);
}
async cleanup(): Promise<void> {
if (this.driverInstance) {
await this.driverInstance.cleanup();
this.driverInstance = null;
}
}
private getDriver(): KtxSnowflakeDriver {
if (!this.driverInstance) {
this.driverInstance = this.driverFactory.createDriver({
resolved: this.resolved,
sdkOptionsProvider: this.options.sdkOptionsProvider,
});
}
return this.driverInstance;
}
private async primaryKeys(tableNames: string[], schemaName: string): Promise<Map<string, Set<string>>> {
if (tableNames.length === 0) {
return new Map();
}
const result = await this.getDriver().query(
`
SELECT tc.TABLE_NAME, kcu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
AND tc.TABLE_CATALOG = kcu.TABLE_CATALOG
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND tc.TABLE_SCHEMA = ?
AND tc.TABLE_CATALOG = ?
ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION
`,
[schemaName, this.resolved.database],
);
const grouped = new Map<string, Set<string>>();
for (const tableName of tableNames) {
grouped.set(tableName, new Set());
}
for (const row of result.rows) {
const tableName = String(row[0]);
const columnName = String(row[1]);
grouped.get(tableName)?.add(columnName);
}
return grouped;
}
private toSchemaTable(table: KtxSnowflakeRawTableMetadata, primaryKeys: Map<string, Set<string>>): KtxSchemaTable {
return {
catalog: table.catalog,
db: table.db,
name: table.name,
kind: 'table',
comment: table.comment,
estimatedRows: table.rowCount,
columns: table.columns.map((column) => this.toSchemaColumn(table.name, column, primaryKeys)),
foreignKeys: [],
};
}
private toSchemaColumn(
tableName: string,
column: KtxSnowflakeRawColumnMetadata,
primaryKeys: Map<string, Set<string>>,
): KtxSchemaColumn {
return {
name: column.name,
nativeType: column.type,
normalizedType: this.dialect.mapDataType(column.type),
dimensionType: this.dialect.mapToDimensionType(column.type),
nullable: column.nullable,
primaryKey: primaryKeys.get(tableName)?.has(column.name) ?? false,
comment: column.comment,
};
}
private async queryRaw<T extends Record<string, unknown>>(sql: string, params?: unknown): Promise<T[]> {
const result = await this.getDriver().query(sql, params);
return result.rows.map((row) => Object.fromEntries(result.headers.map((header, index) => [header, row[index]])) as T);
}
private async singleNumber(sql: string, header: string): Promise<number | null> {
const rows = await this.queryRaw<Record<string, unknown>>(sql);
return firstNumber(rows[0]?.[header] ?? rows[0]?.[header.toLowerCase()]);
}
private assertConnection(connectionId: string): void {
if (connectionId !== this.options.connectionId) {
throw new Error(`Snowflake connector ${this.options.connectionId} cannot scan connection ${connectionId}`);
}
}
}