chore(workspace): gate dead-code with knip production mode (#196)

* refactor(workspace): relocate @ktx/llm source into packages/cli/src/llm

* refactor(workspace): rewrite @ktx/llm imports to relative paths

* refactor(workspace): fold internal packages into cli

* chore(workspace): gate dead-code with knip production mode

Turn on production-mode knip plus an autofix run in pre-commit and the
`pnpm dead-code` script, document the `/** @internal */` convention for
test-only exports in AGENTS.md, annotate test-only exports across the
CLI with that JSDoc, and drop dead exports/wrappers the new gate
surfaced (e.g. `cli-project.ts`, `lookerRuntimeSourceToFileAdapterSource`,
`createLocalScanEnrichmentProvidersFromConfig`,
`PGLITE_OWNER_PROCESS_BACKEND_CAPABILITIES`, stale type re-exports).
Replace the loose `ignoreIssues` allowlist in `knip.json` with explicit
production entries so cross-package barrel leaks are caught.

* refactor(cli): delete internal barrel index.ts files

The 34 `index.ts` re-export barrels inside `packages/cli/src/` were
holdovers from the pre-fold multi-workspace structure. Post-fold-in they
served no production purpose: external consumers go through the single
package main entry, and in-repo callers mostly imported through them
only because the path was short. Internally, knip flagged most barrel
re-exports as production-dead (only reached via tests).

This change:
- Deletes every internal barrel except `packages/cli/src/index.ts`
  (the published package entry).
- Rewrites ~270 source/test files to import each name directly from
  the file that defines it.
- Moves `tools/warehouse-verification/index.ts` to
  `create-warehouse-verification-tools.ts` (the function it defined
  locally) and updates its single consumer.
- Renames `search/backend-conformance.ts` → `.test-utils.ts` to match
  the existing test-helper file convention.
- Deletes 13 dead test-only chains (dbt-descriptions/*,
  live-database/extracted-schema, live-database/structural-sync,
  relationship-* feedback/review chain) plus their tests and a
  cascading orphan integration test.
- Updates test mocks that pointed at deleted barrel paths
  (notion-client, connector barrels in scan/local-scan-connectors
  tests) to mock the source files instead.
- Points the maintainer benchmark script
  (`scripts/relationship-benchmark-report.mjs`) at source files
  instead of `dist/context/scan/index.js`.
- Drops the barrel `!` entries from `knip.json`; adds explicit
  production entries only for the benchmark code reached via dist by
  the maintainer script.

Net: 413 files changed, ~1.2k insertions, ~9.4k deletions.

`pnpm run dead-code` (Biome + knip default + knip production) and
`pnpm run type-check` are clean; 2277 tests pass.

* refactor(workspace): rename @ktx/cli to @kaelio/ktx and pack it directly

Promote the CLI workspace package to the public name `@kaelio/ktx` and
drop the separate `scripts/build-public-npm-package.mjs` wrapper. The
CLI package is now publishable in place (`publishConfig.access: public`,
`provenance: true`), so artifact packing uses `pnpm pack` against
`packages/cli/` instead of assembling a parallel package tree.

Updates all workspace filter invocations, docs, tests, and release
readiness checks to reference the new package name, and folds the
tarball-name helper into `scripts/public-npm-release-metadata.mjs`.

* docs: align "agent clients" and "data agents" terminology

Replace "client agents" with "agent clients" and "database agents" with
"data agents" across AGENTS.md, README.md, the docs-site copy, and the
matching setup-agents test description, matching the canonical
vocabulary in docs/terminology.md.

Also moves packages/cli/tsconfig.json's tsBuildInfoFile from
node_modules/.cache/ to dist/.tsbuildinfo so incremental builds survive
node_modules reinstalls.

* refactor(release): single source of truth for package version

Make packages/cli/package.json the single source of truth for the
@kaelio/ktx version. publicNpmPackageVersion() now reads it directly,
so artifact filenames, release-readiness checks, and the Python wheel
version all derive from one field. The duplicate
release-policy.json.publicNpmPackageVersion is removed.

Previously the two fields could drift: tarballs were named
kaelio-ktx-0.4.1.tgz while internally containing
@kaelio/ktx@0.0.0-private.

- update-public-release-version.mjs rewrites both Python pyproject.toml
  files (ktx-daemon, ktx-sl) alongside the npm package.jsons,
  normalizing the version for PEP 440 (e.g. 0.1.0-rc.2 -> 0.1.0rc2).
- semantic-release-config.cjs adds the two pyproject.toml files to
  @semantic-release/git assets so the release commit back to main
  carries every version source in lockstep.
- The six "?? '0.0.0-private'" fallback literals across the CLI are
  replaced with "?? getKtxCliPackageInfo().version", and
  createDefaultKtxMcpServer makes its version arg required.
- docs/release.md describes the actual commit-back model: the dev tree
  always reflects the most recent release; no sentinel pin to
  maintain.

Verified: pnpm run artifacts:build now produces
kaelio-ktx-0.4.1.tgz and kaelio_ktx-0.4.1-py3-none-any.whl with
@kaelio/ktx@0.4.1 inside. Full type-check, dead-code, and
2287 vitests + 173 script tests pass.

* refactor(cli): inject embedding provider resolution and detect sentence-transformers runtime

Make resolveProjectEmbeddingProvider and runtimeIo injectable in ingest and
scan command entrypoints so tests can stub them, and teach
resolvePublicIngestRuntimeRequirements to flag the local-embeddings runtime
feature when ktx.yaml selects sentence-transformers.

* chore(cli): mark buildLocalStatsStatus and LocalStatsStatus as @internal

Both symbols are consumed only by status-project.test.ts. Annotating with
/** @internal */ keeps knip's production-mode check clean without changing
runtime behavior.

* fix(cli): use real package metadata in print-command-tree

The stubbed package name embedded a forbidden product identifier that
tripped the boundary check in CI. Read the metadata from package.json
instead — keeps the rendered tree unchanged and removes a duplicate
source of truth.

* feat(cli): show embedding coverage in `ktx status`, drop duplicate disk counts

Inline `(N embedded)` next to the Wiki scope counts and Semantic-layer
source counts, computed with `SUM(embedding_json IS NOT NULL)` over
`knowledge_pages` and `local_sl_sources`. Rename the "Knowledge" label to
"Wiki" (canonical per `docs/terminology.md`) and rename the matching
`localStats.knowledgePages` field to `localStats.wikiPages`.

Drop `wiki=N md` and `semantic-layer=N yaml` from the Disk row — those
duplicated the per-surface rows above. Disk now reports only actual byte
usage (db, cache, raw-sources). The unused `wikiGlobalMarkdownCount` /
`semanticLayerYamlCount` fields, the `isMarkdownEntry` / `isYamlEntry`
helpers, and the `filter` arg on `summarizeDir` are removed.
This commit is contained in:
Andrey Avtomonov 2026-05-21 15:28:58 +02:00 committed by GitHub
parent a1cfb03d73
commit 2366b00301
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
1002 changed files with 2286 additions and 12051 deletions

View file

@ -0,0 +1,387 @@
import { describe, expect, it, vi } from 'vitest';
import { createPostgresLiveDatabaseIntrospection } from '../../connectors/postgres/live-database-introspection.js';
import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js';
interface FakeQueryResult {
rows: Record<string, unknown>[];
fields?: Array<{ name: string; dataTypeID: number }>;
}
function fakePoolFactory(results: Map<string, FakeQueryResult>): KtxPostgresPoolFactory {
const query = vi.fn(async (sql: string, params?: unknown[]) => {
const normalized = sql.replace(/\s+/g, ' ').trim();
for (const [key, value] of results.entries()) {
if (normalized.includes(key)) {
return value;
}
}
throw new Error(`Unexpected SQL: ${normalized} params=${JSON.stringify(params ?? [])}`);
});
return {
createPool() {
return {
async connect() {
return {
query,
release: vi.fn(),
};
},
end: vi.fn(async () => undefined),
};
},
};
}
function metadataResults(): Map<string, FakeQueryResult> {
return new Map<string, FakeQueryResult>([
[
'FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n',
{
rows: [
{ table_name: 'customers', table_kind: 'r', row_count: '2', table_comment: 'Customers' },
{ table_name: 'orders', table_kind: 'r', row_count: '3', table_comment: null },
{ table_name: 'recent_orders', table_kind: 'v', row_count: '0', table_comment: 'Recent orders' },
],
},
],
[
'FROM pg_catalog.pg_attribute a JOIN pg_catalog.pg_class c',
{
rows: [
{ table_name: 'customers', column_name: 'id', data_type: 'integer', is_nullable: false, column_comment: null },
{ table_name: 'customers', column_name: 'name', data_type: 'text', is_nullable: false, column_comment: 'Name' },
{ table_name: 'orders', column_name: 'id', data_type: 'integer', is_nullable: false, column_comment: null },
{ table_name: 'orders', column_name: 'customer_id', data_type: 'integer', is_nullable: false, column_comment: null },
{ table_name: 'orders', column_name: 'status', data_type: 'text', is_nullable: true, column_comment: null },
{ table_name: 'recent_orders', column_name: 'id', data_type: 'integer', is_nullable: true, column_comment: null },
],
},
],
[
"tc.constraint_type = 'FOREIGN KEY'",
{
rows: [
{
table_name: 'orders',
column_name: 'customer_id',
foreign_table_schema: 'public',
foreign_table_name: 'customers',
foreign_column_name: 'id',
constraint_name: 'orders_customer_id_fkey',
},
],
},
],
[
"tc.constraint_type = 'PRIMARY KEY'",
{
rows: [
{ table_name: 'customers', column_name: 'id' },
{ table_name: 'orders', column_name: 'id' },
],
},
],
['SELECT "id" FROM "public"."orders" LIMIT 1', { rows: [{ id: 10 }], fields: [{ name: 'id', dataTypeID: 23 }] }],
[
'SELECT "status" FROM "public"."orders" WHERE "status" IS NOT NULL',
{ rows: [{ status: 'paid' }, { status: 'open' }], fields: [{ name: 'status', dataTypeID: 25 }] },
],
['COUNT(DISTINCT val) AS cardinality', { rows: [{ cardinality: '2' }] }],
['SELECT DISTINCT "status"::text AS val', { rows: [{ val: 'open' }, { val: 'paid' }] }],
['SELECT COUNT(*) AS count FROM "public"."orders"', { rows: [{ count: '3' }] }],
['FROM pg_stats s', { rows: [{ column_name: 'status', estimated_cardinality: '2' }] }],
['SELECT 1', { rows: [{ '?column?': 1 }], fields: [{ name: '?column?', dataTypeID: 23 }] }],
['SELECT schema_name FROM information_schema.schemata', { rows: [{ schema_name: 'public' }] }],
]);
}
describe('KtxPostgresScanConnector', () => {
it('resolves configuration safely', () => {
expect(isKtxPostgresConnectionConfig({ driver: 'postgres', url: 'env:DATABASE_URL' })).toBe(true);
expect(isKtxPostgresConnectionConfig({ driver: 'postgresql', host: 'db', database: 'analytics' })).toBe(true);
expect(isKtxPostgresConnectionConfig({ driver: 'mysql', host: 'db' })).toBe(false);
expect(
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
schemas: ['analytics', 'public'],
ssl: true,
rejectUnauthorized: false,
},
}),
).toMatchObject({
host: 'db.example.test',
port: 5432,
database: 'analytics',
user: 'reader',
password: 'test-password', // pragma: allowlist secret
options: '-c search_path=analytics,public',
ssl: { rejectUnauthorized: false },
});
const libpqPreferConfig = postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
url: 'env:DEMO_DATABASE_URL',
},
env: {
DEMO_DATABASE_URL: 'postgresql://reader@demo.example.test:5432/demo?sslmode=prefer',
},
});
expect(libpqPreferConfig).toMatchObject({
host: 'demo.example.test',
port: 5432,
database: 'demo',
user: 'reader',
});
expect(libpqPreferConfig).not.toHaveProperty('connectionString');
expect(libpqPreferConfig).not.toHaveProperty('ssl');
expect(
postgresPoolConfigFromConfig({
connectionId: 'warehouse',
connection: { driver: 'postgres', host: 'db.example.test', database: 'analytics', username: 'reader' },
}),
).toMatchObject({
host: 'db.example.test',
database: 'analytics',
user: 'reader',
});
});
it('introspects schemas, tables, views, primary keys, comments, row counts, and foreign keys', async () => {
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
schema: 'public',
},
poolFactory: fakePoolFactory(metadataResults()),
now: () => new Date('2026-04-29T10:00:00.000Z'),
});
const snapshot = await connector.introspect(
{ connectionId: 'warehouse', driver: 'postgres' },
{ runId: 'scan-run-1' },
);
expect(snapshot).toMatchObject({
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: '2026-04-29T10:00:00.000Z',
scope: { schemas: ['public'] },
metadata: {
database: 'analytics',
schemas: ['public'],
host: 'db.example.test',
table_count: 3,
total_columns: 6,
},
});
expect(snapshot.tables.map((table) => [table.db, table.name, table.kind, table.estimatedRows])).toEqual([
['public', 'customers', 'table', 2],
['public', 'orders', 'table', 3],
['public', 'recent_orders', 'view', null],
]);
expect(snapshot.tables.find((table) => table.name === 'customers')?.columns[0]).toMatchObject({
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
});
expect(snapshot.tables.find((table) => table.name === 'orders')?.foreignKeys).toEqual([
{
fromColumn: 'customer_id',
toCatalog: null,
toDb: 'public',
toTable: 'customers',
toColumn: 'id',
constraintName: 'orders_customer_id_fkey',
},
]);
});
it('runs samples, distinct values, statistics, read-only SQL, and schema listing', async () => {
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
schema: 'public',
},
poolFactory: fakePoolFactory(metadataResults()),
});
await expect(
connector.sampleTable(
{ connectionId: 'warehouse', table: { catalog: null, db: 'public', name: 'orders' }, columns: ['id'], limit: 1 },
{ runId: 'scan-run-1' },
),
).resolves.toEqual({ headers: ['id'], headerTypes: ['integer'], rows: [[10]], totalRows: 1 });
await expect(
connector.sampleColumn(
{ connectionId: 'warehouse', table: { catalog: null, db: 'public', name: 'orders' }, column: 'status', limit: 5 },
{ runId: 'scan-run-1' },
),
).resolves.toMatchObject({ values: ['paid', 'open'], nullCount: null, distinctCount: null });
await expect(
connector.getColumnDistinctValues(
{ catalog: null, db: 'public', name: 'orders' },
'status',
{ maxCardinality: 5, limit: 10, sampleSize: 100 },
),
).resolves.toEqual({ values: ['open', 'paid'], cardinality: 2 });
await expect(connector.getColumnStatistics({ catalog: null, db: 'public', name: 'orders' })).resolves.toEqual({
cardinalityByColumn: new Map([['status', 2]]),
});
await expect(connector.getTableRowCount({ db: 'public', name: 'orders' })).resolves.toBe(3);
await expect(connector.listSchemas()).resolves.toEqual(['public']);
await expect(connector.testConnection()).resolves.toEqual({ success: true });
await expect(
connector.executeReadOnly({ connectionId: 'warehouse', sql: 'delete from orders' }, { runId: 'scan-run-1' }),
).rejects.toThrow('Only read-only SELECT/WITH queries can be executed locally');
});
it('adapts native PostgreSQL snapshots to live-database introspection for local ingest', async () => {
const introspection = createPostgresLiveDatabaseIntrospection({
connections: {
warehouse: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
schema: 'public',
},
},
poolFactory: fakePoolFactory(metadataResults()),
now: () => new Date('2026-04-29T10:00:00.000Z'),
});
const snapshot = await introspection.extractSchema('warehouse');
expect(snapshot).toMatchObject({
connectionId: 'warehouse',
extractedAt: '2026-04-29T10:00:00.000Z',
});
expect(snapshot.tables.find((table) => table.name === 'customers')).toMatchObject({
name: 'customers',
catalog: null,
db: 'public',
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: null,
},
{
name: 'name',
nativeType: 'text',
normalizedType: 'text',
dimensionType: 'string',
nullable: false,
primaryKey: false,
comment: 'Name',
},
],
foreignKeys: [],
});
});
it('does not end the pool before introspection completes', async () => {
let endCalled = false;
const endAwarePoolFactory: KtxPostgresPoolFactory = {
createPool() {
const inner = fakePoolFactory(metadataResults()).createPool({
max: 1,
idleTimeoutMillis: 1,
connectionTimeoutMillis: 1,
});
return {
async connect() {
if (endCalled) {
throw new Error('Cannot use a pool after calling end on the pool');
}
return inner.connect();
},
async end() {
endCalled = true;
return inner.end();
},
};
},
};
const introspection = createPostgresLiveDatabaseIntrospection({
connections: {
warehouse: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
schema: 'public',
},
},
poolFactory: endAwarePoolFactory,
now: () => new Date('2026-04-29T10:00:00.000Z'),
});
const snapshot = await introspection.extractSchema('warehouse');
expect(snapshot.tables.length).toBeGreaterThan(0);
expect(endCalled).toBe(true);
});
it('attaches an error listener to the pg pool', async () => {
const on = vi.fn();
const poolFactory: KtxPostgresPoolFactory = {
createPool() {
return {
on,
async connect() {
return {
query: vi.fn(async () => ({ rows: [{ '?column?': 1 }], fields: [{ name: '?column?', dataTypeID: 23 }] })),
release: vi.fn(),
};
},
end: vi.fn(async () => undefined),
};
},
};
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
},
poolFactory,
});
await expect(connector.testConnection()).resolves.toEqual({ success: true });
expect(on).toHaveBeenCalledWith('error', expect.any(Function));
});
});

