feat(connectors): generalize readiness and constraint handling (#212)

* feat(connectors): add postgres maxConnections

* feat(connectors): add mysql maxConnections

* feat(connectors): add sqlserver maxConnections

* feat(connectors): rename snowflake pool config

* docs: document connector maxConnections

* feat(scan): add constraint discovery warning helper

* feat(scan): carry structural warnings through reports

* feat(postgres): soft-fail denied constraint discovery

* feat(mysql): soft-fail denied constraint discovery

* feat(sqlserver): soft-fail denied constraint discovery

* feat(bigquery): soft-fail denied primary key discovery

* feat(snowflake): report denied primary key discovery

* test(scan): verify constraint discovery warnings

* feat(historic-sql): use shared readiness probes

* docs: document query history readiness probes

* test(historic-sql): verify readiness probe registry

* test(ingest): account for live database warnings artifact

* Add skip option for agent setup
This commit is contained in:
Andrey Avtomonov 2026-05-24 19:30:06 +02:00 committed by GitHub
parent cfd1749ab9
commit 78b8a0c025
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
42 changed files with 2763 additions and 554 deletions

View file

@ -1,6 +1,6 @@
import { describe, expect, it, vi } from 'vitest';
import { createPostgresLiveDatabaseIntrospection } from '../../connectors/postgres/live-database-introspection.js';
import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js';
import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, type KtxPostgresConnectionConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js';
import { tableRefSet } from '../../context/scan/table-ref.js';
interface FakeQueryResult {
@ -8,11 +8,16 @@ interface FakeQueryResult {
fields?: Array<{ name: string; dataTypeID: number }>;
}
function fakePoolFactory(results: Map<string, FakeQueryResult>): KtxPostgresPoolFactory {
type FakeQueryResponse = FakeQueryResult | Error;
function fakePoolFactory(results: Map<string, FakeQueryResponse>): KtxPostgresPoolFactory {
const query = vi.fn(async (sql: string, params?: unknown[]) => {
const normalized = sql.replace(/\s+/g, ' ').trim();
for (const [key, value] of results.entries()) {
if (normalized.includes(key)) {
if (value instanceof Error) {
throw value;
}
return value;
}
}
@ -33,8 +38,8 @@ function fakePoolFactory(results: Map<string, FakeQueryResult>): KtxPostgresPool
};
}
function metadataResults(): Map<string, FakeQueryResult> {
return new Map<string, FakeQueryResult>([
function metadataResults(): Map<string, FakeQueryResponse> {
return new Map<string, FakeQueryResponse>([
[
'FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n',
{
@ -154,6 +159,46 @@ describe('KtxPostgresScanConnector', () => {
});
});
it('defaults and validates Postgres maxConnections', () => {
const baseConnection: KtxPostgresConnectionConfig = {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
};
expect(
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: baseConnection,
}),
).toMatchObject({ max: 10 });
expect(
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: { ...baseConnection, maxConnections: 50 },
}),
).toMatchObject({ max: 50 });
expect(
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: { ...baseConnection, maxConnections: '12' as never },
}),
).toMatchObject({ max: 12 });
for (const maxConnections of [0, -1, 1.5, Number.NaN, 'abc' as never]) {
expect(() =>
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: { ...baseConnection, maxConnections },
}),
).toThrow('connections.warehouse.maxConnections must be a positive integer');
}
});
it('introspects schemas, tables, views, primary keys, comments, row counts, and foreign keys', async () => {
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',
@ -212,6 +257,75 @@ describe('KtxPostgresScanConnector', () => {
]);
});
it('soft-fails denied Postgres constraint discovery with scan warnings', async () => {
const results = metadataResults();
results.set(
"tc.constraint_type = 'PRIMARY KEY'",
Object.assign(new Error('permission denied for information_schema'), { code: '42501' }),
);
results.set(
"tc.constraint_type = 'FOREIGN KEY'",
Object.assign(new Error('relation information_schema.key_column_usage does not exist'), { code: '42P01' }),
);
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
schema: 'public',
},
poolFactory: fakePoolFactory(results),
now: () => new Date('2026-04-29T10:00:00.000Z'),
});
const snapshot = await connector.introspect(
{ connectionId: 'warehouse', driver: 'postgres' },
{ runId: 'scan-run-denied-constraints' },
);
expect(snapshot.warnings).toEqual([
{
code: 'constraint_discovery_unauthorized',
message: 'Skipped primary-key discovery in public (insufficient grants on system catalogs)',
recoverable: true,
metadata: { schema: 'public', kind: 'primary_key' },
},
{
code: 'constraint_discovery_unauthorized',
message: 'Skipped foreign-key discovery in public (insufficient grants on system catalogs)',
recoverable: true,
metadata: { schema: 'public', kind: 'foreign_key' },
},
]);
expect(snapshot.tables.every((table) => table.columns.every((column) => column.primaryKey === false))).toBe(true);
expect(snapshot.tables.every((table) => table.foreignKeys.length === 0)).toBe(true);
});
it('propagates non-denial Postgres constraint discovery errors', async () => {
const results = metadataResults();
const resetError = Object.assign(new Error('connection reset'), { code: 'ECONNRESET' });
results.set("tc.constraint_type = 'PRIMARY KEY'", resetError);
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
schema: 'public',
},
poolFactory: fakePoolFactory(results),
});
await expect(
connector.introspect({ connectionId: 'warehouse', driver: 'postgres' }, { runId: 'scan-run-network-error' }),
).rejects.toBe(resetError);
});
it('runs samples, distinct values, statistics, read-only SQL, and schema listing', async () => {
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',

View file

@ -2,8 +2,29 @@ import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
import { tryConstraintQuery } from '../../context/scan/constraint-discovery.js';
import { scopedTableNames } from '../../context/scan/table-ref.js';
import {
createKtxConnectorCapabilities,
type KtxColumnSampleInput,
type KtxColumnSampleResult,
type KtxColumnStatsInput,
type KtxColumnStatsResult,
type KtxQueryResult,
type KtxReadOnlyQueryInput,
type KtxScanConnector,
type KtxScanContext,
type KtxScanInput,
type KtxScanWarning,
type KtxSchemaColumn,
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
type KtxTableListEntry,
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '../../context/scan/types.js';
import { Pool } from 'pg';
import { KtxPostgresDialect } from './dialect.js';
@ -43,6 +64,7 @@ export interface KtxPostgresConnectionConfig {
sslmode?: string;
sslMode?: string;
rejectUnauthorized?: boolean;
maxConnections?: number;
[key: string]: unknown;
}
@ -207,6 +229,14 @@ function primaryKeyMap(rows: PostgresPrimaryKeyRow[]): Map<string, Set<string>>
return grouped;
}
function isDeniedError(error: unknown): boolean {
if (!error || typeof error !== 'object') {
return false;
}
const code = (error as { code?: unknown }).code;
return code === '42501' || code === '42P01';
}
function queryRows(result: KtxPostgresQueryResult): unknown[][] {
const headers = (result.fields ?? []).map((field) => field.name);
return result.rows.map((row) => headers.map((header) => row[header]));
@ -242,6 +272,23 @@ function numberValue(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
function positiveIntegerConfigValue(input: {
connection: KtxPostgresConnectionConfig;
key: keyof KtxPostgresConnectionConfig;
connectionId: string;
defaultValue: number;
}): number {
const value = input.connection[input.key];
if (value === undefined) {
return input.defaultValue;
}
const numberValue = Number(value);
if (!Number.isInteger(numberValue) || numberValue < 1) {
throw new Error(`connections.${input.connectionId}.${String(input.key)} must be a positive integer`);
}
return numberValue;
}
function parsePostgresUrl(url: string): Partial<KtxPostgresConnectionConfig> {
const parsed = new URL(url);
const sslmode = parsed.searchParams.get('sslmode') ?? undefined;
@ -299,6 +346,12 @@ export function postgresPoolConfigFromConfig(input: {
const user = stringConfigValue(merged, 'username', env) ?? stringConfigValue(merged, 'user', env);
const password = stringConfigValue(merged, 'password', env);
const sslmode = normalizedSslMode(merged);
const maxConnections = positiveIntegerConfigValue({
connection: merged,
key: 'maxConnections',
connectionId: input.connectionId,
defaultValue: 10,
});
if (!referencedUrl && !host) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.host or url`);
@ -311,7 +364,7 @@ export function postgresPoolConfigFromConfig(input: {
}
const config: KtxPostgresPoolConfig = {
max: 10,
max: maxConnections,
idleTimeoutMillis: 30_000,
connectionTimeoutMillis: 10_000,
...(referencedUrl && sslmode !== 'prefer' && sslmode !== 'disable'
@ -379,10 +432,11 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
this.assertConnection(input.connectionId);
const schemas = schemasFromConnection(this.connection);
const allTables: KtxSchemaTable[] = [];
const snapshotWarnings: KtxScanWarning[] = [];
for (const schema of schemas) {
const scopedNames = input.tableScope ? scopedTableNames(input.tableScope, { catalog: null, db: schema }) : null;
if (scopedNames && scopedNames.length === 0) continue;
const tables = await this.loadSchemaTables(schema, scopedNames);
const tables = await this.loadSchemaTables(schema, scopedNames, snapshotWarnings);
allTables.push(...tables);
}
return {
@ -398,6 +452,7 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
total_columns: allTables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables: allTables,
warnings: snapshotWarnings,
};
}
@ -546,7 +601,11 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
}
}
private async loadSchemaTables(schema: string, scopedNames: readonly string[] | null): Promise<KtxSchemaTable[]> {
private async loadSchemaTables(
schema: string,
scopedNames: readonly string[] | null,
snapshotWarnings: KtxScanWarning[],
): Promise<KtxSchemaTable[]> {
if (scopedNames && scopedNames.length === 0) return [];
const pgCatalogScopeClause = scopedNames ? 'AND c.relname = ANY($2)' : '';
const tableConstraintScopeClause = scopedNames ? 'AND tc.table_name = ANY($2)' : '';
@ -591,8 +650,11 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
`,
[schema, ...scopeValues],
);
const primaryKeys = await this.queryRaw<PostgresPrimaryKeyRow>(
`
const primaryKeysResult = await tryConstraintQuery(
{ schema, kind: 'primary_key', isDeniedError },
() =>
this.queryRaw<PostgresPrimaryKeyRow>(
`
SELECT tc.table_name, kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
@ -603,10 +665,18 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
${tableConstraintScopeClause}
ORDER BY tc.table_name, kcu.ordinal_position
`,
[schema, ...scopeValues],
[schema, ...scopeValues],
),
);
const foreignKeys = await this.queryRaw<PostgresForeignKeyRow>(
`
const primaryKeys = primaryKeysResult.ok ? primaryKeysResult.value : [];
if (!primaryKeysResult.ok) {
snapshotWarnings.push(primaryKeysResult.warning);
}
const foreignKeysResult = await tryConstraintQuery(
{ schema, kind: 'foreign_key', isDeniedError },
() =>
this.queryRaw<PostgresForeignKeyRow>(
`
SELECT
tc.table_name,
kcu.column_name,
@ -626,8 +696,13 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
${tableConstraintScopeClause}
ORDER BY tc.table_name, kcu.column_name
`,
[schema, ...scopeValues],
[schema, ...scopeValues],
),
);
const foreignKeys = foreignKeysResult.ok ? foreignKeysResult.value : [];
if (!foreignKeysResult.ok) {
snapshotWarnings.push(foreignKeysResult.warning);
}
const columnsByTable = groupByTable(columns);
const primaryKeysByTable = primaryKeyMap(primaryKeys);