diff --git a/packages/cli/src/ingest-query-executor.test.ts b/packages/cli/src/ingest-query-executor.test.ts new file mode 100644 index 00000000..343202a1 --- /dev/null +++ b/packages/cli/src/ingest-query-executor.test.ts @@ -0,0 +1,86 @@ +import type { KtxLocalProject } from '@ktx/context/project'; +import { createKtxConnectorCapabilities, type KtxScanConnector } from '@ktx/context/scan'; +import { describe, expect, it, vi } from 'vitest'; +import { createKtxCliIngestQueryExecutor } from './ingest-query-executor.js'; + +function project(): KtxLocalProject { + return { + projectDir: '/tmp/ktx-query-project', + config: { + project: 'warehouse', + connections: { + warehouse: { driver: 'postgres', url: 'postgresql://readonly@example.test/db' }, + }, + }, + } as unknown as KtxLocalProject; +} + +function connector(overrides: Partial = {}): KtxScanConnector { + return { + id: 'warehouse', + driver: 'postgres', + capabilities: createKtxConnectorCapabilities({ readOnlySql: true }), + async introspect() { + throw new Error('introspect is not used by this test'); + }, + executeReadOnly: vi.fn(async () => ({ + headers: ['answer'], + rows: [[1]], + totalRows: 1, + rowCount: 1, + })), + cleanup: vi.fn(async () => {}), + ...overrides, + }; +} + +describe('createKtxCliIngestQueryExecutor', () => { + it('executes read-only SQL through the scan connector and cleans it up', async () => { + const scanConnector = connector(); + const createConnector = vi.fn(async () => scanConnector); + const executor = createKtxCliIngestQueryExecutor(project(), { createConnector }); + + await expect( + executor.execute({ + connectionId: 'warehouse', + connection: { driver: 'postgres', url: 'postgresql://readonly@example.test/db' }, + projectDir: '/tmp/ktx-query-project', + sql: 'select 1', + maxRows: 5, + }), + ).resolves.toMatchObject({ + headers: ['answer'], + rows: [[1]], + totalRows: 1, + command: 'SELECT', + rowCount: 1, + }); + + expect(createConnector).toHaveBeenCalledWith(project(), 'warehouse'); + expect(scanConnector.executeReadOnly).toHaveBeenCalledWith( + { connectionId: 'warehouse', sql: 'select 1', maxRows: 5 }, + { runId: 'ingest-sql-execution' }, + ); + expect(scanConnector.cleanup).toHaveBeenCalledTimes(1); + }); + + it('rejects connectors without read-only SQL support', async () => { + const scanConnector = connector({ + capabilities: createKtxConnectorCapabilities({ readOnlySql: false }), + executeReadOnly: undefined, + }); + const executor = createKtxCliIngestQueryExecutor(project(), { + createConnector: vi.fn(async () => scanConnector), + }); + + await expect( + executor.execute({ + connectionId: 'warehouse', + connection: { driver: 'postgres' }, + projectDir: '/tmp/ktx-query-project', + sql: 'select 1', + }), + ).rejects.toThrow('Connection "warehouse" driver "postgres" does not support read-only SQL execution.'); + expect(scanConnector.cleanup).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/cli/src/ingest-query-executor.ts b/packages/cli/src/ingest-query-executor.ts new file mode 100644 index 00000000..197119be --- /dev/null +++ b/packages/cli/src/ingest-query-executor.ts @@ -0,0 +1,49 @@ +import type { KtxSqlQueryExecutionInput, KtxSqlQueryExecutorPort } from '@ktx/context/connections'; +import type { KtxLocalProject } from '@ktx/context/project'; +import type { KtxScanConnector, KtxScanContext } from '@ktx/context/scan'; +import { createKtxCliScanConnector } from './local-scan-connectors.js'; + +type CreateConnector = typeof createKtxCliScanConnector; + +export interface KtxCliIngestQueryExecutorDeps { + createConnector?: CreateConnector; +} + +async function cleanupConnector(connector: KtxScanConnector | null): Promise { + await connector?.cleanup?.(); +} + +export function createKtxCliIngestQueryExecutor( + project: KtxLocalProject, + deps: KtxCliIngestQueryExecutorDeps = {}, +): KtxSqlQueryExecutorPort { + const createConnector = deps.createConnector ?? createKtxCliScanConnector; + return { + async execute(input: KtxSqlQueryExecutionInput) { + let connector: KtxScanConnector | null = null; + try { + connector = await createConnector(project, input.connectionId); + if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) { + throw new Error( + `Connection "${input.connectionId}" driver "${connector.driver}" does not support read-only SQL execution.`, + ); + } + + const ctx: KtxScanContext = { runId: 'ingest-sql-execution' }; + const result = await connector.executeReadOnly( + { connectionId: input.connectionId, sql: input.sql, maxRows: input.maxRows }, + ctx, + ); + return { + headers: result.headers, + rows: result.rows, + totalRows: result.totalRows, + command: 'SELECT', + rowCount: result.rowCount, + }; + } finally { + await cleanupConnector(connector); + } + }, + }; +} diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 4f49216c..5bd62619 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -815,6 +815,44 @@ describe('runKtxIngest', () => { expect(runLocalIngest).toHaveBeenCalledWith(expect.objectContaining({ llmDebugRequestFile: debugFile })); }); + it('supplies a scan-connector query executor to local ingest runs', async () => { + const io = makeIo(); + const projectDir = join(tempDir, 'query-executor-project'); + await writeWarehouseConfig(projectDir); + const queryExecutor = { + execute: vi.fn(async () => ({ + headers: [], + rows: [], + totalRows: 0, + command: 'SELECT', + rowCount: 0, + })), + }; + const runLocalIngest = vi.fn(async (input: RunLocalIngestOptions): Promise => + completedLocalBundleRun(input, 'query-executor-run'), + ); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'warehouse', + adapter: 'fake', + outputMode: 'json', + }, + io.io, + { + runLocalIngest, + createAdapters: () => [], + createQueryExecutor: () => queryExecutor, + }, + ), + ).resolves.toBe(0); + + expect(runLocalIngest).toHaveBeenCalledWith(expect.objectContaining({ queryExecutor })); + }); + it('passes daemon database introspection URL to default local ingest adapters', async () => { const projectDir = join(tempDir, 'project'); await writeWarehouseConfig(projectDir); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index cf7a7aff..91e163de 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -16,7 +16,9 @@ import { runLocalMetabaseIngest, savedMemoryCountsForReport, } from '@ktx/context/ingest'; -import { loadKtxProject } from '@ktx/context/project'; +import type { KtxSqlQueryExecutorPort } from '@ktx/context/connections'; +import { loadKtxProject, type KtxLocalProject } from '@ktx/context/project'; +import { createKtxCliIngestQueryExecutor } from './ingest-query-executor.js'; import { readIngestReportSnapshotFile } from './ingest-report-file.js'; import { createCliOperationalLogger } from './io/logger.js'; import { createKtxCliLocalIngestAdapters } from './local-adapters.js'; @@ -69,6 +71,7 @@ interface KtxIngestDeps { jobIdFactory?: () => string; now?: () => Date; createAdapters?: typeof createKtxCliLocalIngestAdapters; + createQueryExecutor?: (project: KtxLocalProject) => KtxSqlQueryExecutorPort; runLocalIngest?: typeof runLocalIngest; runLocalMetabaseIngest?: typeof runLocalMetabaseIngest; readReportFile?: typeof readIngestReportSnapshotFile; @@ -530,6 +533,9 @@ export async function runKtxIngest( ...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}), logger: operationalLogger, }; + const queryExecutor = + localIngestOptions.queryExecutor ?? + (deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(project); if (args.adapter === 'metabase' && args.sourceDir) { throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter'); } @@ -542,6 +548,7 @@ export async function runKtxIngest( adapters: createAdapters(project, adapterOptions), metabaseConnectionId: args.connectionId, ...localIngestOptions, + queryExecutor, trigger: 'manual_resync', jobIdFactory: deps.jobIdFactory, ...(progress ? { progress } : {}), @@ -602,6 +609,7 @@ export async function runKtxIngest( trigger: 'manual_resync', jobId, ...localIngestOptions, + queryExecutor, pullConfigOptions: adapterOptions, ...(args.debugLlmRequestFile ? { llmDebugRequestFile: args.debugLlmRequestFile } : {}), ...(memoryFlow ? { memoryFlow } : {}),