mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-25 08:48:08 +02:00
fix(cli): clarify historic SQL ingest progress
This commit is contained in:
parent
da108e556c
commit
e62ef65a64
3 changed files with 150 additions and 9 deletions
|
|
@ -176,6 +176,19 @@ function completedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventInd
|
|||
return workUnitEventsThrough(snapshot, eventIndex).filter((event) => event.type === 'work_unit_finished').length;
|
||||
}
|
||||
|
||||
function activeWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number {
|
||||
const active = new Set<string>();
|
||||
for (const event of workUnitEventsThrough(snapshot, eventIndex)) {
|
||||
if (event.type === 'work_unit_started') {
|
||||
active.add(event.unitKey);
|
||||
}
|
||||
if (event.type === 'work_unit_finished') {
|
||||
active.delete(event.unitKey);
|
||||
}
|
||||
}
|
||||
return active.size;
|
||||
}
|
||||
|
||||
function plannedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number {
|
||||
if (snapshot.plannedWorkUnits.length > 0) {
|
||||
return snapshot.plannedWorkUnits.length;
|
||||
|
|
@ -199,7 +212,7 @@ function plainIngestEventProgress(
|
|||
event: MemoryFlowEvent,
|
||||
snapshot: MemoryFlowReplayInput,
|
||||
eventIndex: number,
|
||||
): { percent: number; message: string } | null {
|
||||
): { percent: number; message: string; transient?: boolean } | null {
|
||||
switch (event.type) {
|
||||
case 'source_acquired':
|
||||
return {
|
||||
|
|
@ -229,13 +242,14 @@ function plainIngestEventProgress(
|
|||
case 'work_unit_step': {
|
||||
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
|
||||
const completed = completedWorkUnitCountThrough(snapshot, eventIndex);
|
||||
const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey);
|
||||
const active = activeWorkUnitCountThrough(snapshot, eventIndex);
|
||||
const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0;
|
||||
const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55;
|
||||
const progress = total > 0 ? `${ordinal}/${total} work units: ` : '';
|
||||
const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`;
|
||||
return {
|
||||
percent,
|
||||
message: `Processing ${progress}${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`,
|
||||
message: `Processing work units: ${completed}/${total} complete, ${active} active; latest ${latest}`,
|
||||
transient: true,
|
||||
};
|
||||
}
|
||||
case 'work_unit_finished': {
|
||||
|
|
@ -281,15 +295,31 @@ function shouldWritePlainIngestProgress(
|
|||
function createPlainIngestProgressRenderer(
|
||||
args: Extract<KtxIngestArgs, { command: 'run' }>,
|
||||
io: KtxIngestIo,
|
||||
): { start(): void; update(snapshot: MemoryFlowReplayInput): void } {
|
||||
): { start(): void; update(snapshot: MemoryFlowReplayInput): void; flush(): void } {
|
||||
let printedEvents = 0;
|
||||
let lastPercent = 0;
|
||||
let printedCompletion = false;
|
||||
let hasPendingTransient = false;
|
||||
|
||||
const write = (percent: number, message: string) => {
|
||||
const flush = () => {
|
||||
if (!hasPendingTransient) {
|
||||
return;
|
||||
}
|
||||
io.stdout.write('\n');
|
||||
hasPendingTransient = false;
|
||||
};
|
||||
|
||||
const write = (percent: number, message: string, options?: { transient?: boolean }) => {
|
||||
const nextPercent = Math.max(lastPercent, Math.max(0, Math.min(100, percent)));
|
||||
lastPercent = nextPercent;
|
||||
io.stdout.write(`[${nextPercent}%] ${message}\n`);
|
||||
const line = `[${nextPercent}%] ${message}`;
|
||||
if (options?.transient === true) {
|
||||
io.stdout.write(`\r${line}\u001b[K`);
|
||||
hasPendingTransient = true;
|
||||
return;
|
||||
}
|
||||
flush();
|
||||
io.stdout.write(`${line}\n`);
|
||||
};
|
||||
|
||||
return {
|
||||
|
|
@ -305,7 +335,7 @@ function createPlainIngestProgressRenderer(
|
|||
}
|
||||
const progress = plainIngestEventProgress(event, snapshot, eventIndex);
|
||||
if (progress) {
|
||||
write(progress.percent, progress.message);
|
||||
write(progress.percent, progress.message, progress.transient === true ? { transient: true } : undefined);
|
||||
}
|
||||
}
|
||||
if (!printedCompletion && snapshot.status !== 'running') {
|
||||
|
|
@ -313,6 +343,7 @@ function createPlainIngestProgressRenderer(
|
|||
write(100, snapshot.status === 'done' ? 'Ingest completed' : 'Ingest failed');
|
||||
}
|
||||
},
|
||||
flush,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -564,6 +595,7 @@ export async function runKtxIngest(
|
|||
io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot));
|
||||
return reportStatus(result.report) === 'done' ? 0 : 1;
|
||||
}
|
||||
plainProgress?.flush();
|
||||
await writeReportRecord(result.report, runOutputMode, io, {
|
||||
interactive: (args.inputMode ?? 'auto') === 'auto',
|
||||
renderStoredMemoryFlow: deps.renderStoredMemoryFlow,
|
||||
|
|
@ -571,6 +603,7 @@ export async function runKtxIngest(
|
|||
});
|
||||
return reportStatus(result.report) === 'done' ? 0 : 1;
|
||||
} finally {
|
||||
plainProgress?.flush();
|
||||
liveTui?.close();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue