From 830c372ca9f3746ced8fe6cf5eb7dcb309f1b34a Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Fri, 29 May 2026 17:49:23 +0200 Subject: [PATCH] fix(cli): treat artifact-producing ingests with failures as partial --- packages/cli/src/ingest.ts | 18 ++-- packages/cli/test/ingest.test.ts | 139 ++++++++++++++++++++++++++++++- 2 files changed, 142 insertions(+), 15 deletions(-) diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index fb8c9a29..ad5ba270 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -2,7 +2,7 @@ import { buildMemoryFlowViewModel } from './context/ingest/memory-flow/view-mode import { createMemoryFlowLiveBuffer, sanitizeMemoryFlowError } from './context/ingest/memory-flow/live-buffer.js'; import { formatMemoryFlowFinalSummary } from './context/ingest/memory-flow/summary.js'; import { getLatestLocalIngestStatus, getLocalIngestStatus, type LocalMetabaseFanoutResult, type LocalMetabaseFanoutProgress, type RunLocalIngestOptions, runLocalIngest, runLocalMetabaseIngest } from './context/ingest/local-ingest.js'; -import { type IngestReportSnapshot, savedMemoryCountsForReport } from './context/ingest/reports.js'; +import { type IngestReportSnapshot, ingestReportOutcome, savedMemoryCountsForReport } from './context/ingest/reports.js'; import { ingestReportToMemoryFlowReplay } from './context/ingest/memory-flow/events.js'; import type { MemoryFlowEvent, MemoryFlowReplayInput } from './context/ingest/memory-flow/types.js'; import { renderMemoryFlowReplay } from './context/ingest/memory-flow/render.js'; @@ -93,10 +93,6 @@ export interface KtxIngestDeps { runtimeIo?: KtxIngestIo; } -function reportStatus(report: IngestReportSnapshot): 'done' | 'error' { - return report.body.status === 'failed' || report.body.failedWorkUnits.length > 0 ? 'error' : 'done'; -} - const REPORT_SOURCE_LABELS = new Map([ ['live-database', 'Database schema'], ['historic-sql', 'Query history'], @@ -193,7 +189,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void if (report.body.tracePath) { io.stdout.write(`Trace: ${report.body.tracePath}\n`); } - io.stdout.write(`Status: ${reportStatus(report)}\n`); + io.stdout.write(`Status: ${ingestReportOutcome(report)}\n`); io.stdout.write(`Source: ${reportSourceLabel(report.sourceKey)}\n`); io.stdout.write(`Connection: ${report.connectionId}\n`); io.stdout.write(`Sync: ${report.body.syncId}\n`); @@ -231,7 +227,7 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng } io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`); for (const child of result.children) { - const status = reportStatus(child.report); + const status = ingestReportOutcome(child.report); io.stdout.write( `- target=${child.targetConnectionId} database=${child.metabaseDatabaseId} status=${status} job=${child.jobId} report=${child.report.id}\n`, ); @@ -595,7 +591,7 @@ function initialRunMemoryFlowInput( } function finalRunMemoryFlowInput(snapshot: MemoryFlowReplayInput, report: IngestReportSnapshot): MemoryFlowReplayInput { - const status = reportStatus(report); + const status = ingestReportOutcome(report) === 'error' ? 'error' : 'done'; return { ...snapshot, runId: report.runId, @@ -777,7 +773,7 @@ export async function runKtxIngest( } finally { plainProgress?.flush(); } - return result.status === 'all_succeeded' ? 0 : 1; + return result.status === 'all_failed' ? 1 : 0; } const jobId = deps.jobIdFactory?.(); @@ -846,7 +842,7 @@ export async function runKtxIngest( liveTui?.close(); liveTui = null; io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot)); - return reportStatus(result.report) === 'done' ? 0 : 1; + return ingestReportOutcome(result.report) === 'error' ? 1 : 0; } plainProgress?.flush(); await writeReportRecord(result.report, runOutputMode, io, { @@ -854,7 +850,7 @@ export async function runKtxIngest( renderStoredMemoryFlow: deps.renderStoredMemoryFlow, env, }); - return reportStatus(result.report) === 'done' ? 0 : 1; + return ingestReportOutcome(result.report) === 'error' ? 1 : 0; } finally { plainProgress?.flush(); liveTui?.close(); diff --git a/packages/cli/test/ingest.test.ts b/packages/cli/test/ingest.test.ts index eef751ba..f5cd1ac5 100644 --- a/packages/cli/test/ingest.test.ts +++ b/packages/cli/test/ingest.test.ts @@ -403,7 +403,7 @@ describe('runKtxIngest', () => { expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); }); - it('returns a non-zero code when Metabase fanout has failed children', async () => { + it('returns a non-zero code when a Metabase fanout child fully fails', async () => { const projectDir = join(tempDir, 'project'); await writeMetabaseConfig(projectDir); const io = makeIo(); @@ -441,7 +441,7 @@ describe('runKtxIngest', () => { { runLocalMetabaseIngest: async () => ({ metabaseConnectionId: 'prod-metabase', - status: 'partial_failure', + status: 'all_failed', totals: { workUnits: 1, failedWorkUnits: 1 }, children: [ { @@ -467,9 +467,83 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(1); - expect(io.stdout()).toContain('Metabase fanout: partial_failure'); - expect(io.stdout()).toContain('Failed tasks: 1'); + expect(io.stdout()).toContain('Metabase fanout: all_failed'); expect(io.stdout()).toContain('status=error'); + }); + + it('exits 0 and reports status=partial when a Metabase child saved memory despite a failure', async () => { + const projectDir = join(tempDir, 'project'); + await writeMetabaseConfig(projectDir); + const io = makeIo(); + const report = localFakeBundleReport('metabase-child-1', { + id: 'report-metabase-child-1', + runId: 'run-a', + jobId: 'metabase-child-1', + connectionId: 'warehouse_a', + sourceKey: 'metabase', + body: { + failedWorkUnits: ['metabase-db-2'], + workUnits: [ + { + unitKey: 'metabase-db-1', + rawFiles: ['cards/1.json'], + status: 'success', + actions: [{ target: 'sl', type: 'updated', key: 'warehouse.orders', detail: 'measure' }], + touchedSlSources: [], + }, + { + unitKey: 'metabase-db-2', + rawFiles: ['cards/2.json'], + status: 'failed', + reason: 'bad SQL', + actions: [], + touchedSlSources: [], + }, + ], + }, + }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'prod-metabase', + adapter: 'metabase', + outputMode: 'plain', + }, + io.io, + { + runLocalMetabaseIngest: async () => ({ + metabaseConnectionId: 'prod-metabase', + status: 'partial_failure', + totals: { workUnits: 2, failedWorkUnits: 1 }, + children: [ + { + jobId: 'metabase-child-1', + metabaseConnectionId: 'prod-metabase', + metabaseDatabaseId: 1, + targetConnectionId: 'warehouse_a', + result: { + jobId: 'metabase-child-1', + runId: 'run-a', + syncId: 'sync-a', + diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, + workUnitCount: 2, + failedWorkUnits: ['metabase-db-2'], + artifactsWritten: 1, + commitSha: 'abc', + }, + report, + }, + ], + }), + }, + ), + ).resolves.toBe(0); + + expect(io.stdout()).toContain('Metabase fanout: partial_failure'); + expect(io.stdout()).toContain('status=partial'); expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); }); @@ -1140,6 +1214,63 @@ describe('runKtxIngest', () => { expect(io.stdout()).toContain('Status: error\n'); }); + it('exits 0 and reports Status: partial when a single-source ingest saved memory despite a failure', async () => { + const projectDir = join(tempDir, 'project'); + await writeWarehouseConfig(projectDir); + const sourceDir = join(tempDir, 'source'); + await mkdir(join(sourceDir, 'orders'), { recursive: true }); + await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8'); + + const partialReport = localFakeBundleReport('local-job-partial', { + connectionId: 'warehouse', + sourceKey: 'fake', + body: { + failedWorkUnits: ['orders-bad'], + workUnits: [ + { + unitKey: 'orders-ok', + rawFiles: ['orders/orders.json'], + status: 'success', + actions: [{ target: 'wiki', type: 'created', key: 'wiki/orders.md', detail: 'orders' }], + touchedSlSources: [], + }, + { + unitKey: 'orders-bad', + rawFiles: ['orders/bad.json'], + status: 'failed', + reason: 'writer tool failed', + actions: [], + touchedSlSources: [], + }, + ], + }, + }); + const runLocal = vi.fn(async (_input: RunLocalIngestOptions) => ({ + result: { + jobId: 'local-job-partial', + runId: partialReport.runId, + syncId: partialReport.body.syncId, + diffSummary: partialReport.body.diffSummary, + workUnitCount: partialReport.body.workUnits.length, + failedWorkUnits: partialReport.body.failedWorkUnits, + artifactsWritten: 1, + commitSha: partialReport.body.commitSha, + }, + report: partialReport, + })); + + const io = makeIo(); + await expect( + runKtxIngest( + { command: 'run', projectDir, connectionId: 'warehouse', adapter: 'fake', sourceDir, outputMode: 'plain' }, + io.io, + { runLocalIngest: runLocal, jobIdFactory: () => 'local-job-partial' }, + ), + ).resolves.toBe(0); + + expect(io.stdout()).toContain('Status: partial\n'); + }); + it('prints trace path and error status for stored failed ingest reports', async () => { const projectDir = join(tempDir, 'project'); await writeWarehouseConfig(projectDir);