View file

@ -0,0 +1,742 @@
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { assertReadOnlySql, limitSqlForExecution } from '../../context/connections/read-only-sql.js';
import { createKtxConnectorCapabilities, type KtxColumnSampleInput, type KtxColumnSampleResult, type KtxColumnStatsInput, type KtxColumnStatsResult, type KtxQueryResult, type KtxReadOnlyQueryInput, type KtxScanConnector, type KtxScanContext, type KtxScanInput, type KtxSchemaColumn, type KtxSchemaForeignKey, type KtxSchemaSnapshot, type KtxSchemaTable, type KtxTableListEntry, type KtxTableRef, type KtxTableSampleInput, type KtxTableSampleResult } from '../../context/scan/types.js';
import { Pool } from 'pg';
import { KtxPostgresDialect } from './dialect.js';
const PG_OID_TYPE_MAP: Record<number, string> = {
16: 'boolean',
20: 'bigint',
21: 'smallint',
23: 'integer',
25: 'text',
700: 'real',
701: 'double precision',
1043: 'varchar',
1082: 'date',
1114: 'timestamp',
1184: 'timestamptz',
1700: 'numeric',
2950: 'uuid',
3802: 'jsonb',
114: 'json',
1009: 'text[]',
1007: 'integer[]',
1016: 'bigint[]',
};
export interface KtxPostgresConnectionConfig {
driver?: string;
host?: string;
port?: number;
database?: string;
username?: string;
user?: string;
password?: string;
url?: string;
schema?: string;
schemas?: string[];
ssl?: boolean;
sslmode?: string;
sslMode?: string;
rejectUnauthorized?: boolean;
[key: string]: unknown;
}
export interface KtxPostgresPoolConfig {
host?: string;
port?: number;
database?: string;
user?: string;
password?: string;
connectionString?: string;
max: number;
idleTimeoutMillis: number;
connectionTimeoutMillis: number;
options?: string;
ssl?: { rejectUnauthorized: boolean };
}
interface KtxPostgresQueryResult {
fields?: Array<{ name: string; dataTypeID: number }>;
rows: Record<string, unknown>[];
}
interface KtxPostgresClient {
query(sql: string, params?: unknown[]): Promise<KtxPostgresQueryResult>;
release(): void;
}
interface KtxPostgresPool {
connect(): Promise<KtxPostgresClient>;
end(): Promise<void>;
on?(event: 'error', listener: (error: Error) => void): void;
}
export interface KtxPostgresPoolFactory {
createPool(config: KtxPostgresPoolConfig): KtxPostgresPool;
}
interface KtxPostgresResolvedEndpoint {
host: string;
port: number;
close?: () => Promise<void>;
}
export interface KtxPostgresEndpointResolver {
resolve(input: {
host: string;
port: number;
connection: KtxPostgresConnectionConfig;
}): Promise<KtxPostgresResolvedEndpoint>;
}
export interface KtxPostgresScanConnectorOptions {
connectionId: string;
connection: KtxPostgresConnectionConfig | undefined;
poolFactory?: KtxPostgresPoolFactory;
endpointResolver?: KtxPostgresEndpointResolver;
env?: NodeJS.ProcessEnv;
now?: () => Date;
}
export interface KtxPostgresReadOnlyQueryInput extends KtxReadOnlyQueryInput {
params?: Record<string, unknown> | unknown[];
}
export interface KtxPostgresColumnDistinctValuesOptions {
maxCardinality: number;
limit: number;
sampleSize?: number;
}
export interface KtxPostgresColumnDistinctValuesResult {
values: string[] | null;
cardinality: number;
}
export interface KtxPostgresColumnStatisticsResult {
cardinalityByColumn: Map<string, number>;
}
export interface KtxPostgresTableSampleResult extends KtxTableSampleResult {
headerTypes?: string[];
}
type PostgresTableRef = Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>;
interface PostgresTableRow {
table_name: string;
table_kind: string;
row_count: unknown;
table_comment: string | null;
}
interface PostgresColumnRow {
table_name: string;
column_name: string;
data_type: string;
is_nullable: boolean;
column_comment: string | null;
}
interface PostgresPrimaryKeyRow {
table_name: string;
column_name: string;
}
interface PostgresForeignKeyRow {
table_name: string;
column_name: string;
foreign_table_schema: string | null;
foreign_table_name: string;
foreign_column_name: string;
constraint_name: string | null;
}
interface PostgresSchemaRow {
schema_name: string;
}
interface PostgresTableListRow {
schema_name: string;
table_name: string;
table_kind: string;
}
interface PostgresCountRow {
count?: unknown;
cardinality?: unknown;
}
interface PostgresDistinctValueRow {
val: unknown;
}
interface PostgresStatsRow {
column_name: string;
estimated_cardinality: unknown;
}
class DefaultPostgresPoolFactory implements KtxPostgresPoolFactory {
createPool(config: KtxPostgresPoolConfig): KtxPostgresPool {
return new Pool(config);
}
}
function groupByTable<T extends { table_name: string }>(rows: T[]): Map<string, T[]> {
const grouped = new Map<string, T[]>();
for (const row of rows) {
const tableRows = grouped.get(row.table_name) ?? [];
tableRows.push(row);
grouped.set(row.table_name, tableRows);
}
return grouped;
}
function primaryKeyMap(rows: PostgresPrimaryKeyRow[]): Map<string, Set<string>> {
const grouped = new Map<string, Set<string>>();
for (const row of rows) {
const columns = grouped.get(row.table_name) ?? new Set<string>();
columns.add(row.column_name);
grouped.set(row.table_name, columns);
}
return grouped;
}
function queryRows(result: KtxPostgresQueryResult): unknown[][] {
const headers = (result.fields ?? []).map((field) => field.name);
return result.rows.map((row) => headers.map((header) => row[header]));
}
function finiteNumber(value: unknown): number | null {
const parsed = Number(value);
return Number.isFinite(parsed) ? parsed : null;
}
function stringConfigValue(
connection: KtxPostgresConnectionConfig | undefined,
key: keyof KtxPostgresConnectionConfig,
env: NodeJS.ProcessEnv,
): string | undefined {
const value = connection?.[key];
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function resolveStringReference(value: string, env: NodeJS.ProcessEnv): string {
if (value.startsWith('env:')) {
return env[value.slice('env:'.length)] ?? '';
}
if (value.startsWith('file:')) {
const rawPath = value.slice('file:'.length);
const path = rawPath.startsWith('~') ? resolve(homedir(), rawPath.slice(1)) : rawPath;
return readFileSync(path, 'utf-8').trim();
}
return value;
}
function numberValue(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
function parsePostgresUrl(url: string): Partial<KtxPostgresConnectionConfig> {
const parsed = new URL(url);
const sslmode = parsed.searchParams.get('sslmode') ?? undefined;
return {
host: parsed.hostname,
port: parsed.port ? Number(parsed.port) : undefined,
database: parsed.pathname.replace(/^\/+/, '') || undefined,
username: parsed.username ? decodeURIComponent(parsed.username) : undefined,
password: parsed.password ? decodeURIComponent(parsed.password) : undefined,
...(sslmode ? { sslmode } : {}),
};
}
function normalizedSslMode(connection: KtxPostgresConnectionConfig): string | undefined {
const value = connection.sslmode ?? connection.sslMode;
return typeof value === 'string' && value.trim().length > 0 ? value.trim().toLowerCase() : undefined;
}
function schemasFromConnection(connection: KtxPostgresConnectionConfig): string[] {
if (Array.isArray(connection.schemas) && connection.schemas.length > 0) {
return connection.schemas.filter((schema): schema is string => typeof schema === 'string' && schema.length > 0);
}
return typeof connection.schema === 'string' && connection.schema.length > 0 ? [connection.schema] : ['public'];
}
function searchPathSchemasFromConnection(connection: KtxPostgresConnectionConfig): string[] {
const schemas = schemasFromConnection(connection);
return schemas.includes('public') ? schemas : [...schemas, 'public'];
}
export function isKtxPostgresConnectionConfig(
connection: KtxPostgresConnectionConfig | undefined,
): connection is KtxPostgresConnectionConfig {
const driver = String(connection?.driver ?? '').toLowerCase();
return driver === 'postgres' || driver === 'postgresql';
}
/** @internal */
export function postgresPoolConfigFromConfig(input: {
connectionId: string;
connection: KtxPostgresConnectionConfig | undefined;
env?: NodeJS.ProcessEnv;
}): KtxPostgresPoolConfig {
const inputDriver = input.connection?.driver ?? 'unknown';
if (!isKtxPostgresConnectionConfig(input.connection)) {
throw new Error(`Native PostgreSQL connector cannot run driver "${inputDriver}"`);
}
const env = input.env ?? process.env;
const referencedUrl = stringConfigValue(input.connection, 'url', env);
const urlConfig = referencedUrl ? parsePostgresUrl(referencedUrl) : {};
const merged: KtxPostgresConnectionConfig = { ...urlConfig, ...input.connection };
const host = stringConfigValue(merged, 'host', env);
const database = stringConfigValue(merged, 'database', env);
const user = stringConfigValue(merged, 'username', env) ?? stringConfigValue(merged, 'user', env);
const password = stringConfigValue(merged, 'password', env);
const sslmode = normalizedSslMode(merged);
if (!referencedUrl && !host) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.host or url`);
}
if (!database && !referencedUrl) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.database or url`);
}
if (!user && !referencedUrl) {
throw new Error(`Native PostgreSQL connector requires connections.${input.connectionId}.username, user, or url`);
}
const config: KtxPostgresPoolConfig = {
max: 10,
idleTimeoutMillis: 30_000,
connectionTimeoutMillis: 10_000,
...(referencedUrl && sslmode !== 'prefer' && sslmode !== 'disable'
? { connectionString: referencedUrl }
: { host, port: numberValue(merged.port) ?? 5432, database, user, password }),
};
const searchPathSchemas = searchPathSchemasFromConnection(merged);
if (searchPathSchemas.length > 0) {
config.options = `-c search_path=${searchPathSchemas.join(',')}`;
}
if (merged.ssl && sslmode !== 'prefer' && sslmode !== 'disable') {
config.ssl = { rejectUnauthorized: merged.rejectUnauthorized ?? true };
}
return config;
}
export class KtxPostgresScanConnector implements KtxScanConnector {
readonly id: string;
readonly driver = 'postgres' as const;
readonly capabilities = createKtxConnectorCapabilities({
tableSampling: true,
columnSampling: true,
columnStats: true,
readOnlySql: true,
nestedAnalysis: true,
formalForeignKeys: true,
estimatedRowCounts: true,
});
private readonly connectionId: string;
private readonly connection: KtxPostgresConnectionConfig;
private readonly poolConfig: KtxPostgresPoolConfig;
private readonly poolFactory: KtxPostgresPoolFactory;
private readonly endpointResolver?: KtxPostgresEndpointResolver;
private readonly now: () => Date;
private readonly dialect = new KtxPostgresDialect();
private pool: KtxPostgresPool | null = null;
private lastIdlePoolError: Error | null = null;
private resolvedEndpoint: KtxPostgresResolvedEndpoint | null = null;
constructor(options: KtxPostgresScanConnectorOptions) {
this.connectionId = options.connectionId;
this.connection = options.connection ?? {};
this.poolConfig = postgresPoolConfigFromConfig({
connectionId: options.connectionId,
connection: options.connection,
env: options.env,
});
this.poolFactory = options.poolFactory ?? new DefaultPostgresPoolFactory();
this.endpointResolver = options.endpointResolver;
this.now = options.now ?? (() => new Date());
this.id = `postgres:${options.connectionId}`;
}
async testConnection(): Promise<{ success: boolean; error?: string }> {
try {
await this.query('SELECT 1');
return { success: true };
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) };
}
}
async introspect(input: KtxScanInput, _ctx: KtxScanContext): Promise<KtxSchemaSnapshot> {
this.assertConnection(input.connectionId);
const schemas = schemasFromConnection(this.connection);
const allTables: KtxSchemaTable[] = [];
for (const schema of schemas) {
const tables = await this.loadSchemaTables(schema);
allTables.push(...tables);
}
return {
connectionId: this.connectionId,
driver: 'postgres',
extractedAt: this.now().toISOString(),
scope: { schemas },
metadata: {
database: this.poolConfig.database ?? this.connection.database ?? null,
schemas,
host: this.poolConfig.host ?? this.connection.host ?? null,
table_count: allTables.length,
total_columns: allTables.reduce((sum, table) => sum + table.columns.length, 0),
},
tables: allTables,
};
}
async sampleTable(input: KtxTableSampleInput, _ctx: KtxScanContext): Promise<KtxPostgresTableSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.query(this.dialect.generateSampleQuery(this.qTableName(input.table), input.limit, input.columns));
return {
headers: result.headers,
headerTypes: result.headerTypes,
rows: result.rows,
totalRows: result.totalRows,
};
}
async sampleColumn(input: KtxColumnSampleInput, _ctx: KtxScanContext): Promise<KtxColumnSampleResult> {
this.assertConnection(input.connectionId);
const result = await this.query(
this.dialect.generateColumnSampleQuery(this.qTableName(input.table), input.column, input.limit),
);
const values = result.rows.filter((row) => row.length > 0 && row[0] !== null).map((row) => row[0]);
return { values, nullCount: null, distinctCount: null };
}
async columnStats(input: KtxColumnStatsInput, _ctx: KtxScanContext): Promise<KtxColumnStatsResult | null> {
const stats = await this.getColumnStatistics(input.table);
const value = stats?.cardinalityByColumn.get(input.column);
return value === undefined
? null
: { min: null, max: null, average: null, nullCount: null, distinctCount: value };
}
async executeReadOnly(input: KtxPostgresReadOnlyQueryInput, _ctx: KtxScanContext): Promise<KtxQueryResult> {
this.assertConnection(input.connectionId);
const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows);
const prepared = Array.isArray(input.params)
? { sql: limitedSql, params: input.params }
: this.dialect.prepareQuery(limitedSql, input.params);
const result = await this.query(prepared.sql, prepared.params);
return { ...result, rowCount: result.rows.length };
}
async getColumnDistinctValues(
table: KtxTableRef,
columnName: string,
options: KtxPostgresColumnDistinctValuesOptions,
): Promise<KtxPostgresColumnDistinctValuesResult | null> {
const sampleSize = options.sampleSize ?? 10000;
const tableName = this.qTableName(table);
const quotedColumn = this.dialect.quoteIdentifier(columnName);
const cardinalityRows = await this.queryRaw<PostgresCountRow>(
this.dialect.generateCardinalitySampleQuery(tableName, quotedColumn, sampleSize),
);
const cardinality = finiteNumber(cardinalityRows[0]?.cardinality);
if (cardinality === null) {
return null;
}
if (cardinality === 0) {
return { values: [], cardinality: 0 };
}
if (cardinality > options.maxCardinality) {
return { values: null, cardinality };
}
const valuesRows = await this.queryRaw<PostgresDistinctValueRow>(
this.dialect.generateDistinctValuesQuery(tableName, quotedColumn, options.limit),
);
return {
values: valuesRows.filter((row) => row.val !== null).map((row) => String(row.val)),
cardinality,
};
}
async getColumnStatistics(table: KtxTableRef): Promise<KtxPostgresColumnStatisticsResult | null> {
const schema = table.db ?? schemasFromConnection(this.connection)[0] ?? 'public';
const sql = this.dialect.generateColumnStatisticsQuery(schema, table.name);
if (!sql) {
return null;
}
const rows = await this.queryRaw<PostgresStatsRow>(sql);
const cardinalityByColumn = new Map<string, number>();
for (const row of rows) {
const cardinality = finiteNumber(row.estimated_cardinality);
if (cardinality !== null) {
cardinalityByColumn.set(row.column_name, cardinality);
}
}
return cardinalityByColumn.size > 0 ? { cardinalityByColumn } : null;
}
async getTableRowCount(table: string | PostgresTableRef): Promise<number> {
const tableRef =
typeof table === 'string'
? { catalog: null, db: schemasFromConnection(this.connection)[0] ?? 'public', name: table }
: table;
const rows = await this.queryRaw<PostgresCountRow>(`SELECT COUNT(*) AS count FROM ${this.qTableName(tableRef)}`);
return finiteNumber(rows[0]?.count) ?? 0;
}
qTableName(table: PostgresTableRef): string {
return this.dialect.formatTableName(table);
}
quoteIdentifier(identifier: string): string {
return this.dialect.quoteIdentifier(identifier);
}
async listSchemas(): Promise<string[]> {
const rows = await this.queryRaw<PostgresSchemaRow>(`
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name <> 'information_schema'
AND schema_name NOT LIKE 'pg_%'
ORDER BY schema_name
`);
return rows.map((row) => row.schema_name);
}
async listTables(schemas?: string[]): Promise<KtxTableListEntry[]> {
const filterSchemas = schemas ?? (await this.listSchemas());
if (filterSchemas.length === 0) return [];
const rows = await this.queryRaw<PostgresTableListRow>(
`
SELECT n.nspname AS schema_name, c.relname AS table_name, c.relkind AS table_kind
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = ANY($1)
AND c.relkind IN ('r', 'v')
ORDER BY n.nspname, c.relname
`,
[filterSchemas],
);
return rows.map((row) => ({
schema: row.schema_name,
name: row.table_name,
kind: row.table_kind === 'v' ? ('view' as const) : ('table' as const),
}));
}
async cleanup(): Promise<void> {
if (this.pool) {
await this.pool.end();
this.pool = null;
}
if (this.resolvedEndpoint?.close) {
await this.resolvedEndpoint.close();
this.resolvedEndpoint = null;
}
}
private async loadSchemaTables(schema: string): Promise<KtxSchemaTable[]> {
const tables = await this.queryRaw<PostgresTableRow>(
`
SELECT
c.relname AS table_name,
c.relkind AS table_kind,
c.reltuples::bigint AS row_count,
d.description AS table_comment
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_catalog.pg_description d
ON d.objoid = c.oid AND d.objsubid = 0
WHERE n.nspname = $1
AND c.relkind IN ('r', 'v')
ORDER BY c.relname
`,
[schema],
);
const columns = await this.queryRaw<PostgresColumnRow>(
`
SELECT
c.relname AS table_name,
a.attname AS column_name,
format_type(a.atttypid, a.atttypmod) AS data_type,
NOT a.attnotnull AS is_nullable,
d.description AS column_comment
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_catalog.pg_description d
ON d.objoid = c.oid AND d.objsubid = a.attnum
WHERE n.nspname = $1
AND c.relkind IN ('r', 'v')
AND a.attnum > 0
AND NOT a.attisdropped
ORDER BY c.relname, a.attnum
`,
[schema],
);
const primaryKeys = await this.queryRaw<PostgresPrimaryKeyRow>(
`
SELECT tc.table_name, kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = $1
ORDER BY tc.table_name, kcu.ordinal_position
`,
[schema],
);
const foreignKeys = await this.queryRaw<PostgresForeignKeyRow>(
`
SELECT
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name,
tc.constraint_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = $1
ORDER BY tc.table_name, kcu.column_name
`,
[schema],
);
const columnsByTable = groupByTable(columns);
const primaryKeysByTable = primaryKeyMap(primaryKeys);
const foreignKeysByTable = groupByTable(foreignKeys);
return tables.map((table) =>
this.toSchemaTable(
schema,
table,
columnsByTable.get(table.table_name) ?? [],
primaryKeysByTable.get(table.table_name) ?? new Set<string>(),
foreignKeysByTable.get(table.table_name) ?? [],
),
);
}
private toSchemaTable(
schema: string,
table: PostgresTableRow,
columns: PostgresColumnRow[],
primaryKeys: Set<string>,
foreignKeys: PostgresForeignKeyRow[],
): KtxSchemaTable {
const kind = table.table_kind === 'v' ? 'view' : 'table';
return {
catalog: null,
db: schema,
name: table.table_name,
kind,
comment: table.table_comment || null,
estimatedRows: kind === 'view' ? null : finiteNumber(table.row_count),
columns: columns.map((column) => this.toSchemaColumn(column, primaryKeys)),
foreignKeys: foreignKeys.map((foreignKey) => this.toSchemaForeignKey(foreignKey)),
};
}
private toSchemaColumn(column: PostgresColumnRow, primaryKeys: Set<string>): KtxSchemaColumn {
return {
name: column.column_name,
nativeType: column.data_type,
normalizedType: this.dialect.mapDataType(column.data_type),
dimensionType: this.dialect.mapToDimensionType(column.data_type),
nullable: column.is_nullable,
primaryKey: primaryKeys.has(column.column_name),
comment: column.column_comment || null,
};
}
private toSchemaForeignKey(row: PostgresForeignKeyRow): KtxSchemaForeignKey {
return {
fromColumn: row.column_name,
toCatalog: null,
toDb: row.foreign_table_schema,
toTable: row.foreign_table_name,
toColumn: row.foreign_column_name,
constraintName: row.constraint_name || null,
};
}
private async getPool(): Promise<KtxPostgresPool> {
if (!this.pool) {
let config = { ...this.poolConfig };
if (this.endpointResolver) {
const endpoint = await this.endpointResolver.resolve({
host: config.host ?? this.connection.host ?? 'localhost',
port: config.port ?? numberValue(this.connection.port) ?? 5432,
connection: this.connection,
});
this.resolvedEndpoint = endpoint;
config = { ...config, host: endpoint.host, port: endpoint.port };
}
this.pool = this.poolFactory.createPool(config);
this.pool.on?.('error', (error) => {
this.lastIdlePoolError = error;
});
}
return this.pool;
}
private async queryRaw<T>(sql: string, params?: unknown[]): Promise<T[]> {
this.throwIdlePoolErrorIfPresent();
const pool = await this.getPool();
const client = await pool.connect();
try {
const result = await client.query(sql, params);
return result.rows as T[];
} finally {
client.release();
}
}
private async query(sql: string, params?: Record<string, unknown> | unknown[]): Promise<KtxQueryResult> {
this.throwIdlePoolErrorIfPresent();
const pool = await this.getPool();
const client = await pool.connect();
try {
const result = await client.query(assertReadOnlySql(sql), Array.isArray(params) ? params : undefined);
return {
headers: (result.fields ?? []).map((field) => field.name),
headerTypes: (result.fields ?? []).map((field) => PG_OID_TYPE_MAP[field.dataTypeID] ?? `oid:${field.dataTypeID}`),
rows: queryRows(result),
totalRows: result.rows.length,
rowCount: result.rows.length,
};
} finally {
client.release();
}
}
private assertConnection(connectionId: string): void {
if (connectionId !== this.connectionId) {
throw new Error(`PostgreSQL connector ${this.connectionId} cannot run scan for ${connectionId}`);
}
}
private throwIdlePoolErrorIfPresent(): void {
if (!this.lastIdlePoolError) {
return;
}
const error = this.lastIdlePoolError;
this.lastIdlePoolError = null;
throw error;
}
}

