ktx/packages/connector-bigquery/src/connector.ts
2026-05-13 19:37:25 +02:00

512 lines
18 KiB
TypeScript

import { BigQuery, type TableField } from '@google-cloud/bigquery';
import { assertReadOnlySql, limitSqlForExecution } from '@ktx/context/connections';
import {
createKtxConnectorCapabilities,
type KtxColumnSampleInput,
type KtxColumnSampleResult,
type KtxColumnStatsInput,
type KtxColumnStatsResult,
type KtxQueryResult,
type KtxReadOnlyQueryInput,
type KtxScanConnector,
type KtxScanContext,
type KtxScanInput,
type KtxSchemaColumn,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '@ktx/context/scan';
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { KtxBigQueryDialect } from './dialect.js';
export interface KtxBigQueryConnectionConfig {
driver?: string;
dataset_id?: string;
dataset_ids?: string[];
credentials_json?: string;
location?: string;
[key: string]: unknown;
}
export interface KtxBigQueryResolvedConnectionConfig {
projectId: string;
credentials: Record<string, unknown>;
datasetIds: string[];
location?: string;
}
export interface KtxBigQueryReadOnlyQueryInput extends KtxReadOnlyQueryInput {
params?: Record<string, unknown>;
}
export interface KtxBigQueryColumnDistinctValuesOptions {
maxCardinality: number;
limit: number;
sampleSize?: number;
}
export interface KtxBigQueryColumnDistinctValuesResult {
values: string[] | null;
cardinality: number;
}
export interface KtxBigQueryQueryJob {
getQueryResults(): Promise<
[Array<Record<string, unknown>>, unknown, { schema?: { fields?: TableField[] } }?, ...unknown[]]
>;
}
export interface KtxBigQueryTableRef {
id?: string;
metadata?: { type?: string };
get(): Promise<
[
{
metadata: {
type?: string;
numRows?: string | number;
description?: string;
schema?: { fields?: TableField[] };
};
},
...unknown[],
]
>;
}
export interface KtxBigQueryDataset {
get(): Promise<unknown>;
getTables(): Promise<[KtxBigQueryTableRef[], ...unknown[]]>;
}
export interface KtxBigQueryClient {
getDatasets(input?: { maxResults?: number }): Promise<[Array<{ id?: string }>, ...unknown[]]>;
dataset(datasetId: string): KtxBigQueryDataset;
createQueryJob(input: {
query: string;
location?: string;
params?: Record<string, unknown>;
maximumBytesBilled?: string;
jobTimeoutMs?: number;
}): Promise<[KtxBigQueryQueryJob, ...unknown[]]>;
}
export interface KtxBigQueryClientFactory {
createClient(input: { projectId: string; credentials: Record<string, unknown> }): KtxBigQueryClient;
}
export interface KtxBigQueryScanConnectorOptions {
connectionId: string;
connection: KtxBigQueryConnectionConfig | undefined;
clientFactory?: KtxBigQueryClientFactory;
env?: NodeJS.ProcessEnv;
now?: () => Date;
maxBytesBilled?: number | string;
queryTimeoutMs?: number;
}
class DefaultBigQueryClientFactory implements KtxBigQueryClientFactory {
createClient(input: { projectId: string; credentials: Record<string, unknown> }): KtxBigQueryClient {
const client = new BigQuery(input);
return {
getDatasets: (options) => client.getDatasets(options) as Promise<[Array<{ id?: string }>, ...unknown[]]>,
dataset: (datasetId) => {
const dataset = client.dataset(datasetId);
return {
get: () => dataset.get() as Promise<unknown>,
getTables: () => dataset.getTables() as Promise<[KtxBigQueryTableRef[], ...unknown[]]>,
};
},
createQueryJob: (options) => client.createQueryJob(options) as Promise<[KtxBigQueryQueryJob, ...unknown[]]>,
};
}
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function stringConfigValue(
connection: KtxBigQueryConnectionConfig | undefined,
key: keyof KtxBigQueryConnectionConfig,
env: NodeJS.ProcessEnv,
): string | undefined {
const value = connection?.[key];
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function datasetIds(connection: KtxBigQueryConnectionConfig, env: NodeJS.ProcessEnv): string[] {
if (Array.isArray(connection.dataset_ids) && connection.dataset_ids.length > 0) {
return connection.dataset_ids
.filter((dataset) => dataset.trim().length > 0)
.map((dataset) => resolveStringReference(dataset, env));
}
const datasetId = stringConfigValue(connection, 'dataset_id', env);
return datasetId ? [datasetId] : [];
}
function tableKind(metadataType: string | undefined): KtxSchemaTable['kind'] {
const type = String(metadataType ?? '').toUpperCase();
if (type === 'VIEW' || type === 'MATERIALIZED_VIEW') {
return 'view';
}
if (type === 'EXTERNAL' || type === 'EXTERNAL_TABLE') {
return 'external';
}
return 'table';
}
function firstNumber(value: unknown): number | null {
const numberValue = Number(value);
return Number.isFinite(numberValue) ? numberValue : null;
}
function normalizeValue(value: unknown): unknown {
if (value === null || value === undefined) {
return null;
}
if (Array.isArray(value)) {
return value.map((item) => String(item)).join(', ');
}
if (typeof value === 'object') {
if ('toNumber' in value && typeof value.toNumber === 'function' && 'toFixed' in value && typeof value.toFixed === 'function') {
return value.toNumber();
}
if ('value' in value && Object.keys(value).length === 1 && typeof value.value !== 'object') {
return value.value;
}
return JSON.stringify(value);
}
return value;
}
export function isKtxBigQueryConnectionConfig(
connection: KtxBigQueryConnectionConfig | undefined,
): connection is KtxBigQueryConnectionConfig {
return String(connection?.driver ?? '').toLowerCase() === 'bigquery';
}
export function bigQueryConnectionConfigFromConfig(input: {
connectionId: string;
connection: KtxBigQueryConnectionConfig | undefined;
env?: NodeJS.ProcessEnv;
}): KtxBigQueryResolvedConnectionConfig {
const inputDriver = input.connection?.driver ?? 'unknown';
if (!isKtxBigQueryConnectionConfig(input.connection)) {
throw new Error(`Native BigQuery connector cannot run driver "${inputDriver}"`);
}
const env = input.env ?? process.env;
const credentialsJson = stringConfigValue(input.connection, 'credentials_json', env);
if (!credentialsJson) {
throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.credentials_json`);
}
const credentials = JSON.parse(credentialsJson) as Record<string, unknown>;
const projectId = typeof credentials.project_id === 'string' ? credentials.project_id : undefined;
if (!projectId) {
throw new Error(`Native BigQuery connector requires credentials_json.project_id for connections.${input.connectionId}`);
}
const resolvedDatasetIds = datasetIds(input.connection, env);
if (resolvedDatasetIds.length === 0) {
throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.dataset_id or dataset_ids`);
}
const location = stringConfigValue(input.connection, 'location', env);
return { projectId, credentials, datasetIds: resolvedDatasetIds, ...(location ? { location } : {}) };
}
export class KtxBigQueryScanConnector implements KtxScanConnector {
readonly id: string;
readonly driver = 'bigquery' as const;
readonly capabilities = createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: false,
readOnlySql: true,
nestedAnalysis: true,
formalForeignKeys: false,
estimatedRowCounts: true,
});
private readonly connectionId: string;
private readonly resolved: KtxBigQueryResolvedConnectionConfig;
private readonly clientFactory: KtxBigQueryClientFactory;
private readonly now: () => Date;
private readonly maxBytesBilled?: number | string;
private readonly queryTimeoutMs?: number;
private readonly dialect = new KtxBigQueryDialect();
private client: KtxBigQueryClient | null = null;
constructor(options: KtxBigQueryScanConnectorOptions) {
this.connectionId = options.connectionId;
this.resolved = bigQueryConnectionConfigFromConfig({
connectionId: options.connectionId,
connection: options.connection,
env: options.env,
});
this.clientFactory = options.clientFactory ?? new DefaultBigQueryClientFactory();
this.now = options.now ?? (() => new Date());
this.maxBytesBilled = options.maxBytesBilled;
this.queryTimeoutMs = options.queryTimeoutMs;
this.id = `bigquery:${options.connectionId}`;
}
async testConnection(): Promise<{ success: boolean; error?: string }> {
try {
const client = this.getClient();
await client.getDatasets({ maxResults: 1 });
for (const datasetId of this.resolved.datasetIds) {
await client.dataset(datasetId).get();
}
return { success: true };
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) };
}
}
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const tables: KtxSchemaTable[] = [];
for (const datasetId of this.resolved.datasetIds) {
tables.push(...(await this.introspectDataset(datasetId)));
}
return {
connectionId: this.connectionId,
driver: 'bigquery',
extractedAt: this.now().toISOString(),
scope: { catalogs: [this.resolved.projectId], datasets: this.resolved.datasetIds },
metadata: {
project_id: this.resolved.projectId,
datasets: this.resolved.datasetIds,
table_count: tables.length,
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables,
};
}
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult & { headerTypes?: string[] }> {
this.assertConnection(input.connectionId);
const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns));
return { headers: result.headers, headerTypes: result.headerTypes, rows: result.rows, totalRows: result.totalRows };
}
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.query(
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
);
return { values: result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]), nullCount: null, distinctCount: null };
}
async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
return null;
}
async executeReadOnly(input: KtxBigQueryReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
this.assertConnection(input.connectionId);
const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
const prepared = this.dialect.prepareQuery(limitedSql, input.params);
const result = await this.query(prepared.sql, prepared.params);
return { ...result, rowCount: result.rows.length };
}
async getColumnDistinctValues(
table: KtxTableRef,
columnName: string,
options: KtxBigQueryColumnDistinctValuesOptions,
): Promise<KtxBigQueryColumnDistinctValuesResult | null> {
const tableName = this.qTableName(table);
const quotedColumn = this.dialect.quoteIdentifier(columnName);
const cardinality = await this.singleNumber(
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, options.sampleSize ?? 10000),
'cardinality',
);
if (cardinality === null) {
return null;
}
if (cardinality === 0) {
return { values: [], cardinality: 0 };
}
if (cardinality > options.maxCardinality) {
return { values: null, cardinality };
}
const valueRows = await this.queryRaw<{ val: unknown }>(
this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit),
);
return { values: valueRows.filter((row) => row.val !== null).map((row) => String(row.val)), cardinality };
}
async getTableRowCount(tableName: string, datasetId = this.resolved.datasetIds[0]): Promise<number> {
if (!datasetId) {
return 0;
}
const tables = await this.introspectDataset(datasetId);
return tables.find((table) => table.name === tableName)?.estimatedRows ?? 0;
}
qTableName(table: Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>): string {
return this.dialect.formatTableName(table);
}
quoteIdentifier(identifier: string): string {
return this.dialect.quoteIdentifier(identifier);
}
async listDatasets(): Promise<string[]> {
const [datasets] = await this.getClient().getDatasets();
return datasets.map((dataset) => dataset.id).filter((id): id is string => Boolean(id));
}
async listTables(datasetIds?: string[]): Promise<KtxTableListEntry[]> {
const filterDatasets = datasetIds ?? (await this.listDatasets());
const entries: KtxTableListEntry[] = [];
for (const datasetId of filterDatasets) {
const dataset = this.getClient().dataset(datasetId);
const [tables] = await dataset.getTables();
for (const table of tables) {
if (!table.id) continue;
entries.push({
schema: datasetId,
name: table.id,
kind: table.metadata?.type === 'VIEW' ? 'view' : 'table',
});
}
}
entries.sort((a, b) => a.schema.localeCompare(b.schema) || a.name.localeCompare(b.name));
return entries;
}
async cleanup(): Promise<void> {
this.client = null;
}
private getClient(): KtxBigQueryClient {
if (!this.client) {
this.client = this.clientFactory.createClient({
projectId: this.resolved.projectId,
credentials: this.resolved.credentials,
});
}
return this.client;
}
private async query(sql: string, params?: Record<string, unknown>): Promise<KtxQueryResult> {
const [job] = await this.getClient().createQueryJob({
query: sql,
...(this.resolved.location ? { location: this.resolved.location } : {}),
...(params && Object.keys(params).length > 0 ? { params } : {}),
...(this.maxBytesBilled ? { maximumBytesBilled: String(this.maxBytesBilled) } : {}),
...(this.queryTimeoutMs ? { jobTimeoutMs: this.queryTimeoutMs } : {}),
});
const [rows, , response] = await job.getQueryResults();
let headers = response?.schema?.fields?.map((field) => field.name || '') ?? [];
const headerTypes = response?.schema?.fields?.map((field) => String(field.type || 'STRING')) ?? [];
if (headers.length === 0 && rows.length > 0) {
headers = Object.keys(rows[0]!);
}
return {
headers,
headerTypes: headerTypes.length > 0 ? headerTypes : undefined,
rows: rows.map((row) => headers.map((header) => normalizeValue(row[header]))),
totalRows: rows.length,
rowCount: rows.length,
};
}
private async queryRaw<T extends Record<string, unknown>>(sql: string, params?: Record<string, unknown>): Promise<T[]> {
const result = await this.query(sql, params);
return result.rows.map((row) => Object.fromEntries(result.headers.map((header, index) => [header, row[index]])) as T);
}
private async singleNumber(sql: string, header: string): Promise<number | null> {
const rows = await this.queryRaw<Record<string, unknown>>(sql);
return firstNumber(rows[0]?.[header]);
}
private async introspectDataset(datasetId: string): Promise<KtxSchemaTable[]> {
const dataset = this.getClient().dataset(datasetId);
const [tableRefs] = await dataset.getTables();
const primaryKeys = await this.primaryKeys(datasetId);
const tables: KtxSchemaTable[] = [];
for (const tableRef of tableRefs) {
const tableName = tableRef.id || '';
const [table] = await tableRef.get();
const fields = table.metadata.schema?.fields ?? [];
tables.push({
catalog: this.resolved.projectId,
db: datasetId,
name: tableName,
kind: tableKind(table.metadata.type),
comment: table.metadata.description || null,
estimatedRows: firstNumber(table.metadata.numRows) ?? 0,
columns: fields.map((field) => this.toSchemaColumn(tableName, field, primaryKeys)),
foreignKeys: [],
});
}
return tables;
}
private async primaryKeys(datasetId: string): Promise<Map<string, Set<string>>> {
const rows = await this.queryRaw<{ table_name: string; column_name: string }>(
'SELECT tc.table_name, kcu.column_name ' +
'FROM `' +
this.resolved.projectId +
'.' +
datasetId +
'.INFORMATION_SCHEMA.TABLE_CONSTRAINTS` tc ' +
'JOIN `' +
this.resolved.projectId +
'.' +
datasetId +
'.INFORMATION_SCHEMA.KEY_COLUMN_USAGE` kcu ' +
'ON tc.constraint_name = kcu.constraint_name ' +
'AND tc.table_schema = kcu.table_schema ' +
'AND tc.table_name = kcu.table_name ' +
"WHERE tc.constraint_type = 'PRIMARY KEY' " +
"AND tc.table_schema = '" +
datasetId +
"' " +
"AND NOT REGEXP_CONTAINS(kcu.column_name, r'^(stacksync_record_id|sync_primary_key)_') " +
'ORDER BY tc.table_name, kcu.ordinal_position',
);
const grouped = new Map<string, Set<string>>();
for (const row of rows) {
const columns = grouped.get(row.table_name) ?? new Set<string>();
columns.add(row.column_name);
grouped.set(row.table_name, columns);
}
return grouped;
}
private toSchemaColumn(tableName: string, field: TableField, primaryKeys: Map<string, Set<string>>): KtxSchemaColumn {
const nativeType = String(field.type || 'STRING').toUpperCase();
return {
name: field.name || '',
nativeType,
normalizedType: this.dialect.mapDataType(nativeType),
dimensionType: this.dialect.mapToDimensionType(nativeType),
nullable: field.mode !== 'REQUIRED',
primaryKey: primaryKeys.get(tableName)?.has(field.name || '') ?? false,
comment: field.description || null,
};
}
private assertConnection(connectionId: string): void {
if (connectionId !== this.connectionId) {
throw new Error(`BigQuery connector ${this.connectionId} cannot scan connection ${connectionId}`);
}
}
}