From 13774bfcef1622a83e29f27042bde1bcdd97beb2 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 1 Jun 2026 23:31:31 +0200 Subject: [PATCH] feat(cli): stream plain ktx ingest progress to stderr (KLO-726) (#251) * feat(cli): share public ingest progress adapter * feat(cli): stream plain public ingest progress * test(cli): update plain ingest progress assertions * chore(cli): satisfy plain ingest progress checks * fix(artifacts): expect plain ingest stderr progress in installed-CLI smoke * ci(coverage): make Codecov upload non-fatal and fix repo slug The Coverage job failed because the Codecov upload returned 'Repository not found' while fail_ci_if_error was true, turning a Codecov-side issue into a hard CI failure even though all tests pass. - Set fail_ci_if_error: false on both uploads so Codecov outages or an unlinked repo no longer break CI (upload stays best-effort). - Correct the stale slug Kaelio/ktx -> Kaelio/ktx-ai-data-agents-context to match the actual GitHub repo (aligns with main). * fix(cli): isolate query-history failure capture from scan output The plain public-ingest progress path passes one captured IO as the target-level `io`. With progress deps set, both the schema scan and the query-history ingest resolved their capture to that same shared buffer, so a non-actionable query-history failure surfaced leftover scan report text (e.g. "Mode: enriched") as the skipped-facet detail instead of the real query-history message. Give the query-history ingest a phase-local capture while preserving the flow-to-io branch the foreground context-build view relies on. --------- Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> --- .github/workflows/ci.yml | 4 +- packages/cli/src/context-build-view.ts | 41 +-- packages/cli/src/progress-port-adapter.ts | 29 +++ packages/cli/src/public-ingest.ts | 139 ++++++++++- .../cli/test/progress-port-adapter.test.ts | 35 +++ packages/cli/test/public-ingest.test.ts | 235 +++++++++++++++++- scripts/package-artifacts.mjs | 10 +- scripts/package-artifacts.test.mjs | 1 + 8 files changed, 445 insertions(+), 49 deletions(-) create mode 100644 packages/cli/src/progress-port-adapter.ts create mode 100644 packages/cli/test/progress-port-adapter.test.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c981e98..23e4d668 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -217,7 +217,7 @@ jobs: flags: typescript name: typescript disable_search: true - fail_ci_if_error: true + fail_ci_if_error: false - name: Warn when Codecov token is missing for TypeScript if: env.CODECOV_TOKEN_CONFIGURED != 'true' @@ -236,7 +236,7 @@ jobs: flags: python name: python disable_search: true - fail_ci_if_error: true + fail_ci_if_error: false - name: Warn when Codecov token is missing for Python if: env.CODECOV_TOKEN_CONFIGURED != 'true' diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index 4b5be38b..f088097d 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -1,8 +1,6 @@ -import type { KtxProgressPort, KtxProgressUpdateOptions } from './context/scan/types.js'; import type { KtxCliIo } from './index.js'; import type { KtxIngestProgressUpdate } from './ingest.js'; import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js'; -import { publicDatabaseIngestMessage, publicQueryHistoryMessage } from './public-ingest-copy.js'; import type { KtxPublicIngestArgs, KtxPublicIngestDeps, @@ -10,7 +8,8 @@ import type { KtxPublicIngestProject, KtxPublicIngestTargetResult, } from './public-ingest.js'; -import { buildPublicIngestPlan, executePublicIngestTarget } from './public-ingest.js'; +import { buildPublicIngestPlan, executePublicIngestTarget, publicProgressMessage } from './public-ingest.js'; +import { createAggregateProgressPort } from './progress-port-adapter.js'; import { formatDuration } from './demo-metrics.js'; import { profileMark } from './startup-profile.js'; @@ -810,17 +809,6 @@ export function initViewState(targets: KtxPublicIngestPlanTarget[]): ContextBuil }; } -function publicProgressMessage(message: string, target: KtxPublicIngestPlanTarget): string { - let current = message; - if (target.operation === 'database-ingest') { - current = publicDatabaseIngestMessage(current); - } - if (target.steps.includes('query-history')) { - current = publicQueryHistoryMessage(current, target.connectionId); - } - return current; -} - function formatProgressDetail( update: Pick, target: KtxPublicIngestPlanTarget, @@ -829,29 +817,6 @@ function formatProgressDetail( return `[${percent}%] ${publicProgressMessage(update.message, target)}`; } -function createContextBuildProgressPort( - onProgress: (update: KtxIngestProgressUpdate) => void, - state: { progress: number } = { progress: 0 }, - start = 0, - weight = 1, -): KtxProgressPort { - return { - async update(value: number, message?: string, options?: KtxProgressUpdateOptions): Promise { - const absoluteValue = start + Math.max(0, Math.min(1, value)) * weight; - state.progress = Math.max(state.progress, Math.min(1, absoluteValue)); - if (!message) return; - onProgress({ - percent: Math.max(0, Math.min(100, Math.round(state.progress * 100))), - message, - ...(options?.transient !== undefined ? { transient: options.transient } : {}), - }); - }, - startPhase(phaseWeight: number): KtxProgressPort { - return createContextBuildProgressPort(onProgress, state, state.progress, weight * phaseWeight); - }, - }; -} - export async function runContextBuild( project: KtxPublicIngestProject, args: ContextBuildArgs, @@ -1022,7 +987,7 @@ export async function runContextBuild( }; const progressDeps: KtxPublicIngestDeps = { - scanProgress: createContextBuildProgressPort(updateSchemaPhase), + scanProgress: createAggregateProgressPort(updateSchemaPhase), ingestProgress: updateIngestPhase, runtimeIo: io, onPhaseStart, diff --git a/packages/cli/src/progress-port-adapter.ts b/packages/cli/src/progress-port-adapter.ts new file mode 100644 index 00000000..1f73636b --- /dev/null +++ b/packages/cli/src/progress-port-adapter.ts @@ -0,0 +1,29 @@ +import type { KtxProgressPort, KtxProgressUpdateOptions } from './context/scan/types.js'; +import type { KtxIngestProgressUpdate } from './ingest.js'; + +export interface AggregateProgressState { + progress: number; +} + +export function createAggregateProgressPort( + onProgress: (update: KtxIngestProgressUpdate) => void, + state: AggregateProgressState = { progress: 0 }, + start = 0, + weight = 1, +): KtxProgressPort { + return { + async update(value: number, message?: string, options?: KtxProgressUpdateOptions): Promise { + const absoluteValue = start + Math.max(0, Math.min(1, value)) * weight; + state.progress = Math.max(state.progress, Math.min(1, absoluteValue)); + if (!message) return; + onProgress({ + percent: Math.max(0, Math.min(100, Math.round(state.progress * 100))), + message, + ...(options?.transient !== undefined ? { transient: options.transient } : {}), + }); + }, + startPhase(phaseWeight: number): KtxProgressPort { + return createAggregateProgressPort(onProgress, state, state.progress, weight * phaseWeight); + }, + }; +} diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index 25fe30dd..f2b8cdd4 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -11,7 +11,12 @@ import { type ManagedPythonCommandRuntime, } from './managed-python-command.js'; import type { KtxRuntimeFeature } from './managed-python-runtime.js'; -import { publicIngestOutputLine } from './public-ingest-copy.js'; +import { + publicDatabaseIngestMessage, + publicIngestOutputLine, + publicQueryHistoryMessage, +} from './public-ingest-copy.js'; +import { createAggregateProgressPort } from './progress-port-adapter.js'; import { resolvePublicIngestRuntimeRequirements } from './runtime-requirements.js'; import type { KtxScanArgs, KtxScanDeps } from './scan.js'; import { profileMark } from './startup-profile.js'; @@ -129,6 +134,17 @@ const sourceAdapterByDriver = new Map([ ['lookml', 'lookml'], ]); +export function publicProgressMessage(message: string, target: KtxPublicIngestPlanTarget): string { + let current = message; + if (target.operation === 'database-ingest') { + current = publicDatabaseIngestMessage(current); + } + if (target.steps.includes('query-history')) { + current = publicQueryHistoryMessage(current, target.connectionId); + } + return current; +} + const queryHistoryDialectByDriver = new Map([ ['postgres', 'postgres'], ['bigquery', 'bigquery'], @@ -729,6 +745,80 @@ function createCapturedPublicIngestIo(): CapturedPublicIngestIo { }; } +function isCapturedPublicIngestIo(io: KtxCliIo): io is CapturedPublicIngestIo { + return typeof (io as Partial).capturedOutput === 'function'; +} + +const PLAIN_PUBLIC_INGEST_PHASE_LABELS: Record = { + 'database-schema': 'database schema', + 'query-history': 'query history', + 'source-ingest': 'source ingest', +}; + +interface PlainPublicIngestProgressOptions { + target: KtxPublicIngestPlanTarget; + index: number; + total: number; +} + +function firstSummaryLine(summary: string | undefined): string | undefined { + if (!summary) return undefined; + return summary.split(/\r?\n/).find((line) => line.trim().length > 0)?.trim(); +} + +function plainPhaseHeader(options: PlainPublicIngestProgressOptions, phaseKey: KtxPublicIngestPhaseKey): string { + const prefix = options.total > 1 ? `[${options.index + 1}/${options.total}] ` : ''; + return `${prefix}${options.target.connectionId} · ${PLAIN_PUBLIC_INGEST_PHASE_LABELS[phaseKey]}`; +} + +function plainPhaseEndLine(status: 'done' | 'failed' | 'skipped', summary?: string): string { + const firstLine = firstSummaryLine(summary); + return firstLine ? ` ${status} · ${firstLine}` : ` ${status}`; +} + +function createPlainPublicIngestProgress(io: KtxCliIo, options: PlainPublicIngestProgressOptions): Required< + Pick +> { + let currentPhase: KtxPublicIngestPhaseKey | null = null; + const startedPhases = new Set(); + const lastPercentByPhase = new Map(); + + const startPhase = (phaseKey: KtxPublicIngestPhaseKey): void => { + currentPhase = phaseKey; + startedPhases.add(phaseKey); + lastPercentByPhase.set(phaseKey, -1); + io.stderr.write(`${plainPhaseHeader(options, phaseKey)}\n`); + }; + + const ensurePhaseStarted = (phaseKey: KtxPublicIngestPhaseKey): void => { + if (!startedPhases.has(phaseKey)) { + startPhase(phaseKey); + return; + } + currentPhase = phaseKey; + }; + + const emitProgress = (update: KtxIngestProgressUpdate): void => { + if (currentPhase === null) return; + const rounded = Math.max(0, Math.min(100, Math.round(update.percent))); + const lastPercent = lastPercentByPhase.get(currentPhase) ?? -1; + if (rounded <= lastPercent) return; + lastPercentByPhase.set(currentPhase, rounded); + io.stderr.write(` [${rounded}%] ${publicProgressMessage(update.message, options.target)}\n`); + }; + + return { + onPhaseStart: startPhase, + onPhaseEnd(phaseKey, status, summary) { + ensurePhaseStarted(phaseKey); + io.stderr.write(`${plainPhaseEndLine(status, summary)}\n`); + currentPhase = null; + }, + scanProgress: createAggregateProgressPort(emitProgress), + ingestProgress: emitProgress, + }; +} + const INTERNAL_STATUS_LINE_RE = /^(Report|Run|Job|Status|Adapter|Connection|Sync|Diff|Tasks|Work units|Failed tasks|Saved memory|Provenance rows):\s*/; const ACTIONABLE_FAILURE_LINE_RE = @@ -790,7 +880,7 @@ export async function executePublicIngestTarget( ? { ...step, status: 'failed', - detail: target.preflightFailure, + detail: `${target.connectionId} failed: ${target.preflightFailure}`, } : step, ), @@ -810,7 +900,11 @@ export async function executePublicIngestTarget( ...(args.runtimeInstallPolicy ? { runtimeInstallPolicy: args.runtimeInstallPolicy } : {}), }; const runScan = deps.runScan ?? runKtxScan; - const capturedScanIo = deps.scanProgress ? null : createCapturedPublicIngestIo(); + const capturedScanIo = deps.scanProgress + ? isCapturedPublicIngestIo(io) + ? io + : null + : createCapturedPublicIngestIo(); const scanIo = capturedScanIo ?? io; const scanDeps = { ...(deps.scanProgress ? { progress: deps.scanProgress } : {}), @@ -853,7 +947,13 @@ export async function executePublicIngestTarget( ...(target.queryHistory.windowDays !== undefined ? { windowDays: target.queryHistory.windowDays } : {}), }, }; - const capturedIngestIo = deps.ingestProgress ? null : createCapturedPublicIngestIo(); + // Query history runs after the schema scan has already written its report + // into the shared target io, so it needs a phase-local capture. Reusing + // `io` here would let leftover scan text (e.g. "Mode: enriched") surface as + // the query-history failure detail. Only skip capture when progress is + // active and the caller manages its own buffer (io is not a capture). + const capturedIngestIo = + deps.ingestProgress && !isCapturedPublicIngestIo(io) ? null : createCapturedPublicIngestIo(); const ingestIo = capturedIngestIo ?? io; const ingestDeps = { ...(deps.ingestProgress ? { progress: deps.ingestProgress } : {}), @@ -893,7 +993,11 @@ export async function executePublicIngestTarget( allowImplicitAdapter: true, }; const runIngest = deps.runIngest ?? runKtxIngest; - const capturedIngestIo = deps.ingestProgress ? null : createCapturedPublicIngestIo(); + const capturedIngestIo = deps.ingestProgress + ? isCapturedPublicIngestIo(io) + ? io + : null + : createCapturedPublicIngestIo(); const ingestIo = capturedIngestIo ?? io; const ingestDeps = { ...(deps.ingestProgress ? { progress: deps.ingestProgress } : {}), @@ -976,9 +1080,30 @@ export async function runKtxPublicIngest( } } - for (const target of plan.targets) { + for (const [index, target] of plan.targets.entries()) { const startedAt = performance.now(); - const result = await executePublicIngestTarget(target, args, io, deps); + if (args.json) { + const result = await executePublicIngestTarget(target, args, io, deps); + results.push(result); + await emitIngestCompleted({ args, project, target, result, startedAt, io }); + continue; + } + + const capture = createCapturedPublicIngestIo(); + const progress = createPlainPublicIngestProgress(io, { + target, + index, + total: plan.targets.length, + }); + const targetDeps: KtxPublicIngestDeps = { + ...deps, + scanProgress: progress.scanProgress, + ingestProgress: progress.ingestProgress, + onPhaseStart: progress.onPhaseStart, + onPhaseEnd: progress.onPhaseEnd, + runtimeIo: deps.runtimeIo ?? io, + }; + const result = await executePublicIngestTarget(target, args, capture, targetDeps); results.push(result); await emitIngestCompleted({ args, project, target, result, startedAt, io }); } diff --git a/packages/cli/test/progress-port-adapter.test.ts b/packages/cli/test/progress-port-adapter.test.ts new file mode 100644 index 00000000..336883aa --- /dev/null +++ b/packages/cli/test/progress-port-adapter.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, it } from 'vitest'; +import { createAggregateProgressPort } from '../src/progress-port-adapter.js'; + +describe('createAggregateProgressPort', () => { + it('flattens nested weighted progress into absolute percent updates', async () => { + const updates: Array<{ percent: number; message: string; transient?: boolean }> = []; + const progress = createAggregateProgressPort((update) => updates.push(update)); + + await progress.update(0.1, 'Preparing scan'); + const nested = progress.startPhase(0.5); + await nested.update(0.5, 'Generating descriptions 2/4 tables', { transient: true }); + await progress.update(0.95, 'Writing schema artifacts'); + + expect(updates).toEqual([ + { percent: 10, message: 'Preparing scan' }, + { percent: 35, message: 'Generating descriptions 2/4 tables', transient: true }, + { percent: 95, message: 'Writing schema artifacts' }, + ]); + }); + + it('clamps updates and never moves the shared progress state backward', async () => { + const updates: Array<{ percent: number; message: string }> = []; + const progress = createAggregateProgressPort((update) => updates.push(update)); + + await progress.update(0.8, 'Building enriched schema context'); + await progress.update(0.2, 'Older scan callback'); + await progress.update(1.4, 'Scan completed'); + + expect(updates).toEqual([ + { percent: 80, message: 'Building enriched schema context' }, + { percent: 80, message: 'Older scan callback' }, + { percent: 100, message: 'Scan completed' }, + ]); + }); +}); diff --git a/packages/cli/test/public-ingest.test.ts b/packages/cli/test/public-ingest.test.ts index 41289208..2ffbefaf 100644 --- a/packages/cli/test/public-ingest.test.ts +++ b/packages/cli/test/public-ingest.test.ts @@ -8,6 +8,7 @@ import { buildPublicIngestPlan, type KtxPublicIngestDeps, type KtxPublicIngestProject, + publicProgressMessage, runKtxPublicIngest, } from '../src/public-ingest.js'; import type { ManagedPythonCommandRuntime } from '../src/managed-python-command.js'; @@ -346,6 +347,29 @@ describe('buildPublicIngestPlan', () => { }); }); +describe('publicProgressMessage', () => { + it('rewrites internal scan and historic-sql phrasing for public ingest progress', () => { + const databaseProject = deepReadyProject({ + warehouse: { driver: 'postgres', context: { queryHistory: { enabled: true, dialect: 'postgres' } } }, + }); + const databaseTarget = buildPublicIngestPlan(databaseProject, { + projectDir: '/tmp/project', + all: false, + targetConnectionId: 'warehouse', + queryHistory: 'default', + }).targets[0]; + + expect(databaseTarget).toBeDefined(); + expect(publicProgressMessage('Inspecting database schema', databaseTarget)).toBe('Reading database schema'); + expect(publicProgressMessage('Enriching schema metadata', databaseTarget)).toBe( + 'Building enriched schema context', + ); + expect(publicProgressMessage('Fetching source files for warehouse/historic-sql', databaseTarget)).toBe( + 'Fetching query history for warehouse', + ); + }); +}); + describe('runKtxPublicIngest', () => { afterEach(() => { vi.unstubAllEnvs(); @@ -371,11 +395,13 @@ describe('runKtxPublicIngest', () => { 1, expect.objectContaining({ connectionId: 'first', mode: 'enriched', detectRelationships: true }), expect.anything(), + expect.objectContaining({ progress: expect.any(Object) }), ); expect(runScan).toHaveBeenNthCalledWith( 2, expect.objectContaining({ connectionId: 'second', mode: 'enriched', detectRelationships: true }), expect.anything(), + expect.objectContaining({ progress: expect.any(Object) }), ); }); @@ -655,7 +681,10 @@ describe('runKtxPublicIngest', () => { expect(io.stdout()).not.toContain('Report: report-docs-1'); expect(io.stdout()).not.toContain('Adapter:'); expect(io.stdout()).not.toContain('notion\n'); - expect(io.stderr()).toBe(''); + expect(io.stderr()).toContain('docs · source ingest\n'); + expect(io.stderr()).toContain(' done\n'); + expect(io.stderr()).not.toContain('Report: report-docs-1'); + expect(io.stderr()).not.toContain('Adapter:'); }); it('suppresses historic-sql report output during direct public query-history ingest', async () => { @@ -694,9 +723,168 @@ describe('runKtxPublicIngest', () => { expect(io.stdout()).not.toContain('Report: report-query-history-1'); expect(io.stdout()).not.toContain('Adapter:'); expect(io.stdout()).not.toContain('historic-sql'); + expect(io.stderr()).toContain('warehouse · database schema\n'); + expect(io.stderr()).toContain('warehouse · query history\n'); + expect(io.stderr()).toContain(' done\n'); + expect(io.stderr()).not.toContain('Report: report-query-history-1'); + expect(io.stderr()).not.toContain('Adapter:'); + expect(io.stderr()).not.toContain('historic-sql'); + }); + + it('streams plain non-json progress to stderr while keeping final results on stdout', async () => { + const io = makeIo(); + const project = deepReadyProject({ + warehouse: { driver: 'postgres', context: { queryHistory: { enabled: true, dialect: 'postgres' } } }, + docs: { driver: 'notion' }, + }); + const runScan = vi.fn>(async (_args, scanIo, deps) => { + scanIo.stdout.write('KTX scan completed\n'); + scanIo.stdout.write('Report: raw-sources/warehouse/live-database/sync-1/scan-report.json\n'); + await deps?.progress?.update(0.12, 'Inspecting database schema'); + const enrichmentProgress = deps?.progress?.startPhase(0.5); + await enrichmentProgress?.update(0.75, 'Enriching schema metadata', { transient: true }); + await deps?.progress?.update(1, 'Writing schema artifacts'); + return 0; + }); + const runIngest = vi.fn>(async (ingestArgs, ingestIo, deps) => { + if (ingestArgs.command !== 'run') { + throw new Error(`Unexpected ingest command: ${ingestArgs.command}`); + } + ingestIo.stdout.write(`Adapter: ${ingestArgs.adapter}\n`); + ingestIo.stdout.write('Report: report-progress-1\n'); + if (ingestArgs.adapter === 'historic-sql') { + deps?.progress?.({ percent: 15, message: 'Fetching source files for warehouse/historic-sql' }); + deps?.progress?.({ percent: 90, message: 'Saved memory: 1 wiki, 1 SL' }); + return 0; + } + deps?.progress?.({ percent: 55, message: 'Processing 3/8 tasks' }); + deps?.progress?.({ percent: 90, message: 'Saved memory: 6 wiki, 2 SL' }); + return 0; + }); + + await expect( + runKtxPublicIngest( + { + command: 'run', + projectDir: '/tmp/project', + all: true, + json: false, + inputMode: 'disabled', + queryHistory: 'default', + }, + io.io, + { loadProject: vi.fn(async () => project), runScan, runIngest }, + ), + ).resolves.toBe(0); + + expect(io.stdout()).toContain('Ingest finished'); + expect(io.stdout()).toContain('warehouse'); + expect(io.stdout()).toContain('docs'); + expect(io.stdout()).not.toContain('KTX scan completed'); + expect(io.stdout()).not.toContain('Report:'); + expect(io.stdout()).not.toContain('Adapter:'); + expect(io.stderr()).toContain('[1/2] warehouse · database schema\n'); + expect(io.stderr()).toContain(' [12%] Reading database schema\n'); + expect(io.stderr()).toContain(' [50%] Building enriched schema context\n'); + expect(io.stderr()).toContain('[1/2] warehouse · query history\n'); + expect(io.stderr()).toContain(' [15%] Fetching query history for warehouse\n'); + expect(io.stderr()).toContain('[2/2] docs · source ingest\n'); + expect(io.stderr()).toContain(' [55%] Processing 3/8 tasks\n'); + expect(io.stderr()).not.toContain('\r'); + }); + + it('does not emit plain progress for json public ingest output', async () => { + const io = makeIo(); + const project = deepReadyProject({ + warehouse: { driver: 'postgres' }, + }); + const runScan = vi.fn>(async (_args, _scanIo, deps) => { + expect(deps?.progress).toBeUndefined(); + return 0; + }); + + await expect( + runKtxPublicIngest( + { + command: 'run', + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + json: true, + inputMode: 'disabled', + }, + io.io, + { loadProject: vi.fn(async () => project), runScan }, + ), + ).resolves.toBe(0); + + expect(JSON.parse(io.stdout())).toMatchObject({ + plan: { projectDir: '/tmp/project' }, + results: [{ connectionId: 'warehouse', driver: 'postgres' }], + }); expect(io.stderr()).toBe(''); }); + it('keeps captured failure details when plain progress ports are active', async () => { + const io = makeIo(); + const project = deepReadyProject({ warehouse: { driver: 'postgres' } }); + const runScan = vi.fn>(async (_args, scanIo, deps) => { + await deps?.progress?.update(0.42, 'Enriching schema metadata'); + scanIo.stdout.write('KTX scan enrichment failed after structural scan completed: embedding service timed out\n'); + return 1; + }); + + await expect( + runKtxPublicIngest( + { + command: 'run', + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + json: false, + inputMode: 'disabled', + }, + io.io, + { loadProject: vi.fn(async () => project), runScan }, + ), + ).resolves.toBe(1); + + expect(io.stderr()).toContain('warehouse · database schema\n'); + expect(io.stderr()).toContain(' [42%] Building enriched schema context\n'); + expect(io.stderr()).toContain(' failed\n'); + expect(io.stdout()).toContain( + 'warehouse failed: Database enrichment failed after schema context completed: embedding service timed out.', + ); + expect(io.stdout()).not.toContain('KTX scan enrichment failed'); + expect(io.stdout()).not.toContain('structural scan'); + }); + + it('prints a failed plain phase when preflight fails before phase start', async () => { + const io = makeIo(); + const project = projectWithConnections({ + warehouse: { driver: 'postgres' }, + }); + + await expect( + runKtxPublicIngest( + { + command: 'run', + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + json: false, + inputMode: 'disabled', + }, + io.io, + { loadProject: vi.fn(async () => project) }, + ), + ).resolves.toBe(1); + + expect(io.stderr()).toContain('warehouse · database schema\n'); + expect(io.stderr()).toContain(' failed · warehouse cannot be ingested: enrichment is not configured'); + expect(io.stdout()).toContain('warehouse failed: warehouse cannot be ingested: enrichment is not configured'); + }); + it('delegates interactive TTY public ingest to the foreground context-build view', async () => { const io = makeIo({ isTTY: true, interactive: true }); const project = projectWithConnections({ warehouse: { driver: 'postgres' } }); @@ -872,6 +1060,7 @@ describe('runKtxPublicIngest', () => { inputMode: 'disabled', }), expect.anything(), + expect.objectContaining({ progress: expect.any(Function) }), ); expect(runScan).toHaveBeenCalledWith( { @@ -883,6 +1072,7 @@ describe('runKtxPublicIngest', () => { dryRun: false, }, expect.anything(), + expect.objectContaining({ progress: expect.any(Object) }), ); expect(io.stdout()).toContain('Ingest finished with partial failures'); expect(io.stdout()).toContain('warehouse failed at database-schema.'); @@ -930,6 +1120,45 @@ describe('runKtxPublicIngest', () => { expect(io.stdout()).not.toContain('historic-sql'); }); + it('reports the query-history failure without leaking earlier scan report output', async () => { + const io = makeIo(); + const project = deepReadyProject({ + warehouse: { driver: 'postgres' }, + }); + const runScan = vi.fn(async (_args, scanIo) => { + scanIo.stdout.write('Run: scan-run-1\n'); + scanIo.stdout.write('Mode: enriched\n'); + scanIo.stdout.write('Dry run: no\n'); + scanIo.stdout.write('KTX scan completed\n'); + return 0; + }); + const runIngest = vi.fn(async (_args, ingestIo) => { + ingestIo.stderr.write('Stopped query history before persisting any results\n'); + return 1; + }); + + await expect( + runKtxPublicIngest( + { + command: 'run', + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + json: false, + inputMode: 'disabled', + queryHistory: 'enabled', + }, + io.io, + { loadProject: vi.fn(async () => project), runScan, runIngest }, + ), + ).resolves.toBe(0); + + expect(io.stdout()).toContain('Skipped query history:'); + expect(io.stdout()).toContain('Stopped query history before persisting any results'); + expect(io.stdout()).not.toContain('Dry run: no'); + expect(io.stdout()).not.toContain('Mode: enriched'); + }); + it('prints the runtime artifact build hint for missing query-history runtime assets', async () => { const io = makeIo(); const project = deepReadyProject({ @@ -989,6 +1218,7 @@ describe('runKtxPublicIngest', () => { expect(runIngest).toHaveBeenCalledWith( expect.objectContaining({ command: 'run', connectionId: 'docs', adapter: 'notion' }), expect.anything(), + expect.objectContaining({ progress: expect.any(Function) }), ); expect(io.stdout()).toContain('warehouse cannot be ingested: enrichment is not configured'); }); @@ -1027,6 +1257,7 @@ describe('runKtxPublicIngest', () => { dryRun: false, }, expect.objectContaining({ capturedOutput: expect.any(Function) }), + expect.objectContaining({ progress: expect.any(Object) }), ); }); @@ -1099,6 +1330,7 @@ describe('runKtxPublicIngest', () => { sourceDir: '/repo/dbt', }), expect.objectContaining({ capturedOutput: expect.any(Function) }), + expect.objectContaining({ progress: expect.any(Function) }), ); }); @@ -1135,6 +1367,7 @@ describe('runKtxPublicIngest', () => { allowImplicitAdapter: true, }), expect.objectContaining({ capturedOutput: expect.any(Function) }), + expect.objectContaining({ progress: expect.any(Function) }), ); }); diff --git a/scripts/package-artifacts.mjs b/scripts/package-artifacts.mjs index d66d7f1a..e9ab5e9a 100644 --- a/scripts/package-artifacts.mjs +++ b/scripts/package-artifacts.mjs @@ -518,7 +518,10 @@ function requireExitCodeWithProjectStderr(label, result, projectDir, expectedCod expectedCode, label + ' failed with code ' + result.code + '\\nstdout:\\n' + result.stdout + '\\nstderr:\\n' + result.stderr, ); - assert.equal(result.stderr, 'Project: ' + projectDir + '\\n', label + ' wrote unexpected stderr'); + assert.ok( + result.stderr.startsWith('Project: ' + projectDir + '\\n'), + label + ' did not lead stderr with the project notice\\nstderr:\\n' + result.stderr, + ); } function requireSuccessWithStderr(label, result, stderrPattern) { @@ -534,6 +537,10 @@ function requireOutput(label, result, text) { assert.match(result.stdout, text, label + ' output did not match ' + text); } +function requireStderr(label, result, stderrPattern) { + assert.match(result.stderr, stderrPattern, label + ' stderr did not match ' + stderrPattern); +} + function escapeRegExp(value) { return value.replace(/[|\\\\{}()[\\]^$+*?.]/g, '\\\\$&'); } @@ -857,6 +864,7 @@ try { ), ); requireExitCodeWithProjectStderr('ktx ingest enrichment guard', databaseIngest, projectDir, 1); + requireStderr('ktx ingest enrichment guard', databaseIngest, /^ {2}failed /m); requireOutput('ktx ingest enrichment guard', databaseIngest, /Ingest finished with partial failures/); requireOutput('ktx ingest enrichment guard', databaseIngest, /enrichment is not configured/); process.stdout.write('ktx ingest enrichment guard verified\\n'); diff --git a/scripts/package-artifacts.test.mjs b/scripts/package-artifacts.test.mjs index ffc59ce6..29e7fb1e 100644 --- a/scripts/package-artifacts.test.mjs +++ b/scripts/package-artifacts.test.mjs @@ -535,6 +535,7 @@ describe('verification snippets', () => { assert.doesNotMatch(source, /'--enrich'/); assert.match(source, /ktx ingest enrichment guard verified/); assert.match(source, /enrichment is not configured/); + assert.match(source, /requireStderr\('ktx ingest enrichment guard'/); assert.match(source, /enrichment:/); assert.match(source, /mode: deterministic/); assert.doesNotMatch(source, /run\('pnpm', \['exec', 'ktx', 'ingest', 'run'/);