View file

@ -0,0 +1,52 @@
import { describe, expect, it } from 'vitest';
import { KtxPostgresDialect } from './dialect.js';
describe('KtxPostgresDialect', () => {
const dialect = new KtxPostgresDialect();
it('quotes identifiers and formats schema-qualified tables', () => {
expect(dialect.quoteIdentifier('order"items')).toBe('"order""items"');
expect(dialect.formatTableName({ catalog: null, db: 'public', name: 'orders' })).toBe('"public"."orders"');
expect(dialect.formatTableName({ catalog: null, db: null, name: 'orders' })).toBe('"orders"');
});
it('maps native PostgreSQL types to KTX dimension types', () => {
expect(dialect.mapToDimensionType('timestamp with time zone')).toBe('time');
expect(dialect.mapToDimensionType('numeric(12,2)')).toBe('number');
expect(dialect.mapToDimensionType('uuid')).toBe('string');
expect(dialect.mapToDimensionType('boolean')).toBe('boolean');
expect(dialect.mapToDimensionType('jsonb')).toBe('string');
});
it('generates sample, distinct-value, statistics, and time SQL', () => {
expect(dialect.generateSampleQuery('"public"."orders"', 5, ['id', 'status'])).toBe(
'SELECT "id", "status" FROM "public"."orders" LIMIT 5',
);
expect(dialect.generateColumnSampleQuery('"public"."orders"', 'status', 10)).toContain(
'TRIM(CAST("status" AS TEXT)) != \'\'',
);
expect(dialect.generateDistinctValuesQuery('"public"."orders"', '"status"', 20)).toContain(
'SELECT DISTINCT "status"::text AS val',
);
expect(dialect.generateColumnStatisticsQuery('public', 'orders')).toContain('FROM pg_stats s');
expect(dialect.getTimeTruncExpression('"created_at"', 'month')).toBe('DATE_TRUNC(\'month\', "created_at")');
});
it('prepares named parameters with PostgreSQL positional parameters', () => {
expect(
dialect.prepareQuery('select * from orders where id = :id and status = :status', { id: 1, status: 'paid' }),
).toEqual({
sql: 'select * from orders where id = $1 and status = $2',
params: [1, 'paid'],
});
expect(
dialect.prepareQuery('select :Client_Name_10, :Client_Name_1', {
Client_Name_1: 'short',
Client_Name_10: 'long',
}),
).toEqual({
sql: 'select $2, $1',
params: ['short', 'long'],
});
});
});

