fix: make bigquery table discovery schema scoped

This commit is contained in:
Andrey Avtomonov 2026-05-21 19:35:47 +02:00
parent 5bd6f1a2a6
commit 5273406b6a
5 changed files with 158 additions and 38 deletions

View file

@ -234,6 +234,87 @@ describe('KtxBigQueryScanConnector', () => {
await connector.cleanup();
});
it('constructs for discovery without dataset scope and lists tables through one region information schema query', async () => {
const createQueryJob = vi.fn(
async (input: { query: string; params?: Record<string, unknown>; location?: string }) => [
{
getQueryResults: async () => [
[
{ table_schema: 'analytics', table_name: 'orders', table_type: 'BASE TABLE' },
{ table_schema: 'analytics', table_name: 'order_clone', table_type: 'CLONE' },
{ table_schema: 'mart', table_name: 'orders_mv', table_type: 'MATERIALIZED VIEW' },
],
undefined,
{
schema: {
fields: [
{ name: 'table_schema', type: 'STRING' },
{ name: 'table_name', type: 'STRING' },
{ name: 'table_type', type: 'STRING' },
],
},
},
],
},
],
);
const clientFactory: KtxBigQueryClientFactory = {
createClient: vi.fn(() => ({
getDatasets: vi.fn(async () => [[{ id: 'analytics' }, { id: 'mart' }]]),
dataset: vi.fn((datasetId: string) => ({
get: vi.fn(async () => [{ id: datasetId }]),
getTables: vi.fn(async () => [[]]),
})),
createQueryJob,
})),
};
const connector = new KtxBigQueryScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'bigquery',
credentials_json: JSON.stringify({ project_id: 'project-1' }),
location: 'US',
},
clientFactory,
});
await expect(connector.listTables(['analytics', 'mart'])).resolves.toEqual([
{ schema: 'analytics', name: 'orders', kind: 'table' },
{ schema: 'analytics', name: 'order_clone', kind: 'table' },
{ schema: 'mart', name: 'orders_mv', kind: 'view' },
]);
expect(createQueryJob).toHaveBeenCalledTimes(1);
expect(createQueryJob).toHaveBeenCalledWith(
expect.objectContaining({
location: 'US',
params: { dataset_ids: ['analytics', 'mart'] },
}),
);
expect(createQueryJob.mock.calls[0]?.[0].query).toContain('`project-1`.`region-us`.INFORMATION_SCHEMA.TABLES');
expect(createQueryJob.mock.calls[0]?.[0].query).toContain("'CLONE'");
expect(createQueryJob.mock.calls[0]?.[0].query).toContain("'SNAPSHOT'");
});
it('keeps scan paths requiring dataset scope', async () => {
const connector = new KtxBigQueryScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'bigquery',
credentials_json: JSON.stringify({ project_id: 'project-1' }),
location: 'US',
},
clientFactory: fakeClientFactory(),
});
await expect(
connector.introspect(
{ connectionId: 'warehouse', driver: 'bigquery' },
{ runId: 'scan-run-1' },
),
).rejects.toThrow('Native BigQuery scan requires connections.warehouse.dataset_ids or dataset_id');
});
it('applies maximumBytesBilled to read-only queries when configured', async () => {
const clientFactory = fakeClientFactory();
const connector = new KtxBigQueryScanConnector({

View file

@ -1,4 +1,5 @@
import { BigQuery, type TableField } from '@google-cloud/bigquery';
import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from '../../context/connections/bigquery-identifiers.js';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
import { readFileSync } from 'node:fs';
@ -230,9 +231,6 @@ export function bigQueryConnectionConfigFromConfig(input: {
throw new Error(`Native BigQuery connector requires credentials_json.project_id for connections.${input.connectionId}`);
}
const resolvedDatasetIds = datasetIds(input.connection, env);
if (resolvedDatasetIds.length === 0) {
throw new Error(`Native BigQuery connector requires connections.${input.connectionId}.dataset_id or dataset_ids`);
}
const location = stringConfigValue(input.connection, 'location', env);
return { projectId, credentials, datasetIds: resolvedDatasetIds, ...(location ? { location } : {}) };
}
@ -289,17 +287,18 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const tables: KtxSchemaTable[] = [];
for (const datasetId of this.resolved.datasetIds) {
const datasetIds = this.requireDatasetIdsForScan();
for (const datasetId of datasetIds) {
tables.push(...(await this.introspectDataset(datasetId)));
}
return {
connectionId: this.connectionId,
driver: 'bigquery',
extractedAt: this.now().toISOString(),
scope: { catalogs: [this.resolved.projectId], datasets: this.resolved.datasetIds },
scope: { catalogs: [this.resolved.projectId], datasets: datasetIds },
metadata: {
project_id: this.resolved.projectId,
datasets: this.resolved.datasetIds,
datasets: datasetIds,
table_count: tables.length,
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
},
@ -381,22 +380,33 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
}
async listTables(datasetIds?: string[]): Promise<KtxTableListEntry[]> {
const filterDatasets = datasetIds ?? (await this.listDatasets());
const entries: KtxTableListEntry[] = [];
for (const datasetId of filterDatasets) {
const dataset = this.getClient().dataset(datasetId);
const [tables] = await dataset.getTables();
for (const table of tables) {
if (!table.id) continue;
entries.push({
schema: datasetId,
name: table.id,
kind: table.metadata?.type === 'VIEW' ? 'view' : 'table',
});
}
const projectId = normalizeBigQueryProjectId(this.resolved.projectId, 'table discovery');
const region = normalizeBigQueryRegion(this.resolved.location ?? 'US', 'table discovery');
const params: Record<string, unknown> = {};
const filter = datasetIds && datasetIds.length > 0 ? 'AND table_schema IN UNNEST(@dataset_ids)' : '';
if (datasetIds && datasetIds.length > 0) {
params.dataset_ids = datasetIds;
}
entries.sort((a, b) => a.schema.localeCompare(b.schema) || a.name.localeCompare(b.name));
return entries;
const rows = await this.queryRaw<{ table_schema: string; table_name: string; table_type: string }>(
`
SELECT table_schema, table_name, table_type
FROM \`${projectId}\`.\`region-${region}\`.INFORMATION_SCHEMA.TABLES
WHERE table_type IN (
'BASE TABLE', 'VIEW', 'MATERIALIZED VIEW', 'EXTERNAL', 'CLONE', 'SNAPSHOT'
)
${filter}
ORDER BY table_schema, table_name
`,
params,
);
return rows.map((row) => ({
schema: row.table_schema,
name: row.table_name,
kind:
row.table_type === 'VIEW' || row.table_type === 'MATERIALIZED VIEW'
? ('view' as const)
: ('table' as const),
}));
}
async cleanup(): Promise<void> {
@ -413,6 +423,13 @@ export class KtxBigQueryScanConnector implements KtxScanConnector {
return this.client;
}
private requireDatasetIdsForScan(): string[] {
if (this.resolved.datasetIds.length === 0) {
throw new Error(`Native BigQuery scan requires connections.${this.connectionId}.dataset_ids or dataset_id`);
}
return this.resolved.datasetIds;
}
private async query(sql: string, params?: Record<string, unknown>): Promise<KtxQueryResult> {
const [job] = await this.getClient().createQueryJob({
query: sql,

View file

@ -0,0 +1,19 @@
import { describe, expect, it } from 'vitest';
import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from './bigquery-identifiers.js';
describe('BigQuery identifier normalization', () => {
it('normalizes project ids and regions for information schema paths', () => {
expect(normalizeBigQueryProjectId('project-1')).toBe('project-1');
expect(normalizeBigQueryRegion('US')).toBe('us');
expect(normalizeBigQueryRegion('region-eu')).toBe('eu');
});
it('rejects malformed project ids and regions with caller-specific context', () => {
expect(() => normalizeBigQueryProjectId('project`1', 'table discovery')).toThrow(
'Invalid BigQuery project id for table discovery: project`1',
);
expect(() => normalizeBigQueryRegion('US;DROP', 'table discovery')).toThrow(
'Invalid BigQuery region for table discovery: US;DROP',
);
});
});

View file

@ -0,0 +1,17 @@
const BIGQUERY_PROJECT_ID_PATTERN = /^[A-Za-z0-9_-]+$/;
const BIGQUERY_REGION_PATTERN = /^[a-z0-9-]+$/;
export function normalizeBigQueryProjectId(value: string, context = 'historic-SQL ingest'): string {
if (!BIGQUERY_PROJECT_ID_PATTERN.test(value)) {
throw new Error(`Invalid BigQuery project id for ${context}: ${value}`);
}
return value;
}
export function normalizeBigQueryRegion(value: string, context = 'historic-SQL ingest'): string {
const normalized = value.trim().toLowerCase().replace(/^region-/, '');
if (!BIGQUERY_REGION_PATTERN.test(normalized)) {
throw new Error(`Invalid BigQuery region for ${context}: ${value}`);
}
return normalized;
}

View file

@ -5,6 +5,7 @@ import {
type HistoricSqlTimeWindow,
type HistoricSqlUnifiedPullConfig,
} from './types.js';
import { normalizeBigQueryProjectId, normalizeBigQueryRegion } from '../../../connections/bigquery-identifiers.js';
interface QueryResultLike {
headers: string[];
@ -52,21 +53,6 @@ function grantsError(cause: unknown): HistoricSqlGrantsMissingError {
});
}
function normalizeProjectId(value: string): string {
if (!/^[A-Za-z0-9_-]+$/.test(value)) {
throw new Error(`Invalid BigQuery project id for historic-SQL ingest: ${value}`);
}
return value;
}
function normalizeRegion(value: string): string {
const region = value.trim().toLowerCase().replace(/^region-/, '');
if (!/^[a-z0-9-]+$/.test(region)) {
throw new Error(`Invalid BigQuery region for historic-SQL ingest: ${value}`);
}
return region;
}
function timestampExpression(value: Date | string): string {
const date = value instanceof Date ? value : new Date(value);
if (Number.isNaN(date.getTime())) {
@ -190,8 +176,8 @@ export class BigQueryHistoricSqlQueryHistoryReader {
private readonly viewPath: string;
constructor(options: BigQueryHistoricSqlQueryHistoryReaderOptions) {
const projectId = normalizeProjectId(options.projectId);
const region = normalizeRegion(options.region);
const projectId = normalizeBigQueryProjectId(options.projectId);
const region = normalizeBigQueryRegion(options.region);
this.viewPath = `\`${projectId}.region-${region}.INFORMATION_SCHEMA.JOBS_BY_PROJECT\``;
}