diff --git a/packages/cli/src/context-build-view.test.ts b/packages/cli/src/context-build-view.test.ts index a84264fc..03b7b940 100644 --- a/packages/cli/src/context-build-view.test.ts +++ b/packages/cli/src/context-build-view.test.ts @@ -2,6 +2,7 @@ import { buildDefaultKtxProjectConfig, type KtxProjectConfig } from '@ktx/contex import { describe, expect, it, vi } from 'vitest'; import type { KtxPublicIngestProject, KtxPublicIngestTargetResult } from './public-ingest.js'; import { + type ContextBuildTargetState, extractProgressMessage, createRepainter, initViewState, @@ -112,15 +113,19 @@ describe('parseScanSummary', () => { }); describe('parseIngestSummary', () => { - it('extracts work units and saved memory', () => { - expect(parseIngestSummary('Work units: 5\nSaved memory: 3 wiki, 2 SL')).toBe('3 wiki, 2 SL'); + it('extracts task count and saved memory', () => { + expect(parseIngestSummary('Tasks: 5\nSaved memory: 3 wiki, 2 SL')).toBe('3 wiki, 2 SL'); }); - it('extracts work units alone when no saved memory', () => { - expect(parseIngestSummary('Work units: 5\nStatus: done')).toBe('5 work units'); + it('extracts task count alone when no saved memory', () => { + expect(parseIngestSummary('Tasks: 5\nStatus: done')).toBe('5 tasks'); }); - it('extracts saved memory alone when no work units', () => { + it('still parses the legacy "Work units:" wording for backward compat', () => { + expect(parseIngestSummary('Work units: 7\nStatus: done')).toBe('7 tasks'); + }); + + it('extracts saved memory alone when no task count', () => { expect(parseIngestSummary('Saved memory: 3 wiki, 2 SL')).toBe('3 wiki, 2 SL'); }); @@ -297,11 +302,11 @@ describe('renderContextBuildView', () => { state.contextSources[0].startedAt = 1_000; state.contextSources[0].elapsedMs = 113_000; state.contextSources[0].progressUpdatedAtMs = 46_000; - state.contextSources[0].detailLine = '[45%] No work units to process; finalizing ingest'; + state.contextSources[0].detailLine = '[45%] No tasks to process; finalizing ingest'; const output = renderContextBuildView(state, { styled: false }); - expect(output).toContain('No work units to process; finalizing ingest'); + expect(output).toContain('No tasks to process; finalizing ingest'); expect(output).toContain('last update 1m08s ago'); expect(output).toContain('(1m53s)'); }); @@ -314,7 +319,7 @@ describe('renderContextBuildView', () => { state.contextSources[0].startedAt = 1_000; state.contextSources[0].elapsedMs = 40_000; state.contextSources[0].progressUpdatedAtMs = 25_000; - state.contextSources[0].detailLine = '[45%] Planning work units'; + state.contextSources[0].detailLine = '[45%] Planning tasks'; const output = renderContextBuildView(state, { styled: false }); @@ -414,6 +419,142 @@ describe('renderContextBuildView', () => { }); }); +describe('renderContextBuildView phase rows', () => { + function dbTarget(connectionId: string, queryHistoryEnabled = false) { + return { + connectionId, + driver: 'postgres', + operation: 'database-ingest' as const, + debugCommand: '', + steps: queryHistoryEnabled + ? (['database-schema', 'query-history'] as ('database-schema' | 'query-history')[]) + : (['database-schema'] as ('database-schema' | 'query-history')[]), + ...(queryHistoryEnabled ? { queryHistory: { enabled: true, dialect: 'postgres' as const } } : {}), + }; + } + + function sourceTarget(connectionId: string) { + return { + connectionId, + driver: 'dbt', + operation: 'source-ingest' as const, + adapter: 'dbt', + debugCommand: '', + steps: ['source-ingest', 'memory-update'] as ('source-ingest' | 'memory-update')[], + }; + } + + function setPhase( + state: ReturnType, + connectionId: string, + phaseKey: 'database-schema' | 'query-history' | 'source-ingest', + patch: Partial, + ): void { + const target = [...state.primarySources, ...state.contextSources].find((t) => t.target.connectionId === connectionId); + const phase = target?.phases.find((p) => p.key === phaseKey); + if (!phase) throw new Error(`No phase ${phaseKey} on ${connectionId}`); + Object.assign(phase, patch); + } + + it('renders two phase rows for a database-ingest target with query history', () => { + const state = initViewState([dbTarget('warehouse', true)]); + state.primarySources[0].status = 'running'; + setPhase(state, 'warehouse', 'database-schema', { + status: 'done', + percent: 100, + summary: '172 tables', + elapsedMs: 52_000, + }); + setPhase(state, 'warehouse', 'query-history', { + status: 'running', + percent: 7, + detail: '12/172 · arr-movements', + elapsedMs: 36_000, + }); + + const output = renderContextBuildView(state, { styled: false }); + expect(output).toContain('Schema'); + expect(output).toContain('100%'); + expect(output).toContain('172 tables'); + expect(output).toContain('(52s)'); + expect(output).toContain('Query history'); + expect(output).toContain('7%'); + expect(output).toContain('12/172 · arr-movements'); + expect(output).toContain('(36s)'); + }); + + it('renders a single Schema phase row when query history is disabled', () => { + const state = initViewState([dbTarget('warehouse', false)]); + state.primarySources[0].status = 'running'; + setPhase(state, 'warehouse', 'database-schema', { + status: 'running', + percent: 42, + detail: 'Profiling 73/172 tables', + }); + + const output = renderContextBuildView(state, { styled: false }); + expect(output).toContain('Schema'); + expect(output).toContain('42%'); + expect(output).toContain('Profiling 73/172 tables'); + expect(output).not.toContain('Query history'); + }); + + it('renders Source ingest phase row for a source-ingest target', () => { + const state = initViewState([sourceTarget('dbt-main')]); + state.contextSources[0].status = 'running'; + setPhase(state, 'dbt-main', 'source-ingest', { + status: 'running', + percent: 25, + detail: 'Reading models', + }); + + const output = renderContextBuildView(state, { styled: false }); + expect(output).toContain('Source ingest'); + expect(output).toContain('25%'); + expect(output).toContain('Reading models'); + expect(output).not.toContain('Schema '); + }); + + it('renders skipped Query history when schema phase fails', () => { + const state = initViewState([dbTarget('warehouse', true)]); + state.primarySources[0].status = 'running'; + setPhase(state, 'warehouse', 'database-schema', { status: 'failed', percent: 30 }); + setPhase(state, 'warehouse', 'query-history', { status: 'skipped' }); + + const output = renderContextBuildView(state, { styled: false }); + expect(output).toContain('Schema'); + expect(output).toContain('failed'); + expect(output).toContain('Query history'); + expect(output).toContain('skipped'); + }); + + it('renders queued Query history with an em-dash and empty bar', () => { + const state = initViewState([dbTarget('warehouse', true)]); + state.primarySources[0].status = 'running'; + setPhase(state, 'warehouse', 'database-schema', { + status: 'running', + percent: 12, + detail: 'Introspecting', + }); + + const output = renderContextBuildView(state, { styled: false }); + expect(output).toContain('Query history'); + expect(output).toContain('queued'); + expect(output).toContain('—'); + }); + + it('falls back to single-line legacy detail when no phase has started yet', () => { + const state = initViewState([dbTarget('warehouse', false)]); + state.primarySources[0].status = 'running'; + state.primarySources[0].detailLine = '[5%] Preparing database ingest'; + + const output = renderContextBuildView(state, { styled: false }); + expect(output).toContain('Preparing database ingest'); + expect(output).toContain('5%'); + expect(output).not.toContain('○ Schema'); + }); +}); + describe('createRepainter', () => { it('moves up visual rows, not just newline count, when content wraps', () => { const io = makeIo({ isTTY: true, columns: 5 }); @@ -953,7 +1094,7 @@ describe('runContextBuild', () => { } targetIo.stdout.write('Report: report-dbt-failed\n'); - targetIo.stdout.write('Work units: 3\n'); + targetIo.stdout.write('Tasks: 3\n'); return failedResult(target.connectionId, target.driver, target.operation); }); diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index 1267bafa..05f73c40 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -19,6 +19,21 @@ profileMark('module:context-build-view'); const SPINNER_FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] as const; const ESC = String.fromCharCode(0x1b); +type PhaseKey = 'database-schema' | 'query-history' | 'source-ingest'; +type PhaseStatus = 'queued' | 'running' | 'done' | 'failed' | 'skipped'; + +interface PhaseState { + key: PhaseKey; + name: string; + status: PhaseStatus; + percent: number; + detail: string | null; + summary: string | null; + startedAt: number | null; + elapsedMs: number; + progressUpdatedAtMs: number | null; +} + export interface ContextBuildTargetState { target: KtxPublicIngestPlanTarget; status: 'queued' | 'running' | 'done' | 'failed'; @@ -28,6 +43,35 @@ export interface ContextBuildTargetState { startedAt: number | null; elapsedMs: number; progressUpdatedAtMs: number | null; + phases: PhaseState[]; +} + +const PHASE_LABELS: Record = { + 'database-schema': 'Schema', + 'query-history': 'Query history', + 'source-ingest': 'Source ingest', +}; + +function makePhasesForTarget(target: KtxPublicIngestPlanTarget): PhaseState[] { + const make = (key: PhaseKey): PhaseState => ({ + key, + name: PHASE_LABELS[key], + status: 'queued', + percent: 0, + detail: null, + summary: null, + startedAt: null, + elapsedMs: 0, + progressUpdatedAtMs: null, + }); + if (target.operation === 'database-ingest') { + const phases: PhaseState[] = [make('database-schema')]; + if (target.queryHistory?.enabled === true) { + phases.push(make('query-history')); + } + return phases; + } + return [make('source-ingest')]; } export interface ContextBuildViewState { @@ -121,6 +165,34 @@ function statusIcon(status: ContextBuildTargetState['status'], frame: number, st } } +function phaseStatusIcon(status: PhaseStatus, frame: number, styled: boolean): string { + const raw = (() => { + switch (status) { + case 'done': + return '✓'; + case 'failed': + return '✗'; + case 'running': + return SPINNER_FRAMES[frame % SPINNER_FRAMES.length] ?? '⠋'; + case 'skipped': + return '·'; + default: + return '○'; + } + })(); + if (!styled) return raw; + switch (status) { + case 'done': + return green(raw); + case 'failed': + return red(raw); + case 'running': + return cyan(raw); + default: + return dim(raw); + } +} + function extractPercent(detailLine: string | null): number | null { if (!detailLine) return null; const match = detailLine.match(/^\[(\d+)%\]/); @@ -182,13 +254,70 @@ function targetDetail(target: ContextBuildTargetState, styled: boolean): string return styled ? dim('queued') : 'queued'; } +const PHASE_NAME_WIDTH = 14; + +function renderRunningTargetHeaderDetail(target: ContextBuildTargetState, styled: boolean): string { + const elapsed = target.elapsedMs > 0 ? `(${formatDuration(target.elapsedMs)})` : ''; + if (!elapsed) return ''; + return styled ? dim(elapsed) : elapsed; +} + +function renderPhaseRow(phase: PhaseState, frame: number, styled: boolean): string { + const icon = phaseStatusIcon(phase.status, frame, styled); + const name = phase.name.padEnd(PHASE_NAME_WIDTH); + const segments: string[] = []; + if (phase.status === 'queued' || phase.status === 'skipped') { + const emptyBar = BAR_EMPTY.repeat(BAR_WIDTH); + segments.push(styled ? dim(emptyBar) : emptyBar); + segments.push(styled ? dim(' —') : ' —'); + } else { + const pct = Math.max(0, Math.min(100, Math.round(phase.percent))); + segments.push(renderProgressBar(pct, styled)); + segments.push(`${String(pct).padStart(3)}%`); + } + let trailing = ''; + if (phase.status === 'done') { + const parts: string[] = []; + if (phase.summary) parts.push(phase.summary); + if (phase.elapsedMs > 0) { + const elapsed = `(${formatDuration(phase.elapsedMs)})`; + parts.push(styled ? dim(elapsed) : elapsed); + } + trailing = parts.join(' '); + } else if (phase.status === 'running') { + const parts: string[] = []; + if (phase.detail) parts.push(phase.detail); + if (phase.elapsedMs > 0) { + const elapsed = `(${formatDuration(phase.elapsedMs)})`; + parts.push(styled ? dim(elapsed) : elapsed); + } + trailing = parts.join(' '); + } else if (phase.status === 'queued') { + trailing = styled ? dim('queued') : 'queued'; + } else if (phase.status === 'skipped') { + trailing = styled ? dim('skipped') : 'skipped'; + } else if (phase.status === 'failed') { + trailing = styled ? red('failed') : 'failed'; + } + const bar = `${segments.join(' ')} ${trailing}`.trimEnd(); + return ` ${icon} ${name} ${bar}`; +} + function columnWidth(state: ContextBuildViewState): number { const all = [...state.primarySources, ...state.contextSources]; return Math.max(12, ...all.map((t) => t.target.connectionId.length)) + 2; } -function renderTargetLine(target: ContextBuildTargetState, frame: number, styled: boolean, width: number): string { - return ` ${statusIcon(target.status, frame, styled)} ${target.target.connectionId.padEnd(width)} ${targetDetail(target, styled)}`; +function renderTargetRows(target: ContextBuildTargetState, frame: number, styled: boolean, width: number): string[] { + const icon = statusIcon(target.status, frame, styled); + const name = target.target.connectionId.padEnd(width); + const anyPhaseStarted = target.phases.some((p) => p.status !== 'queued'); + if (target.status === 'running' && target.phases.length > 0 && anyPhaseStarted) { + const headerDetail = renderRunningTargetHeaderDetail(target, styled); + const headerLine = ` ${icon} ${name} ${headerDetail}`.trimEnd(); + return [headerLine, ...target.phases.map((phase) => renderPhaseRow(phase, frame, styled))]; + } + return [` ${icon} ${name} ${targetDetail(target, styled)}`]; } function renderTargetGroup( @@ -199,7 +328,7 @@ function renderTargetGroup( width: number, ): string[] { if (targets.length === 0) return []; - return ['', ` ${label}:`, ...targets.map((t) => renderTargetLine(t, frame, styled, width))]; + return ['', ` ${label}:`, ...targets.flatMap((t) => renderTargetRows(t, frame, styled, width))]; } function renderMessageGroup(label: string, messages: string[], styled: boolean): string[] { @@ -306,8 +435,8 @@ export function parseScanSummary(output: string): string | null { export function parseIngestSummary(output: string): string | null { const savedMemory = output.match(/Saved memory: (.+)/); if (savedMemory) return savedMemory[1]; - const workUnits = output.match(/Work units: (\d+)/); - if (workUnits) return `${workUnits[1]} work units`; + const tasks = output.match(/(?:Tasks|Work units): (\d+)/); + if (tasks) return `${tasks[1]} tasks`; return null; } @@ -422,6 +551,7 @@ export function viewStateFromSourceProgress( startedAt: s.startedAtMs ?? null, elapsedMs: s.status === 'running' && s.startedAtMs ? now - s.startedAtMs : (s.elapsedMs ?? 0), progressUpdatedAtMs: s.updatedAtMs ?? null, + phases: [], }); return { @@ -492,6 +622,7 @@ function makeTargetState(target: KtxPublicIngestPlanTarget): ContextBuildTargetS startedAt: null, elapsedMs: 0, progressUpdatedAtMs: null, + phases: makePhasesForTarget(target), }; } @@ -550,7 +681,7 @@ function failedStepDetail(result: KtxPublicIngestTargetResult): string | null { } const INTERNAL_FAILURE_LINE_RE = - /^(Report|Run|Job|Status|Adapter|Connection|Sync|Mode|Dry run|Diff|Work units|Saved memory|Provenance rows):\s*/; + /^(Report|Run|Job|Status|Adapter|Connection|Sync|Mode|Dry run|Diff|Tasks|Work units|Failed tasks|Saved memory|Provenance rows):\s*/; const ACTIONABLE_FAILURE_LINE_RE = /^(Missing bundled Python runtime manifest|KTX Python runtime is required|KTX managed daemon|Error:|Failed\b|Could not\b|Cannot\b)/; @@ -736,6 +867,11 @@ export async function runContextBuild( if (t.status === 'running' && t.startedAt !== null) { t.elapsedMs = nowFn() - t.startedAt; } + for (const phase of t.phases) { + if (phase.status === 'running' && phase.startedAt !== null) { + phase.elapsedMs = nowFn() - phase.startedAt; + } + } } paint(true); }, 140); @@ -784,8 +920,21 @@ export async function runContextBuild( paint(true); publishSourceProgress(true); let hasPendingProgressPublish = false; + const ingestPhaseKeyForTarget: PhaseKey = + targetState.target.operation === 'database-ingest' ? 'query-history' : 'source-ingest'; - const updateTargetProgress = (update: KtxIngestProgressUpdate) => { + const updateNamedPhase = (key: PhaseKey, update: KtxIngestProgressUpdate): void => { + const phase = targetState.phases.find((p) => p.key === key); + if (phase) { + if (phase.status === 'queued') { + phase.status = 'running'; + phase.startedAt = nowFn(); + } + const sanitizedMessage = update.message.replace(/^\[\d+%\]\s*/, ''); + phase.detail = publicProgressMessage(sanitizedMessage, targetState.target); + phase.percent = Math.max(phase.percent, Math.max(0, Math.min(100, Math.round(update.percent)))); + phase.progressUpdatedAtMs = nowFn(); + } targetState.detailLine = formatProgressDetail(update, targetState.target); targetState.progressUpdatedAtMs = nowFn(); if (!repainter) { @@ -795,6 +944,9 @@ export async function runContextBuild( hasPendingProgressPublish = !publishSourceProgress(false); }; + const updateSchemaPhase = (update: KtxIngestProgressUpdate): void => updateNamedPhase('database-schema', update); + const updateIngestPhase = (update: KtxIngestProgressUpdate): void => updateNamedPhase(ingestPhaseKeyForTarget, update); + const capture = createCaptureIo( (message) => { targetState.detailLine = publicProgressMessage(message, targetState.target); @@ -807,9 +959,48 @@ export async function runContextBuild( }, false, ); + + const onPhaseStart = (key: PhaseKey): void => { + const phase = targetState.phases.find((p) => p.key === key); + if (!phase) return; + phase.status = 'running'; + if (phase.startedAt === null) phase.startedAt = nowFn(); + phase.progressUpdatedAtMs = nowFn(); + paint(true); + hasPendingProgressPublish = !publishSourceProgress(false); + }; + + const onPhaseEnd = (key: PhaseKey, status: 'done' | 'failed' | 'skipped', summary?: string): void => { + const phase = targetState.phases.find((p) => p.key === key); + if (!phase) return; + phase.status = status; + if (phase.startedAt !== null) { + phase.elapsedMs = nowFn() - phase.startedAt; + } + if (status === 'done') { + phase.percent = 100; + } + let resolvedSummary = summary; + if (status === 'done' && !resolvedSummary) { + const captured = capture.captured(); + if (key === 'database-schema') { + resolvedSummary = parseScanSummary(captured) ?? undefined; + } else if (key === 'query-history' || key === 'source-ingest') { + resolvedSummary = parseIngestSummary(captured) ?? undefined; + } + } + if (resolvedSummary) { + phase.summary = resolvedSummary; + } + paint(true); + hasPendingProgressPublish = !publishSourceProgress(false); + }; + const progressDeps: KtxPublicIngestDeps = { - scanProgress: createContextBuildProgressPort(updateTargetProgress), - ingestProgress: updateTargetProgress, + scanProgress: createContextBuildProgressPort(updateSchemaPhase), + ingestProgress: updateIngestPhase, + onPhaseStart, + onPhaseEnd, }; let result: KtxPublicIngestTargetResult | null = null; diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index eb41ea04..5384ef78 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -202,9 +202,9 @@ describe('runKtxIngest', () => { expect.arrayContaining([ { percent: 5, message: 'Fetching source files for warehouse/fake' }, { percent: 15, message: 'Fetched 2 source files from fake' }, - { percent: 45, message: 'Planned 2 work units' }, + { percent: 45, message: 'Planned 2 tasks' }, expect.objectContaining({ - message: 'Processing work units: 0/2 complete, 1 active; latest orders step 2/4', + message: 'Processing tasks: 0/2 complete, 1 active; latest orders step 2/4', transient: true, }), ]), @@ -243,10 +243,10 @@ describe('runKtxIngest', () => { expect(progressEvents).toEqual( expect.arrayContaining([ - { percent: 80, message: 'No work units to process; finalizing ingest' }, + { percent: 80, message: 'No tasks to process; finalizing ingest' }, ]), ); - expect(progressEvents).not.toContainEqual({ percent: 45, message: 'Planned 0 work units' }); + expect(progressEvents).not.toContainEqual({ percent: 45, message: 'Planned 0 tasks' }); }); it('prints provider setup guidance when a skip-llm setup project runs ingest', async () => { @@ -440,7 +440,7 @@ describe('runKtxIngest', () => { ).resolves.toBe(1); expect(io.stdout()).toContain('Metabase fan-out: partial_failure'); - expect(io.stdout()).toContain('Failed work units: 1'); + expect(io.stdout()).toContain('Failed tasks: 1'); expect(io.stdout()).toContain('status=error'); expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); }); @@ -1307,8 +1307,8 @@ describe('runKtxIngest', () => { const stderr = io.stderr(); expect(stderr).toContain('[5%] Fetching source files for warehouse/historic-sql'); expect(stderr).toContain('[15%] Fetched 3 source files from historic-sql'); - expect(stderr).toContain('[45%] Planned 1 work unit'); - expect(stderr).toContain('[80%] Processed 1/1 work units'); + expect(stderr).toContain('[45%] Planned 1 task'); + expect(stderr).toContain('[80%] Processed 1/1 tasks'); expect(stderr).toContain('[100%] Ingest completed'); expect(stdout).toContain('Report: report-live-1'); expect(stdout).not.toContain('[5%]'); @@ -1431,12 +1431,12 @@ describe('runKtxIngest', () => { ).resolves.toBe(0); const stderr = io.stderr(); - expect(stderr).toContain('[45%] Planned 2 work units'); - expect(stderr).toContain('[55%] Processing 1/2 work units: historic-sql-table-public-orders'); + expect(stderr).toContain('[45%] Planned 2 tasks'); + expect(stderr).toContain('[55%] Processing 1/2 tasks: historic-sql-table-public-orders'); expect(stderr).toContain( - '\r[58%] Processing work units: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K', + '\r[58%] Processing tasks: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K', ); - expect(stderr).toContain('[68%] Processed 1/2 work units'); + expect(stderr).toContain('[68%] Processed 1/2 tasks'); }); it('renders concurrent WorkUnit step progress as transient aggregate status', async () => { @@ -1524,10 +1524,10 @@ describe('runKtxIngest', () => { const stderr = io.stderr(); expect(stderr).toContain( - '\r[56%] Processing work units: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K', + '\r[56%] Processing tasks: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K', ); expect(stderr).not.toContain( - '\n[56%] Processing 6/6 work units: historic-sql-table-public-suppliers step 1/40\n', + '\n[56%] Processing 6/6 tasks: historic-sql-table-public-suppliers step 1/40\n', ); expect(stderr).toContain('\n[100%] Ingest completed\n'); }); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index 4718a926..c508c5cf 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -138,7 +138,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void io.stdout.write( `Diff: +${report.body.diffSummary.added}/~${report.body.diffSummary.modified}/-${report.body.diffSummary.deleted}/=${report.body.diffSummary.unchanged}\n`, ); - io.stdout.write(`Work units: ${report.body.workUnits.length}\n`); + io.stdout.write(`Tasks: ${report.body.workUnits.length}\n`); io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`); io.stdout.write(`Provenance rows: ${report.body.provenanceRows.length}\n`); } @@ -158,8 +158,8 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng io.stdout.write(`Source: ${result.metabaseConnectionId}\n`); io.stdout.write(`Children: ${result.children.length}\n`); if (result.totals) { - io.stdout.write(`Work units: ${result.totals.workUnits}\n`); - io.stdout.write(`Failed work units: ${result.totals.failedWorkUnits}\n`); + io.stdout.write(`Tasks: ${result.totals.workUnits}\n`); + io.stdout.write(`Failed tasks: ${result.totals.failedWorkUnits}\n`); } io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`); for (const child of result.children) { @@ -280,19 +280,19 @@ function plainIngestEventProgress( if (event.workUnitCount === 0) { return { percent: 80, - message: 'No work units to process; finalizing ingest', + message: 'No tasks to process; finalizing ingest', }; } return { percent: 45, - message: `Planned ${pluralize(event.workUnitCount, 'work unit')}`, + message: `Planned ${pluralize(event.workUnitCount, 'task')}`, }; case 'stage_skipped': return { percent: 45, message: `Skipped ${event.stage}: ${event.reason}` }; case 'work_unit_started': { const total = plannedWorkUnitCountThrough(snapshot, eventIndex); const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey); - const progress = total > 0 ? `${ordinal}/${total} work units: ` : ''; + const progress = total > 0 ? `${ordinal}/${total} tasks: ` : ''; return { percent: 55, message: `Processing ${progress}${event.unitKey}` }; } case 'work_unit_step': { @@ -304,7 +304,7 @@ function plainIngestEventProgress( const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`; return { percent, - message: `Processing work units: ${completed}/${total} complete, ${active} active; latest ${latest}`, + message: `Processing tasks: ${completed}/${total} complete, ${active} active; latest ${latest}`, transient: true, }; } @@ -314,7 +314,7 @@ function plainIngestEventProgress( const percent = total > 0 ? 55 + Math.round((completed / total) * 25) : 80; return { percent, - message: `Processed ${completed}/${total} work units`, + message: `Processed ${completed}/${total} tasks`, }; } case 'reconciliation_finished': diff --git a/packages/cli/src/public-ingest-copy.test.ts b/packages/cli/src/public-ingest-copy.test.ts index d46713c2..d13696df 100644 --- a/packages/cli/src/public-ingest-copy.test.ts +++ b/packages/cli/src/public-ingest-copy.test.ts @@ -31,8 +31,8 @@ describe('public ingest copy sanitizers', () => { expect(publicQueryHistoryMessage('Fetching source files for warehouse/historic-sql', 'warehouse')).toBe( 'Fetching query history for warehouse', ); - expect(publicQueryHistoryMessage('Curating warehouse/historic-sql work units', 'warehouse')).toBe( - 'Curating warehouse query history work units', + expect(publicQueryHistoryMessage('Curating warehouse/historic-sql tasks', 'warehouse')).toBe( + 'Curating warehouse query history tasks', ); expect(publicQueryHistoryMessage('historic SQL local ingest failed', 'warehouse')).toBe( 'query history local ingest failed', diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index a92a6222..7916a711 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -81,6 +81,8 @@ export interface KtxPublicIngestTargetResult { export type KtxPublicIngestProject = Pick; +type KtxPublicIngestPhaseKey = 'database-schema' | 'query-history' | 'source-ingest'; + export interface KtxPublicIngestDeps { loadProject?: (options: Parameters[0]) => Promise; runScan?: (args: KtxScanArgs, io: KtxCliIo, deps?: KtxScanDeps) => Promise; @@ -92,6 +94,8 @@ export interface KtxPublicIngestDeps { ) => Promise<{ exitCode: number }>; scanProgress?: KtxProgressPort; ingestProgress?: (update: KtxIngestProgressUpdate) => void; + onPhaseStart?: (phaseKey: KtxPublicIngestPhaseKey) => void; + onPhaseEnd?: (phaseKey: KtxPublicIngestPhaseKey, status: 'done' | 'failed' | 'skipped', summary?: string) => void; } interface KtxPublicContextBuildArgs { @@ -657,7 +661,7 @@ function createCapturedPublicIngestIo(): CapturedPublicIngestIo { } const INTERNAL_STATUS_LINE_RE = - /^(Report|Run|Job|Status|Adapter|Connection|Sync|Diff|Work units|Saved memory|Provenance rows):\s*/; + /^(Report|Run|Job|Status|Adapter|Connection|Sync|Diff|Tasks|Work units|Failed tasks|Saved memory|Provenance rows):\s*/; function firstCapturedFailureLine(output: string): string | undefined { return output @@ -677,6 +681,14 @@ export async function executePublicIngestTarget( deps: KtxPublicIngestDeps, ): Promise { if (target.preflightFailure) { + if (target.operation === 'database-ingest') { + deps.onPhaseEnd?.('database-schema', 'failed', target.preflightFailure); + if (target.queryHistory?.enabled === true) { + deps.onPhaseEnd?.('query-history', 'skipped'); + } + } else { + deps.onPhaseEnd?.('source-ingest', 'failed', target.preflightFailure); + } return { connectionId: target.connectionId, driver: target.driver, @@ -707,10 +719,15 @@ export async function executePublicIngestTarget( const runScan = deps.runScan ?? runKtxScan; const capturedScanIo = deps.scanProgress ? null : createCapturedPublicIngestIo(); const scanIo = capturedScanIo ?? io; + deps.onPhaseStart?.('database-schema'); const scanExitCode = deps.scanProgress ? await runScan(scanArgs, scanIo, { progress: deps.scanProgress }) : await runScan(scanArgs, scanIo); if (scanExitCode !== 0) { + deps.onPhaseEnd?.('database-schema', 'failed'); + if (target.queryHistory?.enabled === true) { + deps.onPhaseEnd?.('query-history', 'skipped'); + } return markTargetResult( target, args, @@ -719,6 +736,7 @@ export async function executePublicIngestTarget( capturedScanIo ? firstCapturedFailureLine(capturedScanIo.capturedOutput()) : undefined, ); } + deps.onPhaseEnd?.('database-schema', 'done'); if (target.queryHistory?.enabled === true) { const { runKtxIngest } = await import('./ingest.js'); @@ -741,10 +759,12 @@ export async function executePublicIngestTarget( }; const capturedIngestIo = deps.ingestProgress ? null : createCapturedPublicIngestIo(); const ingestIo = capturedIngestIo ?? io; + deps.onPhaseStart?.('query-history'); const qhExitCode = deps.ingestProgress ? await runIngest(ingestArgs, ingestIo, { progress: deps.ingestProgress }) : await runIngest(ingestArgs, ingestIo); if (qhExitCode !== 0) { + deps.onPhaseEnd?.('query-history', 'failed'); return markTargetResult( target, args, @@ -753,6 +773,7 @@ export async function executePublicIngestTarget( capturedIngestIo ? firstCapturedFailureLine(capturedIngestIo.capturedOutput()) : undefined, ); } + deps.onPhaseEnd?.('query-history', 'done'); } return markTargetResult(target, args, 'done'); @@ -774,9 +795,11 @@ export async function executePublicIngestTarget( const runIngest = deps.runIngest ?? runKtxIngest; const capturedIngestIo = deps.ingestProgress ? null : createCapturedPublicIngestIo(); const ingestIo = capturedIngestIo ?? io; + deps.onPhaseStart?.('source-ingest'); const exitCode = deps.ingestProgress ? await runIngest(ingestArgs, ingestIo, { progress: deps.ingestProgress }) : await runIngest(ingestArgs, ingestIo); + deps.onPhaseEnd?.('source-ingest', exitCode === 0 ? 'done' : 'failed'); return markTargetResult( target, args, diff --git a/packages/cli/src/setup-demo-tour.ts b/packages/cli/src/setup-demo-tour.ts index f280f053..35640026 100644 --- a/packages/cli/src/setup-demo-tour.ts +++ b/packages/cli/src/setup-demo-tour.ts @@ -56,6 +56,7 @@ function createTargetState(target: KtxPublicIngestPlanTarget): ContextBuildTarge startedAt: null, elapsedMs: 0, progressUpdatedAtMs: null, + phases: [], }; }