diff --git a/packages/cli/src/connectors/bigquery/connector.test.ts b/packages/cli/src/connectors/bigquery/connector.test.ts index b9893ccf..65a3da68 100644 --- a/packages/cli/src/connectors/bigquery/connector.test.ts +++ b/packages/cli/src/connectors/bigquery/connector.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from 'vitest'; -import { bigQueryConnectionConfigFromConfig, isKtxBigQueryConnectionConfig, type KtxBigQueryClient, KtxBigQueryScanConnector, type KtxBigQueryClientFactory, type KtxBigQueryDataset, type KtxBigQueryQueryJob, type KtxBigQueryTableRef } from '../../connectors/bigquery/connector.js'; +import { bigQueryConnectionConfigFromConfig, isKtxBigQueryConnectionConfig, type KtxBigQueryClient, KtxBigQueryScanConnector, type KtxBigQueryClientFactory, type KtxBigQueryDataset, type KtxBigQueryQueryJob, type KtxBigQueryTableRef, prepareBigQueryReadOnlyQuery } from '../../connectors/bigquery/connector.js'; import { createBigQueryLiveDatabaseIntrospection } from '../../connectors/bigquery/live-database-introspection.js'; import { tableRefSet } from '../../context/scan/table-ref.js'; @@ -98,6 +98,17 @@ const connection = { } as const; describe('KtxBigQueryScanConnector', () => { + it('prepares read-only SQL parameters with BigQuery named placeholders', () => { + expect(prepareBigQueryReadOnlyQuery('SELECT * FROM orders WHERE id = :id AND id_2 = :id_2', { id: 1, id_2: 2 })).toEqual({ + sql: 'SELECT * FROM orders WHERE id = @id AND id_2 = @id_2', + params: { id: 1, id_2: 2 }, + }); + expect(prepareBigQueryReadOnlyQuery('SELECT * FROM orders')).toEqual({ + sql: 'SELECT * FROM orders', + params: undefined, + }); + }); + it('resolves configuration safely', () => { expect(isKtxBigQueryConnectionConfig(connection)).toBe(true); expect(isKtxBigQueryConnectionConfig({ driver: 'mysql' })).toBe(false); diff --git a/packages/cli/src/connectors/bigquery/connector.ts b/packages/cli/src/connectors/bigquery/connector.ts index 871f50f4..cd285d35 100644 --- a/packages/cli/src/connectors/bigquery/connector.ts +++ b/packages/cli/src/connectors/bigquery/connector.ts @@ -235,6 +235,23 @@ function normalizeValue(value: unknown): unknown { return value; } +/** @internal */ +export function prepareBigQueryReadOnlyQuery( + sql: string, + params?: Record, +): { sql: string; params?: Record } { + if (!params) { + return { sql, params: undefined }; + } + let processedSql = sql; + const processedParams: Record = {}; + for (const [key, value] of Object.entries(params)) { + processedSql = processedSql.replace(new RegExp(`:${key}\\b`, 'g'), `@${key}`); + processedParams[key] = value; + } + return { sql: processedSql, params: Object.keys(processedParams).length > 0 ? processedParams : undefined }; +} + export function isKtxBigQueryConnectionConfig( connection: KtxBigQueryConnectionConfig | undefined, ): connection is KtxBigQueryConnectionConfig { @@ -364,7 +381,7 @@ export class KtxBigQueryScanConnector implements KtxScanConnector { async executeReadOnly(input: KtxBigQueryReadOnlyQueryInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows); - const prepared = this.dialect.prepareQuery(limitedSql, input.params); + const prepared = prepareBigQueryReadOnlyQuery(limitedSql, input.params); const result = await this.query(prepared.sql, prepared.params); return { ...result, rowCount: result.rows.length }; } diff --git a/packages/cli/src/connectors/clickhouse/connector.test.ts b/packages/cli/src/connectors/clickhouse/connector.test.ts index abc7cad5..bdf244b4 100644 --- a/packages/cli/src/connectors/clickhouse/connector.test.ts +++ b/packages/cli/src/connectors/clickhouse/connector.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from 'vitest'; -import { clickHouseClientConfigFromConfig, isKtxClickHouseConnectionConfig, KtxClickHouseScanConnector, type KtxClickHouseClientFactory } from '../../connectors/clickhouse/connector.js'; +import { clickHouseClientConfigFromConfig, isKtxClickHouseConnectionConfig, KtxClickHouseScanConnector, prepareClickHouseReadOnlyQuery, type KtxClickHouseClientFactory } from '../../connectors/clickhouse/connector.js'; import { createClickHouseLiveDatabaseIntrospection } from '../../connectors/clickhouse/live-database-introspection.js'; import { tableRefSet } from '../../context/scan/table-ref.js'; @@ -136,6 +136,33 @@ function multiDatabaseClickHouseClientFactory(): KtxClickHouseClientFactory { } describe('KtxClickHouseScanConnector', () => { + it('prepares read-only SQL parameters with ClickHouse typed placeholders', () => { + expect( + prepareClickHouseReadOnlyQuery('select * from events where id = :id and event_name = :name', { + id: 10, + name: 'signup', + }), + ).toEqual({ + sql: 'select * from events where id = {id:Int64} and event_name = {name:String}', + params: { id: 10, name: 'signup' }, + }); + expect( + prepareClickHouseReadOnlyQuery('select * from events where enabled = :enabled and ratio = :ratio and created_at = :created_at', { + enabled: true, + ratio: 1.5, + created_at: new Date('2026-05-25T00:00:00.000Z'), + }), + ).toEqual({ + sql: 'select * from events where enabled = {enabled:Bool} and ratio = {ratio:Float64} and created_at = {created_at:DateTime}', + params: { + enabled: true, + ratio: 1.5, + created_at: new Date('2026-05-25T00:00:00.000Z'), + }, + }); + expect(prepareClickHouseReadOnlyQuery('select 1')).toEqual({ sql: 'select 1', params: undefined }); + }); + it('resolves ClickHouse connection configuration safely', () => { expect(isKtxClickHouseConnectionConfig({ driver: 'clickhouse', host: 'localhost', database: 'analytics' })).toBe( true, diff --git a/packages/cli/src/connectors/clickhouse/connector.ts b/packages/cli/src/connectors/clickhouse/connector.ts index a2ee568c..cfced4fd 100644 --- a/packages/cli/src/connectors/clickhouse/connector.ts +++ b/packages/cli/src/connectors/clickhouse/connector.ts @@ -198,6 +198,49 @@ function clickHouseTableKey(database: string, table: string): string { return `${database}.${table}`; } +function inferClickHouseQueryParamType(value: unknown): string { + if (value === null || value === undefined) { + return 'String'; + } + if (typeof value === 'boolean') { + return 'Bool'; + } + if (typeof value === 'number') { + return Number.isInteger(value) ? 'Int64' : 'Float64'; + } + if (value instanceof Date) { + return 'DateTime'; + } + return 'String'; +} + +/** @internal */ +export function prepareClickHouseReadOnlyQuery( + sql: string, + params?: Record, +): { sql: string; params?: Record } { + if (!params) { + return { sql, params: undefined }; + } + + let parameterizedQuery = sql; + const queryParams: Record = {}; + const sortedKeys = Object.keys(params).sort((a, b) => b.length - a.length); + + for (const key of sortedKeys) { + const placeholder = `:${key}`; + if (parameterizedQuery.includes(placeholder)) { + parameterizedQuery = parameterizedQuery.replace( + new RegExp(`:${key}\\b`, 'g'), + `{${key}:${inferClickHouseQueryParamType(params[key])}}`, + ); + queryParams[key] = params[key]; + } + } + + return { sql: parameterizedQuery, params: Object.keys(queryParams).length > 0 ? queryParams : undefined }; +} + export function isKtxClickHouseConnectionConfig( connection: KtxClickHouseConnectionConfig | undefined, ): connection is KtxClickHouseConnectionConfig { @@ -408,7 +451,7 @@ export class KtxClickHouseScanConnector implements KtxScanConnector { async executeReadOnly(input: KtxClickHouseReadOnlyQueryInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows); - const prepared = this.dialect.prepareQuery(limitedSql, input.params); + const prepared = prepareClickHouseReadOnlyQuery(limitedSql, input.params); const result = await this.query(prepared.sql, prepared.params); return { ...result, rowCount: result.rows.length }; } diff --git a/packages/cli/src/connectors/mysql/connector.test.ts b/packages/cli/src/connectors/mysql/connector.test.ts index 6c69ea3d..75c8d964 100644 --- a/packages/cli/src/connectors/mysql/connector.test.ts +++ b/packages/cli/src/connectors/mysql/connector.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it, vi } from 'vitest'; import type { FieldPacket, RowDataPacket } from 'mysql2/promise'; import { createMysqlLiveDatabaseIntrospection } from '../../connectors/mysql/live-database-introspection.js'; -import { isKtxMysqlConnectionConfig, KtxMysqlScanConnector, mysqlConnectionPoolConfigFromConfig, type KtxMysqlConnectionConfig, type KtxMysqlPoolFactory } from '../../connectors/mysql/connector.js'; +import { isKtxMysqlConnectionConfig, KtxMysqlScanConnector, mysqlConnectionPoolConfigFromConfig, prepareMysqlReadOnlyQuery, type KtxMysqlConnectionConfig, type KtxMysqlPoolFactory } from '../../connectors/mysql/connector.js'; import { tableRefSet } from '../../context/scan/table-ref.js'; function mysqlResult(rows: Record[], fields: Array<{ name: string; type?: number }>): [RowDataPacket[], FieldPacket[]] { @@ -173,6 +173,19 @@ function multiSchemaMysqlPoolFactory( } describe('KtxMysqlScanConnector', () => { + it('prepares read-only SQL parameters with MySQL positional placeholders', () => { + expect( + prepareMysqlReadOnlyQuery('select * from orders where id = :id and status = :status', { + status: 'paid', + id: 10, + }), + ).toEqual({ + sql: 'select * from orders where id = ? and status = ?', + params: [10, 'paid'], + }); + expect(prepareMysqlReadOnlyQuery('select 1')).toEqual({ sql: 'select 1', params: undefined }); + }); + it('resolves MySQL connection configuration safely', () => { expect(isKtxMysqlConnectionConfig({ driver: 'mysql', host: 'localhost', database: 'analytics' })).toBe(true); expect(isKtxMysqlConnectionConfig({ driver: 'postgres', host: 'localhost', database: 'analytics' })).toBe(false); diff --git a/packages/cli/src/connectors/mysql/connector.ts b/packages/cli/src/connectors/mysql/connector.ts index 83c9712a..5a2ab614 100644 --- a/packages/cli/src/connectors/mysql/connector.ts +++ b/packages/cli/src/connectors/mysql/connector.ts @@ -303,6 +303,25 @@ function queryParams(params: Record | unknown[] | undefined): u return Array.isArray(params) ? params : Object.values(params); } +/** @internal */ +export function prepareMysqlReadOnlyQuery( + sql: string, + params?: Record, +): { sql: string; params?: unknown[] } { + if (!params) { + return { sql, params: undefined }; + } + const values: unknown[] = []; + const parameterizedQuery = sql.replace(/:([A-Za-z_][A-Za-z0-9_]*)\b/g, (placeholder, key: string) => { + if (!(key in params)) { + return placeholder; + } + values.push(params[key]); + return '?'; + }); + return { sql: parameterizedQuery, params: values }; +} + export function isKtxMysqlConnectionConfig( connection: KtxMysqlConnectionConfig | undefined, ): connection is KtxMysqlConnectionConfig { @@ -550,7 +569,7 @@ export class KtxMysqlScanConnector implements KtxScanConnector { const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows); const prepared = Array.isArray(input.params) ? { sql: limitedSql, params: input.params } - : this.dialect.prepareQuery(limitedSql, input.params); + : prepareMysqlReadOnlyQuery(limitedSql, input.params); const result = await this.query(prepared.sql, prepared.params); return { ...result, rowCount: result.rows.length }; } diff --git a/packages/cli/src/connectors/postgres/connector.test.ts b/packages/cli/src/connectors/postgres/connector.test.ts index d9fa45cf..8c92ee5c 100644 --- a/packages/cli/src/connectors/postgres/connector.test.ts +++ b/packages/cli/src/connectors/postgres/connector.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it, vi } from 'vitest'; import { createPostgresLiveDatabaseIntrospection } from '../../connectors/postgres/live-database-introspection.js'; -import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, type KtxPostgresConnectionConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js'; +import { isKtxPostgresConnectionConfig, KtxPostgresScanConnector, postgresPoolConfigFromConfig, preparePostgresReadOnlyQuery, type KtxPostgresConnectionConfig, type KtxPostgresPoolFactory } from '../../connectors/postgres/connector.js'; import { tableRefSet } from '../../context/scan/table-ref.js'; interface FakeQueryResult { @@ -102,6 +102,28 @@ function metadataResults(): Map { } describe('KtxPostgresScanConnector', () => { + it('prepares read-only SQL parameters with PostgreSQL positional placeholders', () => { + expect( + preparePostgresReadOnlyQuery('select * from orders where id = :id and status = :status', { + id: 1, + status: 'paid', + }), + ).toEqual({ + sql: 'select * from orders where id = $1 and status = $2', + params: [1, 'paid'], + }); + expect( + preparePostgresReadOnlyQuery('select :Client_Name_10, :Client_Name_1', { + Client_Name_1: 'short', + Client_Name_10: 'long', + }), + ).toEqual({ + sql: 'select $2, $1', + params: ['short', 'long'], + }); + expect(preparePostgresReadOnlyQuery('select 1')).toEqual({ sql: 'select 1', params: undefined }); + }); + it('resolves configuration safely', () => { expect(isKtxPostgresConnectionConfig({ driver: 'postgres', url: 'env:DATABASE_URL' })).toBe(true); expect(isKtxPostgresConnectionConfig({ driver: 'postgresql', host: 'db', database: 'analytics' })).toBe(false); diff --git a/packages/cli/src/connectors/postgres/connector.ts b/packages/cli/src/connectors/postgres/connector.ts index 1bab5e49..fc487bd5 100644 --- a/packages/cli/src/connectors/postgres/connector.ts +++ b/packages/cli/src/connectors/postgres/connector.ts @@ -219,6 +219,29 @@ function groupByTable(rows: T[]): Map, +): { sql: string; params?: unknown[] } { + if (!params) { + return { sql, params: undefined }; + } + const paramNames = Object.keys(params); + const values: unknown[] = new Array(paramNames.length); + const paramIndexMap = new Map(); + paramNames.forEach((name, index) => { + paramIndexMap.set(name, index + 1); + values[index] = params[name]; + }); + const sortedKeys = [...paramNames].sort((a, b) => b.length - a.length); + let parameterizedQuery = sql; + for (const name of sortedKeys) { + parameterizedQuery = parameterizedQuery.replace(new RegExp(`:${name}\\b`, 'g'), `$${paramIndexMap.get(name)}`); + } + return { sql: parameterizedQuery, params: values }; +} + function primaryKeyMap(rows: PostgresPrimaryKeyRow[]): Map> { const grouped = new Map>(); for (const row of rows) { @@ -489,7 +512,7 @@ export class KtxPostgresScanConnector implements KtxScanConnector { const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows); const prepared = Array.isArray(input.params) ? { sql: limitedSql, params: input.params } - : this.dialect.prepareQuery(limitedSql, input.params); + : preparePostgresReadOnlyQuery(limitedSql, input.params); const result = await this.query(prepared.sql, prepared.params); return { ...result, rowCount: result.rows.length }; } diff --git a/packages/cli/src/connectors/snowflake/connector.test.ts b/packages/cli/src/connectors/snowflake/connector.test.ts index 657dbaf1..51b3181a 100644 --- a/packages/cli/src/connectors/snowflake/connector.test.ts +++ b/packages/cli/src/connectors/snowflake/connector.test.ts @@ -8,7 +8,7 @@ vi.mock('snowflake-sdk', () => ({ })); import { createSnowflakeLiveDatabaseIntrospection } from '../../connectors/snowflake/live-database-introspection.js'; -import { isKtxSnowflakeConnectionConfig, KtxSnowflakeScanConnector, snowflakeConnectionConfigFromConfig, type KtxSnowflakeConnectionConfig, type KtxSnowflakeDriver, type KtxSnowflakeDriverFactory } from '../../connectors/snowflake/connector.js'; +import { isKtxSnowflakeConnectionConfig, KtxSnowflakeScanConnector, prepareSnowflakeReadOnlyQuery, snowflakeConnectionConfigFromConfig, type KtxSnowflakeConnectionConfig, type KtxSnowflakeDriver, type KtxSnowflakeDriverFactory } from '../../connectors/snowflake/connector.js'; import { tableRefSet } from '../../context/scan/table-ref.js'; function fakeDriverFactory(): KtxSnowflakeDriverFactory { @@ -105,6 +105,17 @@ function installSnowflakePoolMock() { } describe('KtxSnowflakeScanConnector', () => { + it('prepares read-only SQL parameters with Snowflake bind arrays', () => { + expect(prepareSnowflakeReadOnlyQuery('SELECT * FROM ORDERS WHERE ID = ? AND STATUS = ?', { id: 1, status: 'paid' })).toEqual({ + sql: 'SELECT * FROM ORDERS WHERE ID = ? AND STATUS = ?', + params: [1, 'paid'], + }); + expect(prepareSnowflakeReadOnlyQuery('SELECT * FROM ORDERS')).toEqual({ + sql: 'SELECT * FROM ORDERS', + params: undefined, + }); + }); + it('resolves Snowflake connection configuration safely', () => { expect( isKtxSnowflakeConnectionConfig({ diff --git a/packages/cli/src/connectors/snowflake/connector.ts b/packages/cli/src/connectors/snowflake/connector.ts index d8737559..2df2a85f 100644 --- a/packages/cli/src/connectors/snowflake/connector.ts +++ b/packages/cli/src/connectors/snowflake/connector.ts @@ -229,6 +229,14 @@ function toSnowflakeBinds(params: unknown[] | undefined): Binds | undefined { return params?.map((value) => toSnowflakeBind(value)); } +/** @internal */ +export function prepareSnowflakeReadOnlyQuery( + sql: string, + params?: Record, +): { sql: string; params?: unknown[] } { + return { sql, params: params ? Object.values(params) : undefined }; +} + export function isKtxSnowflakeConnectionConfig( connection: KtxSnowflakeConnectionConfig | undefined, ): connection is KtxSnowflakeConnectionConfig { @@ -635,7 +643,7 @@ export class KtxSnowflakeScanConnector implements KtxScanConnector { async executeReadOnly(input: KtxSnowflakeReadOnlyQueryInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const limitedSql = limitSqlForExecution(assertReadOnlySql(input.sql), input.maxRows); - const prepared = this.dialect.prepareQuery(limitedSql, input.params); + const prepared = prepareSnowflakeReadOnlyQuery(limitedSql, input.params); return this.getDriver().query(prepared.sql, prepared.params); } diff --git a/packages/cli/src/connectors/sqlserver/connector.test.ts b/packages/cli/src/connectors/sqlserver/connector.test.ts index 3a02130f..672b499c 100644 --- a/packages/cli/src/connectors/sqlserver/connector.test.ts +++ b/packages/cli/src/connectors/sqlserver/connector.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it, vi } from 'vitest'; import { createSqlServerLiveDatabaseIntrospection } from '../../connectors/sqlserver/live-database-introspection.js'; -import { isKtxSqlServerConnectionConfig, KtxSqlServerScanConnector, sqlServerConnectionPoolConfigFromConfig, type KtxSqlServerConnectionConfig, type KtxSqlServerPoolFactory, type KtxSqlServerQueryResult } from '../../connectors/sqlserver/connector.js'; +import { isKtxSqlServerConnectionConfig, KtxSqlServerScanConnector, prepareSqlServerReadOnlyQuery, sqlServerConnectionPoolConfigFromConfig, type KtxSqlServerConnectionConfig, type KtxSqlServerPoolFactory, type KtxSqlServerQueryResult } from '../../connectors/sqlserver/connector.js'; import { tableRefSet } from '../../context/scan/table-ref.js'; function recordset>( @@ -140,6 +140,19 @@ function fakePoolFactory(options: { primaryKeyError?: Error; foreignKeyError?: E } describe('KtxSqlServerScanConnector', () => { + it('prepares read-only SQL parameters with SQL Server named placeholders', () => { + expect( + prepareSqlServerReadOnlyQuery('select * from events where id = :id and name = :name', { + id: 10, + name: 'signup', + }), + ).toEqual({ + sql: 'select * from events where id = @id and name = @name', + params: { id: 10, name: 'signup' }, + }); + expect(prepareSqlServerReadOnlyQuery('select 1')).toEqual({ sql: 'select 1', params: undefined }); + }); + it('resolves SQL Server connection configuration safely', () => { expect( isKtxSqlServerConnectionConfig({ diff --git a/packages/cli/src/connectors/sqlserver/connector.ts b/packages/cli/src/connectors/sqlserver/connector.ts index 9895027f..a165d228 100644 --- a/packages/cli/src/connectors/sqlserver/connector.ts +++ b/packages/cli/src/connectors/sqlserver/connector.ts @@ -158,6 +158,21 @@ function tableScopeSql( return { clause: `AND ${columnExpression} IN (${placeholders.join(', ')})`, params }; } +/** @internal */ +export function prepareSqlServerReadOnlyQuery( + sql: string, + params?: Record, +): { sql: string; params?: Record } { + if (!params) { + return { sql, params: undefined }; + } + let parameterizedQuery = sql; + for (const key of Object.keys(params)) { + parameterizedQuery = parameterizedQuery.replace(new RegExp(`:${key}\\b`, 'g'), `@${key}`); + } + return { sql: parameterizedQuery, params }; +} + class DefaultSqlServerPoolFactory implements KtxSqlServerPoolFactory { async createPool(config: KtxSqlServerPoolConfig): Promise { const pool = await new sql.ConnectionPool(config as sql.config).connect(); @@ -427,7 +442,7 @@ export class KtxSqlServerScanConnector implements KtxScanConnector { async executeReadOnly(input: KtxSqlServerReadOnlyQueryInput, _ctx: KtxScanContext): Promise { this.assertConnection(input.connectionId); const limitedSql = limitSqlForSqlServerExecution(input.sql, input.maxRows); - const prepared = this.dialect.prepareQuery(limitedSql, input.params); + const prepared = prepareSqlServerReadOnlyQuery(limitedSql, input.params); const result = await this.query(prepared.sql, prepared.params); return { ...result, rowCount: result.rows.length }; }