feat(connectors): add postgres maxConnections

This commit is contained in:
Andrey Avtomonov 2026-05-24 00:57:11 +02:00
parent 394a985d2a
commit 54b65446ec
2 changed files with 66 additions and 2 deletions

View file

@ -1,6 +1,6 @@
import { describe, expect, it, vi } from 'vitest';
import { createPostgresLiveDatabaseIntrospection } from '../../connectors/postgres/live-database-introspection.js';
import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js';
import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, type KtxPostgresConnectionConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js';
import { tableRefSet } from '../../context/scan/table-ref.js';
interface FakeQueryResult {
@ -154,6 +154,46 @@ describe('KtxPostgresScanConnector', () => {
});
});
it('defaults and validates Postgres maxConnections', () => {
const baseConnection: KtxPostgresConnectionConfig = {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
};
expect(
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: baseConnection,
}),
).toMatchObject({ max: 10 });
expect(
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: { ...baseConnection, maxConnections: 50 },
}),
).toMatchObject({ max: 50 });
expect(
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: { ...baseConnection, maxConnections: '12' as never },
}),
).toMatchObject({ max: 12 });
for (const maxConnections of [0, -1, 1.5, Number.NaN, 'abc' as never]) {
expect(() =>
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: { ...baseConnection, maxConnections },
}),
).toThrow('connections.warehouse.maxConnections must be a positive integer');
}
});
it('introspects schemas, tables, views, primary keys, comments, row counts, and foreign keys', async () => {
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',

View file

@ -43,6 +43,7 @@ export interface KtxPostgresConnectionConfig {
sslmode?: string;
sslMode?: string;
rejectUnauthorized?: boolean;
maxConnections?: number;
[key: string]: unknown;
}
@ -242,6 +243,23 @@ function numberValue(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
function positiveIntegerConfigValue(input: {
connection: KtxPostgresConnectionConfig;
key: keyof KtxPostgresConnectionConfig;
connectionId: string;
defaultValue: number;
}): number {
const value = input.connection[input.key];
if (value === undefined) {
return input.defaultValue;
}
const numberValue = Number(value);
if (!Number.isInteger(numberValue) || numberValue < 1) {
throw new Error(`connections.${input.connectionId}.${String(input.key)} must be a positive integer`);
}
return numberValue;
}
function parsePostgresUrl(url: string): Partial<KtxPostgresConnectionConfig> {
const parsed = new URL(url);
const sslmode = parsed.searchParams.get('sslmode') ?? undefined;
@ -299,6 +317,12 @@ export function postgresPoolConfigFromConfig(input: {
const user = stringConfigValue(merged, 'username', env) ?? stringConfigValue(merged, 'user', env);
const password = stringConfigValue(merged, 'password', env);
const sslmode = normalizedSslMode(merged);
const maxConnections = positiveIntegerConfigValue({
connection: merged,
key: 'maxConnections',
connectionId: input.connectionId,
defaultValue: 10,
});
if (!referencedUrl && !host) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.host or url`);
@ -311,7 +335,7 @@ export function postgresPoolConfigFromConfig(input: {
}
const config: KtxPostgresPoolConfig = {
max: 10,
max: maxConnections,
idleTimeoutMillis: 30_000,
connectionTimeoutMillis: 10_000,
...(referencedUrl && sslmode !== 'prefer' && sslmode !== 'disable'