diff --git a/AGENTS.md b/AGENTS.md index e8062dcb..2e5a684a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -156,6 +156,19 @@ pnpm run test 2>&1 | tee /tmp/ktx-test-output.log - Do not manually edit generated or built output under `dist/`; edit source and rebuild. +### CLI Standards + +- Use Commander for CLI command trees, arguments, options, help text, custom + parsers, and async action dispatch. Prefer `@commander-js/extra-typings` for + typed command definitions, use `InvalidArgumentError` for parse failures, and + call `parseAsync` when actions await asynchronous work. +- Use `@clack/prompts` for interactive flows. Always handle cancellation with + `isCancel` plus `cancel`, stop active spinners before exiting, and keep prompts + grouped or factored so multi-step setup flows share cancellation behavior. +- Keep command behavior scriptable: prefer flags and config over prompts when + values are supplied, and reserve prompts for interactive missing input or + explicit setup flows. + ### Zod Naming Convention ```typescript diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index 59df5e86..0307ca9e 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -1075,10 +1075,105 @@ describe('runKtxIngest', () => { const stdout = io.stdout(); expect(stdout).toContain('[45%] Planned 2 work units'); expect(stdout).toContain('[55%] Processing 1/2 work units: historic-sql-table-public-orders'); - expect(stdout).toContain('[58%] Processing 1/2 work units: historic-sql-table-public-orders step 7/40'); + expect(stdout).toContain( + '\r[58%] Processing work units: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K', + ); expect(stdout).toContain('[68%] Processed 1/2 work units'); }); + it('renders concurrent WorkUnit step progress as transient aggregate status', async () => { + const projectDir = join(tempDir, 'historic-sql-concurrent-progress-project'); + await mkdir(projectDir, { recursive: true }); + await writeFile( + join(projectDir, 'ktx.yaml'), + [ + 'project: historic-sql-concurrent-progress-project', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:WAREHOUSE_DATABASE_URL', + ' historicSql:', + ' enabled: true', + ' dialect: postgres', + ' minExecutions: 2', + 'ingest:', + ' adapters:', + ' - historic-sql', + '', + ].join('\n'), + 'utf-8', + ); + const createdAdapters: SourceAdapter[] = [ + { source: 'historic-sql', skillNames: [], detect: async () => true, chunk: async () => ({ workUnits: [] }) }, + ]; + const workUnitKeys = [ + 'historic-sql-table-public-orders', + 'historic-sql-table-public-customers', + 'historic-sql-table-public-line-items', + 'historic-sql-table-public-payments', + 'historic-sql-table-public-products', + 'historic-sql-table-public-suppliers', + ]; + const runLocal = vi.fn(async (input: RunLocalIngestOptions) => { + input.memoryFlow?.update({ + plannedWorkUnits: workUnitKeys.map((unitKey) => ({ + unitKey, + rawFiles: [`tables/${unitKey}.json`], + peerFileCount: 0, + dependencyCount: 0, + })), + }); + input.memoryFlow?.emit({ + type: 'chunks_planned', + chunkCount: workUnitKeys.length, + workUnitCount: workUnitKeys.length, + evictionCount: 0, + }); + for (const unitKey of workUnitKeys) { + input.memoryFlow?.emit({ + type: 'work_unit_started', + unitKey, + skills: ['historic_sql_table_digest'], + stepBudget: 40, + }); + } + for (const unitKey of workUnitKeys) { + input.memoryFlow?.emit({ type: 'work_unit_step', unitKey, stepIndex: 1, stepBudget: 40 }); + } + input.memoryFlow?.finish('done'); + return completedLocalBundleRun(input, input.jobId ?? 'historic-concurrent-progress-job'); + }); + const io = makeIo({ isTTY: true }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'warehouse', + adapter: 'historic-sql', + outputMode: 'plain', + }, + io.io, + { + env: interactiveEnv(), + createAdapters: vi.fn(() => createdAdapters as never), + runLocalIngest: runLocal, + jobIdFactory: () => 'historic-concurrent-progress-job', + }, + ), + ).resolves.toBe(0); + + const stdout = io.stdout(); + expect(stdout).toContain( + '\r[56%] Processing work units: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K', + ); + expect(stdout).not.toContain( + '\n[56%] Processing 6/6 work units: historic-sql-table-public-suppliers step 1/40\n', + ); + expect(stdout).toContain('\n[100%] Ingest completed\n'); + }); + it('passes local Looker pull-config options and agent runner into scheduled ingest for Looker scheduled ingest', async () => { const projectDir = join(tempDir, 'project'); await writeWarehouseConfig(projectDir); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index 39bf21bb..5eadce29 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -176,6 +176,19 @@ function completedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventInd return workUnitEventsThrough(snapshot, eventIndex).filter((event) => event.type === 'work_unit_finished').length; } +function activeWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number { + const active = new Set(); + for (const event of workUnitEventsThrough(snapshot, eventIndex)) { + if (event.type === 'work_unit_started') { + active.add(event.unitKey); + } + if (event.type === 'work_unit_finished') { + active.delete(event.unitKey); + } + } + return active.size; +} + function plannedWorkUnitCountThrough(snapshot: MemoryFlowReplayInput, eventIndex: number): number { if (snapshot.plannedWorkUnits.length > 0) { return snapshot.plannedWorkUnits.length; @@ -199,7 +212,7 @@ function plainIngestEventProgress( event: MemoryFlowEvent, snapshot: MemoryFlowReplayInput, eventIndex: number, -): { percent: number; message: string } | null { +): { percent: number; message: string; transient?: boolean } | null { switch (event.type) { case 'source_acquired': return { @@ -229,13 +242,14 @@ function plainIngestEventProgress( case 'work_unit_step': { const total = plannedWorkUnitCountThrough(snapshot, eventIndex); const completed = completedWorkUnitCountThrough(snapshot, eventIndex); - const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey); + const active = activeWorkUnitCountThrough(snapshot, eventIndex); const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0; const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55; - const progress = total > 0 ? `${ordinal}/${total} work units: ` : ''; + const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`; return { percent, - message: `Processing ${progress}${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`, + message: `Processing work units: ${completed}/${total} complete, ${active} active; latest ${latest}`, + transient: true, }; } case 'work_unit_finished': { @@ -281,15 +295,31 @@ function shouldWritePlainIngestProgress( function createPlainIngestProgressRenderer( args: Extract, io: KtxIngestIo, -): { start(): void; update(snapshot: MemoryFlowReplayInput): void } { +): { start(): void; update(snapshot: MemoryFlowReplayInput): void; flush(): void } { let printedEvents = 0; let lastPercent = 0; let printedCompletion = false; + let hasPendingTransient = false; - const write = (percent: number, message: string) => { + const flush = () => { + if (!hasPendingTransient) { + return; + } + io.stdout.write('\n'); + hasPendingTransient = false; + }; + + const write = (percent: number, message: string, options?: { transient?: boolean }) => { const nextPercent = Math.max(lastPercent, Math.max(0, Math.min(100, percent))); lastPercent = nextPercent; - io.stdout.write(`[${nextPercent}%] ${message}\n`); + const line = `[${nextPercent}%] ${message}`; + if (options?.transient === true) { + io.stdout.write(`\r${line}\u001b[K`); + hasPendingTransient = true; + return; + } + flush(); + io.stdout.write(`${line}\n`); }; return { @@ -305,7 +335,7 @@ function createPlainIngestProgressRenderer( } const progress = plainIngestEventProgress(event, snapshot, eventIndex); if (progress) { - write(progress.percent, progress.message); + write(progress.percent, progress.message, progress.transient === true ? { transient: true } : undefined); } } if (!printedCompletion && snapshot.status !== 'running') { @@ -313,6 +343,7 @@ function createPlainIngestProgressRenderer( write(100, snapshot.status === 'done' ? 'Ingest completed' : 'Ingest failed'); } }, + flush, }; } @@ -564,6 +595,7 @@ export async function runKtxIngest( io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot)); return reportStatus(result.report) === 'done' ? 0 : 1; } + plainProgress?.flush(); await writeReportRecord(result.report, runOutputMode, io, { interactive: (args.inputMode ?? 'auto') === 'auto', renderStoredMemoryFlow: deps.renderStoredMemoryFlow, @@ -571,6 +603,7 @@ export async function runKtxIngest( }); return reportStatus(result.report) === 'done' ? 0 : 1; } finally { + plainProgress?.flush(); liveTui?.close(); } }