ktx/packages/cli/src/connectors/postgres/connector.ts

743 lines
25 KiB
TypeScript
Raw Normal View History

2026-05-10 23:12:26 +02:00
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
chore(workspace): gate dead-code with knip production mode (#196) * refactor(workspace): relocate @ktx/llm source into packages/cli/src/llm * refactor(workspace): rewrite @ktx/llm imports to relative paths * refactor(workspace): fold internal packages into cli * chore(workspace): gate dead-code with knip production mode Turn on production-mode knip plus an autofix run in pre-commit and the `pnpm dead-code` script, document the `/** @internal */` convention for test-only exports in AGENTS.md, annotate test-only exports across the CLI with that JSDoc, and drop dead exports/wrappers the new gate surfaced (e.g. `cli-project.ts`, `lookerRuntimeSourceToFileAdapterSource`, `createLocalScanEnrichmentProvidersFromConfig`, `PGLITE_OWNER_PROCESS_BACKEND_CAPABILITIES`, stale type re-exports). Replace the loose `ignoreIssues` allowlist in `knip.json` with explicit production entries so cross-package barrel leaks are caught. * refactor(cli): delete internal barrel index.ts files The 34 `index.ts` re-export barrels inside `packages/cli/src/` were holdovers from the pre-fold multi-workspace structure. Post-fold-in they served no production purpose: external consumers go through the single package main entry, and in-repo callers mostly imported through them only because the path was short. Internally, knip flagged most barrel re-exports as production-dead (only reached via tests). This change: - Deletes every internal barrel except `packages/cli/src/index.ts` (the published package entry). - Rewrites ~270 source/test files to import each name directly from the file that defines it. - Moves `tools/warehouse-verification/index.ts` to `create-warehouse-verification-tools.ts` (the function it defined locally) and updates its single consumer. - Renames `search/backend-conformance.ts` → `.test-utils.ts` to match the existing test-helper file convention. - Deletes 13 dead test-only chains (dbt-descriptions/*, live-database/extracted-schema, live-database/structural-sync, relationship-* feedback/review chain) plus their tests and a cascading orphan integration test. - Updates test mocks that pointed at deleted barrel paths (notion-client, connector barrels in scan/local-scan-connectors tests) to mock the source files instead. - Points the maintainer benchmark script (`scripts/relationship-benchmark-report.mjs`) at source files instead of `dist/context/scan/index.js`. - Drops the barrel `!` entries from `knip.json`; adds explicit production entries only for the benchmark code reached via dist by the maintainer script. Net: 413 files changed, ~1.2k insertions, ~9.4k deletions. `pnpm run dead-code` (Biome + knip default + knip production) and `pnpm run type-check` are clean; 2277 tests pass. * refactor(workspace): rename @ktx/cli to @kaelio/ktx and pack it directly Promote the CLI workspace package to the public name `@kaelio/ktx` and drop the separate `scripts/build-public-npm-package.mjs` wrapper. The CLI package is now publishable in place (`publishConfig.access: public`, `provenance: true`), so artifact packing uses `pnpm pack` against `packages/cli/` instead of assembling a parallel package tree. Updates all workspace filter invocations, docs, tests, and release readiness checks to reference the new package name, and folds the tarball-name helper into `scripts/public-npm-release-metadata.mjs`. * docs: align "agent clients" and "data agents" terminology Replace "client agents" with "agent clients" and "database agents" with "data agents" across AGENTS.md, README.md, the docs-site copy, and the matching setup-agents test description, matching the canonical vocabulary in docs/terminology.md. Also moves packages/cli/tsconfig.json's tsBuildInfoFile from node_modules/.cache/ to dist/.tsbuildinfo so incremental builds survive node_modules reinstalls. * refactor(release): single source of truth for package version Make packages/cli/package.json the single source of truth for the @kaelio/ktx version. publicNpmPackageVersion() now reads it directly, so artifact filenames, release-readiness checks, and the Python wheel version all derive from one field. The duplicate release-policy.json.publicNpmPackageVersion is removed. Previously the two fields could drift: tarballs were named kaelio-ktx-0.4.1.tgz while internally containing @kaelio/ktx@0.0.0-private. - update-public-release-version.mjs rewrites both Python pyproject.toml files (ktx-daemon, ktx-sl) alongside the npm package.jsons, normalizing the version for PEP 440 (e.g. 0.1.0-rc.2 -> 0.1.0rc2). - semantic-release-config.cjs adds the two pyproject.toml files to @semantic-release/git assets so the release commit back to main carries every version source in lockstep. - The six "?? '0.0.0-private'" fallback literals across the CLI are replaced with "?? getKtxCliPackageInfo().version", and createDefaultKtxMcpServer makes its version arg required. - docs/release.md describes the actual commit-back model: the dev tree always reflects the most recent release; no sentinel pin to maintain. Verified: pnpm run artifacts:build now produces kaelio-ktx-0.4.1.tgz and kaelio_ktx-0.4.1-py3-none-any.whl with @kaelio/ktx@0.4.1 inside. Full type-check, dead-code, and 2287 vitests + 173 script tests pass. * refactor(cli): inject embedding provider resolution and detect sentence-transformers runtime Make resolveProjectEmbeddingProvider and runtimeIo injectable in ingest and scan command entrypoints so tests can stub them, and teach resolvePublicIngestRuntimeRequirements to flag the local-embeddings runtime feature when ktx.yaml selects sentence-transformers. * chore(cli): mark buildLocalStatsStatus and LocalStatsStatus as @internal Both symbols are consumed only by status-project.test.ts. Annotating with /** @internal */ keeps knip's production-mode check clean without changing runtime behavior. * fix(cli): use real package metadata in print-command-tree The stubbed package name embedded a forbidden product identifier that tripped the boundary check in CI. Read the metadata from package.json instead — keeps the rendered tree unchanged and removes a duplicate source of truth. * feat(cli): show embedding coverage in `ktx status`, drop duplicate disk counts Inline `(N embedded)` next to the Wiki scope counts and Semantic-layer source counts, computed with `SUM(embedding_json IS NOT NULL)` over `knowledge_pages` and `local_sl_sources`. Rename the "Knowledge" label to "Wiki" (canonical per `docs/terminology.md`) and rename the matching `localStats.knowledgePages` field to `localStats.wikiPages`. Drop `wiki=N md` and `semantic-layer=N yaml` from the Disk row — those duplicated the per-surface rows above. Disk now reports only actual byte usage (db, cache, raw-sources). The unused `wikiGlobalMarkdownCount` / `semanticLayerYamlCount` fields, the `isMarkdownEntry` / `isYamlEntry` helpers, and the `filter` arg on `summarizeDir` are removed.
2026-05-21 15:28:58 +02:00
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
2026-05-10 23:12:26 +02:00
import { Pool } from 'pg';
2026-05-10 23:51:24 +02:00
import { KtxPostgresDialect } from './dialect.js';
2026-05-10 23:12:26 +02:00
const PG_OID_TYPE_MAP: Record<number, string> = {
16: 'boolean',
20: 'bigint',
21: 'smallint',
23: 'integer',
25: 'text',
700: 'real',
701: 'double precision',
1043: 'varchar',
1082: 'date',
1114: 'timestamp',
1184: 'timestamptz',
1700: 'numeric',
2950: 'uuid',
3802: 'jsonb',
114: 'json',
1009: 'text[]',
1007: 'integer[]',
1016: 'bigint[]',
};
2026-05-10 23:51:24 +02:00
export interface KtxPostgresConnectionConfig {
2026-05-10 23:12:26 +02:00
driver?: string;
host?: string;
port?: number;
database?: string;
username?: string;
user?: string;
password?: string;
url?: string;
schema?: string;
schemas?: string[];
ssl?: boolean;
sslmode?: string;
sslMode?: string;
2026-05-10 23:12:26 +02:00
rejectUnauthorized?: boolean;
[key: string]: unknown;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresPoolConfig {
2026-05-10 23:12:26 +02:00
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
connectionString?: string;
max: number;
idleTimeoutMillis: number;
connectionTimeoutMillis: number;
options?: string;
ssl?: { rejectUnauthorized: boolean };
}
2026-05-10 23:51:24 +02:00
interface KtxPostgresQueryResult {
2026-05-10 23:12:26 +02:00
fields?: Array<{ name: string; dataTypeID: number }>;
rows: Record<string, unknown>[];
}
2026-05-10 23:51:24 +02:00
interface KtxPostgresClient {
query(sql: string, params?: unknown[]): Promise<KtxPostgresQueryResult>;
2026-05-10 23:12:26 +02:00
release(): void;
}
2026-05-10 23:51:24 +02:00
interface KtxPostgresPool {
connect(): Promise<KtxPostgresClient>;
2026-05-10 23:12:26 +02:00
end(): Promise<void>;
on?(event: 'error', listener: (error: Error) => void): void;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresPoolFactory {
createPool(config: KtxPostgresPoolConfig): KtxPostgresPool;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
interface KtxPostgresResolvedEndpoint {
2026-05-10 23:12:26 +02:00
host: string;
port: number;
close?: () => Promise<void>;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresEndpointResolver {
2026-05-10 23:12:26 +02:00
resolve(input: {
host: string;
port: number;
2026-05-10 23:51:24 +02:00
connection: KtxPostgresConnectionConfig;
}): Promise<KtxPostgresResolvedEndpoint>;
2026-05-10 23:12:26 +02:00
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresScanConnectorOptions {
2026-05-10 23:12:26 +02:00
connectionId: string;
2026-05-10 23:51:24 +02:00
connection: KtxPostgresConnectionConfig | undefined;
poolFactory?: KtxPostgresPoolFactory;
endpointResolver?: KtxPostgresEndpointResolver;
2026-05-10 23:12:26 +02:00
env?: NodeJS.ProcessEnv;
now?: () => Date;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresReadOnlyQueryInput extends KtxReadOnlyQueryInput {
2026-05-10 23:12:26 +02:00
params?: Record<string, unknown> | unknown[];
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresColumnDistinctValuesOptions {
2026-05-10 23:12:26 +02:00
maxCardinality: number;
limit: number;
sampleSize?: number;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresColumnDistinctValuesResult {
2026-05-10 23:12:26 +02:00
values: string[] | null;
cardinality: number;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresColumnStatisticsResult {
2026-05-10 23:12:26 +02:00
cardinalityByColumn: Map<string, number>;
}
2026-05-10 23:51:24 +02:00
export interface KtxPostgresTableSampleResult extends KtxTableSampleResult {
2026-05-10 23:12:26 +02:00
headerTypes?: string[];
}
2026-05-10 23:51:24 +02:00
type PostgresTableRef = Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>;
2026-05-10 23:12:26 +02:00
interface PostgresTableRow {
table_name: string;
table_kind: string;
row_count: unknown;
table_comment: string | null;
}
interface PostgresColumnRow {
table_name: string;
column_name: string;
data_type: string;
is_nullable: boolean;
column_comment: string | null;
}
interface PostgresPrimaryKeyRow {
table_name: string;
column_name: string;
}
interface PostgresForeignKeyRow {
table_name: string;
column_name: string;
foreign_table_schema: string | null;
foreign_table_name: string;
foreign_column_name: string;
constraint_name: string | null;
}
interface PostgresSchemaRow {
schema_name: string;
}
2026-05-12 18:22:05 -07:00
interface PostgresTableListRow {
schema_name: string;
table_name: string;
table_kind: string;
}
2026-05-10 23:12:26 +02:00
interface PostgresCountRow {
count?: unknown;
cardinality?: unknown;
}
interface PostgresDistinctValueRow {
val: unknown;
}
interface PostgresStatsRow {
column_name: string;
estimated_cardinality: unknown;
}
2026-05-10 23:51:24 +02:00
class DefaultPostgresPoolFactory implements KtxPostgresPoolFactory {
createPool(config: KtxPostgresPoolConfig): KtxPostgresPool {
2026-05-10 23:12:26 +02:00
return new Pool(config);
}
}
function groupByTable<T extends { table_name: string }>(rows: T[]): Map<string, T[]> {
const grouped = new Map<string, T[]>();
for (const row of rows) {
const tableRows = grouped.get(row.table_name) ?? [];
tableRows.push(row);
grouped.set(row.table_name, tableRows);
}
return grouped;
}
function primaryKeyMap(rows: PostgresPrimaryKeyRow[]): Map<string, Set<string>> {
const grouped = new Map<string, Set<string>>();
for (const row of rows) {
const columns = grouped.get(row.table_name) ?? new Set<string>();
columns.add(row.column_name);
grouped.set(row.table_name, columns);
}
return grouped;
}
2026-05-10 23:51:24 +02:00
function queryRows(result: KtxPostgresQueryResult): unknown[][] {
2026-05-10 23:12:26 +02:00
const headers = (result.fields ?? []).map((field) => field.name);
return result.rows.map((row) => headers.map((header) => row[header]));
}
function finiteNumber(value: unknown): number | null {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : null;
}
function stringConfigValue(
2026-05-10 23:51:24 +02:00
connection: KtxPostgresConnectionConfig | undefined,
key: keyof KtxPostgresConnectionConfig,
2026-05-10 23:12:26 +02:00
env: NodeJS.ProcessEnv,
): string | undefined {
const value = connection?.[key];
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function numberValue(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
2026-05-10 23:51:24 +02:00
function parsePostgresUrl(url: string): Partial<KtxPostgresConnectionConfig> {
2026-05-10 23:12:26 +02:00
const parsed = new URL(url);
const sslmode = parsed.searchParams.get('sslmode') ?? undefined;
2026-05-10 23:12:26 +02:00
return {
host: parsed.hostname,
port: parsed.port ? Number(parsed.port) : undefined,
database: parsed.pathname.replace(/^\/+/, '') || undefined,
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
...(sslmode ? { sslmode } : {}),
2026-05-10 23:12:26 +02:00
};
}
function normalizedSslMode(connection: KtxPostgresConnectionConfig): string | undefined {
const value = connection.sslmode ?? connection.sslMode;
return typeof value === 'string' && value.trim().length > 0 ? value.trim().toLowerCase() : undefined;
}
2026-05-10 23:51:24 +02:00
function schemasFromConnection(connection: KtxPostgresConnectionConfig): string[] {
2026-05-10 23:12:26 +02:00
if (Array.isArray(connection.schemas) && connection.schemas.length > 0) {
return connection.schemas.filter((schema): schema is string => typeof schema === 'string' && schema.length > 0);
}
return typeof connection.schema === 'string' && connection.schema.length > 0 ? [connection.schema] : ['public'];
}
2026-05-10 23:51:24 +02:00
function searchPathSchemasFromConnection(connection: KtxPostgresConnectionConfig): string[] {
2026-05-10 23:12:26 +02:00
const schemas = schemasFromConnection(connection);
return schemas.includes('public') ? schemas : [...schemas, 'public'];
}
export function isKtxPostgresConnectionConfig(
connection: KtxPostgresConnectionConfig | undefined,
): connection is KtxPostgresConnectionConfig {
2026-05-10 23:12:26 +02:00
const driver = String(connection?.driver ?? '').toLowerCase();
return driver === 'postgres' || driver === 'postgresql';
}
chore(workspace): gate dead-code with knip production mode (#196) * refactor(workspace): relocate @ktx/llm source into packages/cli/src/llm * refactor(workspace): rewrite @ktx/llm imports to relative paths * refactor(workspace): fold internal packages into cli * chore(workspace): gate dead-code with knip production mode Turn on production-mode knip plus an autofix run in pre-commit and the `pnpm dead-code` script, document the `/** @internal */` convention for test-only exports in AGENTS.md, annotate test-only exports across the CLI with that JSDoc, and drop dead exports/wrappers the new gate surfaced (e.g. `cli-project.ts`, `lookerRuntimeSourceToFileAdapterSource`, `createLocalScanEnrichmentProvidersFromConfig`, `PGLITE_OWNER_PROCESS_BACKEND_CAPABILITIES`, stale type re-exports). Replace the loose `ignoreIssues` allowlist in `knip.json` with explicit production entries so cross-package barrel leaks are caught. * refactor(cli): delete internal barrel index.ts files The 34 `index.ts` re-export barrels inside `packages/cli/src/` were holdovers from the pre-fold multi-workspace structure. Post-fold-in they served no production purpose: external consumers go through the single package main entry, and in-repo callers mostly imported through them only because the path was short. Internally, knip flagged most barrel re-exports as production-dead (only reached via tests). This change: - Deletes every internal barrel except `packages/cli/src/index.ts` (the published package entry). - Rewrites ~270 source/test files to import each name directly from the file that defines it. - Moves `tools/warehouse-verification/index.ts` to `create-warehouse-verification-tools.ts` (the function it defined locally) and updates its single consumer. - Renames `search/backend-conformance.ts` → `.test-utils.ts` to match the existing test-helper file convention. - Deletes 13 dead test-only chains (dbt-descriptions/*, live-database/extracted-schema, live-database/structural-sync, relationship-* feedback/review chain) plus their tests and a cascading orphan integration test. - Updates test mocks that pointed at deleted barrel paths (notion-client, connector barrels in scan/local-scan-connectors tests) to mock the source files instead. - Points the maintainer benchmark script (`scripts/relationship-benchmark-report.mjs`) at source files instead of `dist/context/scan/index.js`. - Drops the barrel `!` entries from `knip.json`; adds explicit production entries only for the benchmark code reached via dist by the maintainer script. Net: 413 files changed, ~1.2k insertions, ~9.4k deletions. `pnpm run dead-code` (Biome + knip default + knip production) and `pnpm run type-check` are clean; 2277 tests pass. * refactor(workspace): rename @ktx/cli to @kaelio/ktx and pack it directly Promote the CLI workspace package to the public name `@kaelio/ktx` and drop the separate `scripts/build-public-npm-package.mjs` wrapper. The CLI package is now publishable in place (`publishConfig.access: public`, `provenance: true`), so artifact packing uses `pnpm pack` against `packages/cli/` instead of assembling a parallel package tree. Updates all workspace filter invocations, docs, tests, and release readiness checks to reference the new package name, and folds the tarball-name helper into `scripts/public-npm-release-metadata.mjs`. * docs: align "agent clients" and "data agents" terminology Replace "client agents" with "agent clients" and "database agents" with "data agents" across AGENTS.md, README.md, the docs-site copy, and the matching setup-agents test description, matching the canonical vocabulary in docs/terminology.md. Also moves packages/cli/tsconfig.json's tsBuildInfoFile from node_modules/.cache/ to dist/.tsbuildinfo so incremental builds survive node_modules reinstalls. * refactor(release): single source of truth for package version Make packages/cli/package.json the single source of truth for the @kaelio/ktx version. publicNpmPackageVersion() now reads it directly, so artifact filenames, release-readiness checks, and the Python wheel version all derive from one field. The duplicate release-policy.json.publicNpmPackageVersion is removed. Previously the two fields could drift: tarballs were named kaelio-ktx-0.4.1.tgz while internally containing @kaelio/ktx@0.0.0-private. - update-public-release-version.mjs rewrites both Python pyproject.toml files (ktx-daemon, ktx-sl) alongside the npm package.jsons, normalizing the version for PEP 440 (e.g. 0.1.0-rc.2 -> 0.1.0rc2). - semantic-release-config.cjs adds the two pyproject.toml files to @semantic-release/git assets so the release commit back to main carries every version source in lockstep. - The six "?? '0.0.0-private'" fallback literals across the CLI are replaced with "?? getKtxCliPackageInfo().version", and createDefaultKtxMcpServer makes its version arg required. - docs/release.md describes the actual commit-back model: the dev tree always reflects the most recent release; no sentinel pin to maintain. Verified: pnpm run artifacts:build now produces kaelio-ktx-0.4.1.tgz and kaelio_ktx-0.4.1-py3-none-any.whl with @kaelio/ktx@0.4.1 inside. Full type-check, dead-code, and 2287 vitests + 173 script tests pass. * refactor(cli): inject embedding provider resolution and detect sentence-transformers runtime Make resolveProjectEmbeddingProvider and runtimeIo injectable in ingest and scan command entrypoints so tests can stub them, and teach resolvePublicIngestRuntimeRequirements to flag the local-embeddings runtime feature when ktx.yaml selects sentence-transformers. * chore(cli): mark buildLocalStatsStatus and LocalStatsStatus as @internal Both symbols are consumed only by status-project.test.ts. Annotating with /** @internal */ keeps knip's production-mode check clean without changing runtime behavior. * fix(cli): use real package metadata in print-command-tree The stubbed package name embedded a forbidden product identifier that tripped the boundary check in CI. Read the metadata from package.json instead — keeps the rendered tree unchanged and removes a duplicate source of truth. * feat(cli): show embedding coverage in `ktx status`, drop duplicate disk counts Inline `(N embedded)` next to the Wiki scope counts and Semantic-layer source counts, computed with `SUM(embedding_json IS NOT NULL)` over `knowledge_pages` and `local_sl_sources`. Rename the "Knowledge" label to "Wiki" (canonical per `docs/terminology.md`) and rename the matching `localStats.knowledgePages` field to `localStats.wikiPages`. Drop `wiki=N md` and `semantic-layer=N yaml` from the Disk row — those duplicated the per-surface rows above. Disk now reports only actual byte usage (db, cache, raw-sources). The unused `wikiGlobalMarkdownCount` / `semanticLayerYamlCount` fields, the `isMarkdownEntry` / `isYamlEntry` helpers, and the `filter` arg on `summarizeDir` are removed.
2026-05-21 15:28:58 +02:00
/** @internal */
2026-05-10 23:12:26 +02:00
export function postgresPoolConfigFromConfig(input: {
connectionId: string;
2026-05-10 23:51:24 +02:00
connection: KtxPostgresConnectionConfig | undefined;
2026-05-10 23:12:26 +02:00
env?: NodeJS.ProcessEnv;
2026-05-10 23:51:24 +02:00
}): KtxPostgresPoolConfig {
const inputDriver = input.connection?.driver ?? 'unknown';
2026-05-10 23:51:24 +02:00
if (!isKtxPostgresConnectionConfig(input.connection)) {
throw new Error(`Native PostgreSQL connector cannot run driver "${inputDriver}"`);
2026-05-10 23:12:26 +02:00
}
const env = input.env ?? process.env;
const referencedUrl = stringConfigValue(input.connection, 'url', env);
const urlConfig = referencedUrl ? parsePostgresUrl(referencedUrl) : {};
2026-05-10 23:51:24 +02:00
const merged: KtxPostgresConnectionConfig = { ...urlConfig, ...input.connection };
2026-05-10 23:12:26 +02:00
const host = stringConfigValue(merged, 'host', env);
const database = stringConfigValue(merged, 'database', env);
const user = stringConfigValue(merged, 'username', env) ?? stringConfigValue(merged, 'user', env);
const password = stringConfigValue(merged, 'password', env);
const sslmode = normalizedSslMode(merged);
2026-05-10 23:12:26 +02:00
if (!referencedUrl && !host) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.host or url`);
}
if (!database && !referencedUrl) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.database or url`);
}
if (!user && !referencedUrl) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.username, user, or url`);
}
2026-05-10 23:51:24 +02:00
const config: KtxPostgresPoolConfig = {
2026-05-10 23:12:26 +02:00
max: 10,
idleTimeoutMillis: 30_000,
connectionTimeoutMillis: 10_000,
...(referencedUrl && sslmode !== 'prefer' && sslmode !== 'disable'
2026-05-10 23:12:26 +02:00
? { connectionString: referencedUrl }
: { host, port: numberValue(merged.port) ?? 5432, database, user, password }),
};
const searchPathSchemas = searchPathSchemasFromConnection(merged);
if (searchPathSchemas.length > 0) {
config.options = `-c search_path=${searchPathSchemas.join(',')}`;
}
if (merged.ssl && sslmode !== 'prefer' && sslmode !== 'disable') {
2026-05-10 23:12:26 +02:00
config.ssl = { rejectUnauthorized: merged.rejectUnauthorized ?? true };
}
return config;
}
2026-05-10 23:51:24 +02:00
export class KtxPostgresScanConnector implements KtxScanConnector {
2026-05-10 23:12:26 +02:00
readonly id: string;
readonly driver = 'postgres' as const;
2026-05-10 23:51:24 +02:00
readonly capabilities = createKtxConnectorCapabilities({
2026-05-10 23:12:26 +02:00
tableSampling: true,
columnSampling: true,
columnStats: true,
readOnlySql: true,
nestedAnalysis: true,
formalForeignKeys: true,
estimatedRowCounts: true,
});
private readonly connectionId: string;
2026-05-10 23:51:24 +02:00
private readonly connection: KtxPostgresConnectionConfig;
private readonly poolConfig: KtxPostgresPoolConfig;
private readonly poolFactory: KtxPostgresPoolFactory;
private readonly endpointResolver?: KtxPostgresEndpointResolver;
2026-05-10 23:12:26 +02:00
private readonly now: () => Date;
2026-05-10 23:51:24 +02:00
private readonly dialect = new KtxPostgresDialect();
private pool: KtxPostgresPool | null = null;
private lastIdlePoolError: Error | null = null;
2026-05-10 23:51:24 +02:00
private resolvedEndpoint: KtxPostgresResolvedEndpoint | null = null;
2026-05-10 23:12:26 +02:00
2026-05-10 23:51:24 +02:00
constructor(options: KtxPostgresScanConnectorOptions) {
2026-05-10 23:12:26 +02:00
this.connectionId = options.connectionId;
this.connection = options.connection ?? {};
this.poolConfig = postgresPoolConfigFromConfig({
connectionId: options.connectionId,
connection: options.connection,
env: options.env,
});
this.poolFactory = options.poolFactory ?? new DefaultPostgresPoolFactory();
this.endpointResolver = options.endpointResolver;
this.now = options.now ?? (() => new Date());
this.id = `postgres:${options.connectionId}`;
}
async testConnection(): Promise<{ success: boolean; error?: string }> {
try {
await this.query('SELECT 1');
return { success: true };
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) };
}
}
2026-05-10 23:51:24 +02:00
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
2026-05-10 23:12:26 +02:00
this.assertConnection(input.connectionId);
const schemas = schemasFromConnection(this.connection);
2026-05-10 23:51:24 +02:00
const allTables: KtxSchemaTable[] = [];
2026-05-10 23:12:26 +02:00
for (const schema of schemas) {
const tables = await this.loadSchemaTables(schema);
allTables.push(...tables);
}
return {
connectionId: this.connectionId,
driver: 'postgres',
extractedAt: this.now().toISOString(),
scope: { schemas },
metadata: {
database: this.poolConfig.database ?? this.connection.database ?? null,
schemas,
host: this.poolConfig.host ?? this.connection.host ?? null,
table_count: allTables.length,
total_columns: allTables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables: allTables,
};
}
2026-05-10 23:51:24 +02:00
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxPostgresTableSampleResult> {
2026-05-10 23:12:26 +02:00
this.assertConnection(input.connectionId);
const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns));
return {
headers: result.headers,
headerTypes: result.headerTypes,
rows: result.rows,
totalRows: result.totalRows,
};
}
2026-05-10 23:51:24 +02:00
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
2026-05-10 23:12:26 +02:00
this.assertConnection(input.connectionId);
const result = await this.query(
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
);
const values = result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]);
return { values, nullCount: null, distinctCount: null };
}
2026-05-10 23:51:24 +02:00
async columnStats(input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
2026-05-10 23:12:26 +02:00
const stats = await this.getColumnStatistics(input.table);
const value = stats?.cardinalityByColumn.get(input.column);
return value === undefined
? null
: { min: null, max: null, average: null, nullCount: null, distinctCount: value };
}
2026-05-10 23:51:24 +02:00
async executeReadOnly(input: KtxPostgresReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
2026-05-10 23:12:26 +02:00
this.assertConnection(input.connectionId);
const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
const prepared = Array.isArray(input.params)
? { sql: limitedSql, params: input.params }
: this.dialect.prepareQuery(limitedSql, input.params);
const result = await this.query(prepared.sql, prepared.params);
return { ...result, rowCount: result.rows.length };
}
async getColumnDistinctValues(
2026-05-10 23:51:24 +02:00
table: KtxTableRef,
2026-05-10 23:12:26 +02:00
columnName: string,
2026-05-10 23:51:24 +02:00
options: KtxPostgresColumnDistinctValuesOptions,
): Promise<KtxPostgresColumnDistinctValuesResult | null> {
2026-05-10 23:12:26 +02:00
const sampleSize = options.sampleSize ?? 10000;
const tableName = this.qTableName(table);
const quotedColumn = this.dialect.quoteIdentifier(columnName);
const cardinalityRows = await this.queryRaw<PostgresCountRow>(
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, sampleSize),
);
const cardinality = finiteNumber(cardinalityRows[0]?.cardinality);
if (cardinality === null) {
return null;
}
if (cardinality === 0) {
return { values: [], cardinality: 0 };
}
if (cardinality > options.maxCardinality) {
return { values: null, cardinality };
}
const valuesRows = await this.queryRaw<PostgresDistinctValueRow>(
this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit),
);
return {
values: valuesRows.filter((row) => row.val !== null).map((row) => String(row.val)),
cardinality,
};
}
2026-05-10 23:51:24 +02:00
async getColumnStatistics(table: KtxTableRef): Promise<KtxPostgresColumnStatisticsResult | null> {
2026-05-10 23:12:26 +02:00
const schema = table.db ?? schemasFromConnection(this.connection)[0] ?? 'public';
const sql = this.dialect.generateColumnStatisticsQuery(schema, table.name);
if (!sql) {
return null;
}
const rows = await this.queryRaw<PostgresStatsRow>(sql);
const cardinalityByColumn = new Map<string, number>();
for (const row of rows) {
const cardinality = finiteNumber(row.estimated_cardinality);
if (cardinality !== null) {
cardinalityByColumn.set(row.column_name, cardinality);
}
}
return cardinalityByColumn.size > 0 ? { cardinalityByColumn } : null;
}
async getTableRowCount(table: string | PostgresTableRef): Promise<number> {
const tableRef =
typeof table === 'string'
? { catalog: null, db: schemasFromConnection(this.connection)[0] ?? 'public', name: table }
: table;
const rows = await this.queryRaw<PostgresCountRow>(`SELECT COUNT(*) AS count FROM ${this.qTableName(tableRef)}`);
return finiteNumber(rows[0]?.count) ?? 0;
}
qTableName(table: PostgresTableRef): string {
return this.dialect.formatTableName(table);
}
quoteIdentifier(identifier: string): string {
return this.dialect.quoteIdentifier(identifier);
}
async listSchemas(): Promise<string[]> {
const rows = await this.queryRaw<PostgresSchemaRow>(`
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name <> 'information_schema'
AND schema_name NOT LIKE 'pg_%'
ORDER BY schema_name
`);
return rows.map((row) => row.schema_name);
}
2026-05-12 18:22:05 -07:00
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const rows = await this.queryRaw<PostgresTableListRow>(
`
SELECT n.nspname AS schema_name, c.relname AS table_name, c.relkind AS table_kind
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = ANY($1)
AND c.relkind IN ('r', 'v')
ORDER BY n.nspname, c.relname
`,
[filterSchemas],
);
return rows.map((row) => ({
schema: row.schema_name,
name: row.table_name,
kind: row.table_kind === 'v' ? ('view' as const) : ('table' as const),
}));
}
2026-05-10 23:12:26 +02:00
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.end();
this.pool = null;
}
if (this.resolvedEndpoint?.close) {
await this.resolvedEndpoint.close();
this.resolvedEndpoint = null;
}
}
2026-05-10 23:51:24 +02:00
private async loadSchemaTables(schema: string): Promise<KtxSchemaTable[]> {
2026-05-10 23:12:26 +02:00
const tables = await this.queryRaw<PostgresTableRow>(
`
SELECT
c.relname AS table_name,
c.relkind AS table_kind,
c.reltuples::bigint AS row_count,
d.description AS table_comment
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_catalog.pg_description d
ON d.objoid = c.oid AND d.objsubid = 0
WHERE n.nspname = $1
AND c.relkind IN ('r', 'v')
ORDER BY c.relname
`,
[schema],
);
const columns = await this.queryRaw<PostgresColumnRow>(
`
SELECT
c.relname AS table_name,
a.attname AS column_name,
format_type(a.atttypid, a.atttypmod) AS data_type,
NOT a.attnotnull AS is_nullable,
d.description AS column_comment
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_catalog.pg_description d
ON d.objoid = c.oid AND d.objsubid = a.attnum
WHERE n.nspname = $1
AND c.relkind IN ('r', 'v')
AND a.attnum > 0
AND NOT a.attisdropped
ORDER BY c.relname, a.attnum
`,
[schema],
);
const primaryKeys = await this.queryRaw<PostgresPrimaryKeyRow>(
`
SELECT tc.table_name, kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = $1
ORDER BY tc.table_name, kcu.ordinal_position
`,
[schema],
);
const foreignKeys = await this.queryRaw<PostgresForeignKeyRow>(
`
SELECT
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name,
tc.constraint_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = $1
ORDER BY tc.table_name, kcu.column_name
`,
[schema],
);
const columnsByTable = groupByTable(columns);
const primaryKeysByTable = primaryKeyMap(primaryKeys);
const foreignKeysByTable = groupByTable(foreignKeys);
return tables.map((table) =>
this.toSchemaTable(
schema,
table,
columnsByTable.get(table.table_name) ?? [],
primaryKeysByTable.get(table.table_name) ?? new Set<string>(),
foreignKeysByTable.get(table.table_name) ?? [],
),
);
}
private toSchemaTable(
schema: string,
table: PostgresTableRow,
columns: PostgresColumnRow[],
primaryKeys: Set<string>,
foreignKeys: PostgresForeignKeyRow[],
2026-05-10 23:51:24 +02:00
): KtxSchemaTable {
2026-05-10 23:12:26 +02:00
const kind = table.table_kind === 'v' ? 'view' : 'table';
return {
catalog: null,
db: schema,
name: table.table_name,
kind,
comment: table.table_comment || null,
estimatedRows: kind === 'view' ? null : finiteNumber(table.row_count),
columns: columns.map((column) => this.toSchemaColumn(column, primaryKeys)),
foreignKeys: foreignKeys.map((foreignKey) => this.toSchemaForeignKey(foreignKey)),
};
}
2026-05-10 23:51:24 +02:00
private toSchemaColumn(column: PostgresColumnRow, primaryKeys: Set<string>): KtxSchemaColumn {
2026-05-10 23:12:26 +02:00
return {
name: column.column_name,
nativeType: column.data_type,
normalizedType: this.dialect.mapDataType(column.data_type),
dimensionType: this.dialect.mapToDimensionType(column.data_type),
nullable: column.is_nullable,
primaryKey: primaryKeys.has(column.column_name),
comment: column.column_comment || null,
};
}
2026-05-10 23:51:24 +02:00
private toSchemaForeignKey(row: PostgresForeignKeyRow): KtxSchemaForeignKey {
2026-05-10 23:12:26 +02:00
return {
fromColumn: row.column_name,
toCatalog: null,
toDb: row.foreign_table_schema,
toTable: row.foreign_table_name,
toColumn: row.foreign_column_name,
constraintName: row.constraint_name || null,
};
}
2026-05-10 23:51:24 +02:00
private async getPool(): Promise<KtxPostgresPool> {
2026-05-10 23:12:26 +02:00
if (!this.pool) {
let config = { ...this.poolConfig };
if (this.endpointResolver) {
const endpoint = await this.endpointResolver.resolve({
host: config.host ?? this.connection.host ?? 'localhost',
port: config.port ?? numberValue(this.connection.port) ?? 5432,
connection: this.connection,
});
this.resolvedEndpoint = endpoint;
config = { ...config, host: endpoint.host, port: endpoint.port };
}
this.pool = this.poolFactory.createPool(config);
this.pool.on?.('error', (error) => {
this.lastIdlePoolError = error;
});
2026-05-10 23:12:26 +02:00
}
return this.pool;
}
private async queryRaw<T>(sql: string, params?: unknown[]): Promise<T[]> {
this.throwIdlePoolErrorIfPresent();
2026-05-10 23:12:26 +02:00
const pool = await this.getPool();
const client = await pool.connect();
try {
const result = await client.query(sql, params);
return result.rows as T[];
} finally {
client.release();
}
}
2026-05-10 23:51:24 +02:00
private async query(sql: string, params?: Record<string, unknown> | unknown[]): Promise<KtxQueryResult> {
this.throwIdlePoolErrorIfPresent();
2026-05-10 23:12:26 +02:00
const pool = await this.getPool();
const client = await pool.connect();
try {
const result = await client.query(assertReadOnlySql(sql), Array.isArray(params) ? params : undefined);
return {
headers: (result.fields ?? []).map((field) => field.name),
headerTypes: (result.fields ?? []).map((field) => PG_OID_TYPE_MAP[field.dataTypeID] ?? `oid:${field.dataTypeID}`),
rows: queryRows(result),
totalRows: result.rows.length,
rowCount: result.rows.length,
};
} finally {
client.release();
}
}
private assertConnection(connectionId: string): void {
if (connectionId !== this.connectionId) {
throw new Error(`PostgreSQL connector ${this.connectionId} cannot run scan for ${connectionId}`);
}
}
private throwIdlePoolErrorIfPresent(): void {
if (!this.lastIdlePoolError) {
return;
}
const error = this.lastIdlePoolError;
this.lastIdlePoolError = null;
throw error;
}
2026-05-10 23:12:26 +02:00
}