fix(ingest): persist postmortem failure traces

This commit is contained in:
Andrey Avtomonov 2026-05-17 21:47:43 +02:00
parent 51fe8306c3
commit 86837dd3ed
8 changed files with 483 additions and 45 deletions

View file

@ -123,7 +123,10 @@ ktx ingest status <runId>
The trace file lives under the project directory at
`.ktx/ingest-traces/<jobId>/trace.jsonl`. Each line is a JSON event with the
job id, run id, sync id, connection id, source key, phase, event name, timing,
context fields, and error details when a step fails.
state snapshot, decision context, and error details. Failed runs also write a
stored ingest report with `status: "failed"`, `failure.phase`,
`failure.message`, and the same trace path, so `ktx ingest status <runId>` can
point you to the postmortem trace.
Use `jq` or line-oriented tools to inspect a trace:

View file

@ -985,6 +985,59 @@ describe('runKtxIngest', () => {
expect(io.stdout()).toContain('Status: error\n');
});
it('prints trace path and error status for stored failed ingest reports', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);
const io = makeIo();
const report = {
id: 'report-failed',
runId: 'run-failed',
jobId: 'job-failed',
connectionId: 'warehouse',
sourceKey: 'metabase',
createdAt: '2026-05-17T12:00:00.000Z',
body: {
status: 'failed',
syncId: 'sync-failed',
diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 },
commitSha: null,
tracePath: '/project/.ktx/ingest-traces/job-failed/trace.jsonl',
failure: { phase: 'final_gates', message: 'final artifact gates failed' },
workUnits: [],
failedWorkUnits: [],
reconciliationSkipped: true,
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
evictionInputs: [],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [],
toolTranscripts: [],
},
};
await runKtxIngest(
{
command: 'status',
projectDir,
reportFile: '/project/report-failed.json',
runId: 'run-failed',
outputMode: 'plain',
inputMode: 'disabled',
},
io.io,
{
readReportFile: vi.fn().mockResolvedValue(report),
},
);
expect(io.stdout()).toContain('Trace: /project/.ktx/ingest-traces/job-failed/trace.jsonl');
expect(io.stdout()).toContain('Status: error');
expect(io.stdout()).toContain('Error: final artifact gates failed');
});
it('prints a clear first failure reason when query-history work units fail', async () => {
const projectDir = join(tempDir, 'project');
await writeWarehouseConfig(projectDir);

View file

@ -102,7 +102,7 @@ export interface KtxIngestDeps {
}
function reportStatus(report: IngestReportSnapshot): 'done' | 'error' {
return report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
return report.body.status === 'failed' || report.body.failedWorkUnits.length > 0 ? 'error' : 'done';
}
const REPORT_SOURCE_LABELS = new Map<string, string>([
@ -174,6 +174,9 @@ function formatFailureReason(sourceKey: string, reason: string): string {
}
function failedReportMessage(report: IngestReportSnapshot): string | null {
if (report.body.status === 'failed' && report.body.failure?.message) {
return sanitizeMemoryFlowError(report.body.failure.message);
}
const failedCount = report.body.failedWorkUnits.length;
if (failedCount === 0) {
return null;

View file

@ -495,6 +495,120 @@ describe('IngestBundleRunner isolated diff path', () => {
}
});
it('stores a failure report and postmortem trace for final gate failures', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime);
const createdReports: any[] = [];
deps.reports.create = vi.fn(async (args: any) => {
createdReports.push(args);
return { id: `report-${createdReports.length}` };
});
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] },
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
if (params.telemetryTags.unitKey === 'card-wiki') {
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/account-segments.md'),
'---\nsummary: Account segments\nusage_mode: auto\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'account-segments',
detail: 'Account segments',
rawPaths: ['cards/wiki.json'],
});
await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local');
}
if (params.telemetryTags.unitKey === 'card-source') {
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
await writeFile(
join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'),
'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n',
);
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'mart_account_segments',
detail: 'Dollar measure',
targetConnectionId: 'warehouse',
rawPaths: ['cards/source.json'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/mart_account_segments.yaml'],
'wu source',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [
['cards/wiki.json', 'h1'],
['cards/source.json', 'h2'],
]);
await expect(
runner.run({
jobId: 'job-trace-failure',
connectionId: 'warehouse',
sourceKey: 'metabase',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).rejects.toThrow(/total_contract_arr_cents/);
const failureReport = createdReports.find((report) => report.body.status === 'failed');
expect(failureReport.body.tracePath).toContain('job-trace-failure/trace.jsonl');
expect(failureReport.body.failure).toMatchObject({ phase: 'final_gates' });
const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-trace-failure/trace.jsonl'), 'utf-8'))
.trim()
.split('\n')
.map((line) => JSON.parse(line));
expect(events.map((event) => event.event)).toEqual(
expect.arrayContaining([
'ingest_started',
'input_snapshot',
'work_units_planned',
'isolated_diff_enabled',
'work_unit_child_created',
'work_unit_patch_collected',
'patch_apply_started',
'patch_accepted',
'reconciliation_finished',
'final_artifact_gates_failed',
'ingest_failed',
'failure_report_created',
]),
);
const failed = events.find((event) => event.event === 'ingest_failed');
expect(failed).toMatchObject({
runId: 'run-1',
syncId: expect.any(String),
data: { phase: 'final_gates', tracePath: expect.stringContaining('trace.jsonl') },
error: { message: expect.stringContaining('total_contract_arr_cents') },
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('rejects slDisallowed patches that touch semantic-layer files', async () => {
const runtime = await makeRealGitRuntime();
try {

View file

@ -23,6 +23,7 @@ import type {
ContextEvidenceIndexSummary,
IngestBundleRunnerDeps,
IngestProvenanceRow,
IngestRunsPort,
IngestSessionWorktree,
PageTriageRunResult,
} from './ports.js';
@ -59,6 +60,7 @@ import {
type MutableToolTranscriptSummary,
} from './tools/tool-transcript-summary.js';
import type {
IngestDiffSummary,
EvictionUnit,
IngestBundleJob,
IngestBundleResult,
@ -420,6 +422,10 @@ export class IngestBundleRunner {
});
}
private errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
private wikiPageKeysFromPaths(paths: string[]): string[] {
return [
...new Set(
@ -659,6 +665,25 @@ export class IngestBundleRunner {
protected async runInner(job: IngestBundleJob, ctx?: IngestJobContext): Promise<Omit<IngestBundleResult, 'jobId'>> {
const syncId = buildSyncId(new Date(), job.jobId);
const trace = this.createTrace(job);
const transcriptSummaries = new Map<string, MutableToolTranscriptSummary>();
let activeTrace: IngestTraceWriter = trace;
let activePhase = 'run';
let runRow: Awaited<ReturnType<IngestRunsPort['create']>> | null = null;
let latestDiffSummary: IngestDiffSummary = { added: 0, modified: 0, deleted: 0, unchanged: 0 };
let latestWorkUnits: WorkUnitOutcome[] = [];
let latestFailedWorkUnits: string[] = [];
let latestReconciliationSkipped = true;
let latestIsolatedDiffSummary:
| {
enabled: boolean;
integrationWorktreePath?: string;
ingestionBaseSha?: string;
projectionSha?: string | null;
acceptedPatches: number;
textualConflicts: number;
semanticConflicts: number;
}
| undefined;
await trace.event('info', 'run', 'ingest_started', {
trigger: job.trigger,
bundleRefKind: job.bundleRef.kind,
@ -670,7 +695,6 @@ export class IngestBundleRunner {
throw new Error('ingest-bundle: config repo has no HEAD');
}
const transcriptDir = this.deps.storage.resolveTranscriptDir(job.jobId);
const transcriptSummaries = new Map<string, MutableToolTranscriptSummary>();
const recordTranscriptEntry =
(path: string) =>
(entry: ToolCallLogEntry): void => {
@ -685,17 +709,28 @@ export class IngestBundleRunner {
await stage1?.updateProgress(0.0, 'Fetching source files');
const adapter = this.deps.registry.get(job.sourceKey);
const stagedDir = overrideReport
? await this.materializeOverrideSnapshot(overrideReport, {
connectionId: job.connectionId,
sourceKey: job.sourceKey,
jobId: job.jobId,
})
: await this.resolveStagedDir(job.bundleRef, {
connectionId: job.connectionId,
sourceKey: job.sourceKey,
jobId: job.jobId,
});
activePhase = 'fetch';
const stagedDir = await traceTimed(
trace,
'fetch',
'resolve_staged_dir',
{
bundleRefKind: job.bundleRef.kind,
sourceKey: job.sourceKey,
},
() =>
overrideReport
? this.materializeOverrideSnapshot(overrideReport, {
connectionId: job.connectionId,
sourceKey: job.sourceKey,
jobId: job.jobId,
})
: this.resolveStagedDir(job.bundleRef, {
connectionId: job.connectionId,
sourceKey: job.sourceKey,
jobId: job.jobId,
}),
);
const fetchReport = adapter.readFetchReport ? await adapter.readFetchReport(stagedDir) : null;
const scopeDescriptor = adapter.describeScope ? await adapter.describeScope(stagedDir) : null;
@ -706,13 +741,27 @@ export class IngestBundleRunner {
let cleanupOutcome: 'success' | 'crash' | 'conflict' = 'crash';
try {
const { currentHashes, rawDirInWorktree } = await this.stageRawFilesStage1({
stagedDir,
worktreeRoot: sessionWorktree.workdir,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
});
activePhase = 'stage_raw_files';
const { currentHashes, rawDirInWorktree } = await traceTimed(
trace,
'stage_raw_files',
'stage_raw_files',
{
stagedDir,
worktreePath: sessionWorktree.workdir,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
},
() =>
this.stageRawFilesStage1({
stagedDir,
worktreeRoot: sessionWorktree.workdir,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
}),
);
memoryFlow?.update({
connectionId: job.connectionId,
adapter: job.sourceKey,
@ -737,11 +786,24 @@ export class IngestBundleRunner {
await stage1?.updateProgress(0.5, 'Checking what changed');
const diffSet = await this.deps.diffSetService.compute(
job.connectionId,
job.sourceKey,
currentHashes,
scopeDescriptor ? scopeDescriptor.isPathInScope.bind(scopeDescriptor) : undefined,
activePhase = 'diff';
const diffSet = await traceTimed(
trace,
'diff',
'compute_diff_set',
{
connectionId: job.connectionId,
sourceKey: job.sourceKey,
currentHashCount: currentHashes.size,
scopeFingerprint: scopeDescriptor?.fingerprint ?? null,
},
() =>
this.deps.diffSetService.compute(
job.connectionId,
job.sourceKey,
currentHashes,
scopeDescriptor ? scopeDescriptor.isPathInScope.bind(scopeDescriptor) : undefined,
),
);
const diffSummary = {
added: diffSet.added.length,
@ -749,9 +811,10 @@ export class IngestBundleRunner {
deleted: diffSet.deleted.length,
unchanged: diffSet.unchanged.length,
};
latestDiffSummary = diffSummary;
memoryFlow?.emit({ type: 'diff_computed', ...diffSummary });
const runRow = await this.deps.runs.create({
runRow = await this.deps.runs.create({
jobId: job.jobId,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
@ -767,6 +830,8 @@ export class IngestBundleRunner {
sourceKey: job.sourceKey,
};
const runTrace = trace.withContext({ runId: runRow.id, syncId });
activeTrace = runTrace;
const createdRunRow = runRow;
await runTrace.event('debug', 'snapshot', 'input_snapshot', {
baseSha,
stagedDir,
@ -781,7 +846,11 @@ export class IngestBundleRunner {
`${diffSet.added.length} new, ${diffSet.modified.length} changed, ${diffSet.deleted.length} removed`,
);
const detected = await adapter.detect(stagedDir);
activePhase = 'detect';
const detected = await traceTimed(runTrace, 'detect', 'adapter_detect', { stagedDir, sourceKey: job.sourceKey }, () =>
adapter.detect(stagedDir),
);
await runTrace.event('debug', 'detect', 'adapter_detected', { detected });
if (!detected) {
await this.deps.runs.markFailed(runRow.id);
throw new Error(`source adapter '${job.sourceKey}' did not recognize staged dir`);
@ -802,6 +871,7 @@ export class IngestBundleRunner {
const stage2 = ctx?.startPhase(0.04);
await stage2?.updateProgress(0.0, 'Planning updates');
activePhase = 'planning';
let workUnits: WorkUnit[] = [];
let eviction: EvictionUnit | undefined;
let unresolvedCards: UnresolvedCardInfo[] | undefined;
@ -819,7 +889,18 @@ export class IngestBundleRunner {
unresolvedCards = overrideReport.body.unresolvedCards;
await stage2?.updateProgress(1.0, `Loaded prior report ${overrideReport.jobId} for override reconciliation`);
} else {
const chunk = await adapter.chunk(stagedDir, diffSet);
const chunk = await traceTimed(
runTrace,
'planning',
'chunk_work_units',
{
stagedDir,
added: diffSet.added.length,
modified: diffSet.modified.length,
deleted: diffSet.deleted.length,
},
() => adapter.chunk(stagedDir, diffSet),
);
workUnits = chunk.workUnits;
eviction = chunk.eviction;
unresolvedCards = chunk.unresolvedCards;
@ -849,6 +930,12 @@ export class IngestBundleRunner {
}
await stage2?.updateProgress(1.0, `Planned ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`);
}
await runTrace.event('debug', 'planning', 'work_units_planned', {
workUnitCount: workUnits.length,
evictionCount: eviction?.deletedRawPaths.length ?? 0,
unresolvedCardCount: unresolvedCards?.length ?? 0,
triageEnabled: triageResult?.enabled ?? false,
});
const targetConnectionIds = new Set<string>([job.connectionId]);
if (!overrideReport && adapter.listTargetConnectionIds) {
@ -869,6 +956,9 @@ export class IngestBundleRunner {
}
}
const slConnectionIds = [...targetConnectionIds].sort();
await runTrace.event('debug', 'planning', 'target_connections_resolved', {
connectionIds: slConnectionIds,
});
// Build shared per-job context.
const [wikiIndex, slIndex] = await Promise.all([this.buildWikiIndex(), this.buildSlIndex(slConnectionIds)]);
@ -914,9 +1004,11 @@ export class IngestBundleRunner {
textualConflicts: 0,
semanticConflicts: 0,
};
latestIsolatedDiffSummary = isolatedDiffSummary;
const stage3 = ctx?.startPhase(0.6);
await stage3?.updateProgress(0.0, `Processing ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`);
activePhase = 'work_units';
this.logger.log(`[ingest-bundle] job=${job.jobId} tool-call transcripts: ${transcriptDir}/`);
let projectionTouchedSources: TouchedSlSource[] = [];
let projectionChangedWikiPageKeys: string[] = [];
@ -940,7 +1032,7 @@ export class IngestBundleRunner {
sourceKey: job.sourceKey,
syncId,
jobId: job.jobId,
runId: runRow.id,
runId: createdRunRow.id,
stagedDir,
workdir: sessionWorktree.workdir,
parseArtifacts,
@ -1080,6 +1172,8 @@ export class IngestBundleRunner {
failedWorkUnits.push(
...workUnitOutcomes.filter((outcome) => outcome.status === 'failed').map((outcome) => outcome.unitKey),
);
latestWorkUnits = workUnitOutcomes;
latestFailedWorkUnits = failedWorkUnits;
stageIndex.workUnits = workUnitOutcomes.map((o) => ({
unitKey: o.unitKey,
rawFiles: workUnits.find((w) => w.unitKey === o.unitKey)?.rawFiles ?? [],
@ -1091,6 +1185,7 @@ export class IngestBundleRunner {
slDisallowedReason: o.slDisallowedReason,
}));
activePhase = 'integration';
for (const [index, outcome] of workUnitOutcomesByIndex.entries()) {
if (!outcome || outcome.status !== 'success' || !outcome.patchPath) {
continue;
@ -1385,6 +1480,8 @@ export class IngestBundleRunner {
failedWorkUnits.push(
...workUnitOutcomes.filter((outcome) => outcome.status === 'failed').map((outcome) => outcome.unitKey),
);
latestWorkUnits = workUnitOutcomes;
latestFailedWorkUnits = failedWorkUnits;
// Complete the typed Stage Index from the outcomes once, and use it for
// Stage 4, provenance writes (Phase G), and the report body (Phase F3).
@ -1410,6 +1507,7 @@ export class IngestBundleRunner {
const dedupResult =
contextReport && this.deps.candidateDedup ? await this.deps.candidateDedup.deduplicateRun(runRow.id) : null;
const preReconciliationSha = await sessionWorktree.git.revParseHead();
activePhase = 'reconciliation';
// Stage 4 — reconciliation. Shares scoped wiki/SL with a fresh CaptureSession
// so reconciliation writes land in the same worktree Stage 3 used.
@ -1656,6 +1754,7 @@ export class IngestBundleRunner {
: undefined,
});
}
latestReconciliationSkipped = reconcileOutcome.skipped;
const danglingReconcileWikiRefs = await findDanglingWikiRefsForActions({
wikiService: rcScopedWiki,
@ -1689,19 +1788,27 @@ export class IngestBundleRunner {
await stage4?.updateProgress(1.0, reconcileOutcome.skipped ? 'No reconciliation needed' : 'Reconciled');
const postProcessor = this.deps.postProcessors?.[job.sourceKey];
activePhase = 'post_processor';
if (postProcessor) {
const stagePostProcessor = ctx?.startPhase(0.04);
await stagePostProcessor?.updateProgress(0.0, 'Running deterministic imports');
try {
const result = await postProcessor.run({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
jobId: job.jobId,
runId: runRow.id,
workdir: sessionWorktree.workdir,
parseArtifacts,
});
const result = await traceTimed(
runTrace,
'post_processor',
'post_processor',
{ sourceKey: job.sourceKey },
() =>
postProcessor.run({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
jobId: job.jobId,
runId: createdRunRow.id,
workdir: sessionWorktree.workdir,
parseArtifacts,
}),
);
postProcessorOutcome = {
sourceKey: job.sourceKey,
status: result.errors.length > 0 && result.touchedSources.length === 0 ? 'failed' : 'success',
@ -1723,6 +1830,12 @@ export class IngestBundleRunner {
throw error;
}
}
await runTrace.event('debug', 'post_processor', 'post_processor_finished', {
sourceKey: job.sourceKey,
status: postProcessorOutcome?.status ?? 'skipped',
touchedSources: postProcessorOutcome?.touchedSources ?? [],
warnings: postProcessorOutcome?.warnings ?? [],
});
const repairConnectionIds = [
...new Set([
@ -1730,11 +1843,24 @@ export class IngestBundleRunner {
...(postProcessorOutcome?.touchedSources ?? []).map((source) => source.connectionId),
]),
].sort();
wikiSlRefRepairResult = await repairWikiSlRefs({
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
configService: sessionWorktree.config,
connectionIds: repairConnectionIds,
activePhase = 'wiki_sl_ref_repair';
wikiSlRefRepairResult = await traceTimed(
runTrace,
'wiki_sl_ref_repair',
'wiki_sl_refs_repair',
{ connectionIds: repairConnectionIds },
() =>
repairWikiSlRefs({
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
configService: sessionWorktree.config,
connectionIds: repairConnectionIds,
}),
);
await runTrace.event('debug', 'wiki_sl_ref_repair', 'wiki_sl_refs_repaired', {
repairCount: wikiSlRefRepairResult.repairs.length,
repairs: wikiSlRefRepairResult.repairs,
warnings: wikiSlRefRepairResult.warnings,
});
const postReconciliationSha = await sessionWorktree.git.revParseHead();
const postReconciliationPaths =
@ -1758,6 +1884,7 @@ export class IngestBundleRunner {
...(postProcessorOutcome?.touchedSources ?? []),
]);
activePhase = 'final_gates';
await traceTimed(
runTrace,
'final_gates',
@ -1802,6 +1929,7 @@ export class IngestBundleRunner {
);
// Stage 6 — squash commit
activePhase = 'squash';
const stage6 = ctx?.startPhase(0.04);
await stage6?.updateProgress(0.0, 'Saving changes');
try {
@ -1827,6 +1955,10 @@ export class IngestBundleRunner {
throw new Error(`squash merge conflict: ${mergeResult.conflictPaths.join(', ')}`);
}
const commitSha = mergeResult.touchedPaths.length === 0 ? null : mergeResult.squashSha;
await runTrace.event('debug', 'squash', 'squash_finished', {
commitSha,
touchedPaths: mergeResult.touchedPaths,
});
const memoryFlowSavedActions = stageIndex.workUnits.flatMap((wu) => wu.actions).concat(reconcileActions);
const postProcessorMemoryCounts = postProcessorSavedMemoryCounts(postProcessorOutcome);
memoryFlow?.emit({
@ -1869,6 +2001,7 @@ export class IngestBundleRunner {
const stage5 = ctx?.startPhase(0.04);
await stage5?.updateProgress(0.0, 'Recording history');
activePhase = 'provenance';
// Provenance rows: per-artifact when the WU emitted actions, plus a `skipped`
// fallback for raw files that produced nothing so the next DiffSet still sees
@ -1949,6 +2082,9 @@ export class IngestBundleRunner {
currentRawPaths: new Set(currentHashes.keys()),
deletedRawPaths: new Set(eviction?.deletedRawPaths ?? []),
});
await runTrace.event('debug', 'provenance', 'provenance_rows_validated', {
rowCount: provenanceRows.length,
});
await this.deps.provenance.insertMany(provenanceRows);
memoryFlow?.emit({ type: 'provenance_recorded', rowCount: provenanceRows.length });
await stage5?.updateProgress(
@ -1958,6 +2094,7 @@ export class IngestBundleRunner {
const stage7 = ctx?.startPhase(0.04);
await stage7?.updateProgress(0.0, 'Wrapping up');
activePhase = 'report';
const reportProvenanceRows = provenanceRows.map(
({ rawPath, artifactKind, artifactKey, actionType, targetConnectionId }) => ({
@ -1993,6 +2130,7 @@ export class IngestBundleRunner {
: undefined;
const reportBody = {
status: 'completed' as const,
syncId,
diffSummary,
fetch: fetchReport ?? undefined,
@ -2077,6 +2215,11 @@ export class IngestBundleRunner {
body: reportBody,
});
const reportId = reportIdFromCreateResult(createdReport);
await runTrace.event('debug', 'report', 'success_report_created', {
reportId,
runId: runRow.id,
tracePath: runTrace.tracePath,
});
memoryFlow?.update({
...(reportId ? { reportId, reportPath: reportId } : {}),
});
@ -2135,7 +2278,74 @@ export class IngestBundleRunner {
await this.deps.sessionWorktreeService.cleanup(sessionWorktree, cleanupOutcome);
}
} catch (error) {
await trace.event('error', 'run', 'ingest_failed', { tracePath: trace.tracePath }, error);
await activeTrace.event(
'error',
'run',
'ingest_failed',
{
tracePath: activeTrace.tracePath,
phase: activePhase,
runId: runRow?.id ?? null,
syncId,
},
error,
);
if (runRow) {
await this.deps.runs.markFailed(runRow.id);
await this.deps.reports.create({
runId: runRow.id,
jobId: job.jobId,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
body: {
status: 'failed' as const,
syncId,
diffSummary: latestDiffSummary,
commitSha: null,
tracePath: activeTrace.tracePath,
isolatedDiff: latestIsolatedDiffSummary,
failure: {
phase: activePhase,
message: this.errorMessage(error),
},
workUnits: latestWorkUnits.map((wu) => ({
unitKey: wu.unitKey,
rawFiles: [],
status: wu.status,
reason: wu.reason,
actions: wu.actions,
touchedSlSources: wu.touchedSlSources,
slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason,
})),
failedWorkUnits: latestFailedWorkUnits,
reconciliationSkipped: latestReconciliationSkipped,
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
artifactResolutions: [],
evictionInputs: [],
reconciliationActions: [],
evictionDecisions: [],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [],
toolTranscripts: Array.from(transcriptSummaries.values()).map((summary) => ({
unitKey: summary.unitKey,
path: summary.path,
toolCallCount: summary.toolCallCount,
errorCount: summary.errorCount,
toolNames: Array.from(summary.toolNames).sort(),
})),
},
});
await activeTrace.event('info', 'report', 'failure_report_created', {
runId: runRow.id,
jobId: job.jobId,
tracePath: activeTrace.tracePath,
});
}
throw error;
}
}

View file

@ -206,6 +206,47 @@ describe('parseIngestReportSnapshot', () => {
expect(snapshot.body.toolTranscripts).toEqual([]);
});
it('parses failed ingest reports with trace and failure details', () => {
const snapshot = parseIngestReportSnapshot({
id: 'report-failed',
runId: 'run-failed',
jobId: 'job-failed',
connectionId: 'warehouse',
sourceKey: 'metabase',
createdAt: '2026-05-17T12:00:00.000Z',
body: {
status: 'failed',
syncId: 'sync-failed',
diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 },
commitSha: null,
tracePath: '/project/.ktx/ingest-traces/job-failed/trace.jsonl',
failure: {
phase: 'final_gates',
message: 'final artifact gates failed',
},
workUnits: [],
failedWorkUnits: [],
reconciliationSkipped: true,
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
evictionInputs: [],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [],
toolTranscripts: [],
},
});
expect(snapshot.body.status).toBe('failed');
expect(snapshot.body.failure).toEqual({
phase: 'final_gates',
message: 'final artifact gates failed',
});
expect(snapshot.body.tracePath).toContain('trace.jsonl');
});
it('rejects malformed report snapshots with a concise message', () => {
const report = validReportSnapshot();
report.body.workUnits[0] = {

View file

@ -123,6 +123,11 @@ const sourceFetchReportSchema = z.object({
warnings: z.array(sourceFetchIssueSchema).default([]),
});
const ingestReportFailureSchema = z.object({
phase: z.string().min(1),
message: z.string().min(1),
});
export const ingestReportSnapshotSchema = z
.object({
id: z.string().min(1),
@ -133,11 +138,13 @@ export const ingestReportSnapshotSchema = z
createdAt: z.string().min(1),
body: z
.object({
status: z.enum(['completed', 'failed']).optional(),
syncId: z.string().min(1),
diffSummary: ingestDiffSummarySchema,
fetch: sourceFetchReportSchema.optional(),
commitSha: z.string().nullable(),
tracePath: z.string().optional(),
failure: ingestReportFailureSchema.optional(),
isolatedDiff: z
.object({
enabled: z.boolean(),

View file

@ -48,12 +48,19 @@ export interface IngestReportPostProcessorOutcome {
touchedSources: TouchedSlSource[];
}
export interface IngestReportFailure {
phase: string;
message: string;
}
export interface IngestReportBody {
status?: 'completed' | 'failed';
syncId: string;
diffSummary: IngestDiffSummary;
fetch?: SourceFetchReport;
commitSha: string | null;
tracePath?: string;
failure?: IngestReportFailure;
isolatedDiff?: {
enabled: boolean;
integrationWorktreePath?: string;