fix(cli): enable read-only SQL probes for local ingest

This commit is contained in:
Andrey Avtomonov 2026-05-13 00:03:31 +02:00
parent 6036019a7b
commit 7e84b6092e
4 changed files with 182 additions and 1 deletions

View file

@ -0,0 +1,86 @@
import type { KtxLocalProject } from '@ktx/context/project';
import { createKtxConnectorCapabilities, type KtxScanConnector } from '@ktx/context/scan';
import { describe, expect, it, vi } from 'vitest';
import { createKtxCliIngestQueryExecutor } from './ingest-query-executor.js';
function project(): KtxLocalProject {
return {
projectDir: '/tmp/ktx-query-project',
config: {
project: 'warehouse',
connections: {
warehouse: { driver: 'postgres', url: 'postgresql://readonly@example.test/db' },
},
},
} as unknown as KtxLocalProject;
}
function connector(overrides: Partial<KtxScanConnector> = {}): KtxScanConnector {
return {
id: 'warehouse',
driver: 'postgres',
capabilities: createKtxConnectorCapabilities({ readOnlySql: true }),
async introspect() {
throw new Error('introspect is not used by this test');
},
executeReadOnly: vi.fn(async () => ({
headers: ['answer'],
rows: [[1]],
totalRows: 1,
rowCount: 1,
})),
cleanup: vi.fn(async () => {}),
...overrides,
};
}
describe('createKtxCliIngestQueryExecutor', () => {
it('executes read-only SQL through the scan connector and cleans it up', async () => {
const scanConnector = connector();
const createConnector = vi.fn(async () => scanConnector);
const executor = createKtxCliIngestQueryExecutor(project(), { createConnector });
await expect(
executor.execute({
connectionId: 'warehouse',
connection: { driver: 'postgres', url: 'postgresql://readonly@example.test/db' },
projectDir: '/tmp/ktx-query-project',
sql: 'select 1',
maxRows: 5,
}),
).resolves.toMatchObject({
headers: ['answer'],
rows: [[1]],
totalRows: 1,
command: 'SELECT',
rowCount: 1,
});
expect(createConnector).toHaveBeenCalledWith(project(), 'warehouse');
expect(scanConnector.executeReadOnly).toHaveBeenCalledWith(
{ connectionId: 'warehouse', sql: 'select 1', maxRows: 5 },
{ runId: 'ingest-sql-execution' },
);
expect(scanConnector.cleanup).toHaveBeenCalledTimes(1);
});
it('rejects connectors without read-only SQL support', async () => {
const scanConnector = connector({
capabilities: createKtxConnectorCapabilities({ readOnlySql: false }),
executeReadOnly: undefined,
});
const executor = createKtxCliIngestQueryExecutor(project(), {
createConnector: vi.fn(async () => scanConnector),
});
await expect(
executor.execute({
connectionId: 'warehouse',
connection: { driver: 'postgres' },
projectDir: '/tmp/ktx-query-project',
sql: 'select 1',
}),
).rejects.toThrow('Connection "warehouse" driver "postgres" does not support read-only SQL execution.');
expect(scanConnector.cleanup).toHaveBeenCalledTimes(1);
});
});

View file

@ -0,0 +1,49 @@
import type { KtxSqlQueryExecutionInput, KtxSqlQueryExecutorPort } from '@ktx/context/connections';
import type { KtxLocalProject } from '@ktx/context/project';
import type { KtxScanConnector, KtxScanContext } from '@ktx/context/scan';
import { createKtxCliScanConnector } from './local-scan-connectors.js';
type CreateConnector = typeof createKtxCliScanConnector;
export interface KtxCliIngestQueryExecutorDeps {
createConnector?: CreateConnector;
}
async function cleanupConnector(connector: KtxScanConnector | null): Promise<void> {
await connector?.cleanup?.();
}
export function createKtxCliIngestQueryExecutor(
project: KtxLocalProject,
deps: KtxCliIngestQueryExecutorDeps = {},
): KtxSqlQueryExecutorPort {
const createConnector = deps.createConnector ?? createKtxCliScanConnector;
return {
async execute(input: KtxSqlQueryExecutionInput) {
let connector: KtxScanConnector | null = null;
try {
connector = await createConnector(project, input.connectionId);
if (!connector.capabilities.readOnlySql || !connector.executeReadOnly) {
throw new Error(
`Connection "${input.connectionId}" driver "${connector.driver}" does not support read-only SQL execution.`,
);
}
const ctx: KtxScanContext = { runId: 'ingest-sql-execution' };
const result = await connector.executeReadOnly(
{ connectionId: input.connectionId, sql: input.sql, maxRows: input.maxRows },
ctx,
);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
command: 'SELECT',
rowCount: result.rowCount,
};
} finally {
await cleanupConnector(connector);
}
},
};
}

View file

@ -815,6 +815,44 @@ describe('runKtxIngest', () => {
expect(runLocalIngest).toHaveBeenCalledWith(expect.objectContaining({ llmDebugRequestFile: debugFile }));
});
it('supplies a scan-connector query executor to local ingest runs', async () => {
const io = makeIo();
const projectDir = join(tempDir, 'query-executor-project');
await writeWarehouseConfig(projectDir);
const queryExecutor = {
execute: vi.fn(async () => ({
headers: [],
rows: [],
totalRows: 0,
command: 'SELECT',
rowCount: 0,
})),
};
const runLocalIngest = vi.fn(async (input: RunLocalIngestOptions): Promise<LocalIngestResult> =>
completedLocalBundleRun(input, 'query-executor-run'),
);
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
outputMode: 'json',
},
io.io,
{
runLocalIngest,
createAdapters: () => [],
createQueryExecutor: () => queryExecutor,
},
),
).resolves.toBe(0);
expect(runLocalIngest).toHaveBeenCalledWith(expect.objectContaining({ queryExecutor }));
});
it('passes daemon database introspection URL to default local ingest adapters', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);

View file

@ -16,7 +16,9 @@ import {
runLocalMetabaseIngest,
savedMemoryCountsForReport,
} from '@ktx/context/ingest';
import { loadKtxProject } from '@ktx/context/project';
import type { KtxSqlQueryExecutorPort } from '@ktx/context/connections';
import { loadKtxProject, type KtxLocalProject } from '@ktx/context/project';
import { createKtxCliIngestQueryExecutor } from './ingest-query-executor.js';
import { readIngestReportSnapshotFile } from './ingest-report-file.js';
import { createCliOperationalLogger } from './io/logger.js';
import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
@ -69,6 +71,7 @@ interface KtxIngestDeps {
jobIdFactory?: () => string;
now?: () => Date;
createAdapters?: typeof createKtxCliLocalIngestAdapters;
createQueryExecutor?: (project: KtxLocalProject) => KtxSqlQueryExecutorPort;
runLocalIngest?: typeof runLocalIngest;
runLocalMetabaseIngest?: typeof runLocalMetabaseIngest;
readReportFile?: typeof readIngestReportSnapshotFile;
@ -530,6 +533,9 @@ export async function runKtxIngest(
...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}),
logger: operationalLogger,
};
const queryExecutor =
localIngestOptions.queryExecutor ??
(deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(project);
if (args.adapter === 'metabase' && args.sourceDir) {
throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter');
}
@ -542,6 +548,7 @@ export async function runKtxIngest(
adapters: createAdapters(project, adapterOptions),
metabaseConnectionId: args.connectionId,
...localIngestOptions,
queryExecutor,
trigger: 'manual_resync',
jobIdFactory: deps.jobIdFactory,
...(progress ? { progress } : {}),
@ -602,6 +609,7 @@ export async function runKtxIngest(
trigger: 'manual_resync',
jobId,
...localIngestOptions,
queryExecutor,
pullConfigOptions: adapterOptions,
...(args.debugLlmRequestFile ? { llmDebugRequestFile: args.debugLlmRequestFile } : {}),
...(memoryFlow ? { memoryFlow } : {}),