feat(cli): stream plain ktx ingest progress to stderr (KLO-726) (#251)

* feat(cli): share public ingest progress adapter

* feat(cli): stream plain public ingest progress

* test(cli): update plain ingest progress assertions

* chore(cli): satisfy plain ingest progress checks

* fix(artifacts): expect plain ingest stderr progress in installed-CLI smoke

* ci(coverage): make Codecov upload non-fatal and fix repo slug

The Coverage job failed because the Codecov upload returned
'Repository not found' while fail_ci_if_error was true, turning a
Codecov-side issue into a hard CI failure even though all tests pass.

- Set fail_ci_if_error: false on both uploads so Codecov outages or an
  unlinked repo no longer break CI (upload stays best-effort).
- Correct the stale slug Kaelio/ktx -> Kaelio/ktx-ai-data-agents-context
  to match the actual GitHub repo (aligns with main).

* fix(cli): isolate query-history failure capture from scan output

The plain public-ingest progress path passes one captured IO as the
target-level `io`. With progress deps set, both the schema scan and the
query-history ingest resolved their capture to that same shared buffer,
so a non-actionable query-history failure surfaced leftover scan report
text (e.g. "Mode: enriched") as the skipped-facet detail instead of the
real query-history message.

Give the query-history ingest a phase-local capture while preserving the
flow-to-io branch the foreground context-build view relies on.

---------

Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
This commit is contained in:
Andrey Avtomonov 2026-06-01 23:31:31 +02:00 committed by GitHub
parent d01abe6f3c
commit 13774bfcef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 445 additions and 49 deletions

View file

@ -1,8 +1,6 @@
import type { KtxProgressPort, KtxProgressUpdateOptions } from './context/scan/types.js';
import type { KtxCliIo } from './index.js';
import type { KtxIngestProgressUpdate } from './ingest.js';
import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js';
import { publicDatabaseIngestMessage, publicQueryHistoryMessage } from './public-ingest-copy.js';
import type {
KtxPublicIngestArgs,
KtxPublicIngestDeps,
@ -10,7 +8,8 @@ import type {
KtxPublicIngestProject,
KtxPublicIngestTargetResult,
} from './public-ingest.js';
import { buildPublicIngestPlan, executePublicIngestTarget } from './public-ingest.js';
import { buildPublicIngestPlan, executePublicIngestTarget, publicProgressMessage } from './public-ingest.js';
import { createAggregateProgressPort } from './progress-port-adapter.js';
import { formatDuration } from './demo-metrics.js';
import { profileMark } from './startup-profile.js';
@ -810,17 +809,6 @@ export function initViewState(targets: KtxPublicIngestPlanTarget[]): ContextBuil
};
}
function publicProgressMessage(message: string, target: KtxPublicIngestPlanTarget): string {
let current = message;
if (target.operation === 'database-ingest') {
current = publicDatabaseIngestMessage(current);
}
if (target.steps.includes('query-history')) {
current = publicQueryHistoryMessage(current, target.connectionId);
}
return current;
}
function formatProgressDetail(
update: Pick<KtxIngestProgressUpdate, 'percent' | 'message'>,
target: KtxPublicIngestPlanTarget,
@ -829,29 +817,6 @@ function formatProgressDetail(
return `[${percent}%] ${publicProgressMessage(update.message, target)}`;
}
function createContextBuildProgressPort(
onProgress: (update: KtxIngestProgressUpdate) => void,
state: { progress: number } = { progress: 0 },
start = 0,
weight = 1,
): KtxProgressPort {
return {
async update(value: number, message?: string, options?: KtxProgressUpdateOptions): Promise<void> {
const absoluteValue = start + Math.max(0, Math.min(1, value)) * weight;
state.progress = Math.max(state.progress, Math.min(1, absoluteValue));
if (!message) return;
onProgress({
percent: Math.max(0, Math.min(100, Math.round(state.progress * 100))),
message,
...(options?.transient !== undefined ? { transient: options.transient } : {}),
});
},
startPhase(phaseWeight: number): KtxProgressPort {
return createContextBuildProgressPort(onProgress, state, state.progress, weight * phaseWeight);
},
};
}
export async function runContextBuild(
project: KtxPublicIngestProject,
args: ContextBuildArgs,
@ -1022,7 +987,7 @@ export async function runContextBuild(
};
const progressDeps: KtxPublicIngestDeps = {
scanProgress: createContextBuildProgressPort(updateSchemaPhase),
scanProgress: createAggregateProgressPort(updateSchemaPhase),
ingestProgress: updateIngestPhase,
runtimeIo: io,
onPhaseStart,

View file

@ -0,0 +1,29 @@
import type { KtxProgressPort, KtxProgressUpdateOptions } from './context/scan/types.js';
import type { KtxIngestProgressUpdate } from './ingest.js';
export interface AggregateProgressState {
progress: number;
}
export function createAggregateProgressPort(
onProgress: (update: KtxIngestProgressUpdate) => void,
state: AggregateProgressState = { progress: 0 },
start = 0,
weight = 1,
): KtxProgressPort {
return {
async update(value: number, message?: string, options?: KtxProgressUpdateOptions): Promise<void> {
const absoluteValue = start + Math.max(0, Math.min(1, value)) * weight;
state.progress = Math.max(state.progress, Math.min(1, absoluteValue));
if (!message) return;
onProgress({
percent: Math.max(0, Math.min(100, Math.round(state.progress * 100))),
message,
...(options?.transient !== undefined ? { transient: options.transient } : {}),
});
},
startPhase(phaseWeight: number): KtxProgressPort {
return createAggregateProgressPort(onProgress, state, state.progress, weight * phaseWeight);
},
};
}

View file

@ -11,7 +11,12 @@ import {
type ManagedPythonCommandRuntime,
} from './managed-python-command.js';
import type { KtxRuntimeFeature } from './managed-python-runtime.js';
import { publicIngestOutputLine } from './public-ingest-copy.js';
import {
publicDatabaseIngestMessage,
publicIngestOutputLine,
publicQueryHistoryMessage,
} from './public-ingest-copy.js';
import { createAggregateProgressPort } from './progress-port-adapter.js';
import { resolvePublicIngestRuntimeRequirements } from './runtime-requirements.js';
import type { KtxScanArgs, KtxScanDeps } from './scan.js';
import { profileMark } from './startup-profile.js';
@ -129,6 +134,17 @@ const sourceAdapterByDriver = new Map<string, string>([
['lookml', 'lookml'],
]);
export function publicProgressMessage(message: string, target: KtxPublicIngestPlanTarget): string {
let current = message;
if (target.operation === 'database-ingest') {
current = publicDatabaseIngestMessage(current);
}
if (target.steps.includes('query-history')) {
current = publicQueryHistoryMessage(current, target.connectionId);
}
return current;
}
const queryHistoryDialectByDriver = new Map<string, HistoricSqlDialect>([
['postgres', 'postgres'],
['bigquery', 'bigquery'],
@ -729,6 +745,80 @@ function createCapturedPublicIngestIo(): CapturedPublicIngestIo {
};
}
function isCapturedPublicIngestIo(io: KtxCliIo): io is CapturedPublicIngestIo {
return typeof (io as Partial<CapturedPublicIngestIo>).capturedOutput === 'function';
}
const PLAIN_PUBLIC_INGEST_PHASE_LABELS: Record<KtxPublicIngestPhaseKey, string> = {
'database-schema': 'database schema',
'query-history': 'query history',
'source-ingest': 'source ingest',
};
interface PlainPublicIngestProgressOptions {
target: KtxPublicIngestPlanTarget;
index: number;
total: number;
}
function firstSummaryLine(summary: string | undefined): string | undefined {
if (!summary) return undefined;
return summary.split(/\r?\n/).find((line) => line.trim().length > 0)?.trim();
}
function plainPhaseHeader(options: PlainPublicIngestProgressOptions, phaseKey: KtxPublicIngestPhaseKey): string {
const prefix = options.total > 1 ? `[${options.index + 1}/${options.total}] ` : '';
return `${prefix}${options.target.connectionId} · ${PLAIN_PUBLIC_INGEST_PHASE_LABELS[phaseKey]}`;
}
function plainPhaseEndLine(status: 'done' | 'failed' | 'skipped', summary?: string): string {
const firstLine = firstSummaryLine(summary);
return firstLine ? ` ${status} · ${firstLine}` : ` ${status}`;
}
function createPlainPublicIngestProgress(io: KtxCliIo, options: PlainPublicIngestProgressOptions): Required<
Pick<KtxPublicIngestDeps, 'scanProgress' | 'ingestProgress' | 'onPhaseStart' | 'onPhaseEnd'>
> {
let currentPhase: KtxPublicIngestPhaseKey | null = null;
const startedPhases = new Set<KtxPublicIngestPhaseKey>();
const lastPercentByPhase = new Map<KtxPublicIngestPhaseKey, number>();
const startPhase = (phaseKey: KtxPublicIngestPhaseKey): void => {
currentPhase = phaseKey;
startedPhases.add(phaseKey);
lastPercentByPhase.set(phaseKey, -1);
io.stderr.write(`${plainPhaseHeader(options, phaseKey)}\n`);
};
const ensurePhaseStarted = (phaseKey: KtxPublicIngestPhaseKey): void => {
if (!startedPhases.has(phaseKey)) {
startPhase(phaseKey);
return;
}
currentPhase = phaseKey;
};
const emitProgress = (update: KtxIngestProgressUpdate): void => {
if (currentPhase === null) return;
const rounded = Math.max(0, Math.min(100, Math.round(update.percent)));
const lastPercent = lastPercentByPhase.get(currentPhase) ?? -1;
if (rounded <= lastPercent) return;
lastPercentByPhase.set(currentPhase, rounded);
io.stderr.write(` [${rounded}%] ${publicProgressMessage(update.message, options.target)}\n`);
};
return {
onPhaseStart: startPhase,
onPhaseEnd(phaseKey, status, summary) {
ensurePhaseStarted(phaseKey);
io.stderr.write(`${plainPhaseEndLine(status, summary)}\n`);
currentPhase = null;
},
scanProgress: createAggregateProgressPort(emitProgress),
ingestProgress: emitProgress,
};
}
const INTERNAL_STATUS_LINE_RE =
/^(Report|Run|Job|Status|Adapter|Connection|Sync|Diff|Tasks|Work units|Failed tasks|Saved memory|Provenance rows):\s*/;
const ACTIONABLE_FAILURE_LINE_RE =
@ -790,7 +880,7 @@ export async function executePublicIngestTarget(
? {
...step,
status: 'failed',
detail: target.preflightFailure,
detail: `${target.connectionId} failed: ${target.preflightFailure}`,
}
: step,
),
@ -810,7 +900,11 @@ export async function executePublicIngestTarget(
...(args.runtimeInstallPolicy ? { runtimeInstallPolicy: args.runtimeInstallPolicy } : {}),
};
const runScan = deps.runScan ?? runKtxScan;
const capturedScanIo = deps.scanProgress ? null : createCapturedPublicIngestIo();
const capturedScanIo = deps.scanProgress
? isCapturedPublicIngestIo(io)
? io
: null
: createCapturedPublicIngestIo();
const scanIo = capturedScanIo ?? io;
const scanDeps = {
...(deps.scanProgress ? { progress: deps.scanProgress } : {}),
@ -853,7 +947,13 @@ export async function executePublicIngestTarget(
...(target.queryHistory.windowDays !== undefined ? { windowDays: target.queryHistory.windowDays } : {}),
},
};
const capturedIngestIo = deps.ingestProgress ? null : createCapturedPublicIngestIo();
// Query history runs after the schema scan has already written its report
// into the shared target io, so it needs a phase-local capture. Reusing
// `io` here would let leftover scan text (e.g. "Mode: enriched") surface as
// the query-history failure detail. Only skip capture when progress is
// active and the caller manages its own buffer (io is not a capture).
const capturedIngestIo =
deps.ingestProgress && !isCapturedPublicIngestIo(io) ? null : createCapturedPublicIngestIo();
const ingestIo = capturedIngestIo ?? io;
const ingestDeps = {
...(deps.ingestProgress ? { progress: deps.ingestProgress } : {}),
@ -893,7 +993,11 @@ export async function executePublicIngestTarget(
allowImplicitAdapter: true,
};
const runIngest = deps.runIngest ?? runKtxIngest;
const capturedIngestIo = deps.ingestProgress ? null : createCapturedPublicIngestIo();
const capturedIngestIo = deps.ingestProgress
? isCapturedPublicIngestIo(io)
? io
: null
: createCapturedPublicIngestIo();
const ingestIo = capturedIngestIo ?? io;
const ingestDeps = {
...(deps.ingestProgress ? { progress: deps.ingestProgress } : {}),
@ -976,9 +1080,30 @@ export async function runKtxPublicIngest(
}
}
for (const target of plan.targets) {
for (const [index, target] of plan.targets.entries()) {
const startedAt = performance.now();
const result = await executePublicIngestTarget(target, args, io, deps);
if (args.json) {
const result = await executePublicIngestTarget(target, args, io, deps);
results.push(result);
await emitIngestCompleted({ args, project, target, result, startedAt, io });
continue;
}
const capture = createCapturedPublicIngestIo();
const progress = createPlainPublicIngestProgress(io, {
target,
index,
total: plan.targets.length,
});
const targetDeps: KtxPublicIngestDeps = {
...deps,
scanProgress: progress.scanProgress,
ingestProgress: progress.ingestProgress,
onPhaseStart: progress.onPhaseStart,
onPhaseEnd: progress.onPhaseEnd,
runtimeIo: deps.runtimeIo ?? io,
};
const result = await executePublicIngestTarget(target, args, capture, targetDeps);
results.push(result);
await emitIngestCompleted({ args, project, target, result, startedAt, io });
}