feat(cli): execute public database ingest facets

This commit is contained in:
Andrey Avtomonov 2026-05-13 17:59:31 +02:00
parent fb06dc5b01
commit 7dd14bb333
3 changed files with 158 additions and 23 deletions

View file

@ -49,6 +49,8 @@ export type KtxIngestArgs =
cliVersion?: string;
runtimeInstallPolicy?: KtxManagedPythonInstallPolicy;
debugLlmRequestFile?: string;
allowImplicitAdapter?: boolean;
historicSqlPullConfigOverride?: Record<string, unknown>;
outputMode: KtxIngestOutputMode;
inputMode?: KtxIngestInputMode;
}
@ -571,6 +573,19 @@ export async function runKtxIngest(
const project = await loadKtxProject({ projectDir: args.projectDir });
const env = deps.env ?? process.env;
if (args.command === 'run') {
const ingestProject =
args.allowImplicitAdapter && !project.config.ingest.adapters.includes(args.adapter)
? {
...project,
config: {
...project.config,
ingest: {
...project.config.ingest,
adapters: [...project.config.ingest.adapters, args.adapter],
},
},
}
: project;
const createAdapters =
deps.createAdapters ??
(deps.runLocalIngest || deps.runLocalMetabaseIngest ? () => [] : createKtxCliLocalIngestAdapters);
@ -583,11 +598,14 @@ export async function runKtxIngest(
...(args.databaseIntrospectionUrl ? { databaseIntrospectionUrl: args.databaseIntrospectionUrl } : {}),
...(managedDaemon ? { managedDaemon } : {}),
...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}),
...(args.historicSqlPullConfigOverride
? { historicSqlPullConfigOverride: args.historicSqlPullConfigOverride }
: {}),
logger: operationalLogger,
};
const queryExecutor =
localIngestOptions.queryExecutor ??
(deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(project);
(deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(ingestProject);
if (args.adapter === 'metabase' && args.sourceDir) {
throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter');
}
@ -604,8 +622,8 @@ export async function runKtxIngest(
deps.progress,
);
const result = await executeMetabaseFanout({
project,
adapters: createAdapters(project, adapterOptions),
project: ingestProject,
adapters: createAdapters(ingestProject, adapterOptions),
metabaseConnectionId: args.connectionId,
...localIngestOptions,
queryExecutor,
@ -668,8 +686,8 @@ export async function runKtxIngest(
try {
const result = await executeLocalIngest({
project,
adapters: createAdapters(project, adapterOptions),
project: ingestProject,
adapters: createAdapters(ingestProject, adapterOptions),
adapter: args.adapter,
connectionId: args.connectionId,
sourceDir: args.sourceDir,

View file

@ -161,6 +161,75 @@ describe('buildPublicIngestPlan', () => {
});
describe('runKtxPublicIngest', () => {
it('maps fast and deep database targets to scan internals', async () => {
const io = makeIo();
const project = projectWithConnections({
fast: { driver: 'postgres' },
deep: { driver: 'postgres', context: { depth: 'deep' } },
});
const runScan = vi.fn(async () => 0);
await expect(
runKtxPublicIngest(
{ command: 'run', projectDir: '/tmp/project', all: true, json: false, inputMode: 'disabled', queryHistory: 'default' },
io.io,
{ loadProject: vi.fn(async () => project), runScan },
),
).resolves.toBe(0);
expect(runScan).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ connectionId: 'deep', mode: 'enriched', detectRelationships: true }),
expect.anything(),
);
expect(runScan).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ connectionId: 'fast', mode: 'structural', detectRelationships: false }),
expect.anything(),
);
});
it('runs query history after schema ingest with current-run window override', async () => {
const io = makeIo();
const project = projectWithConnections({
warehouse: { driver: 'postgres', context: { queryHistory: { enabled: true, windowDays: 90 } } },
});
const runScan = vi.fn(async () => 0);
const runIngest = vi.fn(async () => 0);
await expect(
runKtxPublicIngest(
{
command: 'run',
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
json: false,
inputMode: 'disabled',
queryHistory: 'enabled',
queryHistoryWindowDays: 30,
},
io.io,
{ loadProject: vi.fn(async () => project), runScan, runIngest },
),
).resolves.toBe(0);
expect(runScan).toHaveBeenCalledWith(
expect.objectContaining({ connectionId: 'warehouse', mode: 'enriched' }),
expect.anything(),
);
expect(runIngest).toHaveBeenCalledWith(
expect.objectContaining({
command: 'run',
connectionId: 'warehouse',
adapter: 'historic-sql',
allowImplicitAdapter: true,
historicSqlPullConfigOverride: expect.objectContaining({ dialect: 'postgres', windowDays: 30 }),
}),
expect.anything(),
);
});
it('runs all independent targets and reports partial failures', async () => {
const io = makeIo();
const project = projectWithConnections({
@ -205,8 +274,8 @@ describe('runKtxPublicIngest', () => {
expect.anything(),
);
expect(io.stdout()).toContain('Ingest finished with partial failures');
expect(io.stdout()).toContain('warehouse failed at scan.');
expect(io.stdout()).toContain('Debug: ktx scan warehouse --debug');
expect(io.stdout()).toContain('warehouse failed at database-schema.');
expect(io.stdout()).toContain('Debug: ktx ingest warehouse --debug');
});
it('can request enriched relationship scans for setup-managed context builds', async () => {

View file

@ -303,16 +303,20 @@ export function buildPublicIngestPlan(
function defaultSteps(target: KtxPublicIngestPlanTarget): KtxPublicIngestTargetResult['steps'] {
return [
{
operation: 'scan',
status: target.steps.includes('scan') ? 'not-run' : 'skipped',
...(target.operation === 'scan' ? { debugCommand: target.debugCommand } : {}),
operation: 'database-schema',
status: target.steps.includes('database-schema') ? 'not-run' : 'skipped',
...(target.operation === 'database-ingest' ? { debugCommand: target.debugCommand } : {}),
},
{
operation: 'query-history',
status: target.steps.includes('query-history') ? 'not-run' : 'skipped',
...(target.operation === 'database-ingest' ? { debugCommand: target.debugCommand } : {}),
},
{
operation: 'source-ingest',
status: target.steps.includes('source-ingest') ? 'not-run' : 'skipped',
...(target.operation === 'source-ingest' ? { debugCommand: target.debugCommand } : {}),
},
{ operation: 'enrich', status: 'skipped' },
{
operation: 'memory-update',
status: target.steps.includes('memory-update') ? 'not-run' : 'skipped',
@ -321,8 +325,13 @@ function defaultSteps(target: KtxPublicIngestPlanTarget): KtxPublicIngestTargetR
];
}
function markTargetResult(target: KtxPublicIngestPlanTarget, status: 'done' | 'failed'): KtxPublicIngestTargetResult {
const failedOperation = target.operation === 'scan' ? 'scan' : 'source-ingest';
function markTargetResult(
target: KtxPublicIngestPlanTarget,
status: 'done' | 'failed',
failedOperation?: KtxPublicIngestStepName,
): KtxPublicIngestTargetResult {
const selectedFailedOperation =
failedOperation ?? (target.operation === 'database-ingest' ? 'database-schema' : 'source-ingest');
return {
connectionId: target.connectionId,
driver: target.driver,
@ -333,8 +342,12 @@ function markTargetResult(target: KtxPublicIngestPlanTarget, status: 'done' | 'f
if (status === 'done') {
return { ...step, status: 'done' };
}
if (step.operation === failedOperation) {
return { ...step, status: 'failed', detail: `${target.connectionId} failed at ${failedOperation}.` };
if (step.operation === selectedFailedOperation) {
return {
...step,
status: 'failed',
detail: `${target.connectionId} failed at ${selectedFailedOperation}.`,
};
}
return { ...step, status: 'not-run' };
}),
@ -353,13 +366,16 @@ function renderPlainResults(results: KtxPublicIngestTargetResult[], io: KtxCliIo
const failures = results.filter(resultFailed);
io.stdout.write(failures.length > 0 ? 'Ingest finished with partial failures\n' : 'Ingest finished\n');
io.stdout.write('\n');
io.stdout.write('Source Scan Source ingest Enrich Memory update\n');
io.stdout.write('Source Database schema Query history Source ingest Memory update\n');
for (const result of results) {
io.stdout.write(
`${result.connectionId.padEnd(14)} ${stepStatus(result, 'scan').padEnd(9)} ${stepStatus(
`${result.connectionId.padEnd(14)} ${stepStatus(result, 'database-schema').padEnd(16)} ${stepStatus(
result,
'query-history',
).padEnd(14)} ${stepStatus(
result,
'source-ingest',
).padEnd(14)} ${stepStatus(result, 'enrich').padEnd(8)} ${stepStatus(result, 'memory-update')}\n`,
).padEnd(14)} ${stepStatus(result, 'memory-update')}\n`,
);
}
@ -395,21 +411,47 @@ export async function executePublicIngestTarget(
io: KtxCliIo,
deps: KtxPublicIngestDeps,
): Promise<KtxPublicIngestTargetResult> {
if (target.operation === 'scan') {
if (target.operation === 'database-ingest') {
const { runKtxScan } = await import('./scan.js');
const scanArgs: KtxScanArgs = {
command: 'run',
projectDir: args.projectDir,
connectionId: target.connectionId,
mode: args.scanMode ?? 'structural',
detectRelationships: args.detectRelationships ?? false,
mode: target.databaseDepth === 'deep' ? 'enriched' : 'structural',
detectRelationships: target.databaseDepth === 'deep' ? true : false,
dryRun: false,
};
const runScan = deps.runScan ?? runKtxScan;
const exitCode = deps.scanProgress
const scanExitCode = deps.scanProgress
? await runScan(scanArgs, io, { progress: deps.scanProgress })
: await runScan(scanArgs, io);
return markTargetResult(target, exitCode === 0 ? 'done' : 'failed');
if (scanExitCode !== 0) {
return markTargetResult(target, 'failed', 'database-schema');
}
if (target.queryHistory?.enabled === true) {
const { runKtxIngest } = await import('./ingest.js');
const runIngest = deps.runIngest ?? runKtxIngest;
const ingestArgs: KtxIngestArgs = {
command: 'run',
projectDir: args.projectDir,
connectionId: target.connectionId,
adapter: 'historic-sql',
outputMode: sourceIngestOutputMode(args, io),
inputMode: args.inputMode,
allowImplicitAdapter: true,
historicSqlPullConfigOverride: {
dialect: target.queryHistory.dialect,
...(target.queryHistory.windowDays !== undefined ? { windowDays: target.queryHistory.windowDays } : {}),
},
};
const qhExitCode = await runIngest(ingestArgs, io);
if (qhExitCode !== 0) {
return markTargetResult(target, 'failed', 'query-history');
}
}
return markTargetResult(target, 'done');
}
const { runKtxIngest } = await import('./ingest.js');
@ -453,6 +495,12 @@ export async function runKtxPublicIngest(
const plan = buildPublicIngestPlan(project, args);
const results: KtxPublicIngestTargetResult[] = [];
if (!args.json && plan.warnings.length > 0) {
for (const warning of plan.warnings) {
io.stderr.write(`Warning: ${warning}\n`);
}
}
for (const target of plan.targets) {
results.push(await executePublicIngestTarget(target, args, io, deps));
}