mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-22 08:38:08 +02:00
Improve ingest progress updates
This commit is contained in:
parent
93f992ed5c
commit
a00285fd42
9 changed files with 361 additions and 25 deletions
|
|
@ -635,6 +635,117 @@ describe('runKtxIngest', () => {
|
|||
expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase');
|
||||
});
|
||||
|
||||
it('emits structured child ingest progress during Metabase fan-out', async () => {
|
||||
const projectDir = join(tempDir, 'project');
|
||||
await writeMetabaseConfig(projectDir);
|
||||
const io = makeIo();
|
||||
const progressEvents: Array<{ percent: number; message: string; transient?: boolean }> = [];
|
||||
|
||||
await expect(
|
||||
runKtxIngest(
|
||||
{
|
||||
command: 'run',
|
||||
projectDir,
|
||||
connectionId: 'prod-metabase',
|
||||
adapter: 'metabase',
|
||||
outputMode: 'json',
|
||||
},
|
||||
io.io,
|
||||
{
|
||||
progress: (event) => progressEvents.push(event),
|
||||
runLocalMetabaseIngest: async (input) => {
|
||||
input.progress?.onMetabaseFanoutPlanned?.({
|
||||
metabaseConnectionId: 'prod-metabase',
|
||||
children: [{ metabaseDatabaseId: 1, targetConnectionId: 'warehouse_a' }],
|
||||
});
|
||||
input.progress?.onMetabaseChildStarted?.({
|
||||
metabaseConnectionId: 'prod-metabase',
|
||||
metabaseDatabaseId: 1,
|
||||
targetConnectionId: 'warehouse_a',
|
||||
jobId: 'metabase-child-1',
|
||||
});
|
||||
input.memoryFlow?.update({
|
||||
plannedWorkUnits: [
|
||||
{
|
||||
unitKey: 'metabase-col-6',
|
||||
rawFiles: ['cards/40.json'],
|
||||
peerFileCount: 0,
|
||||
dependencyCount: 0,
|
||||
},
|
||||
],
|
||||
});
|
||||
input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 });
|
||||
input.memoryFlow?.emit({
|
||||
type: 'work_unit_started',
|
||||
unitKey: 'metabase-col-6',
|
||||
skills: ['sl_capture'],
|
||||
stepBudget: 40,
|
||||
});
|
||||
input.memoryFlow?.emit({
|
||||
type: 'work_unit_step',
|
||||
unitKey: 'metabase-col-6',
|
||||
stepIndex: 7,
|
||||
stepBudget: 40,
|
||||
});
|
||||
input.memoryFlow?.emit({
|
||||
type: 'stage_progress',
|
||||
stage: 'integration',
|
||||
percent: 81,
|
||||
message: 'Resolving text conflict for metabase-col-6',
|
||||
});
|
||||
input.memoryFlow?.emit({ type: 'work_unit_finished', unitKey: 'metabase-col-6', status: 'success' });
|
||||
input.memoryFlow?.update({
|
||||
plannedWorkUnits: [
|
||||
{
|
||||
unitKey: 'metabase-col-7',
|
||||
rawFiles: ['cards/48.json'],
|
||||
peerFileCount: 0,
|
||||
dependencyCount: 0,
|
||||
},
|
||||
],
|
||||
});
|
||||
input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 });
|
||||
input.memoryFlow?.emit({
|
||||
type: 'work_unit_started',
|
||||
unitKey: 'metabase-col-7',
|
||||
skills: ['sl_capture'],
|
||||
stepBudget: 40,
|
||||
});
|
||||
input.progress?.onMetabaseChildCompleted?.({
|
||||
metabaseConnectionId: 'prod-metabase',
|
||||
metabaseDatabaseId: 1,
|
||||
targetConnectionId: 'warehouse_a',
|
||||
jobId: 'metabase-child-1',
|
||||
status: 'done',
|
||||
});
|
||||
return {
|
||||
metabaseConnectionId: 'prod-metabase',
|
||||
status: 'all_succeeded',
|
||||
totals: { workUnits: 1, failedWorkUnits: 0 },
|
||||
children: [],
|
||||
};
|
||||
},
|
||||
},
|
||||
),
|
||||
).resolves.toBe(0);
|
||||
|
||||
expect(progressEvents).toEqual(
|
||||
expect.arrayContaining([
|
||||
{ percent: 45, message: 'Planned 1 task' },
|
||||
{ percent: 55, message: 'Processing 1/1 tasks: metabase-col-6' },
|
||||
{
|
||||
percent: 60,
|
||||
message: 'Processing tasks: 0/1 complete, 1 active; latest metabase-col-6 step 7/40',
|
||||
transient: true,
|
||||
},
|
||||
{ percent: 81, message: 'Resolving text conflict for metabase-col-6' },
|
||||
{ percent: 81, message: 'Processing 1/1 tasks: metabase-col-7' },
|
||||
]),
|
||||
);
|
||||
expect(io.stdout()).toContain('"status": "all_succeeded"');
|
||||
expect(io.stderr()).not.toContain('Metabase ingest: prod-metabase');
|
||||
});
|
||||
|
||||
it('runs Metabase scheduled ingest through the public CLI command path with real fan-out', async () => {
|
||||
const projectDir = join(tempDir, 'metabase-cli-project');
|
||||
await writeWarehouseConfig(projectDir);
|
||||
|
|
|
|||
|
|
@ -295,7 +295,11 @@ function formatDiffProgress(event: Extract<MemoryFlowEvent, { type: 'diff_comput
|
|||
}
|
||||
|
||||
function workUnitEventsThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): MemoryFlowEvent[] {
|
||||
return snapshot.events.slice(0, eventIndex + 1);
|
||||
const latestPlanIndex = snapshot.events
|
||||
.slice(0, eventIndex + 1)
|
||||
.findLastIndex((event) => event.type === 'chunks_planned');
|
||||
const startIndex = latestPlanIndex >= 0 ? latestPlanIndex + 1 : 0;
|
||||
return snapshot.events.slice(startIndex, eventIndex + 1);
|
||||
}
|
||||
|
||||
function completedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number {
|
||||
|
|
@ -319,7 +323,8 @@ function plannedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex
|
|||
if (snapshot.plannedWorkUnits.length > 0) {
|
||||
return snapshot.plannedWorkUnits.length;
|
||||
}
|
||||
const planEvent = workUnitEventsThrough(snapshot, eventIndex)
|
||||
const planEvent = snapshot.events
|
||||
.slice(0, eventIndex + 1)
|
||||
.filter((event) => event.type === 'chunks_planned')
|
||||
.at(-1);
|
||||
return planEvent?.workUnitCount ?? completedWorkUnitCountThrough(snapshot, eventIndex);
|
||||
|
|
@ -365,6 +370,12 @@ function plainIngestEventProgress(
|
|||
};
|
||||
case 'stage_skipped':
|
||||
return { percent: 45, message: `Skipped ${event.stage}: ${event.reason}` };
|
||||
case 'stage_progress':
|
||||
return {
|
||||
percent: event.percent,
|
||||
message: event.message,
|
||||
...(event.transient !== undefined ? { transient: event.transient } : {}),
|
||||
};
|
||||
case 'work_unit_started': {
|
||||
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
|
||||
const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey);
|
||||
|
|
@ -711,6 +722,25 @@ export async function runKtxIngest(
|
|||
}
|
||||
if (args.adapter === 'metabase') {
|
||||
const executeMetabaseFanout = deps.runLocalMetabaseIngest ?? runLocalMetabaseIngest;
|
||||
const runOutputMode = effectiveIngestOutputMode(args.outputMode, io, env, {
|
||||
requireInput: (args.inputMode ?? 'auto') === 'auto',
|
||||
});
|
||||
const plainProgress = shouldWritePlainIngestProgress(runOutputMode, io, env)
|
||||
? createPlainIngestProgressRenderer(args, io)
|
||||
: null;
|
||||
const structuredProgress = deps.progress
|
||||
? createPlainIngestProgressObserver(args, deps.progress)
|
||||
: null;
|
||||
const initialMemoryFlow =
|
||||
plainProgress || structuredProgress ? initialRunMemoryFlowInput(args, 'pending') : undefined;
|
||||
const memoryFlow = initialMemoryFlow
|
||||
? createMemoryFlowLiveBuffer(initialMemoryFlow, {
|
||||
onChange: (snapshot) => {
|
||||
plainProgress?.update(snapshot);
|
||||
structuredProgress?.update(snapshot);
|
||||
},
|
||||
})
|
||||
: undefined;
|
||||
const progress =
|
||||
args.outputMode === 'json' && !deps.progress
|
||||
? undefined
|
||||
|
|
@ -721,20 +751,29 @@ export async function runKtxIngest(
|
|||
: io,
|
||||
deps.progress,
|
||||
);
|
||||
const result = await executeMetabaseFanout({
|
||||
project: ingestProject,
|
||||
adapters: createAdapters(ingestProject, adapterOptions),
|
||||
metabaseConnectionId: args.connectionId,
|
||||
...localIngestOptions,
|
||||
queryExecutor,
|
||||
trigger: 'manual_resync',
|
||||
jobIdFactory: deps.jobIdFactory,
|
||||
...(progress ? { progress } : {}),
|
||||
});
|
||||
if (args.outputMode === 'json') {
|
||||
io.stdout.write(`${JSON.stringify(result, null, 2)}\n`);
|
||||
} else {
|
||||
writeMetabaseFanoutStatus(result, io);
|
||||
plainProgress?.start();
|
||||
structuredProgress?.start();
|
||||
let result: LocalMetabaseFanoutResult;
|
||||
try {
|
||||
result = await executeMetabaseFanout({
|
||||
project: ingestProject,
|
||||
adapters: createAdapters(ingestProject, adapterOptions),
|
||||
metabaseConnectionId: args.connectionId,
|
||||
...localIngestOptions,
|
||||
queryExecutor,
|
||||
trigger: 'manual_resync',
|
||||
jobIdFactory: deps.jobIdFactory,
|
||||
...(memoryFlow ? { memoryFlow } : {}),
|
||||
...(progress ? { progress } : {}),
|
||||
});
|
||||
plainProgress?.flush();
|
||||
if (args.outputMode === 'json') {
|
||||
io.stdout.write(`${JSON.stringify(result, null, 2)}\n`);
|
||||
} else {
|
||||
writeMetabaseFanoutStatus(result, io);
|
||||
}
|
||||
} finally {
|
||||
plainProgress?.flush();
|
||||
}
|
||||
return result.status === 'all_succeeded' ? 0 : 1;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue