fix(snowflake): keep introspecting when primary-key discovery is denied

The PK query joins INFORMATION_SCHEMA.TABLE_CONSTRAINTS and
INFORMATION_SCHEMA.KEY_COLUMN_USAGE, which require grants the
connection role may not have. Previously a 'SQL compilation error:
Object ANALYTICS.INFORMATION_SCHEMA.KEY_COLUMN_USAGE does not exist
or not authorized' aborted the entire introspect — schemas, columns,
and row counts were all discarded over a missing nice-to-have.

Wrap the constraint query in try/catch, log a one-line warning per
schema, and return an empty PK map. Columns end up with
primaryKey=false; relationship inference still has FK and profiling
to fall back on.
This commit is contained in:
Andrey Avtomonov 2026-05-22 12:05:05 +02:00
parent 70f47e8b54
commit 1349af1702
2 changed files with 78 additions and 22 deletions

View file

@ -157,6 +157,55 @@ describe('KtxSnowflakeScanConnector', () => {
]);
});
it('continues introspection when primary-key discovery is not authorized', async () => {
const driverFactory = fakeDriverFactory();
const driver = (driverFactory.createDriver as ReturnType<typeof vi.fn>).getMockImplementation() as
| (() => KtxSnowflakeDriver)
| undefined;
if (!driver) throw new Error('driver mock missing');
const built = driver();
(built.query as ReturnType<typeof vi.fn>).mockImplementation(async (sql: string) => {
if (sql.includes('TABLE_CONSTRAINTS')) {
throw new Error(
"SQL compilation error: Object 'ANALYTICS.INFORMATION_SCHEMA.KEY_COLUMN_USAGE' does not exist or not authorized.",
);
}
throw new Error(`Unexpected SQL: ${sql}`);
});
(driverFactory.createDriver as ReturnType<typeof vi.fn>).mockReturnValue(built);
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
try {
const connector = new KtxSnowflakeScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'snowflake',
authMethod: 'password',
account: 'acct',
warehouse: 'WH',
database: 'ANALYTICS',
schema_name: 'PUBLIC',
username: 'reader',
password: 'fixture-pass', // pragma: allowlist secret
},
driverFactory,
});
const snapshot = await connector.introspect(
{ connectionId: 'warehouse', driver: 'snowflake' },
{ runId: 'scan-run-pk-skip' },
);
expect(snapshot.tables.map((table) => table.name).sort()).toEqual(['ORDERS', 'ORDER_SUMMARY']);
expect(snapshot.tables.every((table) => table.columns.every((column) => column.primaryKey === false))).toBe(true);
expect(warn).toHaveBeenCalledWith(
expect.stringContaining('Snowflake primary-key discovery skipped for ANALYTICS.PUBLIC'),
);
} finally {
warn.mockRestore();
}
});
it('supports read-only query, sampling, distinct values, row counts, schema listing, and cleanup', async () => {
const driverFactory = fakeDriverFactory();
const connector = new KtxSnowflakeScanConnector({

View file

@ -626,32 +626,39 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector {
}
private async primaryKeys(tableNames: string[], schemaName: string): Promise<Map<string, Set<string>>> {
if (tableNames.length === 0) {
return new Map();
}
const result = await this.getDriver().query(
`
SELECT tc.TABLE_NAME, kcu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
AND tc.TABLE_CATALOG = kcu.TABLE_CATALOG
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND tc.TABLE_SCHEMA = ?
AND tc.TABLE_CATALOG = ?
ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION
`,
[schemaName, this.resolved.database],
);
const grouped = new Map<string, Set<string>>();
for (const tableName of tableNames) {
grouped.set(tableName, new Set());
}
for (const row of result.rows) {
const tableName = String(row[0]);
const columnName = String(row[1]);
grouped.get(tableName)?.add(columnName);
if (tableNames.length === 0) {
return grouped;
}
try {
const result = await this.getDriver().query(
`
SELECT tc.TABLE_NAME, kcu.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
AND tc.TABLE_CATALOG = kcu.TABLE_CATALOG
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND tc.TABLE_SCHEMA = ?
AND tc.TABLE_CATALOG = ?
ORDER BY tc.TABLE_NAME, kcu.ORDINAL_POSITION
`,
[schemaName, this.resolved.database],
);
for (const row of result.rows) {
const tableName = String(row[0]);
const columnName = String(row[1]);
grouped.get(tableName)?.add(columnName);
}
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
console.warn(
`Snowflake primary-key discovery skipped for ${this.resolved.database}.${schemaName}: ${detail.replace(/\s+/g, ' ').trim()}`,
);
}
return grouped;
}