From 5273406b6a10538b09776ba22f2cd49f55a932c1 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Thu, 21 May 2026 19:35:47 +0200 Subject: [PATCH] fix: make bigquery table discovery schema scoped --- .../src/connectors/bigquery/connector.test.ts | 81 +++++++++++++++++++ .../cli/src/connectors/bigquery/connector.ts | 59 +++++++++----- .../connections/bigquery-identifiers.test.ts | 19 +++++ .../connections/bigquery-identifiers.ts | 17 ++++ .../bigquery-query-history-reader.ts | 20 +---- 5 files changed, 158 insertions(+), 38 deletions(-) create mode 100644 packages/cli/src/context/connections/bigquery-identifiers.test.ts create mode 100644 packages/cli/src/context/connections/bigquery-identifiers.ts diff --git a/packages/cli/src/connectors/bigquery/connector.test.ts b/packages/cli/src/connectors/bigquery/connector.test.ts index 5a890612..314534dc 100644 --- a/packages/cli/src/connectors/bigquery/connector.test.ts +++ b/packages/cli/src/connectors/bigquery/connector.test.ts @@ -234,6 +234,87 @@ describe('KtxBigQueryScanConnector', () => { await connector.cleanup(); }); + it('constructs for discovery without dataset scope and lists tables through one region information schema query', async () => { + const createQueryJob = vi.fn( + async (input: { query: string; params?: Record; location?: string }) => [ + { + getQueryResults: async () => [ + [ + { table_schema: 'analytics', table_name: 'orders', table_type: 'BASE TABLE' }, + { table_schema: 'analytics', table_name: 'order_clone', table_type: 'CLONE' }, + { table_schema: 'mart', table_name: 'orders_mv', table_type: 'MATERIALIZED VIEW' }, + ], + undefined, + { + schema: { + fields: [ + { name: 'table_schema', type: 'STRING' }, + { name: 'table_name', type: 'STRING' }, + { name: 'table_type', type: 'STRING' }, + ], + }, + }, + ], + }, + ], + ); + const clientFactory: KtxBigQueryClientFactory = { + createClient: vi.fn(() => ({ + getDatasets: vi.fn(async () => [[{ id: 'analytics' }, { id: 'mart' }]]), + dataset: vi.fn((datasetId: string) => ({ + get: vi.fn(async () => [{ id: datasetId }]), + getTables: vi.fn(async () => [[]]), + })), + createQueryJob, + })), + }; + const connector = new KtxBigQueryScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'bigquery', + credentials_json: JSON.stringify({ project_id: 'project-1' }), + location: 'US', + }, + clientFactory, + }); + + await expect(connector.listTables(['analytics', 'mart'])).resolves.toEqual([ + { schema: 'analytics', name: 'orders', kind: 'table' }, + { schema: 'analytics', name: 'order_clone', kind: 'table' }, + { schema: 'mart', name: 'orders_mv', kind: 'view' }, + ]); + + expect(createQueryJob).toHaveBeenCalledTimes(1); + expect(createQueryJob).toHaveBeenCalledWith( + expect.objectContaining({ + location: 'US', + params: { dataset_ids: ['analytics', 'mart'] }, + }), + ); + expect(createQueryJob.mock.calls[0]?.[0].query).toContain('`project-1`.`region-us`.INFORMATION_SCHEMA.TABLES'); + expect(createQueryJob.mock.calls[0]?.[0].query).toContain("'CLONE'"); + expect(createQueryJob.mock.calls[0]?.[0].query).toContain("'SNAPSHOT'"); + }); + + it('keeps scan paths requiring dataset scope', async () => { + const connector = new KtxBigQueryScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'bigquery', + credentials_json: JSON.stringify({ project_id: 'project-1' }), + location: 'US', + }, + clientFactory: fakeClientFactory(), + }); + + await expect( + connector.introspect( + { connectionId: 'warehouse', driver: 'bigquery' }, + { runId: 'scan-run-1' }, + ), + ).rejects.toThrow('Native BigQuery scan requires connections.warehouse.dataset_ids or dataset_id'); + }); + it('applies maximumBytesBilled to read-only queries when configured', async () => { const clientFactory = fakeClientFactory(); const connector = new KtxBigQueryScanConnector({ diff --git a/packages/cli/src/connectors/bigquery/connector.ts b/packages/cli/src/connectors/bigquery/connector.ts index 1c6c964b..6a93ccb0 100644 --- a/packages/cli/src/connectors/bigquery/connector.ts +++ b/packages/cli/src/connectors/bigquery/connector.ts @@ -1,4 +1,5 @@ import { BigQuery, type TableField } from '@google-cloud/bigquery'; +import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from '../../context/connections/bigquery-identifiers.js'; import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js'; import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js'; import { readFileSync } from 'node:fs'; @@ -230,9 +231,6 @@ export function bigQueryConnectionConfigFromConfig(input: { throw new Error(`Native BigQuery connector requires credentials_json.project_id for connections.${input.connectionId}`); } const resolvedDatasetIds = datasetIds(input.connection, env); - if (resolvedDatasetIds.length === 0) { - throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.dataset_id or dataset_ids`); - } const location = stringConfigValue(input.connection, 'location', env); return { projectId, credentials, datasetIds: resolvedDatasetIds, ...(location ? { location } : {}) }; } @@ -289,17 +287,18 @@ export class KtxBigQueryScanConnector implements KtxScanConnector { async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const tables: KtxSchemaTable[] = []; - for (const datasetId of this.resolved.datasetIds) { + const datasetIds = this.requireDatasetIdsForScan(); + for (const datasetId of datasetIds) { tables.push(...(await this.introspectDataset(datasetId))); } return { connectionId: this.connectionId, driver: 'bigquery', extractedAt: this.now().toISOString(), - scope: { catalogs: [this.resolved.projectId], datasets: this.resolved.datasetIds }, + scope: { catalogs: [this.resolved.projectId], datasets: datasetIds }, metadata: { project_id: this.resolved.projectId, - datasets: this.resolved.datasetIds, + datasets: datasetIds, table_count: tables.length, total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0), }, @@ -381,22 +380,33 @@ export class KtxBigQueryScanConnector implements KtxScanConnector { } async listTables(datasetIds?: string[]): Promise { - const filterDatasets = datasetIds ?? (await this.listDatasets()); - const entries: KtxTableListEntry[] = []; - for (const datasetId of filterDatasets) { - const dataset = this.getClient().dataset(datasetId); - const [tables] = await dataset.getTables(); - for (const table of tables) { - if (!table.id) continue; - entries.push({ - schema: datasetId, - name: table.id, - kind: table.metadata?.type === 'VIEW' ? 'view' : 'table', - }); - } + const projectId = normalizeBigQueryProjectId(this.resolved.projectId, 'table discovery'); + const region = normalizeBigQueryRegion(this.resolved.location ?? 'US', 'table discovery'); + const params: Record = {}; + const filter = datasetIds && datasetIds.length > 0 ? 'AND table_schema IN UNNEST(@dataset_ids)' : ''; + if (datasetIds && datasetIds.length > 0) { + params.dataset_ids = datasetIds; } - entries.sort((a, b) => a.schema.localeCompare(b.schema) || a.name.localeCompare(b.name)); - return entries; + const rows = await this.queryRaw<{ table_schema: string; table_name: string; table_type: string }>( + ` + SELECT table_schema, table_name, table_type + FROM \`${projectId}\`.\`region-${region}\`.INFORMATION_SCHEMA.TABLES + WHERE table_type IN ( + 'BASE TABLE', 'VIEW', 'MATERIALIZED VIEW', 'EXTERNAL', 'CLONE', 'SNAPSHOT' + ) + ${filter} + ORDER BY table_schema, table_name + `, + params, + ); + return rows.map((row) => ({ + schema: row.table_schema, + name: row.table_name, + kind: + row.table_type === 'VIEW' || row.table_type === 'MATERIALIZED VIEW' + ? ('view' as const) + : ('table' as const), + })); } async cleanup(): Promise { @@ -413,6 +423,13 @@ export class KtxBigQueryScanConnector implements KtxScanConnector { return this.client; } + private requireDatasetIdsForScan(): string[] { + if (this.resolved.datasetIds.length === 0) { + throw new Error(`Native BigQuery scan requires connections.${this.connectionId}.dataset_ids or dataset_id`); + } + return this.resolved.datasetIds; + } + private async query(sql: string, params?: Record): Promise { const [job] = await this.getClient().createQueryJob({ query: sql, diff --git a/packages/cli/src/context/connections/bigquery-identifiers.test.ts b/packages/cli/src/context/connections/bigquery-identifiers.test.ts new file mode 100644 index 00000000..a1fd2e09 --- /dev/null +++ b/packages/cli/src/context/connections/bigquery-identifiers.test.ts @@ -0,0 +1,19 @@ +import { describe, expect, it } from 'vitest'; +import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from './bigquery-identifiers.js'; + +describe('BigQuery identifier normalization', () => { + it('normalizes project ids and regions for information schema paths', () => { + expect(normalizeBigQueryProjectId('project-1')).toBe('project-1'); + expect(normalizeBigQueryRegion('US')).toBe('us'); + expect(normalizeBigQueryRegion('region-eu')).toBe('eu'); + }); + + it('rejects malformed project ids and regions with caller-specific context', () => { + expect(() => normalizeBigQueryProjectId('project`1', 'table discovery')).toThrow( + 'Invalid BigQuery project id for table discovery: project`1', + ); + expect(() => normalizeBigQueryRegion('US;DROP', 'table discovery')).toThrow( + 'Invalid BigQuery region for table discovery: US;DROP', + ); + }); +}); diff --git a/packages/cli/src/context/connections/bigquery-identifiers.ts b/packages/cli/src/context/connections/bigquery-identifiers.ts new file mode 100644 index 00000000..f2aa29f9 --- /dev/null +++ b/packages/cli/src/context/connections/bigquery-identifiers.ts @@ -0,0 +1,17 @@ +const BIGQUERY_PROJECT_ID_PATTERN = /^[A-Za-z0-9_-]+$/; +const BIGQUERY_REGION_PATTERN = /^[a-z0-9-]+$/; + +export function normalizeBigQueryProjectId(value: string, context = 'historic-SQL ingest'): string { + if (!BIGQUERY_PROJECT_ID_PATTERN.test(value)) { + throw new Error(`Invalid BigQuery project id for ${context}: ${value}`); + } + return value; +} + +export function normalizeBigQueryRegion(value: string, context = 'historic-SQL ingest'): string { + const normalized = value.trim().toLowerCase().replace(/^region-/, ''); + if (!BIGQUERY_REGION_PATTERN.test(normalized)) { + throw new Error(`Invalid BigQuery region for ${context}: ${value}`); + } + return normalized; +} diff --git a/packages/cli/src/context/ingest/adapters/historic-sql/bigquery-query-history-reader.ts b/packages/cli/src/context/ingest/adapters/historic-sql/bigquery-query-history-reader.ts index e24c50cf..ac4d4c71 100644 --- a/packages/cli/src/context/ingest/adapters/historic-sql/bigquery-query-history-reader.ts +++ b/packages/cli/src/context/ingest/adapters/historic-sql/bigquery-query-history-reader.ts @@ -5,6 +5,7 @@ import { type HistoricSqlTimeWindow, type HistoricSqlUnifiedPullConfig, } from './types.js'; +import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from '../../../connections/bigquery-identifiers.js'; interface QueryResultLike { headers: string[]; @@ -52,21 +53,6 @@ function grantsError(cause: unknown): HistoricSqlGrantsMissingError { }); } -function normalizeProjectId(value: string): string { - if (!/^[A-Za-z0-9_-]+$/.test(value)) { - throw new Error(`Invalid BigQuery project id for historic-SQL ingest: ${value}`); - } - return value; -} - -function normalizeRegion(value: string): string { - const region = value.trim().toLowerCase().replace(/^region-/, ''); - if (!/^[a-z0-9-]+$/.test(region)) { - throw new Error(`Invalid BigQuery region for historic-SQL ingest: ${value}`); - } - return region; -} - function timestampExpression(value: Date | string): string { const date = value instanceof Date ? value : new Date(value); if (Number.isNaN(date.getTime())) { @@ -190,8 +176,8 @@ export class BigQueryHistoricSqlQueryHistoryReader { private readonly viewPath: string; constructor(options: BigQueryHistoricSqlQueryHistoryReaderOptions) { - const projectId = normalizeProjectId(options.projectId); - const region = normalizeRegion(options.region); + const projectId = normalizeBigQueryProjectId(options.projectId); + const region = normalizeBigQueryRegion(options.region); this.viewPath = `\`${projectId}.region-${region}.INFORMATION_SCHEMA.JOBS_BY_PROJECT\``; }