fix(cli): clarify historic SQL ingest progress (#36)

This commit is contained in:
Andrey Avtomonov 2026-05-12 14:21:57 +02:00 committed by GitHub
parent e1129dd6a9
commit f422facf10
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 150 additions and 9 deletions

View file

@ -156,6 +156,19 @@ pnpm run test 2>&1 | tee /tmp/ktx-test-output.log
- Do not manually edit generated or built output under `dist/`; edit source and
rebuild.
### CLI Standards
- Use Commander for CLI command trees, arguments, options, help text, custom
parsers, and async action dispatch. Prefer `@commander-js/extra-typings` for
typed command definitions, use `InvalidArgumentError` for parse failures, and
call `parseAsync` when actions await asynchronous work.
- Use `@clack/prompts` for interactive flows. Always handle cancellation with
`isCancel` plus `cancel`, stop active spinners before exiting, and keep prompts
grouped or factored so multi-step setup flows share cancellation behavior.
- Keep command behavior scriptable: prefer flags and config over prompts when
values are supplied, and reserve prompts for interactive missing input or
explicit setup flows.
### Zod Naming Convention
```typescript

View file

@ -1075,10 +1075,105 @@ describe('runKtxIngest', () => {
const stdout = io.stdout();
expect(stdout).toContain('[45%] Planned 2 work units');
expect(stdout).toContain('[55%] Processing 1/2 work units: historic-sql-table-public-orders');
expect(stdout).toContain('[58%] Processing 1/2 work units: historic-sql-table-public-orders step 7/40');
expect(stdout).toContain(
'\r[58%] Processing work units: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K',
);
expect(stdout).toContain('[68%] Processed 1/2 work units');
});
it('renders concurrent WorkUnit step progress as transient aggregate status', async () => {
const projectDir = join(tempDir, 'historic-sql-concurrent-progress-project');
await mkdir(projectDir, { recursive: true });
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: historic-sql-concurrent-progress-project',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' historicSql:',
' enabled: true',
' dialect: postgres',
' minExecutions: 2',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
'utf-8',
);
const createdAdapters: SourceAdapter[] = [
{ source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) },
];
const workUnitKeys = [
'historic-sql-table-public-orders',
'historic-sql-table-public-customers',
'historic-sql-table-public-line-items',
'historic-sql-table-public-payments',
'historic-sql-table-public-products',
'historic-sql-table-public-suppliers',
];
const runLocal = vi.fn(async (input: RunLocalIngestOptions) => {
input.memoryFlow?.update({
plannedWorkUnits: workUnitKeys.map((unitKey) => ({
unitKey,
rawFiles: [`tables/${unitKey}.json`],
peerFileCount: 0,
dependencyCount: 0,
})),
});
input.memoryFlow?.emit({
type: 'chunks_planned',
chunkCount: workUnitKeys.length,
workUnitCount: workUnitKeys.length,
evictionCount: 0,
});
for (const unitKey of workUnitKeys) {
input.memoryFlow?.emit({
type: 'work_unit_started',
unitKey,
skills: ['historic_sql_table_digest'],
stepBudget: 40,
});
}
for (const unitKey of workUnitKeys) {
input.memoryFlow?.emit({ type: 'work_unit_step', unitKey, stepIndex: 1, stepBudget: 40 });
}
input.memoryFlow?.finish('done');
return completedLocalBundleRun(input, input.jobId ?? 'historic-concurrent-progress-job');
});
const io = makeIo({ isTTY: true });
await expect(
runKtxIngest(
{
command: 'run',
projectDir,
connectionId: 'warehouse',
adapter: 'historic-sql',
outputMode: 'plain',
},
io.io,
{
env: interactiveEnv(),
createAdapters: vi.fn(() => createdAdapters as never),
runLocalIngest: runLocal,
jobIdFactory: () => 'historic-concurrent-progress-job',
},
),
).resolves.toBe(0);
const stdout = io.stdout();
expect(stdout).toContain(
'\r[56%] Processing work units: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K',
);
expect(stdout).not.toContain(
'\n[56%] Processing 6/6 work units: historic-sql-table-public-suppliers step 1/40\n',
);
expect(stdout).toContain('\n[100%] Ingest completed\n');
});
it('passes local Looker pull-config options and agent runner into scheduled ingest for Looker scheduled ingest', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);

View file

@ -176,6 +176,19 @@ function completedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventInd
return workUnitEventsThrough(snapshot, eventIndex).filter((event) => event.type === 'work_unit_finished').length;
}
function activeWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number {
const active = new Set<string>();
for (const event of workUnitEventsThrough(snapshot, eventIndex)) {
if (event.type === 'work_unit_started') {
active.add(event.unitKey);
}
if (event.type === 'work_unit_finished') {
active.delete(event.unitKey);
}
}
return active.size;
}
function plannedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number {
if (snapshot.plannedWorkUnits.length > 0) {
return snapshot.plannedWorkUnits.length;
@ -199,7 +212,7 @@ function plainIngestEventProgress(
event: MemoryFlowEvent,
snapshot: MemoryFlowReplayInput,
eventIndex: number,
): { percent: number; message: string } | null {
): { percent: number; message: string; transient?: boolean } | null {
switch (event.type) {
case 'source_acquired':
return {
@ -229,13 +242,14 @@ function plainIngestEventProgress(
case 'work_unit_step': {
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
const completed = completedWorkUnitCountThrough(snapshot, eventIndex);
const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey);
const active = activeWorkUnitCountThrough(snapshot, eventIndex);
const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0;
const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55;
const progress = total > 0 ? `${ordinal}/${total} work units: ` : '';
const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`;
return {
percent,
message: `Processing ${progress}${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`,
message: `Processing work units: ${completed}/${total} complete, ${active} active; latest ${latest}`,
transient: true,
};
}
case 'work_unit_finished': {
@ -281,15 +295,31 @@ function shouldWritePlainIngestProgress(
function createPlainIngestProgressRenderer(
args: Extract<KtxIngestArgs, { command: 'run' }>,
io: KtxIngestIo,
): { start(): void; update(snapshot: MemoryFlowReplayInput): void } {
): { start(): void; update(snapshot: MemoryFlowReplayInput): void; flush(): void } {
let printedEvents = 0;
let lastPercent = 0;
let printedCompletion = false;
let hasPendingTransient = false;
const write = (percent: number, message: string) => {
const flush = () => {
if (!hasPendingTransient) {
return;
}
io.stdout.write('\n');
hasPendingTransient = false;
};
const write = (percent: number, message: string, options?: { transient?: boolean }) => {
const nextPercent = Math.max(lastPercent, Math.max(0, Math.min(100, percent)));
lastPercent = nextPercent;
io.stdout.write(`[${nextPercent}%] ${message}\n`);
const line = `[${nextPercent}%] ${message}`;
if (options?.transient === true) {
io.stdout.write(`\r${line}\u001b[K`);
hasPendingTransient = true;
return;
}
flush();
io.stdout.write(`${line}\n`);
};
return {
@ -305,7 +335,7 @@ function createPlainIngestProgressRenderer(
}
const progress = plainIngestEventProgress(event, snapshot, eventIndex);
if (progress) {
write(progress.percent, progress.message);
write(progress.percent, progress.message, progress.transient === true ? { transient: true } : undefined);
}
}
if (!printedCompletion && snapshot.status !== 'running') {
@ -313,6 +343,7 @@ function createPlainIngestProgressRenderer(
write(100, snapshot.status === 'done' ? 'Ingest completed' : 'Ingest failed');
}
},
flush,
};
}
@ -564,6 +595,7 @@ export async function runKtxIngest(
io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot));
return reportStatus(result.report) === 'done' ? 0 : 1;
}
plainProgress?.flush();
await writeReportRecord(result.report, runOutputMode, io, {
interactive: (args.inputMode ?? 'auto') === 'auto',
renderStoredMemoryFlow: deps.renderStoredMemoryFlow,
@ -571,6 +603,7 @@ export async function runKtxIngest(
});
return reportStatus(result.report) === 'done' ? 0 : 1;
} finally {
plainProgress?.flush();
liveTui?.close();
}
}