diff --git a/packages/cli/src/context/ingest/local-ingest.ts b/packages/cli/src/context/ingest/local-ingest.ts index 2351d420..ec8a72f4 100644 --- a/packages/cli/src/context/ingest/local-ingest.ts +++ b/packages/cli/src/context/ingest/local-ingest.ts @@ -13,6 +13,7 @@ import { localPullConfigForAdapter, type DefaultLocalIngestAdaptersOptions } fro import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js'; import type { MemoryFlowEventSink } from './memory-flow/types.js'; import { buildSyncId } from './raw-sources-paths.js'; +import { ingestReportOutcome } from './reports.js'; import type { IngestReportBody, IngestReportSnapshot } from './reports.js'; import { SqliteBundleIngestStore } from './sqlite-bundle-ingest-store.js'; import type { IngestBundleResult, IngestJobContext, IngestJobPhase, IngestTrigger, SourceAdapter } from './types.js'; @@ -79,7 +80,7 @@ export interface LocalMetabaseFanoutProgress { metabaseDatabaseId: number; targetConnectionId: string; jobId: string; - status: 'done' | 'failed'; + status: 'done' | 'partial' | 'failed'; }): void; } @@ -232,11 +233,11 @@ export async function runLocalIngest(options: RunLocalIngestOptions): Promise child.report.body.failedWorkUnits.length === 0).length; - if (succeeded === children.length) { + const outcomes = children.map((child) => ingestReportOutcome(child.report)); + if (outcomes.every((outcome) => outcome === 'done')) { return 'all_succeeded'; } - if (succeeded === 0) { + if (outcomes.every((outcome) => outcome === 'error')) { return 'all_failed'; } return 'partial_failure'; @@ -401,12 +402,13 @@ export async function runLocalMetabaseIngest( error, }); } + const childOutcome = ingestReportOutcome(child.report); options.progress?.onMetabaseChildCompleted?.({ metabaseConnectionId, metabaseDatabaseId: childPlan.metabaseDatabaseId, targetConnectionId, jobId: child.report.jobId, - status: child.report.body.failedWorkUnits.length > 0 ? 'failed' : 'done', + status: childOutcome === 'error' ? 'failed' : childOutcome, }); children.push({ jobId: child.report.jobId, diff --git a/packages/cli/src/context/ingest/memory-flow/events.ts b/packages/cli/src/context/ingest/memory-flow/events.ts index 020ce5ae..92cebe0f 100644 --- a/packages/cli/src/context/ingest/memory-flow/events.ts +++ b/packages/cli/src/context/ingest/memory-flow/events.ts @@ -1,5 +1,6 @@ import type { MemoryAction } from '../../../context/memory/types.js'; import type { LocalIngestRunRecord } from '../local-stage-ingest.js'; +import { ingestReportOutcome } from '../reports.js'; import type { IngestReportSnapshot } from '../reports.js'; import type { MemoryFlowActionDetail, @@ -72,7 +73,7 @@ function fullModeMetadata(input: { } function reportStatus(report: IngestReportSnapshot): MemoryFlowReplayInput['status'] { - return report.body.failedWorkUnits.length > 0 ? 'error' : 'done'; + return ingestReportOutcome(report) === 'error' ? 'error' : 'done'; } function reportCreatedEvent(report: IngestReportSnapshot): MemoryFlowEvent { diff --git a/packages/cli/src/context/ingest/reports.ts b/packages/cli/src/context/ingest/reports.ts index ea02a31a..09f92170 100644 --- a/packages/cli/src/context/ingest/reports.ts +++ b/packages/cli/src/context/ingest/reports.ts @@ -146,6 +146,20 @@ export function savedMemoryCountsForReport(report: IngestReportSnapshot): Ingest }; } +/** @internal */ +export type IngestReportOutcome = 'done' | 'partial' | 'error'; + +export function ingestReportOutcome(report: IngestReportSnapshot): IngestReportOutcome { + if (report.body.status === 'failed') { + return 'error'; + } + if (report.body.failedWorkUnits.length === 0) { + return 'done'; + } + const { wikiCount, slCount } = savedMemoryCountsForReport(report); + return wikiCount + slCount > 0 ? 'partial' : 'error'; +} + export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex { return { jobId, diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index fb8c9a29..ad5ba270 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -2,7 +2,7 @@ import { buildMemoryFlowViewModel } from './context/ingest/memory-flow/view-mode import { createMemoryFlowLiveBuffer, sanitizeMemoryFlowError } from './context/ingest/memory-flow/live-buffer.js'; import { formatMemoryFlowFinalSummary } from './context/ingest/memory-flow/summary.js'; import { getLatestLocalIngestStatus, getLocalIngestStatus, type LocalMetabaseFanoutResult, type LocalMetabaseFanoutProgress, type RunLocalIngestOptions, runLocalIngest, runLocalMetabaseIngest } from './context/ingest/local-ingest.js'; -import { type IngestReportSnapshot, savedMemoryCountsForReport } from './context/ingest/reports.js'; +import { type IngestReportSnapshot, ingestReportOutcome, savedMemoryCountsForReport } from './context/ingest/reports.js'; import { ingestReportToMemoryFlowReplay } from './context/ingest/memory-flow/events.js'; import type { MemoryFlowEvent, MemoryFlowReplayInput } from './context/ingest/memory-flow/types.js'; import { renderMemoryFlowReplay } from './context/ingest/memory-flow/render.js'; @@ -93,10 +93,6 @@ export interface KtxIngestDeps { runtimeIo?: KtxIngestIo; } -function reportStatus(report: IngestReportSnapshot): 'done' | 'error' { - return report.body.status === 'failed' || report.body.failedWorkUnits.length > 0 ? 'error' : 'done'; -} - const REPORT_SOURCE_LABELS = new Map([ ['live-database', 'Database schema'], ['historic-sql', 'Query history'], @@ -193,7 +189,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void if (report.body.tracePath) { io.stdout.write(`Trace: ${report.body.tracePath}\n`); } - io.stdout.write(`Status: ${reportStatus(report)}\n`); + io.stdout.write(`Status: ${ingestReportOutcome(report)}\n`); io.stdout.write(`Source: ${reportSourceLabel(report.sourceKey)}\n`); io.stdout.write(`Connection: ${report.connectionId}\n`); io.stdout.write(`Sync: ${report.body.syncId}\n`); @@ -231,7 +227,7 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng } io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`); for (const child of result.children) { - const status = reportStatus(child.report); + const status = ingestReportOutcome(child.report); io.stdout.write( `- target=${child.targetConnectionId} database=${child.metabaseDatabaseId} status=${status} job=${child.jobId} report=${child.report.id}\n`, ); @@ -595,7 +591,7 @@ function initialRunMemoryFlowInput( } function finalRunMemoryFlowInput(snapshot: MemoryFlowReplayInput, report: IngestReportSnapshot): MemoryFlowReplayInput { - const status = reportStatus(report); + const status = ingestReportOutcome(report) === 'error' ? 'error' : 'done'; return { ...snapshot, runId: report.runId, @@ -777,7 +773,7 @@ export async function runKtxIngest( } finally { plainProgress?.flush(); } - return result.status === 'all_succeeded' ? 0 : 1; + return result.status === 'all_failed' ? 1 : 0; } const jobId = deps.jobIdFactory?.(); @@ -846,7 +842,7 @@ export async function runKtxIngest( liveTui?.close(); liveTui = null; io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot)); - return reportStatus(result.report) === 'done' ? 0 : 1; + return ingestReportOutcome(result.report) === 'error' ? 1 : 0; } plainProgress?.flush(); await writeReportRecord(result.report, runOutputMode, io, { @@ -854,7 +850,7 @@ export async function runKtxIngest( renderStoredMemoryFlow: deps.renderStoredMemoryFlow, env, }); - return reportStatus(result.report) === 'done' ? 0 : 1; + return ingestReportOutcome(result.report) === 'error' ? 1 : 0; } finally { plainProgress?.flush(); liveTui?.close(); diff --git a/packages/cli/src/setup.ts b/packages/cli/src/setup.ts index 74056542..ebc04c87 100644 --- a/packages/cli/src/setup.ts +++ b/packages/cli/src/setup.ts @@ -1,7 +1,7 @@ import { existsSync } from 'node:fs'; import { basename, join, resolve } from 'node:path'; import { getLatestLocalIngestStatus } from './context/ingest/local-ingest.js'; -import { savedMemoryCountsForReport } from './context/ingest/reports.js'; +import { ingestReportOutcome, savedMemoryCountsForReport } from './context/ingest/reports.js'; import { ktxLocalStateDbPath } from './context/project/local-state-db.js'; import { loadKtxProject, type KtxLocalProject } from './context/project/project.js'; import { readKtxSetupState } from './context/project/setup-config.js'; @@ -306,7 +306,7 @@ function sourceConnections(config: Awaited>['c type LocalIngestStatusReport = NonNullable>>; function reportHasSavedContext(report: LocalIngestStatusReport): boolean { - if (report.body.failedWorkUnits.length > 0) { + if (ingestReportOutcome(report) === 'error') { return false; } const counts = savedMemoryCountsForReport(report); diff --git a/packages/cli/test/context/ingest/local-metabase-ingest.test.ts b/packages/cli/test/context/ingest/local-metabase-ingest.test.ts index 06822aa2..8fb89bd0 100644 --- a/packages/cli/test/context/ingest/local-metabase-ingest.test.ts +++ b/packages/cli/test/context/ingest/local-metabase-ingest.test.ts @@ -6,6 +6,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { initKtxProject, type KtxLocalProject } from '../../../src/context/project/project.js'; import { LocalMetabaseDiscoveryCache } from '../../../src/context/ingest/adapters/metabase/local-source-state-store.js'; import { getLocalIngestStatus, runLocalMetabaseIngest } from '../../../src/context/ingest/local-ingest.js'; +import { ingestReportOutcome } from '../../../src/context/ingest/reports.js'; import type { ChunkResult, FetchContext, SourceAdapter } from '../../../src/context/ingest/types.js'; class TestAgentRunner implements AgentRunnerPort { @@ -202,6 +203,24 @@ describe('runLocalMetabaseIngest', () => { expect(result.children[1]?.report.body.failedWorkUnits).toEqual(['metabase-db-2']); }); + it('keeps a child that saved memory out of all_failed when another child fails', async () => { + await seedMetabaseState(); + const agentRunner = new TestAgentRunner(); + const ids = ['metabase-child-1', 'metabase-child-2']; + + const result = await runLocalMetabaseIngest({ + project, + adapters: [new FakeMetabaseSourceAdapter()], + metabaseConnectionId: 'prod-metabase', + agentRunner, + jobIdFactory: () => ids.shift() ?? 'metabase-child-extra', + }); + + expect(result.status).toBe('partial_failure'); + expect(ingestReportOutcome(result.children[0].report)).toBe('done'); + expect(ingestReportOutcome(result.children[1].report)).toBe('error'); + }); + it('captures fetch-time child failures and continues later mappings', async () => { await seedMetabaseState(); project.config.connections.warehouse_c = { driver: 'postgres', url: 'postgres://localhost/c' }; diff --git a/packages/cli/test/context/ingest/memory-flow/events.test.ts b/packages/cli/test/context/ingest/memory-flow/events.test.ts index e29405a4..cb0e72c8 100644 --- a/packages/cli/test/context/ingest/memory-flow/events.test.ts +++ b/packages/cli/test/context/ingest/memory-flow/events.test.ts @@ -166,7 +166,7 @@ describe('memory-flow event mapping', () => { runId: 'run-1', connectionId: 'warehouse', adapter: 'lookml', - status: 'error', + status: 'done', sourceDir: null, syncId: 'sync-2', reportId: 'report-1', @@ -308,7 +308,7 @@ describe('memory-flow event mapping', () => { sourceReportPath: 'report-1', fallbackReason: null, }); - expect(replay.status).toBe('error'); + expect(replay.status).toBe('done'); expect(replay.reportId).toBe('report-1'); expect(replay.reportPath).toBe('report-1'); expect(replay.events[0]).toMatchObject({ type: 'source_acquired', emittedAt: '2026-05-01T10:00:00.000Z' }); diff --git a/packages/cli/test/context/ingest/reports.test.ts b/packages/cli/test/context/ingest/reports.test.ts new file mode 100644 index 00000000..5fc24f6d --- /dev/null +++ b/packages/cli/test/context/ingest/reports.test.ts @@ -0,0 +1,71 @@ +import { describe, expect, it } from 'vitest'; +import { ingestReportOutcome } from '../../../src/context/ingest/reports.js'; +import type { IngestReportSnapshot } from '../../../src/context/ingest/reports.js'; + +function report(body: Partial): IngestReportSnapshot { + return { + id: 'r', + runId: 'run', + jobId: 'job', + connectionId: 'warehouse', + sourceKey: 'metabase', + createdAt: '2026-05-29T00:00:00.000Z', + body: { + syncId: 'sync', + diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 }, + commitSha: null, + workUnits: [], + failedWorkUnits: [], + reconciliationSkipped: false, + conflictsResolved: [], + evictionsApplied: [], + unmappedFallbacks: [], + evictionInputs: [], + unresolvedCards: [], + supersededBy: null, + overrideOf: null, + provenanceRows: [], + toolTranscripts: [], + ...body, + }, + }; +} + +const savingWorkUnit = { + unitKey: 'ok', + rawFiles: ['cards/1.json'], + status: 'success' as const, + actions: [{ target: 'sl' as const, type: 'updated' as const, key: 'warehouse.orders', detail: 'measure' }], + touchedSlSources: [], +}; + +const failedWorkUnit = { + unitKey: 'bad', + rawFiles: ['cards/2.json'], + status: 'failed' as const, + reason: 'tool write failed', + actions: [], + touchedSlSources: [], +}; + +describe('ingestReportOutcome', () => { + it('returns done when there are no failed work units', () => { + expect(ingestReportOutcome(report({ workUnits: [savingWorkUnit] }))).toBe('done'); + }); + + it('returns partial when failed work units coexist with saved memory', () => { + expect( + ingestReportOutcome(report({ workUnits: [savingWorkUnit, failedWorkUnit], failedWorkUnits: ['bad'] })), + ).toBe('partial'); + }); + + it('returns error when failed work units produced no saved memory', () => { + expect(ingestReportOutcome(report({ workUnits: [failedWorkUnit], failedWorkUnits: ['bad'] }))).toBe('error'); + }); + + it('returns error for a stage-level failure even if artifacts were recorded', () => { + expect(ingestReportOutcome(report({ status: 'failed', workUnits: [savingWorkUnit], failedWorkUnits: [] }))).toBe( + 'error', + ); + }); +}); diff --git a/packages/cli/test/ingest.test.ts b/packages/cli/test/ingest.test.ts index eef751ba..f5cd1ac5 100644 --- a/packages/cli/test/ingest.test.ts +++ b/packages/cli/test/ingest.test.ts @@ -403,7 +403,7 @@ describe('runKtxIngest', () => { expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); }); - it('returns a non-zero code when Metabase fanout has failed children', async () => { + it('returns a non-zero code when a Metabase fanout child fully fails', async () => { const projectDir = join(tempDir, 'project'); await writeMetabaseConfig(projectDir); const io = makeIo(); @@ -441,7 +441,7 @@ describe('runKtxIngest', () => { { runLocalMetabaseIngest: async () => ({ metabaseConnectionId: 'prod-metabase', - status: 'partial_failure', + status: 'all_failed', totals: { workUnits: 1, failedWorkUnits: 1 }, children: [ { @@ -467,9 +467,83 @@ describe('runKtxIngest', () => { ), ).resolves.toBe(1); - expect(io.stdout()).toContain('Metabase fanout: partial_failure'); - expect(io.stdout()).toContain('Failed tasks: 1'); + expect(io.stdout()).toContain('Metabase fanout: all_failed'); expect(io.stdout()).toContain('status=error'); + }); + + it('exits 0 and reports status=partial when a Metabase child saved memory despite a failure', async () => { + const projectDir = join(tempDir, 'project'); + await writeMetabaseConfig(projectDir); + const io = makeIo(); + const report = localFakeBundleReport('metabase-child-1', { + id: 'report-metabase-child-1', + runId: 'run-a', + jobId: 'metabase-child-1', + connectionId: 'warehouse_a', + sourceKey: 'metabase', + body: { + failedWorkUnits: ['metabase-db-2'], + workUnits: [ + { + unitKey: 'metabase-db-1', + rawFiles: ['cards/1.json'], + status: 'success', + actions: [{ target: 'sl', type: 'updated', key: 'warehouse.orders', detail: 'measure' }], + touchedSlSources: [], + }, + { + unitKey: 'metabase-db-2', + rawFiles: ['cards/2.json'], + status: 'failed', + reason: 'bad SQL', + actions: [], + touchedSlSources: [], + }, + ], + }, + }); + + await expect( + runKtxIngest( + { + command: 'run', + projectDir, + connectionId: 'prod-metabase', + adapter: 'metabase', + outputMode: 'plain', + }, + io.io, + { + runLocalMetabaseIngest: async () => ({ + metabaseConnectionId: 'prod-metabase', + status: 'partial_failure', + totals: { workUnits: 2, failedWorkUnits: 1 }, + children: [ + { + jobId: 'metabase-child-1', + metabaseConnectionId: 'prod-metabase', + metabaseDatabaseId: 1, + targetConnectionId: 'warehouse_a', + result: { + jobId: 'metabase-child-1', + runId: 'run-a', + syncId: 'sync-a', + diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, + workUnitCount: 2, + failedWorkUnits: ['metabase-db-2'], + artifactsWritten: 1, + commitSha: 'abc', + }, + report, + }, + ], + }), + }, + ), + ).resolves.toBe(0); + + expect(io.stdout()).toContain('Metabase fanout: partial_failure'); + expect(io.stdout()).toContain('status=partial'); expect(io.stderr()).toContain('Metabase ingest: prod-metabase'); }); @@ -1140,6 +1214,63 @@ describe('runKtxIngest', () => { expect(io.stdout()).toContain('Status: error\n'); }); + it('exits 0 and reports Status: partial when a single-source ingest saved memory despite a failure', async () => { + const projectDir = join(tempDir, 'project'); + await writeWarehouseConfig(projectDir); + const sourceDir = join(tempDir, 'source'); + await mkdir(join(sourceDir, 'orders'), { recursive: true }); + await writeFile(join(sourceDir, 'orders', 'orders.json'), '{"name":"orders"}\n', 'utf-8'); + + const partialReport = localFakeBundleReport('local-job-partial', { + connectionId: 'warehouse', + sourceKey: 'fake', + body: { + failedWorkUnits: ['orders-bad'], + workUnits: [ + { + unitKey: 'orders-ok', + rawFiles: ['orders/orders.json'], + status: 'success', + actions: [{ target: 'wiki', type: 'created', key: 'wiki/orders.md', detail: 'orders' }], + touchedSlSources: [], + }, + { + unitKey: 'orders-bad', + rawFiles: ['orders/bad.json'], + status: 'failed', + reason: 'writer tool failed', + actions: [], + touchedSlSources: [], + }, + ], + }, + }); + const runLocal = vi.fn(async (_input: RunLocalIngestOptions) => ({ + result: { + jobId: 'local-job-partial', + runId: partialReport.runId, + syncId: partialReport.body.syncId, + diffSummary: partialReport.body.diffSummary, + workUnitCount: partialReport.body.workUnits.length, + failedWorkUnits: partialReport.body.failedWorkUnits, + artifactsWritten: 1, + commitSha: partialReport.body.commitSha, + }, + report: partialReport, + })); + + const io = makeIo(); + await expect( + runKtxIngest( + { command: 'run', projectDir, connectionId: 'warehouse', adapter: 'fake', sourceDir, outputMode: 'plain' }, + io.io, + { runLocalIngest: runLocal, jobIdFactory: () => 'local-job-partial' }, + ), + ).resolves.toBe(0); + + expect(io.stdout()).toContain('Status: partial\n'); + }); + it('prints trace path and error status for stored failed ingest reports', async () => { const projectDir = join(tempDir, 'project'); await writeWarehouseConfig(projectDir); diff --git a/packages/cli/test/setup.test.ts b/packages/cli/test/setup.test.ts index 0bc00919..da51e9af 100644 --- a/packages/cli/test/setup.test.ts +++ b/packages/cli/test/setup.test.ts @@ -398,6 +398,59 @@ describe('setup status', () => { expect(rendered).toContain('KTX context built: yes'); }); + it('reports context ready after a partial ingest report saved memory', async () => { + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'setup:', + ' database_connection_ids:', + ' - warehouse', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:DATABASE_URL', + 'ingest:', + ' embeddings:', + ' backend: none', + ' dimensions: 8', + '', + ].join('\n'), + 'utf-8', + ); + await writeKtxSetupState(tempDir, { completed_steps: ['project', 'databases'] }); + await persistLocalBundleReport( + tempDir, + localFakeBundleReport('warehouse-job-partial', { + connectionId: 'warehouse', + sourceKey: 'fake', + body: { + failedWorkUnits: ['orders-bad'], + workUnits: [ + { + unitKey: 'orders-ok', + rawFiles: ['orders/orders.json'], + status: 'success', + actions: [{ target: 'wiki', type: 'created', key: 'wiki/orders.md', detail: 'orders' }], + touchedSlSources: [], + }, + { + unitKey: 'orders-bad', + rawFiles: ['orders/bad.json'], + status: 'failed', + reason: 'writer tool failed', + actions: [], + touchedSlSources: [], + }, + ], + }, + }), + ); + + const status = await readKtxSetupStatus(tempDir); + + expect(status.context).toMatchObject({ ready: true, status: 'completed' }); + }); + it('formats plain and JSON setup status payloads', async () => { const status = await readKtxSetupStatus(tempDir); const rendered = formatKtxSetupStatus(status);