feat(cli): improve setup progress UX (#69)

This commit is contained in:
Andrey Avtomonov 2026-05-13 17:01:48 +02:00 committed by GitHub
parent d7147f9ca1
commit 754e4a9039
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 1125 additions and 346 deletions

View file

@ -67,7 +67,13 @@ interface KtxIngestIo {
stderr: { write(chunk: string): void };
}
interface KtxIngestDeps {
export interface KtxIngestProgressUpdate {
percent: number;
message: string;
transient?: boolean;
}
export interface KtxIngestDeps {
jobIdFactory?: () => string;
now?: () => Date;
createAdapters?: typeof createKtxCliLocalIngestAdapters;
@ -88,6 +94,7 @@ interface KtxIngestDeps {
| 'logger'
| 'pullConfigOptions'
>;
progress?: (update: KtxIngestProgressUpdate) => void;
}
function reportStatus(report: IngestReportSnapshot): 'done' | 'error' {
@ -145,12 +152,18 @@ function pluralize(count: number, singular: string, plural = `${singular}s`): st
function createMetabaseFanoutProgress(
connectionId: string,
io: KtxIngestIo,
onProgress?: (update: KtxIngestProgressUpdate) => void,
): LocalMetabaseFanoutProgress {
io.stderr.write(`Metabase ingest: ${connectionId}\n`);
io.stderr.write('Checking mappings and scheduled-pull targets...\n');
onProgress?.({ percent: 5, message: `Checking Metabase mappings for ${connectionId}` });
return {
onMetabaseFanoutPlanned(event) {
io.stderr.write(`Targets: ${pluralize(event.children.length, 'mapped database')}\n`);
onProgress?.({
percent: 10,
message: `Metabase ${event.metabaseConnectionId}: ${pluralize(event.children.length, 'mapped database')}`,
});
for (const child of event.children) {
io.stderr.write(`- database=${child.metabaseDatabaseId} target=${child.targetConnectionId} status=queued\n`);
}
@ -159,11 +172,19 @@ function createMetabaseFanoutProgress(
io.stderr.write(
`- database=${event.metabaseDatabaseId} target=${event.targetConnectionId} status=running job=${event.jobId}\n`,
);
onProgress?.({
percent: 25,
message: `Metabase database ${event.metabaseDatabaseId} -> ${event.targetConnectionId} running`,
});
},
onMetabaseChildCompleted(event) {
io.stderr.write(
`- database=${event.metabaseDatabaseId} target=${event.targetConnectionId} status=${event.status} job=${event.jobId}\n`,
);
onProgress?.({
percent: 90,
message: `Metabase database ${event.metabaseDatabaseId} -> ${event.targetConnectionId} ${event.status}`,
});
},
};
}
@ -231,6 +252,12 @@ function plainIngestEventProgress(
case 'diff_computed':
return { percent: 35, message: `Computed source diff ${formatDiffProgress(event)}` };
case 'chunks_planned':
if (event.workUnitCount === 0) {
return {
percent: 80,
message: 'No work units to process; finalizing ingest',
};
}
return {
percent: 45,
message: `Planned ${pluralize(event.workUnitCount, 'work unit')}`,
@ -296,34 +323,22 @@ function shouldWritePlainIngestProgress(
return outputMode === 'plain' && io.stdout.isTTY === true && env.CI !== 'true';
}
function createPlainIngestProgressRenderer(
function createPlainIngestProgressObserver(
args: Extract<KtxIngestArgs, { command: 'run' }>,
io: KtxIngestIo,
): { start(): void; update(snapshot: MemoryFlowReplayInput): void; flush(): void } {
onProgress: (update: KtxIngestProgressUpdate) => void,
): { start(): void; update(snapshot: MemoryFlowReplayInput): void } {
let printedEvents = 0;
let lastPercent = 0;
let printedCompletion = false;
let hasPendingTransient = false;
const flush = () => {
if (!hasPendingTransient) {
return;
}
io.stderr.write('\n');
hasPendingTransient = false;
};
const write = (percent: number, message: string, options?: { transient?: boolean }) => {
const nextPercent = Math.max(lastPercent, Math.max(0, Math.min(100, percent)));
lastPercent = nextPercent;
const line = `[${nextPercent}%] ${message}`;
if (options?.transient === true) {
io.stderr.write(`\r${line}\u001b[K`);
hasPendingTransient = true;
return;
}
flush();
io.stderr.write(`${line}\n`);
onProgress({
percent: nextPercent,
message,
...(options?.transient !== undefined ? { transient: options.transient } : {}),
});
};
return {
@ -347,6 +362,41 @@ function createPlainIngestProgressRenderer(
write(100, snapshot.status === 'done' ? 'Ingest completed' : 'Ingest failed');
}
},
};
}
function createPlainIngestProgressRenderer(
args: Extract<KtxIngestArgs, { command: 'run' }>,
io: KtxIngestIo,
): { start(): void; update(snapshot: MemoryFlowReplayInput): void; flush(): void } {
let hasPendingTransient = false;
const flush = () => {
if (!hasPendingTransient) {
return;
}
io.stderr.write('\n');
hasPendingTransient = false;
};
const observer = createPlainIngestProgressObserver(args, (update) => {
const line = `[${update.percent}%] ${update.message}`;
if (update.transient === true) {
io.stderr.write(`\r${line}\u001b[K`);
hasPendingTransient = true;
return;
}
flush();
io.stderr.write(`${line}\n`);
});
return {
start() {
observer.start();
},
update(snapshot) {
observer.update(snapshot);
},
flush,
};
}
@ -544,7 +594,15 @@ export async function runKtxIngest(
if (args.adapter === 'metabase') {
const executeMetabaseFanout = deps.runLocalMetabaseIngest ?? runLocalMetabaseIngest;
const progress =
args.outputMode === 'json' ? undefined : createMetabaseFanoutProgress(args.connectionId, io);
args.outputMode === 'json' && !deps.progress
? undefined
: createMetabaseFanoutProgress(
args.connectionId,
args.outputMode === 'json'
? { ...io, stderr: { write: () => undefined } }
: io,
deps.progress,
);
const result = await executeMetabaseFanout({
project,
adapters: createAdapters(project, adapterOptions),
@ -573,8 +631,13 @@ export async function runKtxIngest(
const plainProgress = shouldWritePlainIngestProgress(runOutputMode, io, env)
? createPlainIngestProgressRenderer(args, io)
: null;
const structuredProgress = deps.progress
? createPlainIngestProgressObserver(args, deps.progress)
: null;
const initialMemoryFlow =
shouldUseLiveViz || plainProgress ? initialRunMemoryFlowInput(args, jobId ?? 'pending') : undefined;
shouldUseLiveViz || plainProgress || structuredProgress
? initialRunMemoryFlowInput(args, jobId ?? 'pending')
: undefined;
let latestMemoryFlowSnapshot: MemoryFlowReplayInput | null = initialMemoryFlow ?? null;
if (shouldUseLiveViz && initialMemoryFlow && isTuiCapableIo(io)) {
@ -595,11 +658,13 @@ export async function runKtxIngest(
return;
}
plainProgress?.update(snapshot);
structuredProgress?.update(snapshot);
},
})
: undefined;
plainProgress?.start();
structuredProgress?.start();
try {
const result = await executeLocalIngest({