fix(cli): treat artifact-producing ingests with failures as partial (#238)

* fix(cli): derive ingest outcomes from saved artifacts

* fix(cli): treat artifact-producing ingests with failures as partial

* fix(cli): route memory-flow run status through shared ingest outcome

* fix(cli): treat partial ingest as saved context in setup status

* test(cli): align memory-flow replay expectations with partial ingests
This commit is contained in:
Andrey Avtomonov 2026-05-30 00:42:59 +02:00 committed by GitHub
parent 3f0d11e07d
commit 53a6f8d111
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 312 additions and 25 deletions

View file

@ -13,6 +13,7 @@ import { localPullConfigForAdapter, type DefaultLocalIngestAdaptersOptions } fro
import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js';
import type { MemoryFlowEventSink } from './memory-flow/types.js';
import { buildSyncId } from './raw-sources-paths.js';
import { ingestReportOutcome } from './reports.js';
import type { IngestReportBody, IngestReportSnapshot } from './reports.js';
import { SqliteBundleIngestStore } from './sqlite-bundle-ingest-store.js';
import type { IngestBundleResult, IngestJobContext, IngestJobPhase, IngestTrigger, SourceAdapter } from './types.js';
@ -79,7 +80,7 @@ export interface LocalMetabaseFanoutProgress {
metabaseDatabaseId: number;
targetConnectionId: string;
jobId: string;
status: 'done' | 'failed';
status: 'done' | 'partial' | 'failed';
}): void;
}
@ -232,11 +233,11 @@ export async function runLocalIngest(options: RunLocalIngestOptions): Promise<Lo
}
function metabaseFanoutStatus(children: LocalMetabaseFanoutChild[]): LocalMetabaseFanoutResult['status'] {
const succeeded = children.filter((child) => child.report.body.failedWorkUnits.length === 0).length;
if (succeeded === children.length) {
const outcomes = children.map((child) => ingestReportOutcome(child.report));
if (outcomes.every((outcome) => outcome === 'done')) {
return 'all_succeeded';
}
if (succeeded === 0) {
if (outcomes.every((outcome) => outcome === 'error')) {
return 'all_failed';
}
return 'partial_failure';
@ -401,12 +402,13 @@ export async function runLocalMetabaseIngest(
error,
});
}
const childOutcome = ingestReportOutcome(child.report);
options.progress?.onMetabaseChildCompleted?.({
metabaseConnectionId,
metabaseDatabaseId: childPlan.metabaseDatabaseId,
targetConnectionId,
jobId: child.report.jobId,
status: child.report.body.failedWorkUnits.length > 0 ? 'failed' : 'done',
status: childOutcome === 'error' ? 'failed' : childOutcome,
});
children.push({
jobId: child.report.jobId,

View file

@ -1,5 +1,6 @@
import type { MemoryAction } from '../../../context/memory/types.js';
import type { LocalIngestRunRecord } from '../local-stage-ingest.js';
import { ingestReportOutcome } from '../reports.js';
import type { IngestReportSnapshot } from '../reports.js';
import type {
MemoryFlowActionDetail,
@ -72,7 +73,7 @@ function fullModeMetadata(input: {
}
function reportStatus(report: IngestReportSnapshot): MemoryFlowReplayInput['status'] {
return report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
return ingestReportOutcome(report) === 'error' ? 'error' : 'done';
}
function reportCreatedEvent(report: IngestReportSnapshot): MemoryFlowEvent {

View file

@ -146,6 +146,20 @@ export function savedMemoryCountsForReport(report: IngestReportSnapshot): Ingest
};
}
/** @internal */
export type IngestReportOutcome = 'done' | 'partial' | 'error';
export function ingestReportOutcome(report: IngestReportSnapshot): IngestReportOutcome {
if (report.body.status === 'failed') {
return 'error';
}
if (report.body.failedWorkUnits.length === 0) {
return 'done';
}
const { wikiCount, slCount } = savedMemoryCountsForReport(report);
return wikiCount + slCount > 0 ? 'partial' : 'error';
}
export function buildStageIndexFromReportBody(jobId: string, connectionId: string, body: IngestReportBody): StageIndex {
return {
jobId,

View file

@ -2,7 +2,7 @@ import { buildMemoryFlowViewModel } from './context/ingest/memory-flow/view-mode
import { createMemoryFlowLiveBuffer, sanitizeMemoryFlowError } from './context/ingest/memory-flow/live-buffer.js';
import { formatMemoryFlowFinalSummary } from './context/ingest/memory-flow/summary.js';
import { getLatestLocalIngestStatus, getLocalIngestStatus, type LocalMetabaseFanoutResult, type LocalMetabaseFanoutProgress, type RunLocalIngestOptions, runLocalIngest, runLocalMetabaseIngest } from './context/ingest/local-ingest.js';
import { type IngestReportSnapshot, savedMemoryCountsForReport } from './context/ingest/reports.js';
import { type IngestReportSnapshot, ingestReportOutcome, savedMemoryCountsForReport } from './context/ingest/reports.js';
import { ingestReportToMemoryFlowReplay } from './context/ingest/memory-flow/events.js';
import type { MemoryFlowEvent, MemoryFlowReplayInput } from './context/ingest/memory-flow/types.js';
import { renderMemoryFlowReplay } from './context/ingest/memory-flow/render.js';
@ -93,10 +93,6 @@ export interface KtxIngestDeps {
runtimeIo?: KtxIngestIo;
}
function reportStatus(report: IngestReportSnapshot): 'done' | 'error' {
return report.body.status === 'failed' || report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
}
const REPORT_SOURCE_LABELS = new Map<string, string>([
['live-database', 'Database schema'],
['historic-sql', 'Query history'],
@ -193,7 +189,7 @@ function writeReportStatus(report: IngestReportSnapshot, io: KtxIngestIo): void
if (report.body.tracePath) {
io.stdout.write(`Trace: ${report.body.tracePath}\n`);
}
io.stdout.write(`Status: ${reportStatus(report)}\n`);
io.stdout.write(`Status: ${ingestReportOutcome(report)}\n`);
io.stdout.write(`Source: ${reportSourceLabel(report.sourceKey)}\n`);
io.stdout.write(`Connection: ${report.connectionId}\n`);
io.stdout.write(`Sync: ${report.body.syncId}\n`);
@ -231,7 +227,7 @@ function writeMetabaseFanoutStatus(result: LocalMetabaseFanoutResult, io: KtxIng
}
io.stdout.write(`Saved memory: ${counts.wikiCount} wiki, ${counts.slCount} SL\n`);
for (const child of result.children) {
const status = reportStatus(child.report);
const status = ingestReportOutcome(child.report);
io.stdout.write(
`- target=${child.targetConnectionId} database=${child.metabaseDatabaseId} status=${status} job=${child.jobId} report=${child.report.id}\n`,
);
@ -595,7 +591,7 @@ function initialRunMemoryFlowInput(
}
function finalRunMemoryFlowInput(snapshot: MemoryFlowReplayInput, report: IngestReportSnapshot): MemoryFlowReplayInput {
const status = reportStatus(report);
const status = ingestReportOutcome(report) === 'error' ? 'error' : 'done';
return {
...snapshot,
runId: report.runId,
@ -777,7 +773,7 @@ export async function runKtxIngest(
} finally {
plainProgress?.flush();
}
return result.status === 'all_succeeded' ? 0 : 1;
return result.status === 'all_failed' ? 1 : 0;
}
const jobId = deps.jobIdFactory?.();
@ -846,7 +842,7 @@ export async function runKtxIngest(
liveTui?.close();
liveTui = null;
io.stdout.write(formatMemoryFlowFinalSummary(latestMemoryFlowSnapshot));
return reportStatus(result.report) === 'done' ? 0 : 1;
return ingestReportOutcome(result.report) === 'error' ? 1 : 0;
}
plainProgress?.flush();
await writeReportRecord(result.report, runOutputMode, io, {
@ -854,7 +850,7 @@ export async function runKtxIngest(
renderStoredMemoryFlow: deps.renderStoredMemoryFlow,
env,
});
return reportStatus(result.report) === 'done' ? 0 : 1;
return ingestReportOutcome(result.report) === 'error' ? 1 : 0;
} finally {
plainProgress?.flush();
liveTui?.close();

View file

@ -1,7 +1,7 @@
import { existsSync } from 'node:fs';
import { basename, join, resolve } from 'node:path';
import { getLatestLocalIngestStatus } from './context/ingest/local-ingest.js';
import { savedMemoryCountsForReport } from './context/ingest/reports.js';
import { ingestReportOutcome, savedMemoryCountsForReport } from './context/ingest/reports.js';
import { ktxLocalStateDbPath } from './context/project/local-state-db.js';
import { loadKtxProject, type KtxLocalProject } from './context/project/project.js';
import { readKtxSetupState } from './context/project/setup-config.js';
@ -306,7 +306,7 @@ function sourceConnections(config: Awaited<ReturnType<typeof loadKtxProject>>['c
type LocalIngestStatusReport = NonNullable<Awaited<ReturnType<typeof getLatestLocalIngestStatus>>>;
function reportHasSavedContext(report: LocalIngestStatusReport): boolean {
if (report.body.failedWorkUnits.length > 0) {
if (ingestReportOutcome(report) === 'error') {
return false;
}
const counts = savedMemoryCountsForReport(report);