feat(snowflake): report denied primary key discovery

This commit is contained in:
Andrey Avtomonov 2026-05-24 01:11:38 +02:00
parent 1f35868708
commit 86dc85869e
2 changed files with 92 additions and 15 deletions

View file

@ -359,12 +359,56 @@ describe('KtxSnowflakeScanConnector', () => {
expect(snapshot.tables.map((table) => table.name).sort()).toEqual(['ORDERS', 'ORDER_SUMMARY']);
expect(snapshot.tables.every((table) => table.columns.every((column) => column.primaryKey === false))).toBe(true);
expect(snapshot.warnings).toEqual([
{
code: 'constraint_discovery_unauthorized',
message: 'Skipped primary-key discovery in PUBLIC (insufficient grants on system catalogs)',
recoverable: true,
metadata: { schema: 'PUBLIC', kind: 'primary_key' },
},
]);
expect(warn).not.toHaveBeenCalled();
} finally {
warn.mockRestore();
}
});
it('propagates non-denial Snowflake primary-key discovery errors', async () => {
const driverFactory = fakeDriverFactory();
const driver = (driverFactory.createDriver as ReturnType<typeof vi.fn>).getMockImplementation() as
| (() => KtxSnowflakeDriver)
| undefined;
if (!driver) throw new Error('driver mock missing');
const built = driver();
const networkError = new Error('network unavailable');
(built.query as ReturnType<typeof vi.fn>).mockImplementation(async (sql: string) => {
if (sql.includes('TABLE_CONSTRAINTS')) {
throw networkError;
}
throw new Error(`Unexpected SQL: ${sql}`);
});
(driverFactory.createDriver as ReturnType<typeof vi.fn>).mockReturnValue(built);
const connector = new KtxSnowflakeScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'snowflake',
authMethod: 'password',
account: 'acct',
warehouse: 'WH',
database: 'ANALYTICS',
schema_name: 'PUBLIC',
username: 'reader',
password: 'fixture-pass', // pragma: allowlist secret
},
driverFactory,
});
await expect(
connector.introspect({ connectionId: 'warehouse', driver: 'snowflake' }, { runId: 'scan-run-snowflake-network' }),
).rejects.toBe(networkError);
});
it('limits introspection to tables in tableScope', async () => {
const queries: Array<{ sql: string; params?: unknown }> = [];
const getSchemaMetadata = vi.fn(async (_schemaName?: string, scopedNames?: readonly string[] | null) =>

View file

@ -3,8 +3,28 @@ import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableRef, type KtxTableSampleInput, type KtxTableListEntry, type KtxTableSampleResult } from '../../context/scan/types.js';
import { tryConstraintQuery } from '../../context/scan/constraint-discovery.js';
import { scopedTableNames } from '../../context/scan/table-ref.js';
import {
createKtxConnectorCapabilities,
type KtxColumnSampleInput,
type KtxColumnSampleResult,
type KtxColumnStatsInput,
type KtxColumnStatsResult,
type KtxQueryResult,
type KtxReadOnlyQueryInput,
type KtxScanConnector,
type KtxScanContext,
type KtxScanInput,
type KtxScanWarning,
type KtxSchemaColumn,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '../../context/scan/types.js';
import snowflake from 'snowflake-sdk';
import type { Bind, Binds, Connection, ConnectionOptions } from 'snowflake-sdk';
import { KtxSnowflakeDialect } from './dialect.js';
@ -166,6 +186,13 @@ function firstNumber(value: unknown): number | null {
return Number.isFinite(numberValue) ? numberValue : null;
}
function isDeniedError(error: unknown): boolean {
if (error instanceof Error) {
return /insufficient privileges|does not exist or not authorized/i.test(error.message);
}
return false;
}
function normalizeSnowflakeValue(value: unknown, columnType?: string): unknown {
if (columnType && DATE_TYPES.some((type) => columnType.toUpperCase().includes(type))) {
if (typeof value === 'number') {
@ -544,13 +571,23 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const tables: KtxSchemaTable[] = [];
const snapshotWarnings: KtxScanWarning[] = [];
for (const schemaName of this.resolved.schemas) {
const scopedNames = input.tableScope
? scopedTableNames(input.tableScope, { catalog: this.resolved.database, db: schemaName })
: null;
if (scopedNames && scopedNames.length === 0) continue;
const rawTables = await this.getDriver().getSchemaMetadata(schemaName, scopedNames);
const primaryKeys = await this.primaryKeys(rawTables.map((table) => table.name), schemaName);
const primaryKeysResult = await tryConstraintQuery(
{ schema: schemaName, kind: 'primary_key', isDeniedError },
() => this.primaryKeys(rawTables.map((table) => table.name), schemaName),
);
const primaryKeys = primaryKeysResult.ok
? primaryKeysResult.value
: new Map(rawTables.map((table) => [table.name, new Set<string>()]));
if (!primaryKeysResult.ok) {
snapshotWarnings.push(primaryKeysResult.warning);
}
tables.push(...rawTables.map((table) => this.toSchemaTable(table, primaryKeys)));
}
return {
@ -567,6 +604,7 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
total_columns: tables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables,
warnings: snapshotWarnings,
};
}
@ -690,9 +728,8 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
return grouped;
}
const tableNamePlaceholders = tableNames.map(() => '?').join(', ');
try {
const result = await this.getDriver().query(
`
const result = await this.getDriver().query(
`
SELECT tc.TABLE_NAME, kcu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
@ -705,16 +742,12 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
AND tc.TABLE_NAME IN (${tableNamePlaceholders})
ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION
`,
[schemaName, this.resolved.database, ...tableNames],
);
for (const row of result.rows) {
const tableName = String(row[0]);
const columnName = String(row[1]);
grouped.get(tableName)?.add(columnName);
}
} catch {
// INFORMATION_SCHEMA.KEY_COLUMN_USAGE often isn't granted to read-only roles;
// continue with empty PK map and let FK inference + profiling carry the slack.
[schemaName, this.resolved.database, ...tableNames],
);
for (const row of result.rows) {
const tableName = String(row[0]);
const columnName = String(row[1]);
grouped.get(tableName)?.add(columnName);
}
return grouped;
}