View file

@ -0,0 +1,213 @@
import type { KtxSchemaDimensionType, KtxTableRef } from '../../context/scan/types.js';
type PostgresTableNameRef = Pick<KtxTableRef, 'name'> & Partial<Pick<KtxTableRef, 'catalog' | 'db'>>;
export class KtxPostgresDialect {
readonly type = 'postgresql';
private readonly typeMappings: Record<string, KtxSchemaDimensionType> = {
timestamp: 'time',
'timestamp without time zone': 'time',
'timestamp with time zone': 'time',
timestamptz: 'time',
datetime: 'time',
date: 'time',
time: 'time',
integer: 'number',
int: 'number',
int2: 'number',
int4: 'number',
int8: 'number',
bigint: 'number',
smallint: 'number',
decimal: 'number',
numeric: 'number',
float: 'number',
float4: 'number',
float8: 'number',
'double precision': 'number',
real: 'number',
money: 'number',
text: 'string',
varchar: 'string',
'character varying': 'string',
char: 'string',
character: 'string',
uuid: 'string',
json: 'string',
jsonb: 'string',
boolean: 'boolean',
bool: 'boolean',
};
quoteIdentifier(identifier: string): string {
return `"${identifier.replace(/"/g, '""')}"`;
}
formatTableName(table: PostgresTableNameRef): string {
return table.db
? `${this.quoteIdentifier(table.db)}.${this.quoteIdentifier(table.name)}`
: this.quoteIdentifier(table.name);
}
mapDataType(nativeType: string): string {
return nativeType;
}
mapToDimensionType(nativeType: string): KtxSchemaDimensionType {
if (!nativeType) {
return 'string';
}
const lower = nativeType.toLowerCase().trim();
const normalized = lower.includes('(') ? lower.split('(')[0]!.trim() : lower;
if (this.typeMappings[normalized]) {
return this.typeMappings[normalized];
}
if (normalized.includes('time') || normalized.includes('date')) {
return 'time';
}
if (
normalized.includes('int') ||
normalized.includes('num') ||
normalized.includes('dec') ||
normalized.includes('float') ||
normalized.includes('double')
) {
return 'number';
}
if (normalized.includes('bool')) {
return 'boolean';
}
return 'string';
}
generateSampleQuery(tableName: string, limit: number, columns?: string[]): string {
const columnList =
columns && columns.length > 0 ? columns.map((column) => this.quoteIdentifier(column)).join(', ') : '*';
return `SELECT ${columnList} FROM ${tableName} LIMIT ${limit}`;
}
generateColumnSampleQuery(tableName: string, columnName: string, limit: number): string {
const quotedColumn = this.quoteIdentifier(columnName);
return `SELECT ${quotedColumn} FROM ${tableName} WHERE ${quotedColumn} IS NOT NULL AND TRIM(CAST(${quotedColumn} AS TEXT)) != '' LIMIT ${limit}`;
}
prepareQuery(sql: string, params?: Record<string, unknown>): { sql: string; params?: unknown[] } {
if (!params) {
return { sql, params: undefined };
}
const paramNames = Object.keys(params);
const values: unknown[] = new Array(paramNames.length);
const paramIndexMap = new Map<string, number>();
paramNames.forEach((name, index) => {
paramIndexMap.set(name, index + 1);
values[index] = params[name];
});
const sortedKeys = [...paramNames].sort((a, b) => b.length - a.length);
let parameterizedQuery = sql;
for (const name of sortedKeys) {
parameterizedQuery = parameterizedQuery.replace(new RegExp(`:${name}\\b`, 'g'), `$${paramIndexMap.get(name)}`);
}
return { sql: parameterizedQuery, params: values };
}
getRandomSampleFilter(samplePct: number): string {
if (samplePct <= 0 || samplePct >= 1) {
return '';
}
return `RANDOM() < ${samplePct}`;
}
getTableSampleClause(samplePct: number): string {
if (samplePct <= 0 || samplePct >= 1) {
return '';
}
return `TABLESAMPLE SYSTEM (${samplePct * 100})`;
}
getLimitOffsetClause(limit: number, offset?: number): string {
return offset !== undefined && offset > 0 ? `LIMIT ${limit} OFFSET ${offset}` : `LIMIT ${limit}`;
}
getNullCountExpression(column: string): string {
return `COUNT(*) FILTER (WHERE ${column} IS NULL)`;
}
getDistinctCountExpression(column: string): string {
return `COUNT(DISTINCT ${column})`;
}
generateCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string {
return `
WITH sampled AS (
SELECT ${columnName} AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
LIMIT ${sampleSize}
)
SELECT COUNT(DISTINCT val) AS cardinality
FROM sampled
`;
}
generateDistinctValuesQuery(tableName: string, columnName: string, limit: number): string {
return `
SELECT DISTINCT ${columnName}::text AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
ORDER BY val
LIMIT ${limit}
`;
}
generateColumnStatisticsQuery(schemaName: string, tableName: string): string | null {
return `
SELECT
s.attname AS column_name,
CASE
WHEN s.n_distinct > 0 THEN s.n_distinct::bigint
WHEN s.n_distinct < 0 THEN (-s.n_distinct * c.reltuples)::bigint
ELSE NULL
END AS estimated_cardinality
FROM pg_stats s
JOIN pg_class c ON c.relname = s.tablename
JOIN pg_namespace n ON c.relnamespace = n.oid AND n.nspname = s.schemaname
WHERE s.schemaname = '${schemaName.replace(/'/g, "''")}'
AND s.tablename = '${tableName.replace(/'/g, "''")}'
AND s.n_distinct IS NOT NULL
`;
}
generateRandomizedCardinalitySampleQuery(tableName: string, columnName: string, sampleSize: number): string {
return `
WITH sampled AS (
SELECT ${columnName} AS val
FROM ${tableName}
WHERE ${columnName} IS NOT NULL
ORDER BY RANDOM()
LIMIT ${sampleSize}
)
SELECT COUNT(DISTINCT val) AS cardinality
FROM sampled
`;
}
getTimeTruncExpression(
column: string,
granularity: 'day' | 'week' | 'month' | 'quarter' | 'year',
timezone?: string,
): string {
const col = timezone ? `(${column} AT TIME ZONE '${timezone.replace(/'/g, "''")}')` : column;
return `DATE_TRUNC('${granularity}', ${col})`;
}
getCustomTimeTruncExpression(column: string, interval: string, origin?: string, timezone?: string): string {
const col = timezone ? `(${column} AT TIME ZONE '${timezone.replace(/'/g, "''")}')` : column;
const originExpr = origin ? `TIMESTAMP '${origin.replace(/'/g, "''")}'` : "TIMESTAMP '1970-01-01'";
return `${originExpr} + FLOOR(EXTRACT(EPOCH FROM (${col} - ${originExpr})) / EXTRACT(EPOCH FROM INTERVAL '${interval.replace(/'/g, "''")}')) * INTERVAL '${interval.replace(/'/g, "''")}'`;
}
parseIntervalToSql(interval: string): string {
return `INTERVAL '${interval.replace(/'/g, "''")}'`;
}
}

