feat(cli): route public connection ingest command

This commit is contained in:
Andrey Avtomonov 2026-05-13 17:56:23 +02:00
parent 9ad7ad257c
commit 3371d41157
4 changed files with 175 additions and 36 deletions

View file

@ -4,6 +4,7 @@ import type { KtxConnectionArgs } from './connection.js';
import type { KtxDoctorArgs } from './doctor.js';
import type { KtxIngestArgs } from './ingest.js';
import type { KtxKnowledgeArgs } from './knowledge.js';
import type { KtxPublicIngestArgs } from './public-ingest.js';
import type { KtxRuntimeArgs } from './runtime.js';
import type { KtxScanArgs } from './scan.js';
import type { KtxSetupArgs } from './setup.js';
@ -30,6 +31,7 @@ export interface KtxCliDeps {
connection?: (args: KtxConnectionArgs, io: KtxCliIo) => Promise<number>;
doctor?: (args: KtxDoctorArgs, io: KtxCliIo) => Promise<number>;
ingest?: (args: KtxIngestArgs, io: KtxCliIo) => Promise<number>;
publicIngest?: (args: KtxPublicIngestArgs, io: KtxCliIo) => Promise<number>;
runtime?: (args: KtxRuntimeArgs, io: KtxCliIo) => Promise<number>;
scan?: (args: KtxScanArgs, io: KtxCliIo) => Promise<number>;
knowledge?: (args: KtxKnowledgeArgs, io: KtxCliIo) => Promise<number>;

View file

@ -1,9 +1,15 @@
import { resolve } from 'node:path';
import { type Command, Option } from '@commander-js/extra-typings';
import { type KtxCliCommandContext, type OutputModeOptions, resolveCommandProjectDir } from '../cli-program.js';
import {
type KtxCliCommandContext,
type OutputModeOptions,
parsePositiveIntegerOption,
resolveCommandProjectDir,
} from '../cli-program.js';
import type { KtxCliDeps, KtxCliIo } from '../index.js';
import type { KtxIngestArgs, KtxIngestOutputMode } from '../ingest.js';
import { runtimeInstallPolicyFromFlags } from '../managed-python-command.js';
import type { KtxPublicIngestArgs } from '../public-ingest.js';
import { profileMark } from '../startup-profile.js';
profileMark('module:commands/ingest-commands');
@ -41,6 +47,21 @@ function inputMode(options: OutputModeOptions): Pick<KtxIngestArgs, 'inputMode'>
return options.input === false ? { inputMode: 'disabled' } : {};
}
function resolvedOptions<T extends object>(command: Command, fallback: T): T {
return (command.optsWithGlobals ? command.optsWithGlobals() : fallback) as T;
}
function assertOutputModeCompatible(options: OutputModeOptions): void {
const requested = [
options.plain === true ? '--plain' : undefined,
options.json === true ? '--json' : undefined,
options.viz === true ? '--viz' : undefined,
].filter((option): option is string => option !== undefined);
if (requested.length > 1) {
throw new Error(`Output mode options cannot be used together: ${requested.join(', ')}`);
}
}
async function runIngestArgs(
context: KtxCliCommandContext,
args: KtxIngestArgs,
@ -57,15 +78,45 @@ export function registerIngestCommands(
): void {
const ingest = program
.command('ingest')
.description('Run or inspect local ingest memory-flow output')
.description('Build or inspect KTX context')
.usage('[options] [connectionId]')
.argument('[connectionId]', 'Configured connection id to ingest')
.option('--all', 'Ingest all configured connections', false)
.addOption(new Option('--fast', 'Use deterministic database schema ingest').conflicts('deep'))
.addOption(new Option('--deep', 'Use AI-enriched database ingest').conflicts('fast'))
.addOption(new Option('--query-history', 'Include database query-history usage patterns').conflicts('noQueryHistory'))
.addOption(new Option('--no-query-history', 'Skip database query-history usage patterns'))
.option('--query-history-window-days <days>', 'Query-history lookback window for this run', parsePositiveIntegerOption)
.addOption(new Option('--plain', 'Print plain text output').conflicts(['json']))
.addOption(new Option('--json', 'Print JSON output').conflicts(['plain']))
.option('--no-input', 'Disable interactive terminal input')
.showHelpAfterError();
ingest.action(async (connectionId: string | undefined, options, command) => {
const { runKtxPublicIngest } = await import('../public-ingest.js');
const queryHistory =
options.queryHistory === true ? 'enabled' : options.queryHistory === false ? 'disabled' : 'default';
const args: KtxPublicIngestArgs = {
command: 'run',
projectDir: resolveCommandProjectDir(command),
...(connectionId ? { targetConnectionId: connectionId } : {}),
all: options.all === true,
json: options.json === true,
inputMode: options.input === false ? 'disabled' : 'auto',
...(options.fast === true ? { depth: 'fast' as const } : {}),
...(options.deep === true ? { depth: 'deep' as const } : {}),
queryHistory,
...(options.queryHistoryWindowDays !== undefined ? { queryHistoryWindowDays: options.queryHistoryWindowDays } : {}),
};
context.setExitCode(await (context.deps.publicIngest ?? runKtxPublicIngest)(args, context.io));
});
ingest.hook('preAction', (_thisCommand, actionCommand) => {
context.writeDebug?.('ingest', actionCommand);
});
ingest
.command('run')
.command('run', { hidden: true })
.description('Run local ingest for one configured connection and source adapter')
.requiredOption('--connection-id <connectionId>', 'KTX connection id')
.requiredOption('--adapter <adapter>', 'Ingest source adapter name')
@ -79,6 +130,8 @@ export function registerIngestCommands(
.option('--yes', 'Install the managed Python runtime without prompting when required', false)
.option('--no-input', 'Disable interactive terminal input for visualization')
.action(async (options, command) => {
const commandOptionsWithGlobals = resolvedOptions(command, options);
assertOutputModeCompatible(commandOptionsWithGlobals);
if (options.reportFile) {
throw new Error('--report-file is only supported for ingest status/watch');
}
@ -87,15 +140,17 @@ export function registerIngestCommands(
{
command: 'run',
projectDir: resolveCommandProjectDir(command),
connectionId: options.connectionId,
adapter: options.adapter,
sourceDir: options.sourceDir ? resolve(options.sourceDir) : undefined,
databaseIntrospectionUrl: options.databaseIntrospectionUrl || undefined,
connectionId: commandOptionsWithGlobals.connectionId,
adapter: commandOptionsWithGlobals.adapter,
sourceDir: commandOptionsWithGlobals.sourceDir ? resolve(commandOptionsWithGlobals.sourceDir) : undefined,
databaseIntrospectionUrl: commandOptionsWithGlobals.databaseIntrospectionUrl || undefined,
cliVersion: context.packageInfo.version,
runtimeInstallPolicy: runtimeInstallPolicyFromFlags({ yes: options.yes }),
...(options.debugLlmRequestFile ? { debugLlmRequestFile: resolve(options.debugLlmRequestFile) } : {}),
outputMode: outputMode(options),
...inputMode(options),
runtimeInstallPolicy: runtimeInstallPolicyFromFlags({ yes: commandOptionsWithGlobals.yes }),
...(commandOptionsWithGlobals.debugLlmRequestFile
? { debugLlmRequestFile: resolve(commandOptionsWithGlobals.debugLlmRequestFile) }
: {}),
outputMode: outputMode(commandOptionsWithGlobals),
...inputMode(commandOptionsWithGlobals),
},
commandOptions,
);
@ -111,22 +166,24 @@ export function registerIngestCommands(
.addOption(new Option('--viz', 'Render memory-flow TUI output').conflicts(['plain', 'json']))
.option('--no-input', 'Disable interactive terminal input for visualization')
.action(async (runId: string | undefined, options, command) => {
const commandOptionsWithGlobals = resolvedOptions(command, options);
assertOutputModeCompatible(commandOptionsWithGlobals);
await runIngestArgs(
context,
{
command: 'status',
projectDir: resolveCommandProjectDir(command),
...(runId ? { runId } : {}),
...(options.reportFile ? { reportFile: resolve(options.reportFile) } : {}),
outputMode: outputMode(options),
...inputMode(options),
...(commandOptionsWithGlobals.reportFile ? { reportFile: resolve(commandOptionsWithGlobals.reportFile) } : {}),
outputMode: outputMode(commandOptionsWithGlobals),
...inputMode(commandOptionsWithGlobals),
},
commandOptions,
);
});
ingest
.command('watch')
.command('watch', { hidden: true })
.description('Open the latest or selected stored ingest visual report')
.argument('[runId]', 'Local ingest run id, report id, run id, or job id')
.option('--report-file <path>', 'Bundle ingest report JSON file to render')
@ -135,15 +192,17 @@ export function registerIngestCommands(
.addOption(new Option('--viz', 'Render memory-flow TUI output').conflicts(['plain', 'json']))
.option('--no-input', 'Disable interactive terminal input for visualization')
.action(async (runId: string | undefined, options, command) => {
const commandOptionsWithGlobals = resolvedOptions(command, options);
assertOutputModeCompatible(commandOptionsWithGlobals);
await runIngestArgs(
context,
{
command: 'watch',
projectDir: resolveCommandProjectDir(command),
...(runId ? { runId } : {}),
...(options.reportFile ? { reportFile: resolve(options.reportFile) } : {}),
outputMode: watchOutputMode(options),
...inputMode(options),
...(commandOptionsWithGlobals.reportFile ? { reportFile: resolve(commandOptionsWithGlobals.reportFile) } : {}),
outputMode: watchOutputMode(commandOptionsWithGlobals),
...inputMode(commandOptionsWithGlobals),
},
commandOptions,
);
@ -159,15 +218,17 @@ export function registerIngestCommands(
.addOption(new Option('--viz', 'Render memory-flow TUI output').conflicts(['plain', 'json']))
.option('--no-input', 'Disable interactive terminal input for visualization')
.action(async (runId: string, options, command) => {
const commandOptionsWithGlobals = resolvedOptions(command, options);
assertOutputModeCompatible(commandOptionsWithGlobals);
await runIngestArgs(
context,
{
command: 'replay',
projectDir: resolveCommandProjectDir(command),
runId,
...(options.reportFile ? { reportFile: resolve(options.reportFile) } : {}),
outputMode: outputMode(options),
...inputMode(options),
...(commandOptionsWithGlobals.reportFile ? { reportFile: resolve(commandOptionsWithGlobals.reportFile) } : {}),
outputMode: outputMode(commandOptionsWithGlobals),
...inputMode(commandOptionsWithGlobals),
},
commandOptions,
);

View file

@ -133,10 +133,6 @@ describe('dev Commander tree', () => {
argv: ['scan', '--help'],
expected: ['Usage: ktx scan [options] <connectionId>', '--mode <mode>', 'structural', 'relationships', '--dry-run'],
},
{
argv: ['ingest', 'run', '--help'],
expected: ['Usage: ktx ingest run [options]', '--connection-id <connectionId>', '--adapter <adapter>'],
},
])('prints generated nested help for $argv', async ({ argv, expected }) => {
const io = makeIo();
const doctor = vi.fn(async () => 0);
@ -158,6 +154,27 @@ describe('dev Commander tree', () => {
expect(scan).not.toHaveBeenCalled();
});
it('keeps legacy adapter-backed ingest run callable but hidden from ingest help', async () => {
const helpIo = makeIo();
const runIo = makeIo();
const ingest = vi.fn(async () => 0);
await expect(runKtxCli(['ingest', '--help'], helpIo.io, { ingest })).resolves.toBe(0);
await expect(
runKtxCli(
['ingest', 'run', '--connection-id', 'warehouse', '--adapter', 'metabase', '--project-dir', '/tmp/project'],
runIo.io,
{ ingest },
),
).resolves.toBe(0);
expect(helpIo.stdout()).not.toMatch(/^ run\s/m);
expect(ingest).toHaveBeenCalledWith(
expect.objectContaining({ command: 'run', connectionId: 'warehouse', adapter: 'metabase' }),
runIo.io,
);
});
it('dispatches top-level scan through Commander with injected dependencies', async () => {
const scanIo = makeIo();
const scan = vi.fn(async () => 0);

View file

@ -611,15 +611,69 @@ describe('runKtxCli', () => {
expect(testIo.stderr()).toMatch(/unknown command|error:/);
});
it('rejects removed public ingest shorthand', async () => {
it('routes public connection-centric ingest shorthand', async () => {
const testIo = makeIo();
const ingest = vi.fn().mockResolvedValue(0);
const publicIngest = vi.fn().mockResolvedValue(0);
await expect(runKtxCli(['--project-dir', '/tmp/project', 'ingest', 'warehouse'], testIo.io, { ingest }))
.resolves.toBe(1);
await expect(
runKtxCli(['--project-dir', '/tmp/project', 'ingest', 'warehouse', '--fast', '--no-input'], testIo.io, {
publicIngest,
}),
).resolves.toBe(0);
expect(ingest).not.toHaveBeenCalled();
expect(testIo.stderr()).toMatch(/unknown command|error:/);
expect(publicIngest).toHaveBeenCalledWith(
{
command: 'run',
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
json: false,
inputMode: 'disabled',
depth: 'fast',
queryHistory: 'default',
},
testIo.io,
);
expect(testIo.stderr()).toBe('Project: /tmp/project\n');
});
it('routes public ingest --all --deep with JSON output', async () => {
const testIo = makeIo();
const publicIngest = vi.fn().mockResolvedValue(0);
await expect(
runKtxCli(['--project-dir', '/tmp/project', 'ingest', '--all', '--deep', '--json'], testIo.io, {
publicIngest,
}),
).resolves.toBe(0);
expect(publicIngest).toHaveBeenCalledWith(
{
command: 'run',
projectDir: '/tmp/project',
all: true,
json: true,
inputMode: 'auto',
depth: 'deep',
queryHistory: 'default',
},
testIo.io,
);
expect(testIo.stderr()).toBe('');
});
it('rejects mutually exclusive public ingest depth flags before dispatch', async () => {
const testIo = makeIo();
const publicIngest = vi.fn().mockResolvedValue(0);
await expect(
runKtxCli(['--project-dir', '/tmp/project', 'ingest', 'warehouse', '--fast', '--deep'], testIo.io, {
publicIngest,
}),
).resolves.toBe(1);
expect(publicIngest).not.toHaveBeenCalled();
expect(testIo.stderr()).toMatch(/option '--(deep|fast)' cannot be used with option '--(fast|deep)'/);
});
it('prints ingest watch help from Commander', async () => {
@ -715,13 +769,18 @@ describe('runKtxCli', () => {
await expect(runKtxCli(['ingest', '--help'], testIo.io, { ingest })).resolves.toBe(0);
expect(testIo.stdout()).toContain('Usage: ktx ingest [options] [command]');
expect(testIo.stdout()).toContain('Run or inspect local ingest memory-flow output');
expect(testIo.stdout()).toContain('run');
expect(testIo.stdout()).toContain('Usage: ktx ingest [options] [connectionId]');
expect(testIo.stdout()).toContain('Build or inspect KTX context');
expect(testIo.stdout()).toContain('--all');
expect(testIo.stdout()).toContain('--fast');
expect(testIo.stdout()).toContain('--deep');
expect(testIo.stdout()).toContain('--query-history');
expect(testIo.stdout()).toContain('--no-query-history');
expect(testIo.stdout()).toContain('--query-history-window-days <days>');
expect(testIo.stdout()).toContain('status');
expect(testIo.stdout()).toContain('watch');
expect(testIo.stdout()).toContain('replay');
expect(testIo.stdout()).not.toContain('--all');
expect(testIo.stdout()).not.toMatch(/^ run\s/m);
expect(testIo.stdout()).not.toMatch(/^ watch\s/m);
expect(testIo.stderr()).toBe('');
expect(ingest).not.toHaveBeenCalled();
});