feat(cli): split ingest progress into per-phase rows, rename work units to tasks

Each database target in the unified ingest dashboard now renders one row per
real subprocess (Schema, then Query history when enabled) instead of a single
combined bar. Each phase has its own monotonic 0-100% bar so the progress
never snaps back to zero when historic-sql starts after scan completes.
Completed phases keep their final bar, summary, and elapsed time visible as
an inline audit trail; queued and skipped phases are shown explicitly.

Also rename user-facing "work units" / "Failed work units" to "tasks" /
"Failed tasks" in ingest output and parseIngestSummary. The parser still
accepts the legacy "Work units:" wording in captured output for backward
compat. Internal memory-flow event names and type fields are left alone.
This commit is contained in:
Andrey Avtomonov 2026-05-14 00:32:18 +02:00
parent ecebc018b9
commit 3b98faaf83
7 changed files with 398 additions and 42 deletions

View file

@ -2,6 +2,7 @@ import { buildDefaultKtxProjectConfig, type KtxProjectConfig } from '@ktx/contex
import { describe, expect, it, vi } from 'vitest';
import type { KtxPublicIngestProject, KtxPublicIngestTargetResult } from './public-ingest.js';
import {
type ContextBuildTargetState,
extractProgressMessage,
createRepainter,
initViewState,
@ -112,15 +113,19 @@ describe('parseScanSummary', () => {
});
describe('parseIngestSummary', () => {
it('extracts work units and saved memory', () => {
expect(parseIngestSummary('Work units: 5\nSaved memory: 3 wiki, 2 SL')).toBe('3 wiki, 2 SL');
it('extracts task count and saved memory', () => {
expect(parseIngestSummary('Tasks: 5\nSaved memory: 3 wiki, 2 SL')).toBe('3 wiki, 2 SL');
});
it('extracts work units alone when no saved memory', () => {
expect(parseIngestSummary('Work units: 5\nStatus: done')).toBe('5 work units');
it('extracts task count alone when no saved memory', () => {
expect(parseIngestSummary('Tasks: 5\nStatus: done')).toBe('5 tasks');
});
it('extracts saved memory alone when no work units', () => {
it('still parses the legacy "Work units:" wording for backward compat', () => {
expect(parseIngestSummary('Work units: 7\nStatus: done')).toBe('7 tasks');
});
it('extracts saved memory alone when no task count', () => {
expect(parseIngestSummary('Saved memory: 3 wiki, 2 SL')).toBe('3 wiki, 2 SL');
});
@ -297,11 +302,11 @@ describe('renderContextBuildView', () => {
state.contextSources[0].startedAt = 1_000;
state.contextSources[0].elapsedMs = 113_000;
state.contextSources[0].progressUpdatedAtMs = 46_000;
state.contextSources[0].detailLine = '[45%] No work units to process; finalizing ingest';
state.contextSources[0].detailLine = '[45%] No tasks to process; finalizing ingest';
const output = renderContextBuildView(state, { styled: false });
expect(output).toContain('No work units to process; finalizing ingest');
expect(output).toContain('No tasks to process; finalizing ingest');
expect(output).toContain('last update 1m08s ago');
expect(output).toContain('(1m53s)');
});
@ -314,7 +319,7 @@ describe('renderContextBuildView', () => {
state.contextSources[0].startedAt = 1_000;
state.contextSources[0].elapsedMs = 40_000;
state.contextSources[0].progressUpdatedAtMs = 25_000;
state.contextSources[0].detailLine = '[45%] Planning work units';
state.contextSources[0].detailLine = '[45%] Planning tasks';
const output = renderContextBuildView(state, { styled: false });
@ -414,6 +419,142 @@ describe('renderContextBuildView', () => {
});
});
describe('renderContextBuildView phase rows', () => {
function dbTarget(connectionId: string, queryHistoryEnabled = false) {
return {
connectionId,
driver: 'postgres',
operation: 'database-ingest' as const,
debugCommand: '',
steps: queryHistoryEnabled
? (['database-schema', 'query-history'] as ('database-schema' | 'query-history')[])
: (['database-schema'] as ('database-schema' | 'query-history')[]),
...(queryHistoryEnabled ? { queryHistory: { enabled: true, dialect: 'postgres' as const } } : {}),
};
}
function sourceTarget(connectionId: string) {
return {
connectionId,
driver: 'dbt',
operation: 'source-ingest' as const,
adapter: 'dbt',
debugCommand: '',
steps: ['source-ingest', 'memory-update'] as ('source-ingest' | 'memory-update')[],
};
}
function setPhase(
state: ReturnType<typeof initViewState>,
connectionId: string,
phaseKey: 'database-schema' | 'query-history' | 'source-ingest',
patch: Partial<ContextBuildTargetState['phases'][number]>,
): void {
const target = [...state.primarySources, ...state.contextSources].find((t) => t.target.connectionId === connectionId);
const phase = target?.phases.find((p) => p.key === phaseKey);
if (!phase) throw new Error(`No phase ${phaseKey} on ${connectionId}`);
Object.assign(phase, patch);
}
it('renders two phase rows for a database-ingest target with query history', () => {
const state = initViewState([dbTarget('warehouse', true)]);
state.primarySources[0].status = 'running';
setPhase(state, 'warehouse', 'database-schema', {
status: 'done',
percent: 100,
summary: '172 tables',
elapsedMs: 52_000,
});
setPhase(state, 'warehouse', 'query-history', {
status: 'running',
percent: 7,
detail: '12/172 · arr-movements',
elapsedMs: 36_000,
});
const output = renderContextBuildView(state, { styled: false });
expect(output).toContain('Schema');
expect(output).toContain('100%');
expect(output).toContain('172 tables');
expect(output).toContain('(52s)');
expect(output).toContain('Query history');
expect(output).toContain('7%');
expect(output).toContain('12/172 · arr-movements');
expect(output).toContain('(36s)');
});
it('renders a single Schema phase row when query history is disabled', () => {
const state = initViewState([dbTarget('warehouse', false)]);
state.primarySources[0].status = 'running';
setPhase(state, 'warehouse', 'database-schema', {
status: 'running',
percent: 42,
detail: 'Profiling 73/172 tables',
});
const output = renderContextBuildView(state, { styled: false });
expect(output).toContain('Schema');
expect(output).toContain('42%');
expect(output).toContain('Profiling 73/172 tables');
expect(output).not.toContain('Query history');
});
it('renders Source ingest phase row for a source-ingest target', () => {
const state = initViewState([sourceTarget('dbt-main')]);
state.contextSources[0].status = 'running';
setPhase(state, 'dbt-main', 'source-ingest', {
status: 'running',
percent: 25,
detail: 'Reading models',
});
const output = renderContextBuildView(state, { styled: false });
expect(output).toContain('Source ingest');
expect(output).toContain('25%');
expect(output).toContain('Reading models');
expect(output).not.toContain('Schema ');
});
it('renders skipped Query history when schema phase fails', () => {
const state = initViewState([dbTarget('warehouse', true)]);
state.primarySources[0].status = 'running';
setPhase(state, 'warehouse', 'database-schema', { status: 'failed', percent: 30 });
setPhase(state, 'warehouse', 'query-history', { status: 'skipped' });
const output = renderContextBuildView(state, { styled: false });
expect(output).toContain('Schema');
expect(output).toContain('failed');
expect(output).toContain('Query history');
expect(output).toContain('skipped');
});
it('renders queued Query history with an em-dash and empty bar', () => {
const state = initViewState([dbTarget('warehouse', true)]);
state.primarySources[0].status = 'running';
setPhase(state, 'warehouse', 'database-schema', {
status: 'running',
percent: 12,
detail: 'Introspecting',
});
const output = renderContextBuildView(state, { styled: false });
expect(output).toContain('Query history');
expect(output).toContain('queued');
expect(output).toContain('—');
});
it('falls back to single-line legacy detail when no phase has started yet', () => {
const state = initViewState([dbTarget('warehouse', false)]);
state.primarySources[0].status = 'running';
state.primarySources[0].detailLine = '[5%] Preparing database ingest';
const output = renderContextBuildView(state, { styled: false });
expect(output).toContain('Preparing database ingest');
expect(output).toContain('5%');
expect(output).not.toContain('○ Schema');
});
});
describe('createRepainter', () => {
it('moves up visual rows, not just newline count, when content wraps', () => {
const io = makeIo({ isTTY: true, columns: 5 });
@ -953,7 +1094,7 @@ describe('runContextBuild', () => {
}
targetIo.stdout.write('Report: report-dbt-failed\n');
targetIo.stdout.write('Work units: 3\n');
targetIo.stdout.write('Tasks: 3\n');
return failedResult(target.connectionId, target.driver, target.operation);
});

View file

@ -19,6 +19,21 @@ profileMark('module:context-build-view');
const SPINNER_FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] as const;
const ESC = String.fromCharCode(0x1b);
type PhaseKey = 'database-schema' | 'query-history' | 'source-ingest';
type PhaseStatus = 'queued' | 'running' | 'done' | 'failed' | 'skipped';
interface PhaseState {
key: PhaseKey;
name: string;
status: PhaseStatus;
percent: number;
detail: string | null;
summary: string | null;
startedAt: number | null;
elapsedMs: number;
progressUpdatedAtMs: number | null;
}
export interface ContextBuildTargetState {
target: KtxPublicIngestPlanTarget;
status: 'queued' | 'running' | 'done' | 'failed';
@ -28,6 +43,35 @@ export interface ContextBuildTargetState {
startedAt: number | null;
elapsedMs: number;
progressUpdatedAtMs: number | null;
phases: PhaseState[];
}
const PHASE_LABELS: Record<PhaseKey, string> = {
'database-schema': 'Schema',
'query-history': 'Query history',
'source-ingest': 'Source ingest',
};
function makePhasesForTarget(target: KtxPublicIngestPlanTarget): PhaseState[] {
const make = (key: PhaseKey): PhaseState => ({
key,
name: PHASE_LABELS[key],
status: 'queued',
percent: 0,
detail: null,
summary: null,
startedAt: null,
elapsedMs: 0,
progressUpdatedAtMs: null,
});
if (target.operation === 'database-ingest') {
const phases: PhaseState[] = [make('database-schema')];
if (target.queryHistory?.enabled === true) {
phases.push(make('query-history'));
}
return phases;
}
return [make('source-ingest')];
}
export interface ContextBuildViewState {
@ -121,6 +165,34 @@ function statusIcon(status: ContextBuildTargetState['status'], frame: number, st
}
}
function phaseStatusIcon(status: PhaseStatus, frame: number, styled: boolean): string {
const raw = (() => {
switch (status) {
case 'done':
return '✓';
case 'failed':
return '✗';
case 'running':
return SPINNER_FRAMES[frame % SPINNER_FRAMES.length] ?? '⠋';
case 'skipped':
return '·';
default:
return '○';
}
})();
if (!styled) return raw;
switch (status) {
case 'done':
return green(raw);
case 'failed':
return red(raw);
case 'running':
return cyan(raw);
default:
return dim(raw);
}
}
function extractPercent(detailLine: string | null): number | null {
if (!detailLine) return null;
const match = detailLine.match(/^\[(\d+)%\]/);
@ -182,13 +254,70 @@ function targetDetail(target: ContextBuildTargetState, styled: boolean): string
return styled ? dim('queued') : 'queued';
}
const PHASE_NAME_WIDTH = 14;
function renderRunningTargetHeaderDetail(target: ContextBuildTargetState, styled: boolean): string {
const elapsed = target.elapsedMs > 0 ? `(${formatDuration(target.elapsedMs)})` : '';
if (!elapsed) return '';
return styled ? dim(elapsed) : elapsed;
}
function renderPhaseRow(phase: PhaseState, frame: number, styled: boolean): string {
const icon = phaseStatusIcon(phase.status, frame, styled);
const name = phase.name.padEnd(PHASE_NAME_WIDTH);
const segments: string[] = [];
if (phase.status === 'queued' || phase.status === 'skipped') {
const emptyBar = BAR_EMPTY.repeat(BAR_WIDTH);
segments.push(styled ? dim(emptyBar) : emptyBar);
segments.push(styled ? dim(' —') : ' —');
} else {
const pct = Math.max(0, Math.min(100, Math.round(phase.percent)));
segments.push(renderProgressBar(pct, styled));
segments.push(`${String(pct).padStart(3)}%`);
}
let trailing = '';
if (phase.status === 'done') {
const parts: string[] = [];
if (phase.summary) parts.push(phase.summary);
if (phase.elapsedMs > 0) {
const elapsed = `(${formatDuration(phase.elapsedMs)})`;
parts.push(styled ? dim(elapsed) : elapsed);
}
trailing = parts.join(' ');
} else if (phase.status === 'running') {
const parts: string[] = [];
if (phase.detail) parts.push(phase.detail);
if (phase.elapsedMs > 0) {
const elapsed = `(${formatDuration(phase.elapsedMs)})`;
parts.push(styled ? dim(elapsed) : elapsed);
}
trailing = parts.join(' ');
} else if (phase.status === 'queued') {
trailing = styled ? dim('queued') : 'queued';
} else if (phase.status === 'skipped') {
trailing = styled ? dim('skipped') : 'skipped';
} else if (phase.status === 'failed') {
trailing = styled ? red('failed') : 'failed';
}
const bar = `${segments.join(' ')} ${trailing}`.trimEnd();
return ` ${icon} ${name} ${bar}`;
}
function columnWidth(state: ContextBuildViewState): number {
const all = [...state.primarySources, ...state.contextSources];
return Math.max(12, ...all.map((t) => t.target.connectionId.length)) + 2;
}
function renderTargetLine(target: ContextBuildTargetState, frame: number, styled: boolean, width: number): string {
return ` ${statusIcon(target.status, frame, styled)} ${target.target.connectionId.padEnd(width)} ${targetDetail(target, styled)}`;
function renderTargetRows(target: ContextBuildTargetState, frame: number, styled: boolean, width: number): string[] {
const icon = statusIcon(target.status, frame, styled);
const name = target.target.connectionId.padEnd(width);
const anyPhaseStarted = target.phases.some((p) => p.status !== 'queued');
if (target.status === 'running' && target.phases.length > 0 && anyPhaseStarted) {
const headerDetail = renderRunningTargetHeaderDetail(target, styled);
const headerLine = ` ${icon} ${name} ${headerDetail}`.trimEnd();
return [headerLine, ...target.phases.map((phase) => renderPhaseRow(phase, frame, styled))];
}
return [` ${icon} ${name} ${targetDetail(target, styled)}`];
}
function renderTargetGroup(
@ -199,7 +328,7 @@ function renderTargetGroup(
width: number,
): string[] {
if (targets.length === 0) return [];
return ['', ` ${label}:`, ...targets.map((t) => renderTargetLine(t, frame, styled, width))];
return ['', ` ${label}:`, ...targets.flatMap((t) => renderTargetRows(t, frame, styled, width))];
}
function renderMessageGroup(label: string, messages: string[], styled: boolean): string[] {
@ -306,8 +435,8 @@ export function parseScanSummary(output: string): string | null {
export function parseIngestSummary(output: string): string | null {
const savedMemory = output.match(/Saved memory: (.+)/);
if (savedMemory) return savedMemory[1];
const workUnits = output.match(/Work units: (\d+)/);
if (workUnits) return `${workUnits[1]} work units`;
const tasks = output.match(/(?:Tasks|Work units): (\d+)/);
if (tasks) return `${tasks[1]} tasks`;
return null;
}
@ -422,6 +551,7 @@ export function viewStateFromSourceProgress(
startedAt: s.startedAtMs ?? null,
elapsedMs: s.status === 'running' && s.startedAtMs ? now - s.startedAtMs : (s.elapsedMs ?? 0),
progressUpdatedAtMs: s.updatedAtMs ?? null,
phases: [],
});
return {
@ -492,6 +622,7 @@ function makeTargetState(target: KtxPublicIngestPlanTarget): ContextBuildTargetS
startedAt: null,
elapsedMs: 0,
progressUpdatedAtMs: null,
phases: makePhasesForTarget(target),
};
}
@ -550,7 +681,7 @@ function failedStepDetail(result: KtxPublicIngestTargetResult): string | null {
}
const INTERNAL_FAILURE_LINE_RE =
/^(Report|Run|Job|Status|Adapter|Connection|Sync|Mode|Dry run|Diff|Work units|Saved memory|Provenance rows):\s*/;
/^(Report|Run|Job|Status|Adapter|Connection|Sync|Mode|Dry run|Diff|Tasks|Work units|Failed tasks|Saved memory|Provenance rows):\s*/;
const ACTIONABLE_FAILURE_LINE_RE =
/^(Missing bundled Python runtime manifest|KTX Python runtime is required|KTX managed daemon|Error:|Failed\b|Could not\b|Cannot\b)/;
@ -736,6 +867,11 @@ export async function runContextBuild(
if (t.status === 'running' && t.startedAt !== null) {
t.elapsedMs = nowFn() - t.startedAt;
}
for (const phase of t.phases) {
if (phase.status === 'running' && phase.startedAt !== null) {
phase.elapsedMs = nowFn() - phase.startedAt;
}
}
}
paint(true);
}, 140);
@ -784,8 +920,21 @@ export async function runContextBuild(
paint(true);
publishSourceProgress(true);
let hasPendingProgressPublish = false;
const ingestPhaseKeyForTarget: PhaseKey =
targetState.target.operation === 'database-ingest' ? 'query-history' : 'source-ingest';
const updateTargetProgress = (update: KtxIngestProgressUpdate) => {
const updateNamedPhase = (key: PhaseKey, update: KtxIngestProgressUpdate): void => {
const phase = targetState.phases.find((p) => p.key === key);
if (phase) {
if (phase.status === 'queued') {
phase.status = 'running';
phase.startedAt = nowFn();
}
const sanitizedMessage = update.message.replace(/^\[\d+%\]\s*/, '');
phase.detail = publicProgressMessage(sanitizedMessage, targetState.target);
phase.percent = Math.max(phase.percent, Math.max(0, Math.min(100, Math.round(update.percent))));
phase.progressUpdatedAtMs = nowFn();
}
targetState.detailLine = formatProgressDetail(update, targetState.target);
targetState.progressUpdatedAtMs = nowFn();
if (!repainter) {
@ -795,6 +944,9 @@ export async function runContextBuild(
hasPendingProgressPublish = !publishSourceProgress(false);
};
const updateSchemaPhase = (update: KtxIngestProgressUpdate): void => updateNamedPhase('database-schema', update);
const updateIngestPhase = (update: KtxIngestProgressUpdate): void => updateNamedPhase(ingestPhaseKeyForTarget, update);
const capture = createCaptureIo(
(message) => {
targetState.detailLine = publicProgressMessage(message, targetState.target);
@ -807,9 +959,48 @@ export async function runContextBuild(
},
false,
);
const onPhaseStart = (key: PhaseKey): void => {
const phase = targetState.phases.find((p) => p.key === key);
if (!phase) return;
phase.status = 'running';
if (phase.startedAt === null) phase.startedAt = nowFn();
phase.progressUpdatedAtMs = nowFn();
paint(true);
hasPendingProgressPublish = !publishSourceProgress(false);
};
const onPhaseEnd = (key: PhaseKey, status: 'done' | 'failed' | 'skipped', summary?: string): void => {
const phase = targetState.phases.find((p) => p.key === key);
if (!phase) return;
phase.status = status;
if (phase.startedAt !== null) {
phase.elapsedMs = nowFn() - phase.startedAt;
}
if (status === 'done') {
phase.percent = 100;
}
let resolvedSummary = summary;
if (status === 'done' && !resolvedSummary) {
const captured = capture.captured();
if (key === 'database-schema') {
resolvedSummary = parseScanSummary(captured) ?? undefined;
} else if (key === 'query-history' || key === 'source-ingest') {
resolvedSummary = parseIngestSummary(captured) ?? undefined;
}
}
if (resolvedSummary) {
phase.summary = resolvedSummary;
}
paint(true);
hasPendingProgressPublish = !publishSourceProgress(false);
};
const progressDeps: KtxPublicIngestDeps = {
scanProgress: createContextBuildProgressPort(updateTargetProgress),
ingestProgress: updateTargetProgress,
scanProgress: createContextBuildProgressPort(updateSchemaPhase),
ingestProgress: updateIngestPhase,
onPhaseStart,
onPhaseEnd,
};
let result: KtxPublicIngestTargetResult | null = null;

View file

@ -202,9 +202,9 @@ describe('runKtxIngest', () => {
expect.arrayContaining([
{ percent: 5, message: 'Fetching source files for warehouse/fake' },
{ percent: 15, message: 'Fetched 2 source files from fake' },
{ percent: 45, message: 'Planned 2 work units' },
{ percent: 45, message: 'Planned 2 tasks' },
expect.objectContaining({
message: 'Processing work units: 0/2 complete, 1 active; latest orders step 2/4',
message: 'Processing tasks: 0/2 complete, 1 active; latest orders step 2/4',
transient: true,
}),
]),
@ -243,10 +243,10 @@ describe('runKtxIngest', () => {
expect(progressEvents).toEqual(
expect.arrayContaining([
{ percent: 80, message: 'No work units to process; finalizing ingest' },
{ percent: 80, message: 'No tasks to process; finalizing ingest' },
]),
);
expect(progressEvents).not.toContainEqual({ percent: 45, message: 'Planned 0 work units' });
expect(progressEvents).not.toContainEqual({ percent: 45, message: 'Planned 0 tasks' });
});
it('prints provider setup guidance when a skip-llm setup project runs ingest', async () => {
@ -440,7 +440,7 @@ describe('runKtxIngest', () => {
).resolves.toBe(1);
expect(io.stdout()).toContain('Metabase fan-out: partial_failure');
expect(io.stdout()).toContain('Failed work units: 1');
expect(io.stdout()).toContain('Failed tasks: 1');
expect(io.stdout()).toContain('status=error');
expect(io.stderr()).toContain('Metabase ingest: prod-metabase');
});
@ -1307,8 +1307,8 @@ describe('runKtxIngest', () => {
const stderr = io.stderr();
expect(stderr).toContain('[5%] Fetching source files for warehouse/historic-sql');
expect(stderr).toContain('[15%] Fetched 3 source files from historic-sql');
expect(stderr).toContain('[45%] Planned 1 work unit');
expect(stderr).toContain('[80%] Processed 1/1 work units');
expect(stderr).toContain('[45%] Planned 1 task');
expect(stderr).toContain('[80%] Processed 1/1 tasks');
expect(stderr).toContain('[100%] Ingest completed');
expect(stdout).toContain('Report: report-live-1');
expect(stdout).not.toContain('[5%]');
@ -1431,12 +1431,12 @@ describe('runKtxIngest', () => {
).resolves.toBe(0);
const stderr = io.stderr();
expect(stderr).toContain('[45%] Planned 2 work units');
expect(stderr).toContain('[55%] Processing 1/2 work units: historic-sql-table-public-orders');
expect(stderr).toContain('[45%] Planned 2 tasks');
expect(stderr).toContain('[55%] Processing 1/2 tasks: historic-sql-table-public-orders');
expect(stderr).toContain(
'\r[58%] Processing work units: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K',
'\r[58%] Processing tasks: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K',
);
expect(stderr).toContain('[68%] Processed 1/2 work units');
expect(stderr).toContain('[68%] Processed 1/2 tasks');
});
it('renders concurrent WorkUnit step progress as transient aggregate status', async () => {
@ -1524,10 +1524,10 @@ describe('runKtxIngest', () => {
const stderr = io.stderr();
expect(stderr).toContain(
'\r[56%] Processing work units: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K',
'\r[56%] Processing tasks: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K',
);
expect(stderr).not.toContain(
'\n[56%] Processing 6/6 work units: historic-sql-table-public-suppliers step 1/40\n',
'\n[56%] Processing 6/6 tasks: historic-sql-table-public-suppliers step 1/40\n',
);
expect(stderr).toContain('\n[100%] Ingest completed\n');
});

View file

@ -138,7 +138,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void
io.stdout.write(
`Diff: +${report.body.diffSummary.added}/~${report.body.diffSummary.modified}/-${report.body.diffSummary.deleted}/=${report.body.diffSummary.unchanged}\n`,
);
io.stdout.write(`Work units: ${report.body.workUnits.length}\n`);
io.stdout.write(`Tasks: ${report.body.workUnits.length}\n`);
io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`);
io.stdout.write(`Provenance rows: ${report.body.provenanceRows.length}\n`);
}
@ -158,8 +158,8 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng
io.stdout.write(`Source: ${result.metabaseConnectionId}\n`);
io.stdout.write(`Children: ${result.children.length}\n`);
if (result.totals) {
io.stdout.write(`Work units: ${result.totals.workUnits}\n`);
io.stdout.write(`Failed work units: ${result.totals.failedWorkUnits}\n`);
io.stdout.write(`Tasks: ${result.totals.workUnits}\n`);
io.stdout.write(`Failed tasks: ${result.totals.failedWorkUnits}\n`);
}
io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`);
for (const child of result.children) {
@ -280,19 +280,19 @@ function plainIngestEventProgress(
if (event.workUnitCount === 0) {
return {
percent: 80,
message: 'No work units to process; finalizing ingest',
message: 'No tasks to process; finalizing ingest',
};
}
return {
percent: 45,
message: `Planned ${pluralize(event.workUnitCount, 'work unit')}`,
message: `Planned ${pluralize(event.workUnitCount, 'task')}`,
};
case 'stage_skipped':
return { percent: 45, message: `Skipped ${event.stage}: ${event.reason}` };
case 'work_unit_started': {
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey);
const progress = total > 0 ? `${ordinal}/${total} work units: ` : '';
const progress = total > 0 ? `${ordinal}/${total} tasks: ` : '';
return { percent: 55, message: `Processing ${progress}${event.unitKey}` };
}
case 'work_unit_step': {
@ -304,7 +304,7 @@ function plainIngestEventProgress(
const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`;
return {
percent,
message: `Processing work units: ${completed}/${total} complete, ${active} active; latest ${latest}`,
message: `Processing tasks: ${completed}/${total} complete, ${active} active; latest ${latest}`,
transient: true,
};
}
@ -314,7 +314,7 @@ function plainIngestEventProgress(
const percent = total > 0 ? 55 + Math.round((completed / total) * 25) : 80;
return {
percent,
message: `Processed ${completed}/${total} work units`,
message: `Processed ${completed}/${total} tasks`,
};
}
case 'reconciliation_finished':

View file

@ -31,8 +31,8 @@ describe('public ingest copy sanitizers', () => {
expect(publicQueryHistoryMessage('Fetching source files for warehouse/historic-sql', 'warehouse')).toBe(
'Fetching query history for warehouse',
);
expect(publicQueryHistoryMessage('Curating warehouse/historic-sql work units', 'warehouse')).toBe(
'Curating warehouse query history work units',
expect(publicQueryHistoryMessage('Curating warehouse/historic-sql tasks', 'warehouse')).toBe(
'Curating warehouse query history tasks',
);
expect(publicQueryHistoryMessage('historic SQL local ingest failed', 'warehouse')).toBe(
'query history local ingest failed',

View file

@ -81,6 +81,8 @@ export interface KtxPublicIngestTargetResult {
export type KtxPublicIngestProject = Pick<KtxLocalProject, 'projectDir' | 'config'>;
type KtxPublicIngestPhaseKey = 'database-schema' | 'query-history' | 'source-ingest';
export interface KtxPublicIngestDeps {
loadProject?: (options: Parameters<typeof loadKtxProject>[0]) => Promise<KtxPublicIngestProject>;
runScan?: (args: KtxScanArgs, io: KtxCliIo, deps?: KtxScanDeps) => Promise<number>;
@ -92,6 +94,8 @@ export interface KtxPublicIngestDeps {
) => Promise<{ exitCode: number }>;
scanProgress?: KtxProgressPort;
ingestProgress?: (update: KtxIngestProgressUpdate) => void;
onPhaseStart?: (phaseKey: KtxPublicIngestPhaseKey) => void;
onPhaseEnd?: (phaseKey: KtxPublicIngestPhaseKey, status: 'done' | 'failed' | 'skipped', summary?: string) => void;
}
interface KtxPublicContextBuildArgs {
@ -657,7 +661,7 @@ function createCapturedPublicIngestIo(): CapturedPublicIngestIo {
}
const INTERNAL_STATUS_LINE_RE =
/^(Report|Run|Job|Status|Adapter|Connection|Sync|Diff|Work units|Saved memory|Provenance rows):\s*/;
/^(Report|Run|Job|Status|Adapter|Connection|Sync|Diff|Tasks|Work units|Failed tasks|Saved memory|Provenance rows):\s*/;
function firstCapturedFailureLine(output: string): string | undefined {
return output
@ -677,6 +681,14 @@ export async function executePublicIngestTarget(
deps: KtxPublicIngestDeps,
): Promise<KtxPublicIngestTargetResult> {
if (target.preflightFailure) {
if (target.operation === 'database-ingest') {
deps.onPhaseEnd?.('database-schema', 'failed', target.preflightFailure);
if (target.queryHistory?.enabled === true) {
deps.onPhaseEnd?.('query-history', 'skipped');
}
} else {
deps.onPhaseEnd?.('source-ingest', 'failed', target.preflightFailure);
}
return {
connectionId: target.connectionId,
driver: target.driver,
@ -707,10 +719,15 @@ export async function executePublicIngestTarget(
const runScan = deps.runScan ?? runKtxScan;
const capturedScanIo = deps.scanProgress ? null : createCapturedPublicIngestIo();
const scanIo = capturedScanIo ?? io;
deps.onPhaseStart?.('database-schema');
const scanExitCode = deps.scanProgress
? await runScan(scanArgs, scanIo, { progress: deps.scanProgress })
: await runScan(scanArgs, scanIo);
if (scanExitCode !== 0) {
deps.onPhaseEnd?.('database-schema', 'failed');
if (target.queryHistory?.enabled === true) {
deps.onPhaseEnd?.('query-history', 'skipped');
}
return markTargetResult(
target,
args,
@ -719,6 +736,7 @@ export async function executePublicIngestTarget(
capturedScanIo ? firstCapturedFailureLine(capturedScanIo.capturedOutput()) : undefined,
);
}
deps.onPhaseEnd?.('database-schema', 'done');
if (target.queryHistory?.enabled === true) {
const { runKtxIngest } = await import('./ingest.js');
@ -741,10 +759,12 @@ export async function executePublicIngestTarget(
};
const capturedIngestIo = deps.ingestProgress ? null : createCapturedPublicIngestIo();
const ingestIo = capturedIngestIo ?? io;
deps.onPhaseStart?.('query-history');
const qhExitCode = deps.ingestProgress
? await runIngest(ingestArgs, ingestIo, { progress: deps.ingestProgress })
: await runIngest(ingestArgs, ingestIo);
if (qhExitCode !== 0) {
deps.onPhaseEnd?.('query-history', 'failed');
return markTargetResult(
target,
args,
@ -753,6 +773,7 @@ export async function executePublicIngestTarget(
capturedIngestIo ? firstCapturedFailureLine(capturedIngestIo.capturedOutput()) : undefined,
);
}
deps.onPhaseEnd?.('query-history', 'done');
}
return markTargetResult(target, args, 'done');
@ -774,9 +795,11 @@ export async function executePublicIngestTarget(
const runIngest = deps.runIngest ?? runKtxIngest;
const capturedIngestIo = deps.ingestProgress ? null : createCapturedPublicIngestIo();
const ingestIo = capturedIngestIo ?? io;
deps.onPhaseStart?.('source-ingest');
const exitCode = deps.ingestProgress
? await runIngest(ingestArgs, ingestIo, { progress: deps.ingestProgress })
: await runIngest(ingestArgs, ingestIo);
deps.onPhaseEnd?.('source-ingest', exitCode === 0 ? 'done' : 'failed');
return markTargetResult(
target,
args,

View file

@ -56,6 +56,7 @@ function createTargetState(target: KtxPublicIngestPlanTarget): ContextBuildTarge
startedAt: null,
elapsedMs: 0,
progressUpdatedAtMs: null,
phases: [],
};
}