ktx/packages/cli/src/context/connections/postgres-query-executor.ts

79 lines
2.6 KiB
TypeScript
Raw Normal View History

2026-05-10 23:12:26 +02:00
import { Client, type ClientConfig } from 'pg';
import type {
2026-05-10 23:51:24 +02:00
KtxSqlQueryExecutionInput,
KtxSqlQueryExecutionResult,
KtxSqlQueryExecutorPort,
2026-05-10 23:12:26 +02:00
} from './query-executor.js';
import { limitSqlForExecution } from './read-only-sql.js';
interface PgClientLike {
connect(): Promise<unknown>;
query(input: string | { text: string; rowMode: 'array' }): Promise<{
fields: Array<{ name: string }>;
rows: unknown[][];
command: string;
rowCount: number | null;
}>;
end(): Promise<void>;
}
interface PostgresQueryExecutorOptions {
statementTimeoutMs?: number;
queryTimeoutMs?: number;
connectionTimeoutMs?: number;
clientFactory?: (config: ClientConfig) => PgClientLike;
}
2026-05-10 23:51:24 +02:00
function connectionDriver(input: KtxSqlQueryExecutionInput): string {
2026-05-10 23:12:26 +02:00
return String(input.connection?.driver ?? '').toLowerCase();
}
function createDefaultClient(config: ClientConfig): PgClientLike {
return new Client(config);
}
2026-05-10 23:51:24 +02:00
export function createPostgresQueryExecutor(options: PostgresQueryExecutorOptions = {}): KtxSqlQueryExecutorPort {
2026-05-10 23:12:26 +02:00
const clientFactory = options.clientFactory ?? createDefaultClient;
return {
2026-05-10 23:51:24 +02:00
async execute(input: KtxSqlQueryExecutionInput): Promise<KtxSqlQueryExecutionResult> {
2026-05-10 23:12:26 +02:00
const driver = connectionDriver(input);
const connection = input.connection;
2026-05-10 23:12:26 +02:00
if (driver !== 'postgres' && driver !== 'postgresql') {
throw new Error(`Local Postgres execution cannot run driver "${connection?.driver ?? 'unknown'}".`);
2026-05-10 23:12:26 +02:00
}
if (typeof connection?.url !== 'string' || connection.url.trim().length === 0) {
2026-05-10 23:12:26 +02:00
throw new Error(`Local Postgres execution requires connections.${input.connectionId}.url.`);
}
const client = clientFactory({
connectionString: connection.url,
2026-05-10 23:12:26 +02:00
statement_timeout: options.statementTimeoutMs ?? 30_000,
query_timeout: options.queryTimeoutMs ?? 35_000,
connectionTimeoutMillis: options.connectionTimeoutMs ?? 5_000,
2026-05-10 23:51:24 +02:00
application_name: 'ktx-local-query',
2026-05-10 23:12:26 +02:00
});
await client.connect();
try {
await client.query('BEGIN READ ONLY');
const result = await client.query({
text: limitSqlForExecution(input.sql, input.maxRows),
rowMode: 'array',
});
await client.query('COMMIT');
return {
headers: result.fields.map((field) => field.name),
rows: result.rows,
totalRows: result.rows.length,
command: result.command,
rowCount: result.rowCount,
};
} catch (error) {
await client.query('ROLLBACK').catch(() => undefined);
throw error;
} finally {
await client.end();
}
},
};
}