View file

@ -0,0 +1,49 @@
import { describe, expect, it, vi } from 'vitest';
import { KtxPostgresHistoricSqlQueryClient } from './historic-sql-query-client.js';
import type { KtxPostgresPoolConfig, KtxPostgresPoolFactory } from './connector.js';
describe('KtxPostgresHistoricSqlQueryClient', () => {
it('executes parameterized read-only SQL through the native Postgres connector pool', async () => {
const queryCalls: Array<{ sql: string; params?: unknown[] }> = [];
const release = vi.fn();
const end = vi.fn(async () => {});
const poolFactory: KtxPostgresPoolFactory = {
createPool(_config: KtxPostgresPoolConfig) {
return {
async connect() {
return {
async query(sql: string, params?: unknown[]) {
queryCalls.push({ sql, params });
return {
fields: [{ name: 'answer', dataTypeID: 23 }],
rows: [{ answer: 42 }],
};
},
release,
};
},
end,
};
},
};
const client = new KtxPostgresHistoricSqlQueryClient({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
url: 'postgresql://readonly:secret@pg.example.test/warehouse', // pragma: allowlist secret
},
poolFactory,
});
await expect(client.executeQuery('SELECT $1::int AS answer', [42])).resolves.toEqual({
headers: ['answer'],
rows: [[42]],
totalRows: 1,
});
expect(queryCalls).toEqual([{ sql: 'SELECT $1::int AS answer', params: [42] }]);
await client.cleanup();
expect(release).toHaveBeenCalledTimes(1);
expect(end).toHaveBeenCalledTimes(1);
});
});

View file

@ -0,0 +1,37 @@
import type { KtxPostgresQueryClient } from '../../context/ingest/adapters/historic-sql/types.js';
import { KtxPostgresScanConnector, type KtxPostgresScanConnectorOptions } from './connector.js';
export type KtxPostgresHistoricSqlQueryClientOptions = KtxPostgresScanConnectorOptions;
export class KtxPostgresHistoricSqlQueryClient implements KtxPostgresQueryClient {
private readonly connectionId: string;
private readonly connector: KtxPostgresScanConnector;
constructor(options: KtxPostgresHistoricSqlQueryClientOptions) {
this.connectionId = options.connectionId;
this.connector = new KtxPostgresScanConnector(options);
}
async executeQuery(
sql: string,
params?: unknown[],
): Promise<{ headers: string[]; rows: unknown[][]; totalRows: number }> {
const result = await this.connector.executeReadOnly(
{
connectionId: this.connectionId,
sql,
params,
},
{} as never,
);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
};
}
async cleanup(): Promise<void> {
await this.connector.cleanup();
}
}

View file

@ -0,0 +1,37 @@
import type { LiveDatabaseIntrospectionPort } from '../../context/ingest/adapters/live-database/types.js';
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
import {
KtxPostgresScanConnector,
type KtxPostgresConnectionConfig,
type KtxPostgresEndpointResolver,
type KtxPostgresPoolFactory,
} from './connector.js';
interface CreatePostgresLiveDatabaseIntrospectionOptions {
connections: Record<string, KtxProjectConnectionConfig>;
poolFactory?: KtxPostgresPoolFactory;
endpointResolver?: KtxPostgresEndpointResolver;
now?: () => Date;
}
export function createPostgresLiveDatabaseIntrospection(
options: CreatePostgresLiveDatabaseIntrospectionOptions,
): LiveDatabaseIntrospectionPort {
return {
async extractSchema(connectionId: string) {
const connection = options.connections[connectionId] as KtxPostgresConnectionConfig | undefined;
const connector = new KtxPostgresScanConnector({
connectionId,
connection,
poolFactory: options.poolFactory,
endpointResolver: options.endpointResolver,
now: options.now,
});
try {
return await connector.introspect({ connectionId, driver: 'postgres' }, { runId: `postgres-${connectionId}` });
} finally {
await connector.cleanup();
}
},
};
}