import { BigQuery, type TableField } from '@google-cloud/bigquery'; import { assertReadOnlySql, limitSqlForExecution } from '@ktx/context/connections'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult, } from '@ktx/context/scan'; import { readFileSync } from 'node:fs'; import { homedir } from 'node:os'; import { resolve } from 'node:path'; import { KtxBigQueryDialect } from './dialect.js'; export interface KtxBigQueryConnectionConfig { driver?: string; dataset_id?: string; dataset_ids?: string[]; credentials_json?: string; location?: string; readonly?: boolean; [key: string]: unknown; } export interface KtxBigQueryResolvedConnectionConfig { projectId: string; credentials: Record; datasetIds: string[]; location?: string; } export interface KtxBigQueryReadOnlyQueryInput extends KtxReadOnlyQueryInput { params?: Record; } export interface KtxBigQueryColumnDistinctValuesOptions { maxCardinality: number; limit: number; sampleSize?: number; } export interface KtxBigQueryColumnDistinctValuesResult { values: string[] | null; cardinality: number; } export interface KtxBigQueryQueryJob { getQueryResults(): Promise< [Array>, unknown, { schema?: { fields?: TableField[] } }?, ...unknown[]] >; } export interface KtxBigQueryTableRef { id?: string; get(): Promise< [ { metadata: { type?: string; numRows?: string | number; description?: string; schema?: { fields?: TableField[] }; }; }, ...unknown[], ] >; } export interface KtxBigQueryDataset { get(): Promise; getTables(): Promise<[KtxBigQueryTableRef[], ...unknown[]]>; } export interface KtxBigQueryClient { getDatasets(input?: { maxResults?: number }): Promise<[Array<{ id?: string }>, ...unknown[]]>; dataset(datasetId: string): KtxBigQueryDataset; createQueryJob(input: { query: string; location?: string; params?: Record; maximumBytesBilled?: string; jobTimeoutMs?: number; }): Promise<[KtxBigQueryQueryJob, ...unknown[]]>; } export interface KtxBigQueryClientFactory { createClient(input: { projectId: string; credentials: Record }): KtxBigQueryClient; } export interface KtxBigQueryScanConnectorOptions { connectionId: string; connection: KtxBigQueryConnectionConfig | undefined; clientFactory?: KtxBigQueryClientFactory; env?: NodeJS.ProcessEnv; now?: () => Date; maxBytesBilled?: number | string; queryTimeoutMs?: number; } class DefaultBigQueryClientFactory implements KtxBigQueryClientFactory { createClient(input: { projectId: string; credentials: Record }): KtxBigQueryClient { const client = new BigQuery(input); return { getDatasets: (options) => client.getDatasets(options) as Promise<[Array<{ id?: string }>, ...unknown[]]>, dataset: (datasetId) => { const dataset = client.dataset(datasetId); return { get: () => dataset.get() as Promise, getTables: () => dataset.getTables() as Promise<[KtxBigQueryTableRef[], ...unknown[]]>, }; }, createQueryJob: (options) => client.createQueryJob(options) as Promise<[KtxBigQueryQueryJob, ...unknown[]]>, }; } } function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string { if (value.startsWith('env:')) { return env[value.slice('env:'.length)] ?? ''; } if (value.startsWith('file:')) { const rawPath = value.slice('file:'.length); const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath; return readFileSync(path, 'utf-8').trim(); } return value; } function stringConfigValue( connection: KtxBigQueryConnectionConfig | undefined, key: keyof KtxBigQueryConnectionConfig, env: NodeJS.ProcessEnv, ): string | undefined { const value = connection?.[key]; return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined; } function datasetIds(connection: KtxBigQueryConnectionConfig, env: NodeJS.ProcessEnv): string[] { if (Array.isArray(connection.dataset_ids) && connection.dataset_ids.length > 0) { return connection.dataset_ids .filter((dataset) => dataset.trim().length > 0) .map((dataset) => resolveStringReference(dataset, env)); } const datasetId = stringConfigValue(connection, 'dataset_id', env); return datasetId ? [datasetId] : []; } function tableKind(metadataType: string | undefined): KtxSchemaTable['kind'] { const type = String(metadataType ?? '').toUpperCase(); if (type === 'VIEW' || type === 'MATERIALIZED_VIEW') { return 'view'; } if (type === 'EXTERNAL' || type === 'EXTERNAL_TABLE') { return 'external'; } return 'table'; } function firstNumber(value: unknown): number | null { const numberValue = Number(value); return Number.isFinite(numberValue) ? numberValue : null; } function normalizeValue(value: unknown): unknown { if (value === null || value === undefined) { return null; } if (Array.isArray(value)) { return value.map((item) => String(item)).join(', '); } if (typeof value === 'object') { if ('toNumber' in value && typeof value.toNumber === 'function' && 'toFixed' in value && typeof value.toFixed === 'function') { return value.toNumber(); } if ('value' in value && Object.keys(value).length === 1 && typeof value.value !== 'object') { return value.value; } return JSON.stringify(value); } return value; } export function isKtxBigQueryConnectionConfig(connection: KtxBigQueryConnectionConfig | undefined): boolean { return String(connection?.driver ?? '').toLowerCase() === 'bigquery'; } export function bigQueryConnectionConfigFromConfig(input: { connectionId: string; connection: KtxBigQueryConnectionConfig | undefined; env?: NodeJS.ProcessEnv; }): KtxBigQueryResolvedConnectionConfig { if (!isKtxBigQueryConnectionConfig(input.connection)) { throw new Error(`Native BigQuery connector cannot run driver "${input.connection?.driver ?? 'unknown'}"`); } if (input.connection?.readonly !== true) { throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.readonly: true`); } const env = input.env ?? process.env; const credentialsJson = stringConfigValue(input.connection, 'credentials_json', env); if (!credentialsJson) { throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.credentials_json`); } const credentials = JSON.parse(credentialsJson) as Record; const projectId = typeof credentials.project_id === 'string' ? credentials.project_id : undefined; if (!projectId) { throw new Error(`Native BigQuery connector requires credentials_json.project_id for connections.${input.connectionId}`); } const resolvedDatasetIds = datasetIds(input.connection, env); if (resolvedDatasetIds.length === 0) { throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.dataset_id or dataset_ids`); } const location = stringConfigValue(input.connection, 'location', env); return { projectId, credentials, datasetIds: resolvedDatasetIds, ...(location ? { location } : {}) }; } export class KtxBigQueryScanConnector implements KtxScanConnector { readonly id: string; readonly driver = 'bigquery' as const; readonly capabilities = createKtxConnectorCapabilities({ tableSampling: true, columnSampling: true, columnStats: false, readOnlySql: true, nestedAnalysis: true, formalForeignKeys: false, estimatedRowCounts: true, }); private readonly connectionId: string; private readonly resolved: KtxBigQueryResolvedConnectionConfig; private readonly clientFactory: KtxBigQueryClientFactory; private readonly now: () => Date; private readonly maxBytesBilled?: number | string; private readonly queryTimeoutMs?: number; private readonly dialect = new KtxBigQueryDialect(); private client: KtxBigQueryClient | null = null; constructor(options: KtxBigQueryScanConnectorOptions) { this.connectionId = options.connectionId; this.resolved = bigQueryConnectionConfigFromConfig({ connectionId: options.connectionId, connection: options.connection, env: options.env, }); this.clientFactory = options.clientFactory ?? new DefaultBigQueryClientFactory(); this.now = options.now ?? (() => new Date()); this.maxBytesBilled = options.maxBytesBilled; this.queryTimeoutMs = options.queryTimeoutMs; this.id = `bigquery:${options.connectionId}`; } async testConnection(): Promise<{ success: boolean; error?: string }> { try { const client = this.getClient(); await client.getDatasets({ maxResults: 1 }); for (const datasetId of this.resolved.datasetIds) { await client.dataset(datasetId).get(); } return { success: true }; } catch (error) { return { success: false, error: error instanceof Error ? error.message : String(error) }; } } async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const tables: KtxSchemaTable[] = []; for (const datasetId of this.resolved.datasetIds) { tables.push(...(await this.introspectDataset(datasetId))); } return { connectionId: this.connectionId, driver: 'bigquery', extractedAt: this.now().toISOString(), scope: { catalogs: [this.resolved.projectId], datasets: this.resolved.datasetIds }, metadata: { project_id: this.resolved.projectId, datasets: this.resolved.datasetIds, table_count: tables.length, total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0), }, tables, }; } async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns)); return { headers: result.headers, headerTypes: result.headerTypes, rows: result.rows, totalRows: result.totalRows }; } async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const result = await this.query( this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit), ); return { values: result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]), nullCount: null, distinctCount: null }; } async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise { return null; } async executeReadOnly(input: KtxBigQueryReadOnlyQueryInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows); const prepared = this.dialect.prepareQuery(limitedSql, input.params); const result = await this.query(prepared.sql, prepared.params); return { ...result, rowCount: result.rows.length }; } async getColumnDistinctValues( table: KtxTableRef, columnName: string, options: KtxBigQueryColumnDistinctValuesOptions, ): Promise { const tableName = this.qTableName(table); const quotedColumn = this.dialect.quoteIdentifier(columnName); const cardinality = await this.singleNumber( this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, options.sampleSize ?? 10000), 'cardinality', ); if (cardinality === null) { return null; } if (cardinality === 0) { return { values: [], cardinality: 0 }; } if (cardinality > options.maxCardinality) { return { values: null, cardinality }; } const valueRows = await this.queryRaw<{ val: unknown }>( this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit), ); return { values: valueRows.filter((row) => row.val !== null).map((row) => String(row.val)), cardinality }; } async getTableRowCount(tableName: string, datasetId = this.resolved.datasetIds[0]): Promise { if (!datasetId) { return 0; } const tables = await this.introspectDataset(datasetId); return tables.find((table) => table.name === tableName)?.estimatedRows ?? 0; } qTableName(table: Pick & Partial>): string { return this.dialect.formatTableName(table); } quoteIdentifier(identifier: string): string { return this.dialect.quoteIdentifier(identifier); } async listDatasets(): Promise { const [datasets] = await this.getClient().getDatasets(); return datasets.map((dataset) => dataset.id).filter((id): id is string => Boolean(id)); } async cleanup(): Promise { this.client = null; } private getClient(): KtxBigQueryClient { if (!this.client) { this.client = this.clientFactory.createClient({ projectId: this.resolved.projectId, credentials: this.resolved.credentials, }); } return this.client; } private async query(sql: string, params?: Record): Promise { const [job] = await this.getClient().createQueryJob({ query: sql, ...(this.resolved.location ? { location: this.resolved.location } : {}), ...(params && Object.keys(params).length > 0 ? { params } : {}), ...(this.maxBytesBilled ? { maximumBytesBilled: String(this.maxBytesBilled) } : {}), ...(this.queryTimeoutMs ? { jobTimeoutMs: this.queryTimeoutMs } : {}), }); const [rows, , response] = await job.getQueryResults(); let headers = response?.schema?.fields?.map((field) => field.name || '') ?? []; const headerTypes = response?.schema?.fields?.map((field) => String(field.type || 'STRING')) ?? []; if (headers.length === 0 && rows.length > 0) { headers = Object.keys(rows[0]!); } return { headers, headerTypes: headerTypes.length > 0 ? headerTypes : undefined, rows: rows.map((row) => headers.map((header) => normalizeValue(row[header]))), totalRows: rows.length, rowCount: rows.length, }; } private async queryRaw>(sql: string, params?: Record): Promise { const result = await this.query(sql, params); return result.rows.map((row) => Object.fromEntries(result.headers.map((header, index) => [header, row[index]])) as T); } private async singleNumber(sql: string, header: string): Promise { const rows = await this.queryRaw>(sql); return firstNumber(rows[0]?.[header]); } private async introspectDataset(datasetId: string): Promise { const dataset = this.getClient().dataset(datasetId); const [tableRefs] = await dataset.getTables(); const primaryKeys = await this.primaryKeys(datasetId); const tables: KtxSchemaTable[] = []; for (const tableRef of tableRefs) { const tableName = tableRef.id || ''; const [table] = await tableRef.get(); const fields = table.metadata.schema?.fields ?? []; tables.push({ catalog: this.resolved.projectId, db: datasetId, name: tableName, kind: tableKind(table.metadata.type), comment: table.metadata.description || null, estimatedRows: firstNumber(table.metadata.numRows) ?? 0, columns: fields.map((field) => this.toSchemaColumn(tableName, field, primaryKeys)), foreignKeys: [], }); } return tables; } private async primaryKeys(datasetId: string): Promise>> { const rows = await this.queryRaw<{ table_name: string; column_name: string }>( 'SELECT tc.table_name, kcu.column_name ' + 'FROM `' + this.resolved.projectId + '.' + datasetId + '.INFORMATION_SCHEMA.TABLE_CONSTRAINTS` tc ' + 'JOIN `' + this.resolved.projectId + '.' + datasetId + '.INFORMATION_SCHEMA.KEY_COLUMN_USAGE` kcu ' + 'ON tc.constraint_name = kcu.constraint_name ' + 'AND tc.table_schema = kcu.table_schema ' + 'AND tc.table_name = kcu.table_name ' + "WHERE tc.constraint_type = 'PRIMARY KEY' " + "AND tc.table_schema = '" + datasetId + "' " + "AND NOT REGEXP_CONTAINS(kcu.column_name, r'^(stacksync_record_id|sync_primary_key)_') " + 'ORDER BY tc.table_name, kcu.ordinal_position', ); const grouped = new Map>(); for (const row of rows) { const columns = grouped.get(row.table_name) ?? new Set(); columns.add(row.column_name); grouped.set(row.table_name, columns); } return grouped; } private toSchemaColumn(tableName: string, field: TableField, primaryKeys: Map>): KtxSchemaColumn { const nativeType = String(field.type || 'STRING').toUpperCase(); return { name: field.name || '', nativeType, normalizedType: this.dialect.mapDataType(nativeType), dimensionType: this.dialect.mapToDimensionType(nativeType), nullable: field.mode !== 'REQUIRED', primaryKey: primaryKeys.get(tableName)?.has(field.name || '') ?? false, comment: field.description || null, }; } private assertConnection(connectionId: string): void { if (connectionId !== this.connectionId) { throw new Error(`BigQuery connector ${this.connectionId} cannot scan connection ${connectionId}`); } } }