mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
feat(postgres): soft-fail denied constraint discovery
This commit is contained in:
parent
d7d4a57859
commit
75e61486b5
2 changed files with 137 additions and 12 deletions
|
|
@ -8,11 +8,16 @@ interface FakeQueryResult {
|
|||
fields?: Array<{ name: string; dataTypeID: number }>;
|
||||
}
|
||||
|
||||
function fakePoolFactory(results: Map<string, FakeQueryResult>): KtxPostgresPoolFactory {
|
||||
type FakeQueryResponse = FakeQueryResult | Error;
|
||||
|
||||
function fakePoolFactory(results: Map<string, FakeQueryResponse>): KtxPostgresPoolFactory {
|
||||
const query = vi.fn(async (sql: string, params?: unknown[]) => {
|
||||
const normalized = sql.replace(/\s+/g, ' ').trim();
|
||||
for (const [key, value] of results.entries()) {
|
||||
if (normalized.includes(key)) {
|
||||
if (value instanceof Error) {
|
||||
throw value;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
|
@ -33,8 +38,8 @@ function fakePoolFactory(results: Map<string, FakeQueryResult>): KtxPostgresPool
|
|||
};
|
||||
}
|
||||
|
||||
function metadataResults(): Map<string, FakeQueryResult> {
|
||||
return new Map<string, FakeQueryResult>([
|
||||
function metadataResults(): Map<string, FakeQueryResponse> {
|
||||
return new Map<string, FakeQueryResponse>([
|
||||
[
|
||||
'FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n',
|
||||
{
|
||||
|
|
@ -252,6 +257,75 @@ describe('KtxPostgresScanConnector', () => {
|
|||
]);
|
||||
});
|
||||
|
||||
it('soft-fails denied Postgres constraint discovery with scan warnings', async () => {
|
||||
const results = metadataResults();
|
||||
results.set(
|
||||
"tc.constraint_type = 'PRIMARY KEY'",
|
||||
Object.assign(new Error('permission denied for information_schema'), { code: '42501' }),
|
||||
);
|
||||
results.set(
|
||||
"tc.constraint_type = 'FOREIGN KEY'",
|
||||
Object.assign(new Error('relation information_schema.key_column_usage does not exist'), { code: '42P01' }),
|
||||
);
|
||||
const connector = new KtxPostgresScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'postgres',
|
||||
host: 'db.example.test',
|
||||
database: 'analytics',
|
||||
username: 'reader',
|
||||
password: 'test-password', // pragma: allowlist secret
|
||||
schema: 'public',
|
||||
},
|
||||
poolFactory: fakePoolFactory(results),
|
||||
now: () => new Date('2026-04-29T10:00:00.000Z'),
|
||||
});
|
||||
|
||||
const snapshot = await connector.introspect(
|
||||
{ connectionId: 'warehouse', driver: 'postgres' },
|
||||
{ runId: 'scan-run-denied-constraints' },
|
||||
);
|
||||
|
||||
expect(snapshot.warnings).toEqual([
|
||||
{
|
||||
code: 'constraint_discovery_unauthorized',
|
||||
message: 'Skipped primary-key discovery in public (insufficient grants on system catalogs)',
|
||||
recoverable: true,
|
||||
metadata: { schema: 'public', kind: 'primary_key' },
|
||||
},
|
||||
{
|
||||
code: 'constraint_discovery_unauthorized',
|
||||
message: 'Skipped foreign-key discovery in public (insufficient grants on system catalogs)',
|
||||
recoverable: true,
|
||||
metadata: { schema: 'public', kind: 'foreign_key' },
|
||||
},
|
||||
]);
|
||||
expect(snapshot.tables.every((table) => table.columns.every((column) => column.primaryKey === false))).toBe(true);
|
||||
expect(snapshot.tables.every((table) => table.foreignKeys.length === 0)).toBe(true);
|
||||
});
|
||||
|
||||
it('propagates non-denial Postgres constraint discovery errors', async () => {
|
||||
const results = metadataResults();
|
||||
const resetError = Object.assign(new Error('connection reset'), { code: 'ECONNRESET' });
|
||||
results.set("tc.constraint_type = 'PRIMARY KEY'", resetError);
|
||||
const connector = new KtxPostgresScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'postgres',
|
||||
host: 'db.example.test',
|
||||
database: 'analytics',
|
||||
username: 'reader',
|
||||
password: 'test-password', // pragma: allowlist secret
|
||||
schema: 'public',
|
||||
},
|
||||
poolFactory: fakePoolFactory(results),
|
||||
});
|
||||
|
||||
await expect(
|
||||
connector.introspect({ connectionId: 'warehouse', driver: 'postgres' }, { runId: 'scan-run-network-error' }),
|
||||
).rejects.toBe(resetError);
|
||||
});
|
||||
|
||||
it('runs samples, distinct values, statistics, read-only SQL, and schema listing', async () => {
|
||||
const connector = new KtxPostgresScanConnector({
|
||||
connectionId: 'warehouse',
|
||||
|
|
|
|||
|
|
@ -2,8 +2,29 @@ import { readFileSync } from 'node:fs';
|
|||
import { homedir } from 'node:os';
|
||||
import { resolve } from 'node:path';
|
||||
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
|
||||
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
|
||||
import { tryConstraintQuery } from '../../context/scan/constraint-discovery.js';
|
||||
import { scopedTableNames } from '../../context/scan/table-ref.js';
|
||||
import {
|
||||
createKtxConnectorCapabilities,
|
||||
type KtxColumnSampleInput,
|
||||
type KtxColumnSampleResult,
|
||||
type KtxColumnStatsInput,
|
||||
type KtxColumnStatsResult,
|
||||
type KtxQueryResult,
|
||||
type KtxReadOnlyQueryInput,
|
||||
type KtxScanConnector,
|
||||
type KtxScanContext,
|
||||
type KtxScanInput,
|
||||
type KtxScanWarning,
|
||||
type KtxSchemaColumn,
|
||||
type KtxSchemaForeignKey,
|
||||
type KtxSchemaSnapshot,
|
||||
type KtxSchemaTable,
|
||||
type KtxTableListEntry,
|
||||
type KtxTableRef,
|
||||
type KtxTableSampleInput,
|
||||
type KtxTableSampleResult,
|
||||
} from '../../context/scan/types.js';
|
||||
import { Pool } from 'pg';
|
||||
import { KtxPostgresDialect } from './dialect.js';
|
||||
|
||||
|
|
@ -208,6 +229,14 @@ function primaryKeyMap(rows: PostgresPrimaryKeyRow[]): Map<string, Set<string>>
|
|||
return grouped;
|
||||
}
|
||||
|
||||
function isDeniedError(error: unknown): boolean {
|
||||
if (!error || typeof error !== 'object') {
|
||||
return false;
|
||||
}
|
||||
const code = (error as { code?: unknown }).code;
|
||||
return code === '42501' || code === '42P01';
|
||||
}
|
||||
|
||||
function queryRows(result: KtxPostgresQueryResult): unknown[][] {
|
||||
const headers = (result.fields ?? []).map((field) => field.name);
|
||||
return result.rows.map((row) => headers.map((header) => row[header]));
|
||||
|
|
@ -403,10 +432,11 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
this.assertConnection(input.connectionId);
|
||||
const schemas = schemasFromConnection(this.connection);
|
||||
const allTables: KtxSchemaTable[] = [];
|
||||
const snapshotWarnings: KtxScanWarning[] = [];
|
||||
for (const schema of schemas) {
|
||||
const scopedNames = input.tableScope ? scopedTableNames(input.tableScope, { catalog: null, db: schema }) : null;
|
||||
if (scopedNames && scopedNames.length === 0) continue;
|
||||
const tables = await this.loadSchemaTables(schema, scopedNames);
|
||||
const tables = await this.loadSchemaTables(schema, scopedNames, snapshotWarnings);
|
||||
allTables.push(...tables);
|
||||
}
|
||||
return {
|
||||
|
|
@ -422,6 +452,7 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
total_columns: allTables.reduce((sum, table) => sum + table.columns.length, 0),
|
||||
},
|
||||
tables: allTables,
|
||||
warnings: snapshotWarnings,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -570,7 +601,11 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
}
|
||||
}
|
||||
|
||||
private async loadSchemaTables(schema: string, scopedNames: readonly string[] | null): Promise<KtxSchemaTable[]> {
|
||||
private async loadSchemaTables(
|
||||
schema: string,
|
||||
scopedNames: readonly string[] | null,
|
||||
snapshotWarnings: KtxScanWarning[],
|
||||
): Promise<KtxSchemaTable[]> {
|
||||
if (scopedNames && scopedNames.length === 0) return [];
|
||||
const pgCatalogScopeClause = scopedNames ? 'AND c.relname = ANY($2)' : '';
|
||||
const tableConstraintScopeClause = scopedNames ? 'AND tc.table_name = ANY($2)' : '';
|
||||
|
|
@ -615,8 +650,11 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
`,
|
||||
[schema, ...scopeValues],
|
||||
);
|
||||
const primaryKeys = await this.queryRaw<PostgresPrimaryKeyRow>(
|
||||
`
|
||||
const primaryKeysResult = await tryConstraintQuery(
|
||||
{ schema, kind: 'primary_key', isDeniedError },
|
||||
() =>
|
||||
this.queryRaw<PostgresPrimaryKeyRow>(
|
||||
`
|
||||
SELECT tc.table_name, kcu.column_name
|
||||
FROM information_schema.table_constraints tc
|
||||
JOIN information_schema.key_column_usage kcu
|
||||
|
|
@ -627,10 +665,18 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
${tableConstraintScopeClause}
|
||||
ORDER BY tc.table_name, kcu.ordinal_position
|
||||
`,
|
||||
[schema, ...scopeValues],
|
||||
[schema, ...scopeValues],
|
||||
),
|
||||
);
|
||||
const foreignKeys = await this.queryRaw<PostgresForeignKeyRow>(
|
||||
`
|
||||
const primaryKeys = primaryKeysResult.ok ? primaryKeysResult.value : [];
|
||||
if (!primaryKeysResult.ok) {
|
||||
snapshotWarnings.push(primaryKeysResult.warning);
|
||||
}
|
||||
const foreignKeysResult = await tryConstraintQuery(
|
||||
{ schema, kind: 'foreign_key', isDeniedError },
|
||||
() =>
|
||||
this.queryRaw<PostgresForeignKeyRow>(
|
||||
`
|
||||
SELECT
|
||||
tc.table_name,
|
||||
kcu.column_name,
|
||||
|
|
@ -650,8 +696,13 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
|
|||
${tableConstraintScopeClause}
|
||||
ORDER BY tc.table_name, kcu.column_name
|
||||
`,
|
||||
[schema, ...scopeValues],
|
||||
[schema, ...scopeValues],
|
||||
),
|
||||
);
|
||||
const foreignKeys = foreignKeysResult.ok ? foreignKeysResult.value : [];
|
||||
if (!foreignKeysResult.ok) {
|
||||
snapshotWarnings.push(foreignKeysResult.warning);
|
||||
}
|
||||
|
||||
const columnsByTable = groupByTable(columns);
|
||||
const primaryKeysByTable = primaryKeyMap(primaryKeys);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue