feat(cli): use managed daemon for ingest helpers

This commit is contained in:
Andrey Avtomonov 2026-05-11 12:34:23 +02:00
parent c4bfac4506
commit d24415413f
5 changed files with 135 additions and 0 deletions

View file

@ -3,6 +3,7 @@ import { type Command, Option } from '@commander-js/extra-typings';
import { type KtxCliCommandContext, type OutputModeOptions, resolveCommandProjectDir } from '../cli-program.js';
import type { KtxCliDeps, KtxCliIo } from '../index.js';
import type { KtxIngestArgs, KtxIngestOutputMode } from '../ingest.js';
import { runtimeInstallPolicyFromFlags } from '../managed-python-command.js';
import { profileMark } from '../startup-profile.js';
profileMark('module:commands/ingest-commands');
@ -75,6 +76,7 @@ export function registerIngestCommands(
.addOption(new Option('--plain', 'Print plain text output').conflicts(['json', 'viz']))
.addOption(new Option('--json', 'Print JSON output').conflicts(['plain', 'viz']))
.addOption(new Option('--viz', 'Render memory-flow TUI output').conflicts(['plain', 'json']))
.option('--yes', 'Install the managed Python runtime without prompting when required', false)
.option('--no-input', 'Disable interactive terminal input for visualization')
.action(async (options, command) => {
if (options.reportFile) {
@ -89,6 +91,8 @@ export function registerIngestCommands(
adapter: options.adapter,
sourceDir: options.sourceDir ? resolve(options.sourceDir) : undefined,
databaseIntrospectionUrl: options.databaseIntrospectionUrl || undefined,
cliVersion: context.packageInfo.version,
runtimeInstallPolicy: runtimeInstallPolicyFromFlags(options),
...(options.debugLlmRequestFile ? { debugLlmRequestFile: resolve(options.debugLlmRequestFile) } : {}),
outputMode: outputMode(options),
...inputMode(options),

View file

@ -661,6 +661,8 @@ describe('dev Commander tree', () => {
adapter: 'metabase',
sourceDir: undefined,
databaseIntrospectionUrl: undefined,
cliVersion: '0.0.0-private',
runtimeInstallPolicy: 'prompt',
outputMode: 'json',
},
io.io,

View file

@ -919,6 +919,8 @@ describe('runKtxCli', () => {
adapter: 'fake',
sourceDir: tempDir,
databaseIntrospectionUrl: undefined,
cliVersion: '0.0.0-private',
runtimeInstallPolicy: 'never',
debugLlmRequestFile: `${tempDir}/debug.jsonl`,
outputMode: 'json',
inputMode: 'disabled',
@ -932,6 +934,60 @@ describe('runKtxCli', () => {
expect(ingestReplayHelpIo.stderr()).toBe('');
});
it('routes ingest managed runtime install policies', async () => {
const autoIo = makeIo();
const conflictIo = makeIo();
const ingest = vi.fn(async () => 0);
await expect(
runKtxCli(
[
'dev',
'ingest',
'run',
'--project-dir',
tempDir,
'--connection-id',
'warehouse',
'--adapter',
'looker',
'--yes',
],
autoIo.io,
{ ingest },
),
).resolves.toBe(0);
await expect(
runKtxCli(
[
'dev',
'ingest',
'run',
'--project-dir',
tempDir,
'--connection-id',
'warehouse',
'--adapter',
'looker',
'--yes',
'--no-input',
],
conflictIo.io,
{ ingest },
),
).resolves.toBe(1);
expect(ingest).toHaveBeenCalledWith(
expect.objectContaining({
command: 'run',
cliVersion: '0.0.0-private',
runtimeInstallPolicy: 'auto',
}),
autoIo.io,
);
expect(conflictIo.stderr()).toContain('Choose only one runtime install mode: --yes or --no-input');
});
it('dispatches public connection through the existing connection implementation', async () => {
const testIo = makeIo();
const connection = vi.fn(async () => 0);

View file

@ -1214,6 +1214,59 @@ describe('runKtxIngest', () => {
adapters: createdAdapters,
adapter: 'fake',
connectionId: 'warehouse',
pullConfigOptions: {
databaseIntrospectionUrl: 'http://127.0.0.1:8765',
},
}),
);
});
it('passes managed daemon options to adapters and pull-config options when no explicit daemon URL is set', async () => {
const projectDir = join(tempDir, 'managed-daemon-ingest-project');
await initKtxProject({ projectDir, projectName: 'managed-daemon-ingest-project' });
await writeWarehouseConfig(projectDir);
const createdAdapters: SourceAdapter[] = [
{ source: 'fake', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) },
];
const createAdapters = vi.fn(() => createdAdapters as never);
const runLocal = vi.fn(async (input: RunLocalIngestOptions) =>
completedLocalBundleRun(input, input.jobId ?? 'local-job-1'),
);
const io = makeIo();
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'fake',
cliVersion: '0.2.0',
runtimeInstallPolicy: 'auto',
outputMode: 'plain',
} satisfies KtxIngestArgs,
io.io,
{
createAdapters,
runLocalIngest: runLocal,
jobIdFactory: () => 'local-job-1',
},
),
).resolves.toBe(0);
const expectedManagedDaemon = {
cliVersion: '0.2.0',
installPolicy: 'auto',
io: io.io,
};
expect(createAdapters).toHaveBeenCalledWith(expect.objectContaining({ projectDir }), {
managedDaemon: expectedManagedDaemon,
});
expect(runLocal).toHaveBeenCalledWith(
expect.objectContaining({
pullConfigOptions: {
managedDaemon: expectedManagedDaemon,
},
}),
);
});

View file

@ -17,6 +17,7 @@ import {
import { loadKtxProject } from '@ktx/context/project';
import { readIngestReportSnapshotFile } from './ingest-report-file.js';
import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js';
import { type KtxMemoryFlowStdin, renderMemoryFlowInteractively } from './memory-flow-interactive.js';
import {
type KtxMemoryFlowTuiIo,
@ -40,6 +41,8 @@ export type KtxIngestArgs =
adapter: string;
sourceDir?: string;
databaseIntrospectionUrl?: string;
cliVersion?: string;
runtimeInstallPolicy?: KtxManagedPythonInstallPolicy;
debugLlmRequestFile?: string;
outputMode: KtxIngestOutputMode;
inputMode?: KtxIngestInputMode;
@ -245,6 +248,20 @@ function initialRunMemoryFlowInput(
};
}
function managedDaemonOptionsForIngestRun(
args: Extract<KtxIngestArgs, { command: 'run' }>,
io: KtxIngestIo,
) {
if (args.databaseIntrospectionUrl || !args.cliVersion || !args.runtimeInstallPolicy) {
return undefined;
}
return {
cliVersion: args.cliVersion,
installPolicy: args.runtimeInstallPolicy,
io,
};
}
async function writeReportRecord(
report: IngestReportSnapshot,
outputMode: KtxIngestOutputMode,
@ -300,9 +317,11 @@ export async function runKtxIngest(
const createAdapters = deps.createAdapters ?? createKtxCliLocalIngestAdapters;
const executeLocalIngest = deps.runLocalIngest ?? runLocalIngest;
const localIngestOptions = deps.localIngestOptions ?? {};
const managedDaemon = managedDaemonOptionsForIngestRun(args, io);
const adapterOptions = {
...(localIngestOptions.pullConfigOptions ?? {}),
...(args.databaseIntrospectionUrl ? { databaseIntrospectionUrl: args.databaseIntrospectionUrl } : {}),
...(managedDaemon ? { managedDaemon } : {}),
...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}),
};
if (args.adapter === 'metabase' && args.sourceDir) {
@ -369,6 +388,7 @@ export async function runKtxIngest(
trigger: 'manual_resync',
jobId,
...localIngestOptions,
pullConfigOptions: adapterOptions,
...(args.debugLlmRequestFile ? { llmDebugRequestFile: args.debugLlmRequestFile } : {}),
...(memoryFlow ? { memoryFlow } : {}),
});