ktx/packages/connector-postgres/src/connector.ts

762 lines
25 KiB
TypeScript
Raw Normal View History

2026-05-10 23:12:26 +02:00
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
2026-05-10 23:51:24 +02:00
import { assertReadOnlySql, limitSqlForExecution } from '@ktx/context/connections';
2026-05-10 23:12:26 +02:00
import {
2026-05-10 23:51:24 +02:00
createKtxConnectorCapabilities,
type KtxColumnSampleInput,
type KtxColumnSampleResult,
type KtxColumnStatsInput,
type KtxColumnStatsResult,
type KtxQueryResult,
type KtxReadOnlyQueryInput,
type KtxScanConnector,
type KtxScanContext,
type KtxScanInput,
type KtxSchemaColumn,
type KtxSchemaForeignKey,
type KtxSchemaSnapshot,
type KtxSchemaTable,
2026-05-12 18:22:05 -07:00
type KtxTableListEntry,
2026-05-10 23:51:24 +02:00
type KtxTableRef,
type KtxTableSampleInput,
type KtxTableSampleResult,
} from '@ktx/context/scan';
2026-05-10 23:12:26 +02:00
import { Pool } from 'pg';
2026-05-10 23:51:24 +02:00
import { KtxPostgresDialect } from './dialect.js';
2026-05-10 23:12:26 +02:00
const PG_OID_TYPE_MAP: Record<number, string> = {
16: 'boolean',
20: 'bigint',
21: 'smallint',
23: 'integer',
25: 'text',
700: 'real',
701: 'double precision',
1043: 'varchar',
1082: 'date',
1114: 'timestamp',
1184: 'timestamptz',
1700: 'numeric',
2950: 'uuid',
3802: 'jsonb',
114: 'json',
1009: 'text[]',
1007: 'integer[]',
1016: 'bigint[]',
};
2026-05-10 23:51:24 +02:00
export interface KtxPostgresConnectionConfig {
2026-05-10 23:12:26 +02:00
driver?: string;
host?: string;
port?: number;
database?: string;
username?: string;
user?: string;
password?: string;
url?: string;
schema?: string;
schemas?: string[];
ssl?: boolean;
sslmode?: string;
sslMode?: string;
2026-05-10 23:12:26 +02:00
rejectUnauthorized?: boolean;
readonly?: boolean;
[key: string]: unknown;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresPoolConfig {
2026-05-10 23:12:26 +02:00
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
connectionString?: string;
max: number;
idleTimeoutMillis: number;
connectionTimeoutMillis: number;
options?: string;
ssl?: { rejectUnauthorized: boolean };
}
2026-05-10 23:51:24 +02:00
interface KtxPostgresQueryResult {
2026-05-10 23:12:26 +02:00
fields?: Array<{ name: string; dataTypeID: number }>;
rows: Record<string, unknown>[];
}
2026-05-10 23:51:24 +02:00
interface KtxPostgresClient {
query(sql: string, params?: unknown[]): Promise<KtxPostgresQueryResult>;
2026-05-10 23:12:26 +02:00
release(): void;
}
2026-05-10 23:51:24 +02:00
interface KtxPostgresPool {
connect(): Promise<KtxPostgresClient>;
2026-05-10 23:12:26 +02:00
end(): Promise<void>;
on?(event: 'error', listener: (error: Error) => void): void;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresPoolFactory {
createPool(config: KtxPostgresPoolConfig): KtxPostgresPool;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
interface KtxPostgresResolvedEndpoint {
2026-05-10 23:12:26 +02:00
host: string;
port: number;
close?: () => Promise<void>;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresEndpointResolver {
2026-05-10 23:12:26 +02:00
resolve(input: {
host: string;
port: number;
2026-05-10 23:51:24 +02:00
connection: KtxPostgresConnectionConfig;
}): Promise<KtxPostgresResolvedEndpoint>;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresScanConnectorOptions {
2026-05-10 23:12:26 +02:00
connectionId: string;
2026-05-10 23:51:24 +02:00
connection: KtxPostgresConnectionConfig | undefined;
poolFactory?: KtxPostgresPoolFactory;
endpointResolver?: KtxPostgresEndpointResolver;
2026-05-10 23:12:26 +02:00
env?: NodeJS.ProcessEnv;
now?: () => Date;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresReadOnlyQueryInput extends KtxReadOnlyQueryInput {
2026-05-10 23:12:26 +02:00
params?: Record<string, unknown> | unknown[];
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresColumnDistinctValuesOptions {
2026-05-10 23:12:26 +02:00
maxCardinality: number;
limit: number;
sampleSize?: number;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresColumnDistinctValuesResult {
2026-05-10 23:12:26 +02:00
values: string[] | null;
cardinality: number;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresColumnStatisticsResult {
2026-05-10 23:12:26 +02:00
cardinalityByColumn: Map<string, number>;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresTableSampleResult extends KtxTableSampleResult {
2026-05-10 23:12:26 +02:00
headerTypes?: string[];
}
2026-05-10 23:51:24 +02:00
type PostgresTableRef = Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>;
2026-05-10 23:12:26 +02:00
interface PostgresTableRow {
table_name: string;
table_kind: string;
row_count: unknown;
table_comment: string | null;
}
interface PostgresColumnRow {
table_name: string;
column_name: string;
data_type: string;
is_nullable: boolean;
column_comment: string | null;
}
interface PostgresPrimaryKeyRow {
table_name: string;
column_name: string;
}
interface PostgresForeignKeyRow {
table_name: string;
column_name: string;
foreign_table_schema: string | null;
foreign_table_name: string;
foreign_column_name: string;
constraint_name: string | null;
}
interface PostgresSchemaRow {
schema_name: string;
}
2026-05-12 18:22:05 -07:00
interface PostgresTableListRow {
schema_name: string;
table_name: string;
table_kind: string;
}
2026-05-10 23:12:26 +02:00
interface PostgresCountRow {
count?: unknown;
cardinality?: unknown;
}
interface PostgresDistinctValueRow {
val: unknown;
}
interface PostgresStatsRow {
column_name: string;
estimated_cardinality: unknown;
}
2026-05-10 23:51:24 +02:00
class DefaultPostgresPoolFactory implements KtxPostgresPoolFactory {
createPool(config: KtxPostgresPoolConfig): KtxPostgresPool {
2026-05-10 23:12:26 +02:00
return new Pool(config);
}
}
function groupByTable<T extends { table_name: string }>(rows: T[]): Map<string, T[]> {
const grouped = new Map<string, T[]>();
for (const row of rows) {
const tableRows = grouped.get(row.table_name) ?? [];
tableRows.push(row);
grouped.set(row.table_name, tableRows);
}
return grouped;
}
function primaryKeyMap(rows: PostgresPrimaryKeyRow[]): Map<string, Set<string>> {
const grouped = new Map<string, Set<string>>();
for (const row of rows) {
const columns = grouped.get(row.table_name) ?? new Set<string>();
columns.add(row.column_name);
grouped.set(row.table_name, columns);
}
return grouped;
}
2026-05-10 23:51:24 +02:00
function queryRows(result: KtxPostgresQueryResult): unknown[][] {
2026-05-10 23:12:26 +02:00
const headers = (result.fields ?? []).map((field) => field.name);
return result.rows.map((row) => headers.map((header) => row[header]));
}
function finiteNumber(value: unknown): number | null {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : null;
}
function stringConfigValue(
2026-05-10 23:51:24 +02:00
connection: KtxPostgresConnectionConfig | undefined,
key: keyof KtxPostgresConnectionConfig,
2026-05-10 23:12:26 +02:00
env: NodeJS.ProcessEnv,
): string | undefined {
const value = connection?.[key];
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function numberValue(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
2026-05-10 23:51:24 +02:00
function parsePostgresUrl(url: string): Partial<KtxPostgresConnectionConfig> {
2026-05-10 23:12:26 +02:00
const parsed = new URL(url);
const sslmode = parsed.searchParams.get('sslmode') ?? undefined;
2026-05-10 23:12:26 +02:00
return {
host: parsed.hostname,
port: parsed.port ? Number(parsed.port) : undefined,
database: parsed.pathname.replace(/^\/+/, '') || undefined,
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
...(sslmode ? { sslmode } : {}),
2026-05-10 23:12:26 +02:00
};
}
function normalizedSslMode(connection: KtxPostgresConnectionConfig): string | undefined {
const value = connection.sslmode ?? connection.sslMode;
return typeof value === 'string' && value.trim().length > 0 ? value.trim().toLowerCase() : undefined;
}
2026-05-10 23:51:24 +02:00
function schemasFromConnection(connection: KtxPostgresConnectionConfig): string[] {
2026-05-10 23:12:26 +02:00
if (Array.isArray(connection.schemas) && connection.schemas.length > 0) {
return connection.schemas.filter((schema): schema is string => typeof schema === 'string' && schema.length > 0);
}
return typeof connection.schema === 'string' && connection.schema.length > 0 ? [connection.schema] : ['public'];
}
2026-05-10 23:51:24 +02:00
function searchPathSchemasFromConnection(connection: KtxPostgresConnectionConfig): string[] {
2026-05-10 23:12:26 +02:00
const schemas = schemasFromConnection(connection);
return schemas.includes('public') ? schemas : [...schemas, 'public'];
}
2026-05-10 23:51:24 +02:00
export function isKtxPostgresConnectionConfig(connection: KtxPostgresConnectionConfig | undefined): boolean {
2026-05-10 23:12:26 +02:00
const driver = String(connection?.driver ?? '').toLowerCase();
return driver === 'postgres' || driver === 'postgresql';
}
export function postgresPoolConfigFromConfig(input: {
connectionId: string;
2026-05-10 23:51:24 +02:00
connection: KtxPostgresConnectionConfig | undefined;
2026-05-10 23:12:26 +02:00
env?: NodeJS.ProcessEnv;
2026-05-10 23:51:24 +02:00
}): KtxPostgresPoolConfig {
if (!isKtxPostgresConnectionConfig(input.connection)) {
2026-05-10 23:12:26 +02:00
throw new Error(`Native PostgreSQL connector cannot run driver "${input.connection?.driver ?? 'unknown'}"`);
}
if (input.connection?.readonly !== true) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.readonly: true`);
}
const env = input.env ?? process.env;
const referencedUrl = stringConfigValue(input.connection, 'url', env);
const urlConfig = referencedUrl ? parsePostgresUrl(referencedUrl) : {};
2026-05-10 23:51:24 +02:00
const merged: KtxPostgresConnectionConfig = { ...urlConfig, ...input.connection };
2026-05-10 23:12:26 +02:00
const host = stringConfigValue(merged, 'host', env);
const database = stringConfigValue(merged, 'database', env);
const user = stringConfigValue(merged, 'username', env) ?? stringConfigValue(merged, 'user', env);
const password = stringConfigValue(merged, 'password', env);
const sslmode = normalizedSslMode(merged);
2026-05-10 23:12:26 +02:00
if (!referencedUrl && !host) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.host or url`);
}
if (!database && !referencedUrl) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.database or url`);
}
if (!user && !referencedUrl) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.username, user, or url`);
}
2026-05-10 23:51:24 +02:00
const config: KtxPostgresPoolConfig = {
2026-05-10 23:12:26 +02:00
max: 10,
idleTimeoutMillis: 30_000,
connectionTimeoutMillis: 10_000,
...(referencedUrl && sslmode !== 'prefer' && sslmode !== 'disable'
2026-05-10 23:12:26 +02:00
? { connectionString: referencedUrl }
: { host, port: numberValue(merged.port) ?? 5432, database, user, password }),
};
const searchPathSchemas = searchPathSchemasFromConnection(merged);
if (searchPathSchemas.length > 0) {
config.options = `-c search_path=${searchPathSchemas.join(',')}`;
}
if (merged.ssl && sslmode !== 'prefer' && sslmode !== 'disable') {
2026-05-10 23:12:26 +02:00
config.ssl = { rejectUnauthorized: merged.rejectUnauthorized ?? true };
}
return config;
}
2026-05-10 23:51:24 +02:00
export class KtxPostgresScanConnector implements KtxScanConnector {
2026-05-10 23:12:26 +02:00
readonly id: string;
readonly driver = 'postgres' as const;
2026-05-10 23:51:24 +02:00
readonly capabilities = createKtxConnectorCapabilities({
2026-05-10 23:12:26 +02:00
tableSampling: true,
columnSampling: true,
columnStats: true,
readOnlySql: true,
nestedAnalysis: true,
formalForeignKeys: true,
estimatedRowCounts: true,
});
private readonly connectionId: string;
2026-05-10 23:51:24 +02:00
private readonly connection: KtxPostgresConnectionConfig;
private readonly poolConfig: KtxPostgresPoolConfig;
private readonly poolFactory: KtxPostgresPoolFactory;
private readonly endpointResolver?: KtxPostgresEndpointResolver;
2026-05-10 23:12:26 +02:00
private readonly now: () => Date;
2026-05-10 23:51:24 +02:00
private readonly dialect = new KtxPostgresDialect();
private pool: KtxPostgresPool | null = null;
private lastIdlePoolError: Error | null = null;
2026-05-10 23:51:24 +02:00
private resolvedEndpoint: KtxPostgresResolvedEndpoint | null = null;
2026-05-10 23:12:26 +02:00
2026-05-10 23:51:24 +02:00
constructor(options: KtxPostgresScanConnectorOptions) {
2026-05-10 23:12:26 +02:00
this.connectionId = options.connectionId;
this.connection = options.connection ?? {};
this.poolConfig = postgresPoolConfigFromConfig({
connectionId: options.connectionId,
connection: options.connection,
env: options.env,
});
this.poolFactory = options.poolFactory ?? new DefaultPostgresPoolFactory();
this.endpointResolver = options.endpointResolver;
this.now = options.now ?? (() => new Date());
this.id = `postgres:${options.connectionId}`;
}
async testConnection(): Promise<{ success: boolean; error?: string }> {
try {
await this.query('SELECT 1');
return { success: true };
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) };
}
}
2026-05-10 23:51:24 +02:00
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
2026-05-10 23:12:26 +02:00
this.assertConnection(input.connectionId);
const schemas = schemasFromConnection(this.connection);
2026-05-10 23:51:24 +02:00
const allTables: KtxSchemaTable[] = [];
2026-05-10 23:12:26 +02:00
for (const schema of schemas) {
const tables = await this.loadSchemaTables(schema);
allTables.push(...tables);
}
return {
connectionId: this.connectionId,
driver: 'postgres',
extractedAt: this.now().toISOString(),
scope: { schemas },
metadata: {
database: this.poolConfig.database ?? this.connection.database ?? null,
schemas,
host: this.poolConfig.host ?? this.connection.host ?? null,
table_count: allTables.length,
total_columns: allTables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables: allTables,
};
}
2026-05-10 23:51:24 +02:00
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxPostgresTableSampleResult> {
2026-05-10 23:12:26 +02:00
this.assertConnection(input.connectionId);
const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns));
return {
headers: result.headers,
headerTypes: result.headerTypes,
rows: result.rows,
totalRows: result.totalRows,
};
}
2026-05-10 23:51:24 +02:00
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
2026-05-10 23:12:26 +02:00
this.assertConnection(input.connectionId);
const result = await this.query(
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
);
const values = result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]);
return { values, nullCount: null, distinctCount: null };
}
2026-05-10 23:51:24 +02:00
async columnStats(input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
2026-05-10 23:12:26 +02:00
const stats = await this.getColumnStatistics(input.table);
const value = stats?.cardinalityByColumn.get(input.column);
return value === undefined
? null
: { min: null, max: null, average: null, nullCount: null, distinctCount: value };
}
2026-05-10 23:51:24 +02:00
async executeReadOnly(input: KtxPostgresReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
2026-05-10 23:12:26 +02:00
this.assertConnection(input.connectionId);
const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
const prepared = Array.isArray(input.params)
? { sql: limitedSql, params: input.params }
: this.dialect.prepareQuery(limitedSql, input.params);
const result = await this.query(prepared.sql, prepared.params);
return { ...result, rowCount: result.rows.length };
}
async getColumnDistinctValues(
2026-05-10 23:51:24 +02:00
table: KtxTableRef,
2026-05-10 23:12:26 +02:00
columnName: string,
2026-05-10 23:51:24 +02:00
options: KtxPostgresColumnDistinctValuesOptions,
): Promise<KtxPostgresColumnDistinctValuesResult | null> {
2026-05-10 23:12:26 +02:00
const sampleSize = options.sampleSize ?? 10000;
const tableName = this.qTableName(table);
const quotedColumn = this.dialect.quoteIdentifier(columnName);
const cardinalityRows = await this.queryRaw<PostgresCountRow>(
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, sampleSize),
);
const cardinality = finiteNumber(cardinalityRows[0]?.cardinality);
if (cardinality === null) {
return null;
}
if (cardinality === 0) {
return { values: [], cardinality: 0 };
}
if (cardinality > options.maxCardinality) {
return { values: null, cardinality };
}
const valuesRows = await this.queryRaw<PostgresDistinctValueRow>(
this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit),
);
return {
values: valuesRows.filter((row) => row.val !== null).map((row) => String(row.val)),
cardinality,
};
}
2026-05-10 23:51:24 +02:00
async getColumnStatistics(table: KtxTableRef): Promise<KtxPostgresColumnStatisticsResult | null> {
2026-05-10 23:12:26 +02:00
const schema = table.db ?? schemasFromConnection(this.connection)[0] ?? 'public';
const sql = this.dialect.generateColumnStatisticsQuery(schema, table.name);
if (!sql) {
return null;
}
const rows = await this.queryRaw<PostgresStatsRow>(sql);
const cardinalityByColumn = new Map<string, number>();
for (const row of rows) {
const cardinality = finiteNumber(row.estimated_cardinality);
if (cardinality !== null) {
cardinalityByColumn.set(row.column_name, cardinality);
}
}
return cardinalityByColumn.size > 0 ? { cardinalityByColumn } : null;
}
async getTableRowCount(table: string | PostgresTableRef): Promise<number> {
const tableRef =
typeof table === 'string'
? { catalog: null, db: schemasFromConnection(this.connection)[0] ?? 'public', name: table }
: table;
const rows = await this.queryRaw<PostgresCountRow>(`SELECT COUNT(*) AS count FROM ${this.qTableName(tableRef)}`);
return finiteNumber(rows[0]?.count) ?? 0;
}
qTableName(table: PostgresTableRef): string {
return this.dialect.formatTableName(table);
}
quoteIdentifier(identifier: string): string {
return this.dialect.quoteIdentifier(identifier);
}
async listSchemas(): Promise<string[]> {
const rows = await this.queryRaw<PostgresSchemaRow>(`
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name <> 'information_schema'
AND schema_name NOT LIKE 'pg_%'
ORDER BY schema_name
`);
return rows.map((row) => row.schema_name);
}
2026-05-12 18:22:05 -07:00
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const rows = await this.queryRaw<PostgresTableListRow>(
`
SELECT n.nspname AS schema_name, c.relname AS table_name, c.relkind AS table_kind
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = ANY($1)
AND c.relkind IN ('r', 'v')
ORDER BY n.nspname, c.relname
`,
[filterSchemas],
);
return rows.map((row) => ({
schema: row.schema_name,
name: row.table_name,
kind: row.table_kind === 'v' ? ('view' as const) : ('table' as const),
}));
}
2026-05-10 23:12:26 +02:00
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.end();
this.pool = null;
}
if (this.resolvedEndpoint?.close) {
await this.resolvedEndpoint.close();
this.resolvedEndpoint = null;
}
}
2026-05-10 23:51:24 +02:00
private async loadSchemaTables(schema: string): Promise<KtxSchemaTable[]> {
2026-05-10 23:12:26 +02:00
const tables = await this.queryRaw<PostgresTableRow>(
`
SELECT
c.relname AS table_name,
c.relkind AS table_kind,
c.reltuples::bigint AS row_count,
d.description AS table_comment
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_catalog.pg_description d
ON d.objoid = c.oid AND d.objsubid = 0
WHERE n.nspname = $1
AND c.relkind IN ('r', 'v')
ORDER BY c.relname
`,
[schema],
);
const columns = await this.queryRaw<PostgresColumnRow>(
`
SELECT
c.relname AS table_name,
a.attname AS column_name,
format_type(a.atttypid, a.atttypmod) AS data_type,
NOT a.attnotnull AS is_nullable,
d.description AS column_comment
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_catalog.pg_description d
ON d.objoid = c.oid AND d.objsubid = a.attnum
WHERE n.nspname = $1
AND c.relkind IN ('r', 'v')
AND a.attnum > 0
AND NOT a.attisdropped
ORDER BY c.relname, a.attnum
`,
[schema],
);
const primaryKeys = await this.queryRaw<PostgresPrimaryKeyRow>(
`
SELECT tc.table_name, kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = $1
ORDER BY tc.table_name, kcu.ordinal_position
`,
[schema],
);
const foreignKeys = await this.queryRaw<PostgresForeignKeyRow>(
`
SELECT
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name,
tc.constraint_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = $1
ORDER BY tc.table_name, kcu.column_name
`,
[schema],
);
const columnsByTable = groupByTable(columns);
const primaryKeysByTable = primaryKeyMap(primaryKeys);
const foreignKeysByTable = groupByTable(foreignKeys);
return tables.map((table) =>
this.toSchemaTable(
schema,
table,
columnsByTable.get(table.table_name) ?? [],
primaryKeysByTable.get(table.table_name) ?? new Set<string>(),
foreignKeysByTable.get(table.table_name) ?? [],
),
);
}
private toSchemaTable(
schema: string,
table: PostgresTableRow,
columns: PostgresColumnRow[],
primaryKeys: Set<string>,
foreignKeys: PostgresForeignKeyRow[],
2026-05-10 23:51:24 +02:00
): KtxSchemaTable {
2026-05-10 23:12:26 +02:00
const kind = table.table_kind === 'v' ? 'view' : 'table';
return {
catalog: null,
db: schema,
name: table.table_name,
kind,
comment: table.table_comment || null,
estimatedRows: kind === 'view' ? null : finiteNumber(table.row_count),
columns: columns.map((column) => this.toSchemaColumn(column, primaryKeys)),
foreignKeys: foreignKeys.map((foreignKey) => this.toSchemaForeignKey(foreignKey)),
};
}
2026-05-10 23:51:24 +02:00
private toSchemaColumn(column: PostgresColumnRow, primaryKeys: Set<string>): KtxSchemaColumn {
2026-05-10 23:12:26 +02:00
return {
name: column.column_name,
nativeType: column.data_type,
normalizedType: this.dialect.mapDataType(column.data_type),
dimensionType: this.dialect.mapToDimensionType(column.data_type),
nullable: column.is_nullable,
primaryKey: primaryKeys.has(column.column_name),
comment: column.column_comment || null,
};
}
2026-05-10 23:51:24 +02:00
private toSchemaForeignKey(row: PostgresForeignKeyRow): KtxSchemaForeignKey {
2026-05-10 23:12:26 +02:00
return {
fromColumn: row.column_name,
toCatalog: null,
toDb: row.foreign_table_schema,
toTable: row.foreign_table_name,
toColumn: row.foreign_column_name,
constraintName: row.constraint_name || null,
};
}
2026-05-10 23:51:24 +02:00
private async getPool(): Promise<KtxPostgresPool> {
2026-05-10 23:12:26 +02:00
if (!this.pool) {
let config = { ...this.poolConfig };
if (this.endpointResolver) {
const endpoint = await this.endpointResolver.resolve({
host: config.host ?? this.connection.host ?? 'localhost',
port: config.port ?? numberValue(this.connection.port) ?? 5432,
connection: this.connection,
});
this.resolvedEndpoint = endpoint;
config = { ...config, host: endpoint.host, port: endpoint.port };
}
this.pool = this.poolFactory.createPool(config);
this.pool.on?.('error', (error) => {
this.lastIdlePoolError = error;
});
2026-05-10 23:12:26 +02:00
}
return this.pool;
}
private async queryRaw<T>(sql: string, params?: unknown[]): Promise<T[]> {
this.throwIdlePoolErrorIfPresent();
2026-05-10 23:12:26 +02:00
const pool = await this.getPool();
const client = await pool.connect();
try {
const result = await client.query(sql, params);
return result.rows as T[];
} finally {
client.release();
}
}
2026-05-10 23:51:24 +02:00
private async query(sql: string, params?: Record<string, unknown> | unknown[]): Promise<KtxQueryResult> {
this.throwIdlePoolErrorIfPresent();
2026-05-10 23:12:26 +02:00
const pool = await this.getPool();
const client = await pool.connect();
try {
const result = await client.query(assertReadOnlySql(sql), Array.isArray(params) ? params : undefined);
return {
headers: (result.fields ?? []).map((field) => field.name),
headerTypes: (result.fields ?? []).map((field) => PG_OID_TYPE_MAP[field.dataTypeID] ?? `oid:${field.dataTypeID}`),
rows: queryRows(result),
totalRows: result.rows.length,
rowCount: result.rows.length,
};
} finally {
client.release();
}
}
private assertConnection(connectionId: string): void {
if (connectionId !== this.connectionId) {
throw new Error(`PostgreSQL connector ${this.connectionId} cannot run scan for ${connectionId}`);
}
}
private throwIdlePoolErrorIfPresent(): void {
if (!this.lastIdlePoolError) {
return;
}
const error = this.lastIdlePoolError;
this.lastIdlePoolError = null;
throw error;
}
2026-05-10 23:12:26 +02:00
}