mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
512 lines
18 KiB
TypeScript
512 lines
18 KiB
TypeScript
import { BigQuery, type TableField } from '@google-cloud/bigquery';
|
|
import { assertReadOnlySql, limitSqlForExecution } from '@ktx/context/connections';
|
|
import {
|
|
createKtxConnectorCapabilities,
|
|
type KtxColumnSampleInput,
|
|
type KtxColumnSampleResult,
|
|
type KtxColumnStatsInput,
|
|
type KtxColumnStatsResult,
|
|
type KtxQueryResult,
|
|
type KtxReadOnlyQueryInput,
|
|
type KtxScanConnector,
|
|
type KtxScanContext,
|
|
type KtxScanInput,
|
|
type KtxSchemaColumn,
|
|
type KtxSchemaSnapshot,
|
|
type KtxSchemaTable,
|
|
type KtxTableListEntry,
|
|
type KtxTableRef,
|
|
type KtxTableSampleInput,
|
|
type KtxTableSampleResult,
|
|
} from '@ktx/context/scan';
|
|
import { readFileSync } from 'node:fs';
|
|
import { homedir } from 'node:os';
|
|
import { resolve } from 'node:path';
|
|
import { KtxBigQueryDialect } from './dialect.js';
|
|
|
|
export interface KtxBigQueryConnectionConfig {
|
|
driver?: string;
|
|
dataset_id?: string;
|
|
dataset_ids?: string[];
|
|
credentials_json?: string;
|
|
location?: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
export interface KtxBigQueryResolvedConnectionConfig {
|
|
projectId: string;
|
|
credentials: Record<string, unknown>;
|
|
datasetIds: string[];
|
|
location?: string;
|
|
}
|
|
|
|
export interface KtxBigQueryReadOnlyQueryInput extends KtxReadOnlyQueryInput {
|
|
params?: Record<string, unknown>;
|
|
}
|
|
|
|
export interface KtxBigQueryColumnDistinctValuesOptions {
|
|
maxCardinality: number;
|
|
limit: number;
|
|
sampleSize?: number;
|
|
}
|
|
|
|
export interface KtxBigQueryColumnDistinctValuesResult {
|
|
values: string[] | null;
|
|
cardinality: number;
|
|
}
|
|
|
|
export interface KtxBigQueryQueryJob {
|
|
getQueryResults(): Promise<
|
|
[Array<Record<string, unknown>>, unknown, { schema?: { fields?: TableField[] } }?, ...unknown[]]
|
|
>;
|
|
}
|
|
|
|
export interface KtxBigQueryTableRef {
|
|
id?: string;
|
|
metadata?: { type?: string };
|
|
get(): Promise<
|
|
[
|
|
{
|
|
metadata: {
|
|
type?: string;
|
|
numRows?: string | number;
|
|
description?: string;
|
|
schema?: { fields?: TableField[] };
|
|
};
|
|
},
|
|
...unknown[],
|
|
]
|
|
>;
|
|
}
|
|
|
|
export interface KtxBigQueryDataset {
|
|
get(): Promise<unknown>;
|
|
getTables(): Promise<[KtxBigQueryTableRef[], ...unknown[]]>;
|
|
}
|
|
|
|
export interface KtxBigQueryClient {
|
|
getDatasets(input?: { maxResults?: number }): Promise<[Array<{ id?: string }>, ...unknown[]]>;
|
|
dataset(datasetId: string): KtxBigQueryDataset;
|
|
createQueryJob(input: {
|
|
query: string;
|
|
location?: string;
|
|
params?: Record<string, unknown>;
|
|
maximumBytesBilled?: string;
|
|
jobTimeoutMs?: number;
|
|
}): Promise<[KtxBigQueryQueryJob, ...unknown[]]>;
|
|
}
|
|
|
|
export interface KtxBigQueryClientFactory {
|
|
createClient(input: { projectId: string; credentials: Record<string, unknown> }): KtxBigQueryClient;
|
|
}
|
|
|
|
export interface KtxBigQueryScanConnectorOptions {
|
|
connectionId: string;
|
|
connection: KtxBigQueryConnectionConfig | undefined;
|
|
clientFactory?: KtxBigQueryClientFactory;
|
|
env?: NodeJS.ProcessEnv;
|
|
now?: () => Date;
|
|
maxBytesBilled?: number | string;
|
|
queryTimeoutMs?: number;
|
|
}
|
|
|
|
class DefaultBigQueryClientFactory implements KtxBigQueryClientFactory {
|
|
createClient(input: { projectId: string; credentials: Record<string, unknown> }): KtxBigQueryClient {
|
|
const client = new BigQuery(input);
|
|
return {
|
|
getDatasets: (options) => client.getDatasets(options) as Promise<[Array<{ id?: string }>, ...unknown[]]>,
|
|
dataset: (datasetId) => {
|
|
const dataset = client.dataset(datasetId);
|
|
return {
|
|
get: () => dataset.get() as Promise<unknown>,
|
|
getTables: () => dataset.getTables() as Promise<[KtxBigQueryTableRef[], ...unknown[]]>,
|
|
};
|
|
},
|
|
createQueryJob: (options) => client.createQueryJob(options) as Promise<[KtxBigQueryQueryJob, ...unknown[]]>,
|
|
};
|
|
}
|
|
}
|
|
|
|
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
|
|
if (value.startsWith('env:')) {
|
|
return env[value.slice('env:'.length)] ?? '';
|
|
}
|
|
if (value.startsWith('file:')) {
|
|
const rawPath = value.slice('file:'.length);
|
|
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
|
|
return readFileSync(path, 'utf-8').trim();
|
|
}
|
|
return value;
|
|
}
|
|
|
|
function stringConfigValue(
|
|
connection: KtxBigQueryConnectionConfig | undefined,
|
|
key: keyof KtxBigQueryConnectionConfig,
|
|
env: NodeJS.ProcessEnv,
|
|
): string | undefined {
|
|
const value = connection?.[key];
|
|
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
|
|
}
|
|
|
|
function datasetIds(connection: KtxBigQueryConnectionConfig, env: NodeJS.ProcessEnv): string[] {
|
|
if (Array.isArray(connection.dataset_ids) && connection.dataset_ids.length > 0) {
|
|
return connection.dataset_ids
|
|
.filter((dataset) => dataset.trim().length > 0)
|
|
.map((dataset) => resolveStringReference(dataset, env));
|
|
}
|
|
const datasetId = stringConfigValue(connection, 'dataset_id', env);
|
|
return datasetId ? [datasetId] : [];
|
|
}
|
|
|
|
function tableKind(metadataType: string | undefined): KtxSchemaTable['kind'] {
|
|
const type = String(metadataType ?? '').toUpperCase();
|
|
if (type === 'VIEW' || type === 'MATERIALIZED_VIEW') {
|
|
return 'view';
|
|
}
|
|
if (type === 'EXTERNAL' || type === 'EXTERNAL_TABLE') {
|
|
return 'external';
|
|
}
|
|
return 'table';
|
|
}
|
|
|
|
function firstNumber(value: unknown): number | null {
|
|
const numberValue = Number(value);
|
|
return Number.isFinite(numberValue) ? numberValue : null;
|
|
}
|
|
|
|
function normalizeValue(value: unknown): unknown {
|
|
if (value === null || value === undefined) {
|
|
return null;
|
|
}
|
|
if (Array.isArray(value)) {
|
|
return value.map((item) => String(item)).join(', ');
|
|
}
|
|
if (typeof value === 'object') {
|
|
if ('toNumber' in value && typeof value.toNumber === 'function' && 'toFixed' in value && typeof value.toFixed === 'function') {
|
|
return value.toNumber();
|
|
}
|
|
if ('value' in value && Object.keys(value).length === 1 && typeof value.value !== 'object') {
|
|
return value.value;
|
|
}
|
|
return JSON.stringify(value);
|
|
}
|
|
return value;
|
|
}
|
|
|
|
export function isKtxBigQueryConnectionConfig(
|
|
connection: KtxBigQueryConnectionConfig | undefined,
|
|
): connection is KtxBigQueryConnectionConfig {
|
|
return String(connection?.driver ?? '').toLowerCase() === 'bigquery';
|
|
}
|
|
|
|
export function bigQueryConnectionConfigFromConfig(input: {
|
|
connectionId: string;
|
|
connection: KtxBigQueryConnectionConfig | undefined;
|
|
env?: NodeJS.ProcessEnv;
|
|
}): KtxBigQueryResolvedConnectionConfig {
|
|
const inputDriver = input.connection?.driver ?? 'unknown';
|
|
if (!isKtxBigQueryConnectionConfig(input.connection)) {
|
|
throw new Error(`Native BigQuery connector cannot run driver "${inputDriver}"`);
|
|
}
|
|
|
|
const env = input.env ?? process.env;
|
|
const credentialsJson = stringConfigValue(input.connection, 'credentials_json', env);
|
|
if (!credentialsJson) {
|
|
throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.credentials_json`);
|
|
}
|
|
const credentials = JSON.parse(credentialsJson) as Record<string, unknown>;
|
|
const projectId = typeof credentials.project_id === 'string' ? credentials.project_id : undefined;
|
|
if (!projectId) {
|
|
throw new Error(`Native BigQuery connector requires credentials_json.project_id for connections.${input.connectionId}`);
|
|
}
|
|
const resolvedDatasetIds = datasetIds(input.connection, env);
|
|
if (resolvedDatasetIds.length === 0) {
|
|
throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.dataset_id or dataset_ids`);
|
|
}
|
|
const location = stringConfigValue(input.connection, 'location', env);
|
|
return { projectId, credentials, datasetIds: resolvedDatasetIds, ...(location ? { location } : {}) };
|
|
}
|
|
|
|
export class KtxBigQueryScanConnector implements KtxScanConnector {
|
|
readonly id: string;
|
|
readonly driver = 'bigquery' as const;
|
|
readonly capabilities = createKtxConnectorCapabilities({
|
|
tableSampling: true,
|
|
columnSampling: true,
|
|
columnStats: false,
|
|
readOnlySql: true,
|
|
nestedAnalysis: true,
|
|
formalForeignKeys: false,
|
|
estimatedRowCounts: true,
|
|
});
|
|
|
|
private readonly connectionId: string;
|
|
private readonly resolved: KtxBigQueryResolvedConnectionConfig;
|
|
private readonly clientFactory: KtxBigQueryClientFactory;
|
|
private readonly now: () => Date;
|
|
private readonly maxBytesBilled?: number | string;
|
|
private readonly queryTimeoutMs?: number;
|
|
private readonly dialect = new KtxBigQueryDialect();
|
|
private client: KtxBigQueryClient | null = null;
|
|
|
|
constructor(options: KtxBigQueryScanConnectorOptions) {
|
|
this.connectionId = options.connectionId;
|
|
this.resolved = bigQueryConnectionConfigFromConfig({
|
|
connectionId: options.connectionId,
|
|
connection: options.connection,
|
|
env: options.env,
|
|
});
|
|
this.clientFactory = options.clientFactory ?? new DefaultBigQueryClientFactory();
|
|
this.now = options.now ?? (() => new Date());
|
|
this.maxBytesBilled = options.maxBytesBilled;
|
|
this.queryTimeoutMs = options.queryTimeoutMs;
|
|
this.id = `bigquery:${options.connectionId}`;
|
|
}
|
|
|
|
async testConnection(): Promise<{ success: boolean; error?: string }> {
|
|
try {
|
|
const client = this.getClient();
|
|
await client.getDatasets({ maxResults: 1 });
|
|
for (const datasetId of this.resolved.datasetIds) {
|
|
await client.dataset(datasetId).get();
|
|
}
|
|
return { success: true };
|
|
} catch (error) {
|
|
return { success: false, error: error instanceof Error ? error.message : String(error) };
|
|
}
|
|
}
|
|
|
|
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
|
|
this.assertConnection(input.connectionId);
|
|
const tables: KtxSchemaTable[] = [];
|
|
for (const datasetId of this.resolved.datasetIds) {
|
|
tables.push(...(await this.introspectDataset(datasetId)));
|
|
}
|
|
return {
|
|
connectionId: this.connectionId,
|
|
driver: 'bigquery',
|
|
extractedAt: this.now().toISOString(),
|
|
scope: { catalogs: [this.resolved.projectId], datasets: this.resolved.datasetIds },
|
|
metadata: {
|
|
project_id: this.resolved.projectId,
|
|
datasets: this.resolved.datasetIds,
|
|
table_count: tables.length,
|
|
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
|
|
},
|
|
tables,
|
|
};
|
|
}
|
|
|
|
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxTableSampleResult & { headerTypes?: string[] }> {
|
|
this.assertConnection(input.connectionId);
|
|
const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns));
|
|
return { headers: result.headers, headerTypes: result.headerTypes, rows: result.rows, totalRows: result.totalRows };
|
|
}
|
|
|
|
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
|
|
this.assertConnection(input.connectionId);
|
|
const result = await this.query(
|
|
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
|
|
);
|
|
return { values: result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]), nullCount: null, distinctCount: null };
|
|
}
|
|
|
|
async columnStats(_input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
|
|
return null;
|
|
}
|
|
|
|
async executeReadOnly(input: KtxBigQueryReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
|
|
this.assertConnection(input.connectionId);
|
|
const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
|
|
const prepared = this.dialect.prepareQuery(limitedSql, input.params);
|
|
const result = await this.query(prepared.sql, prepared.params);
|
|
return { ...result, rowCount: result.rows.length };
|
|
}
|
|
|
|
async getColumnDistinctValues(
|
|
table: KtxTableRef,
|
|
columnName: string,
|
|
options: KtxBigQueryColumnDistinctValuesOptions,
|
|
): Promise<KtxBigQueryColumnDistinctValuesResult | null> {
|
|
const tableName = this.qTableName(table);
|
|
const quotedColumn = this.dialect.quoteIdentifier(columnName);
|
|
const cardinality = await this.singleNumber(
|
|
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, options.sampleSize ?? 10000),
|
|
'cardinality',
|
|
);
|
|
if (cardinality === null) {
|
|
return null;
|
|
}
|
|
if (cardinality === 0) {
|
|
return { values: [], cardinality: 0 };
|
|
}
|
|
if (cardinality > options.maxCardinality) {
|
|
return { values: null, cardinality };
|
|
}
|
|
const valueRows = await this.queryRaw<{ val: unknown }>(
|
|
this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit),
|
|
);
|
|
return { values: valueRows.filter((row) => row.val !== null).map((row) => String(row.val)), cardinality };
|
|
}
|
|
|
|
async getTableRowCount(tableName: string, datasetId = this.resolved.datasetIds[0]): Promise<number> {
|
|
if (!datasetId) {
|
|
return 0;
|
|
}
|
|
const tables = await this.introspectDataset(datasetId);
|
|
return tables.find((table) => table.name === tableName)?.estimatedRows ?? 0;
|
|
}
|
|
|
|
qTableName(table: Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>): string {
|
|
return this.dialect.formatTableName(table);
|
|
}
|
|
|
|
quoteIdentifier(identifier: string): string {
|
|
return this.dialect.quoteIdentifier(identifier);
|
|
}
|
|
|
|
async listDatasets(): Promise<string[]> {
|
|
const [datasets] = await this.getClient().getDatasets();
|
|
return datasets.map((dataset) => dataset.id).filter((id): id is string => Boolean(id));
|
|
}
|
|
|
|
async listTables(datasetIds?: string[]): Promise<KtxTableListEntry[]> {
|
|
const filterDatasets = datasetIds ?? (await this.listDatasets());
|
|
const entries: KtxTableListEntry[] = [];
|
|
for (const datasetId of filterDatasets) {
|
|
const dataset = this.getClient().dataset(datasetId);
|
|
const [tables] = await dataset.getTables();
|
|
for (const table of tables) {
|
|
if (!table.id) continue;
|
|
entries.push({
|
|
schema: datasetId,
|
|
name: table.id,
|
|
kind: table.metadata?.type === 'VIEW' ? 'view' : 'table',
|
|
});
|
|
}
|
|
}
|
|
entries.sort((a, b) => a.schema.localeCompare(b.schema) || a.name.localeCompare(b.name));
|
|
return entries;
|
|
}
|
|
|
|
async cleanup(): Promise<void> {
|
|
this.client = null;
|
|
}
|
|
|
|
private getClient(): KtxBigQueryClient {
|
|
if (!this.client) {
|
|
this.client = this.clientFactory.createClient({
|
|
projectId: this.resolved.projectId,
|
|
credentials: this.resolved.credentials,
|
|
});
|
|
}
|
|
return this.client;
|
|
}
|
|
|
|
private async query(sql: string, params?: Record<string, unknown>): Promise<KtxQueryResult> {
|
|
const [job] = await this.getClient().createQueryJob({
|
|
query: sql,
|
|
...(this.resolved.location ? { location: this.resolved.location } : {}),
|
|
...(params && Object.keys(params).length > 0 ? { params } : {}),
|
|
...(this.maxBytesBilled ? { maximumBytesBilled: String(this.maxBytesBilled) } : {}),
|
|
...(this.queryTimeoutMs ? { jobTimeoutMs: this.queryTimeoutMs } : {}),
|
|
});
|
|
const [rows, , response] = await job.getQueryResults();
|
|
let headers = response?.schema?.fields?.map((field) => field.name || '') ?? [];
|
|
const headerTypes = response?.schema?.fields?.map((field) => String(field.type || 'STRING')) ?? [];
|
|
if (headers.length === 0 && rows.length > 0) {
|
|
headers = Object.keys(rows[0]!);
|
|
}
|
|
return {
|
|
headers,
|
|
headerTypes: headerTypes.length > 0 ? headerTypes : undefined,
|
|
rows: rows.map((row) => headers.map((header) => normalizeValue(row[header]))),
|
|
totalRows: rows.length,
|
|
rowCount: rows.length,
|
|
};
|
|
}
|
|
|
|
private async queryRaw<T extends Record<string, unknown>>(sql: string, params?: Record<string, unknown>): Promise<T[]> {
|
|
const result = await this.query(sql, params);
|
|
return result.rows.map((row) => Object.fromEntries(result.headers.map((header, index) => [header, row[index]])) as T);
|
|
}
|
|
|
|
private async singleNumber(sql: string, header: string): Promise<number | null> {
|
|
const rows = await this.queryRaw<Record<string, unknown>>(sql);
|
|
return firstNumber(rows[0]?.[header]);
|
|
}
|
|
|
|
private async introspectDataset(datasetId: string): Promise<KtxSchemaTable[]> {
|
|
const dataset = this.getClient().dataset(datasetId);
|
|
const [tableRefs] = await dataset.getTables();
|
|
const primaryKeys = await this.primaryKeys(datasetId);
|
|
const tables: KtxSchemaTable[] = [];
|
|
for (const tableRef of tableRefs) {
|
|
const tableName = tableRef.id || '';
|
|
const [table] = await tableRef.get();
|
|
const fields = table.metadata.schema?.fields ?? [];
|
|
tables.push({
|
|
catalog: this.resolved.projectId,
|
|
db: datasetId,
|
|
name: tableName,
|
|
kind: tableKind(table.metadata.type),
|
|
comment: table.metadata.description || null,
|
|
estimatedRows: firstNumber(table.metadata.numRows) ?? 0,
|
|
columns: fields.map((field) => this.toSchemaColumn(tableName, field, primaryKeys)),
|
|
foreignKeys: [],
|
|
});
|
|
}
|
|
return tables;
|
|
}
|
|
|
|
private async primaryKeys(datasetId: string): Promise<Map<string, Set<string>>> {
|
|
const rows = await this.queryRaw<{ table_name: string; column_name: string }>(
|
|
'SELECT tc.table_name, kcu.column_name ' +
|
|
'FROM `' +
|
|
this.resolved.projectId +
|
|
'.' +
|
|
datasetId +
|
|
'.INFORMATION_SCHEMA.TABLE_CONSTRAINTS` tc ' +
|
|
'JOIN `' +
|
|
this.resolved.projectId +
|
|
'.' +
|
|
datasetId +
|
|
'.INFORMATION_SCHEMA.KEY_COLUMN_USAGE` kcu ' +
|
|
'ON tc.constraint_name = kcu.constraint_name ' +
|
|
'AND tc.table_schema = kcu.table_schema ' +
|
|
'AND tc.table_name = kcu.table_name ' +
|
|
"WHERE tc.constraint_type = 'PRIMARY KEY' " +
|
|
"AND tc.table_schema = '" +
|
|
datasetId +
|
|
"' " +
|
|
"AND NOT REGEXP_CONTAINS(kcu.column_name, r'^(stacksync_record_id|sync_primary_key)_') " +
|
|
'ORDER BY tc.table_name, kcu.ordinal_position',
|
|
);
|
|
const grouped = new Map<string, Set<string>>();
|
|
for (const row of rows) {
|
|
const columns = grouped.get(row.table_name) ?? new Set<string>();
|
|
columns.add(row.column_name);
|
|
grouped.set(row.table_name, columns);
|
|
}
|
|
return grouped;
|
|
}
|
|
|
|
private toSchemaColumn(tableName: string, field: TableField, primaryKeys: Map<string, Set<string>>): KtxSchemaColumn {
|
|
const nativeType = String(field.type || 'STRING').toUpperCase();
|
|
return {
|
|
name: field.name || '',
|
|
nativeType,
|
|
normalizedType: this.dialect.mapDataType(nativeType),
|
|
dimensionType: this.dialect.mapToDimensionType(nativeType),
|
|
nullable: field.mode !== 'REQUIRED',
|
|
primaryKey: primaryKeys.get(tableName)?.has(field.name || '') ?? false,
|
|
comment: field.description || null,
|
|
};
|
|
}
|
|
|
|
private assertConnection(connectionId: string): void {
|
|
if (connectionId !== this.connectionId) {
|
|
throw new Error(`BigQuery connector ${this.connectionId} cannot scan connection ${connectionId}`);
|
|
}
|
|
}
|
|
}
|