diff --git a/docs-site/content/docs/cli-reference/ktx-ingest.mdx b/docs-site/content/docs/cli-reference/ktx-ingest.mdx index 9ce5887e..495ca356 100644 --- a/docs-site/content/docs/cli-reference/ktx-ingest.mdx +++ b/docs-site/content/docs/cli-reference/ktx-ingest.mdx @@ -123,7 +123,10 @@ ktx ingest status The trace file lives under the project directory at `.ktx/ingest-traces//trace.jsonl`. Each line is a JSON event with the job id, run id, sync id, connection id, source key, phase, event name, timing, -context fields, and error details when a step fails. +state snapshot, decision context, and error details. Failed runs also write a +stored ingest report with `status: "failed"`, `failure.phase`, +`failure.message`, and the same trace path, so `ktx ingest status ` can +point you to the postmortem trace. Use `jq` or line-oriented tools to inspect a trace: diff --git a/packages/cli/src/ingest.test.ts b/packages/cli/src/ingest.test.ts index aed006c6..14b91333 100644 --- a/packages/cli/src/ingest.test.ts +++ b/packages/cli/src/ingest.test.ts @@ -985,6 +985,59 @@ describe('runKtxIngest', () => { expect(io.stdout()).toContain('Status: error\n'); }); + it('prints trace path and error status for stored failed ingest reports', async () => { + const projectDir = join(tempDir, 'project'); + await writeWarehouseConfig(projectDir); + const io = makeIo(); + const report = { + id: 'report-failed', + runId: 'run-failed', + jobId: 'job-failed', + connectionId: 'warehouse', + sourceKey: 'metabase', + createdAt: '2026-05-17T12:00:00.000Z', + body: { + status: 'failed', + syncId: 'sync-failed', + diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, + commitSha: null, + tracePath: '/project/.ktx/ingest-traces/job-failed/trace.jsonl', + failure: { phase: 'final_gates', message: 'final artifact gates failed' }, + workUnits: [], + failedWorkUnits: [], + reconciliationSkipped: true, + conflictsResolved: [], + evictionsApplied: [], + unmappedFallbacks: [], + evictionInputs: [], + unresolvedCards: [], + supersededBy: null, + overrideOf: null, + provenanceRows: [], + toolTranscripts: [], + }, + }; + + await runKtxIngest( + { + command: 'status', + projectDir, + reportFile: '/project/report-failed.json', + runId: 'run-failed', + outputMode: 'plain', + inputMode: 'disabled', + }, + io.io, + { + readReportFile: vi.fn().mockResolvedValue(report), + }, + ); + + expect(io.stdout()).toContain('Trace: /project/.ktx/ingest-traces/job-failed/trace.jsonl'); + expect(io.stdout()).toContain('Status: error'); + expect(io.stdout()).toContain('Error: final artifact gates failed'); + }); + it('prints a clear first failure reason when query-history work units fail', async () => { const projectDir = join(tempDir, 'project'); await writeWarehouseConfig(projectDir); diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index 53d28f42..ac146e79 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -102,7 +102,7 @@ export interface KtxIngestDeps { } function reportStatus(report: IngestReportSnapshot): 'done' | 'error' { - return report.body.failedWorkUnits.length > 0 ? 'error' : 'done'; + return report.body.status === 'failed' || report.body.failedWorkUnits.length > 0 ? 'error' : 'done'; } const REPORT_SOURCE_LABELS = new Map([ @@ -174,6 +174,9 @@ function formatFailureReason(sourceKey: string, reason: string): string { } function failedReportMessage(report: IngestReportSnapshot): string | null { + if (report.body.status === 'failed' && report.body.failure?.message) { + return sanitizeMemoryFlowError(report.body.failure.message); + } const failedCount = report.body.failedWorkUnits.length; if (failedCount === 0) { return null; diff --git a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts index 044dda31..c7446b5a 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts @@ -495,6 +495,120 @@ describe('IngestBundleRunner isolated diff path', () => { } }); + it('stores a failure report and postmortem trace for final gate failures', async () => { + const runtime = await makeRealGitRuntime(); + try { + const { deps, adapter } = makeDeps(runtime); + const createdReports: any[] = []; + deps.reports.create = vi.fn(async (args: any) => { + createdReports.push(args); + return { id: `report-${createdReports.length}` }; + }); + adapter.chunk.mockResolvedValue({ + workUnits: [ + { unitKey: 'card-wiki', rawFiles: ['cards/wiki.json'], peerFileIndex: [], dependencyPaths: [] }, + { unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }, + ], + }); + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async (params: any) => { + const root = rootOfConfig(currentSession.configService, runtime.configDir); + if (params.telemetryTags.unitKey === 'card-wiki') { + await mkdir(join(root, 'wiki/global'), { recursive: true }); + await writeFile( + join(root, 'wiki/global/account-segments.md'), + '---\nsummary: Account segments\nusage_mode: auto\n---\n\nARR is `mart_account_segments.total_contract_arr_cents`.\n', + ); + currentSession.actions.push({ + target: 'wiki', + type: 'created', + key: 'account-segments', + detail: 'Account segments', + rawPaths: ['cards/wiki.json'], + }); + await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'wu wiki', 'KTX Test', 'system@ktx.local'); + } + if (params.telemetryTags.unitKey === 'card-source') { + await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); + await writeFile( + join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), + 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', + ); + addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); + currentSession.actions.push({ + target: 'sl', + type: 'created', + key: 'mart_account_segments', + detail: 'Dollar measure', + targetConnectionId: 'warehouse', + rawPaths: ['cards/source.json'], + }); + await currentSession.gitService.commitFiles( + ['semantic-layer/warehouse/mart_account_segments.yaml'], + 'wu source', + 'KTX Test', + 'system@ktx.local', + ); + } + return { stopReason: 'natural' }; + }) as never; + + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [ + ['cards/wiki.json', 'h1'], + ['cards/source.json', 'h2'], + ]); + + await expect( + runner.run({ + jobId: 'job-trace-failure', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/total_contract_arr_cents/); + + const failureReport = createdReports.find((report) => report.body.status === 'failed'); + expect(failureReport.body.tracePath).toContain('job-trace-failure/trace.jsonl'); + expect(failureReport.body.failure).toMatchObject({ phase: 'final_gates' }); + + const events = (await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-trace-failure/trace.jsonl'), 'utf-8')) + .trim() + .split('\n') + .map((line) => JSON.parse(line)); + expect(events.map((event) => event.event)).toEqual( + expect.arrayContaining([ + 'ingest_started', + 'input_snapshot', + 'work_units_planned', + 'isolated_diff_enabled', + 'work_unit_child_created', + 'work_unit_patch_collected', + 'patch_apply_started', + 'patch_accepted', + 'reconciliation_finished', + 'final_artifact_gates_failed', + 'ingest_failed', + 'failure_report_created', + ]), + ); + const failed = events.find((event) => event.event === 'ingest_failed'); + expect(failed).toMatchObject({ + runId: 'run-1', + syncId: expect.any(String), + data: { phase: 'final_gates', tracePath: expect.stringContaining('trace.jsonl') }, + error: { message: expect.stringContaining('total_contract_arr_cents') }, + }); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + it('rejects slDisallowed patches that touch semantic-layer files', async () => { const runtime = await makeRealGitRuntime(); try { diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index ddea1cf9..e81fdfb9 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -23,6 +23,7 @@ import type { ContextEvidenceIndexSummary, IngestBundleRunnerDeps, IngestProvenanceRow, + IngestRunsPort, IngestSessionWorktree, PageTriageRunResult, } from './ports.js'; @@ -59,6 +60,7 @@ import { type MutableToolTranscriptSummary, } from './tools/tool-transcript-summary.js'; import type { + IngestDiffSummary, EvictionUnit, IngestBundleJob, IngestBundleResult, @@ -420,6 +422,10 @@ export class IngestBundleRunner { }); } + private errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); + } + private wikiPageKeysFromPaths(paths: string[]): string[] { return [ ...new Set( @@ -659,6 +665,25 @@ export class IngestBundleRunner { protected async runInner(job: IngestBundleJob, ctx?: IngestJobContext): Promise> { const syncId = buildSyncId(new Date(), job.jobId); const trace = this.createTrace(job); + const transcriptSummaries = new Map(); + let activeTrace: IngestTraceWriter = trace; + let activePhase = 'run'; + let runRow: Awaited> | null = null; + let latestDiffSummary: IngestDiffSummary = { added: 0, modified: 0, deleted: 0, unchanged: 0 }; + let latestWorkUnits: WorkUnitOutcome[] = []; + let latestFailedWorkUnits: string[] = []; + let latestReconciliationSkipped = true; + let latestIsolatedDiffSummary: + | { + enabled: boolean; + integrationWorktreePath?: string; + ingestionBaseSha?: string; + projectionSha?: string | null; + acceptedPatches: number; + textualConflicts: number; + semanticConflicts: number; + } + | undefined; await trace.event('info', 'run', 'ingest_started', { trigger: job.trigger, bundleRefKind: job.bundleRef.kind, @@ -670,7 +695,6 @@ export class IngestBundleRunner { throw new Error('ingest-bundle: config repo has no HEAD'); } const transcriptDir = this.deps.storage.resolveTranscriptDir(job.jobId); - const transcriptSummaries = new Map(); const recordTranscriptEntry = (path: string) => (entry: ToolCallLogEntry): void => { @@ -685,17 +709,28 @@ export class IngestBundleRunner { await stage1?.updateProgress(0.0, 'Fetching source files'); const adapter = this.deps.registry.get(job.sourceKey); - const stagedDir = overrideReport - ? await this.materializeOverrideSnapshot(overrideReport, { - connectionId: job.connectionId, - sourceKey: job.sourceKey, - jobId: job.jobId, - }) - : await this.resolveStagedDir(job.bundleRef, { - connectionId: job.connectionId, - sourceKey: job.sourceKey, - jobId: job.jobId, - }); + activePhase = 'fetch'; + const stagedDir = await traceTimed( + trace, + 'fetch', + 'resolve_staged_dir', + { + bundleRefKind: job.bundleRef.kind, + sourceKey: job.sourceKey, + }, + () => + overrideReport + ? this.materializeOverrideSnapshot(overrideReport, { + connectionId: job.connectionId, + sourceKey: job.sourceKey, + jobId: job.jobId, + }) + : this.resolveStagedDir(job.bundleRef, { + connectionId: job.connectionId, + sourceKey: job.sourceKey, + jobId: job.jobId, + }), + ); const fetchReport = adapter.readFetchReport ? await adapter.readFetchReport(stagedDir) : null; const scopeDescriptor = adapter.describeScope ? await adapter.describeScope(stagedDir) : null; @@ -706,13 +741,27 @@ export class IngestBundleRunner { let cleanupOutcome: 'success' | 'crash' | 'conflict' = 'crash'; try { - const { currentHashes, rawDirInWorktree } = await this.stageRawFilesStage1({ - stagedDir, - worktreeRoot: sessionWorktree.workdir, - connectionId: job.connectionId, - sourceKey: job.sourceKey, - syncId, - }); + activePhase = 'stage_raw_files'; + const { currentHashes, rawDirInWorktree } = await traceTimed( + trace, + 'stage_raw_files', + 'stage_raw_files', + { + stagedDir, + worktreePath: sessionWorktree.workdir, + connectionId: job.connectionId, + sourceKey: job.sourceKey, + syncId, + }, + () => + this.stageRawFilesStage1({ + stagedDir, + worktreeRoot: sessionWorktree.workdir, + connectionId: job.connectionId, + sourceKey: job.sourceKey, + syncId, + }), + ); memoryFlow?.update({ connectionId: job.connectionId, adapter: job.sourceKey, @@ -737,11 +786,24 @@ export class IngestBundleRunner { await stage1?.updateProgress(0.5, 'Checking what changed'); - const diffSet = await this.deps.diffSetService.compute( - job.connectionId, - job.sourceKey, - currentHashes, - scopeDescriptor ? scopeDescriptor.isPathInScope.bind(scopeDescriptor) : undefined, + activePhase = 'diff'; + const diffSet = await traceTimed( + trace, + 'diff', + 'compute_diff_set', + { + connectionId: job.connectionId, + sourceKey: job.sourceKey, + currentHashCount: currentHashes.size, + scopeFingerprint: scopeDescriptor?.fingerprint ?? null, + }, + () => + this.deps.diffSetService.compute( + job.connectionId, + job.sourceKey, + currentHashes, + scopeDescriptor ? scopeDescriptor.isPathInScope.bind(scopeDescriptor) : undefined, + ), ); const diffSummary = { added: diffSet.added.length, @@ -749,9 +811,10 @@ export class IngestBundleRunner { deleted: diffSet.deleted.length, unchanged: diffSet.unchanged.length, }; + latestDiffSummary = diffSummary; memoryFlow?.emit({ type: 'diff_computed', ...diffSummary }); - const runRow = await this.deps.runs.create({ + runRow = await this.deps.runs.create({ jobId: job.jobId, connectionId: job.connectionId, sourceKey: job.sourceKey, @@ -767,6 +830,8 @@ export class IngestBundleRunner { sourceKey: job.sourceKey, }; const runTrace = trace.withContext({ runId: runRow.id, syncId }); + activeTrace = runTrace; + const createdRunRow = runRow; await runTrace.event('debug', 'snapshot', 'input_snapshot', { baseSha, stagedDir, @@ -781,7 +846,11 @@ export class IngestBundleRunner { `${diffSet.added.length} new, ${diffSet.modified.length} changed, ${diffSet.deleted.length} removed`, ); - const detected = await adapter.detect(stagedDir); + activePhase = 'detect'; + const detected = await traceTimed(runTrace, 'detect', 'adapter_detect', { stagedDir, sourceKey: job.sourceKey }, () => + adapter.detect(stagedDir), + ); + await runTrace.event('debug', 'detect', 'adapter_detected', { detected }); if (!detected) { await this.deps.runs.markFailed(runRow.id); throw new Error(`source adapter '${job.sourceKey}' did not recognize staged dir`); @@ -802,6 +871,7 @@ export class IngestBundleRunner { const stage2 = ctx?.startPhase(0.04); await stage2?.updateProgress(0.0, 'Planning updates'); + activePhase = 'planning'; let workUnits: WorkUnit[] = []; let eviction: EvictionUnit | undefined; let unresolvedCards: UnresolvedCardInfo[] | undefined; @@ -819,7 +889,18 @@ export class IngestBundleRunner { unresolvedCards = overrideReport.body.unresolvedCards; await stage2?.updateProgress(1.0, `Loaded prior report ${overrideReport.jobId} for override reconciliation`); } else { - const chunk = await adapter.chunk(stagedDir, diffSet); + const chunk = await traceTimed( + runTrace, + 'planning', + 'chunk_work_units', + { + stagedDir, + added: diffSet.added.length, + modified: diffSet.modified.length, + deleted: diffSet.deleted.length, + }, + () => adapter.chunk(stagedDir, diffSet), + ); workUnits = chunk.workUnits; eviction = chunk.eviction; unresolvedCards = chunk.unresolvedCards; @@ -849,6 +930,12 @@ export class IngestBundleRunner { } await stage2?.updateProgress(1.0, `Planned ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`); } + await runTrace.event('debug', 'planning', 'work_units_planned', { + workUnitCount: workUnits.length, + evictionCount: eviction?.deletedRawPaths.length ?? 0, + unresolvedCardCount: unresolvedCards?.length ?? 0, + triageEnabled: triageResult?.enabled ?? false, + }); const targetConnectionIds = new Set([job.connectionId]); if (!overrideReport && adapter.listTargetConnectionIds) { @@ -869,6 +956,9 @@ export class IngestBundleRunner { } } const slConnectionIds = [...targetConnectionIds].sort(); + await runTrace.event('debug', 'planning', 'target_connections_resolved', { + connectionIds: slConnectionIds, + }); // Build shared per-job context. const [wikiIndex, slIndex] = await Promise.all([this.buildWikiIndex(), this.buildSlIndex(slConnectionIds)]); @@ -914,9 +1004,11 @@ export class IngestBundleRunner { textualConflicts: 0, semanticConflicts: 0, }; + latestIsolatedDiffSummary = isolatedDiffSummary; const stage3 = ctx?.startPhase(0.6); await stage3?.updateProgress(0.0, `Processing ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`); + activePhase = 'work_units'; this.logger.log(`[ingest-bundle] job=${job.jobId} tool-call transcripts: ${transcriptDir}/`); let projectionTouchedSources: TouchedSlSource[] = []; let projectionChangedWikiPageKeys: string[] = []; @@ -940,7 +1032,7 @@ export class IngestBundleRunner { sourceKey: job.sourceKey, syncId, jobId: job.jobId, - runId: runRow.id, + runId: createdRunRow.id, stagedDir, workdir: sessionWorktree.workdir, parseArtifacts, @@ -1080,6 +1172,8 @@ export class IngestBundleRunner { failedWorkUnits.push( ...workUnitOutcomes.filter((outcome) => outcome.status === 'failed').map((outcome) => outcome.unitKey), ); + latestWorkUnits = workUnitOutcomes; + latestFailedWorkUnits = failedWorkUnits; stageIndex.workUnits = workUnitOutcomes.map((o) => ({ unitKey: o.unitKey, rawFiles: workUnits.find((w) => w.unitKey === o.unitKey)?.rawFiles ?? [], @@ -1091,6 +1185,7 @@ export class IngestBundleRunner { slDisallowedReason: o.slDisallowedReason, })); + activePhase = 'integration'; for (const [index, outcome] of workUnitOutcomesByIndex.entries()) { if (!outcome || outcome.status !== 'success' || !outcome.patchPath) { continue; @@ -1385,6 +1480,8 @@ export class IngestBundleRunner { failedWorkUnits.push( ...workUnitOutcomes.filter((outcome) => outcome.status === 'failed').map((outcome) => outcome.unitKey), ); + latestWorkUnits = workUnitOutcomes; + latestFailedWorkUnits = failedWorkUnits; // Complete the typed Stage Index from the outcomes once, and use it for // Stage 4, provenance writes (Phase G), and the report body (Phase F3). @@ -1410,6 +1507,7 @@ export class IngestBundleRunner { const dedupResult = contextReport && this.deps.candidateDedup ? await this.deps.candidateDedup.deduplicateRun(runRow.id) : null; const preReconciliationSha = await sessionWorktree.git.revParseHead(); + activePhase = 'reconciliation'; // Stage 4 — reconciliation. Shares scoped wiki/SL with a fresh CaptureSession // so reconciliation writes land in the same worktree Stage 3 used. @@ -1656,6 +1754,7 @@ export class IngestBundleRunner { : undefined, }); } + latestReconciliationSkipped = reconcileOutcome.skipped; const danglingReconcileWikiRefs = await findDanglingWikiRefsForActions({ wikiService: rcScopedWiki, @@ -1689,19 +1788,27 @@ export class IngestBundleRunner { await stage4?.updateProgress(1.0, reconcileOutcome.skipped ? 'No reconciliation needed' : 'Reconciled'); const postProcessor = this.deps.postProcessors?.[job.sourceKey]; + activePhase = 'post_processor'; if (postProcessor) { const stagePostProcessor = ctx?.startPhase(0.04); await stagePostProcessor?.updateProgress(0.0, 'Running deterministic imports'); try { - const result = await postProcessor.run({ - connectionId: job.connectionId, - sourceKey: job.sourceKey, - syncId, - jobId: job.jobId, - runId: runRow.id, - workdir: sessionWorktree.workdir, - parseArtifacts, - }); + const result = await traceTimed( + runTrace, + 'post_processor', + 'post_processor', + { sourceKey: job.sourceKey }, + () => + postProcessor.run({ + connectionId: job.connectionId, + sourceKey: job.sourceKey, + syncId, + jobId: job.jobId, + runId: createdRunRow.id, + workdir: sessionWorktree.workdir, + parseArtifacts, + }), + ); postProcessorOutcome = { sourceKey: job.sourceKey, status: result.errors.length > 0 && result.touchedSources.length === 0 ? 'failed' : 'success', @@ -1723,6 +1830,12 @@ export class IngestBundleRunner { throw error; } } + await runTrace.event('debug', 'post_processor', 'post_processor_finished', { + sourceKey: job.sourceKey, + status: postProcessorOutcome?.status ?? 'skipped', + touchedSources: postProcessorOutcome?.touchedSources ?? [], + warnings: postProcessorOutcome?.warnings ?? [], + }); const repairConnectionIds = [ ...new Set([ @@ -1730,11 +1843,24 @@ export class IngestBundleRunner { ...(postProcessorOutcome?.touchedSources ?? []).map((source) => source.connectionId), ]), ].sort(); - wikiSlRefRepairResult = await repairWikiSlRefs({ - wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - configService: sessionWorktree.config, - connectionIds: repairConnectionIds, + activePhase = 'wiki_sl_ref_repair'; + wikiSlRefRepairResult = await traceTimed( + runTrace, + 'wiki_sl_ref_repair', + 'wiki_sl_refs_repair', + { connectionIds: repairConnectionIds }, + () => + repairWikiSlRefs({ + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + configService: sessionWorktree.config, + connectionIds: repairConnectionIds, + }), + ); + await runTrace.event('debug', 'wiki_sl_ref_repair', 'wiki_sl_refs_repaired', { + repairCount: wikiSlRefRepairResult.repairs.length, + repairs: wikiSlRefRepairResult.repairs, + warnings: wikiSlRefRepairResult.warnings, }); const postReconciliationSha = await sessionWorktree.git.revParseHead(); const postReconciliationPaths = @@ -1758,6 +1884,7 @@ export class IngestBundleRunner { ...(postProcessorOutcome?.touchedSources ?? []), ]); + activePhase = 'final_gates'; await traceTimed( runTrace, 'final_gates', @@ -1802,6 +1929,7 @@ export class IngestBundleRunner { ); // Stage 6 — squash commit + activePhase = 'squash'; const stage6 = ctx?.startPhase(0.04); await stage6?.updateProgress(0.0, 'Saving changes'); try { @@ -1827,6 +1955,10 @@ export class IngestBundleRunner { throw new Error(`squash merge conflict: ${mergeResult.conflictPaths.join(', ')}`); } const commitSha = mergeResult.touchedPaths.length === 0 ? null : mergeResult.squashSha; + await runTrace.event('debug', 'squash', 'squash_finished', { + commitSha, + touchedPaths: mergeResult.touchedPaths, + }); const memoryFlowSavedActions = stageIndex.workUnits.flatMap((wu) => wu.actions).concat(reconcileActions); const postProcessorMemoryCounts = postProcessorSavedMemoryCounts(postProcessorOutcome); memoryFlow?.emit({ @@ -1869,6 +2001,7 @@ export class IngestBundleRunner { const stage5 = ctx?.startPhase(0.04); await stage5?.updateProgress(0.0, 'Recording history'); + activePhase = 'provenance'; // Provenance rows: per-artifact when the WU emitted actions, plus a `skipped` // fallback for raw files that produced nothing so the next DiffSet still sees @@ -1949,6 +2082,9 @@ export class IngestBundleRunner { currentRawPaths: new Set(currentHashes.keys()), deletedRawPaths: new Set(eviction?.deletedRawPaths ?? []), }); + await runTrace.event('debug', 'provenance', 'provenance_rows_validated', { + rowCount: provenanceRows.length, + }); await this.deps.provenance.insertMany(provenanceRows); memoryFlow?.emit({ type: 'provenance_recorded', rowCount: provenanceRows.length }); await stage5?.updateProgress( @@ -1958,6 +2094,7 @@ export class IngestBundleRunner { const stage7 = ctx?.startPhase(0.04); await stage7?.updateProgress(0.0, 'Wrapping up'); + activePhase = 'report'; const reportProvenanceRows = provenanceRows.map( ({ rawPath, artifactKind, artifactKey, actionType, targetConnectionId }) => ({ @@ -1993,6 +2130,7 @@ export class IngestBundleRunner { : undefined; const reportBody = { + status: 'completed' as const, syncId, diffSummary, fetch: fetchReport ?? undefined, @@ -2077,6 +2215,11 @@ export class IngestBundleRunner { body: reportBody, }); const reportId = reportIdFromCreateResult(createdReport); + await runTrace.event('debug', 'report', 'success_report_created', { + reportId, + runId: runRow.id, + tracePath: runTrace.tracePath, + }); memoryFlow?.update({ ...(reportId ? { reportId, reportPath: reportId } : {}), }); @@ -2135,7 +2278,74 @@ export class IngestBundleRunner { await this.deps.sessionWorktreeService.cleanup(sessionWorktree, cleanupOutcome); } } catch (error) { - await trace.event('error', 'run', 'ingest_failed', { tracePath: trace.tracePath }, error); + await activeTrace.event( + 'error', + 'run', + 'ingest_failed', + { + tracePath: activeTrace.tracePath, + phase: activePhase, + runId: runRow?.id ?? null, + syncId, + }, + error, + ); + if (runRow) { + await this.deps.runs.markFailed(runRow.id); + await this.deps.reports.create({ + runId: runRow.id, + jobId: job.jobId, + connectionId: job.connectionId, + sourceKey: job.sourceKey, + body: { + status: 'failed' as const, + syncId, + diffSummary: latestDiffSummary, + commitSha: null, + tracePath: activeTrace.tracePath, + isolatedDiff: latestIsolatedDiffSummary, + failure: { + phase: activePhase, + message: this.errorMessage(error), + }, + workUnits: latestWorkUnits.map((wu) => ({ + unitKey: wu.unitKey, + rawFiles: [], + status: wu.status, + reason: wu.reason, + actions: wu.actions, + touchedSlSources: wu.touchedSlSources, + slDisallowed: wu.slDisallowed, + slDisallowedReason: wu.slDisallowedReason, + })), + failedWorkUnits: latestFailedWorkUnits, + reconciliationSkipped: latestReconciliationSkipped, + conflictsResolved: [], + evictionsApplied: [], + unmappedFallbacks: [], + artifactResolutions: [], + evictionInputs: [], + reconciliationActions: [], + evictionDecisions: [], + unresolvedCards: [], + supersededBy: null, + overrideOf: null, + provenanceRows: [], + toolTranscripts: Array.from(transcriptSummaries.values()).map((summary) => ({ + unitKey: summary.unitKey, + path: summary.path, + toolCallCount: summary.toolCallCount, + errorCount: summary.errorCount, + toolNames: Array.from(summary.toolNames).sort(), + })), + }, + }); + await activeTrace.event('info', 'report', 'failure_report_created', { + runId: runRow.id, + jobId: job.jobId, + tracePath: activeTrace.tracePath, + }); + } throw error; } } diff --git a/packages/context/src/ingest/report-snapshot.test.ts b/packages/context/src/ingest/report-snapshot.test.ts index bdf5b193..2e125386 100644 --- a/packages/context/src/ingest/report-snapshot.test.ts +++ b/packages/context/src/ingest/report-snapshot.test.ts @@ -206,6 +206,47 @@ describe('parseIngestReportSnapshot', () => { expect(snapshot.body.toolTranscripts).toEqual([]); }); + it('parses failed ingest reports with trace and failure details', () => { + const snapshot = parseIngestReportSnapshot({ + id: 'report-failed', + runId: 'run-failed', + jobId: 'job-failed', + connectionId: 'warehouse', + sourceKey: 'metabase', + createdAt: '2026-05-17T12:00:00.000Z', + body: { + status: 'failed', + syncId: 'sync-failed', + diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, + commitSha: null, + tracePath: '/project/.ktx/ingest-traces/job-failed/trace.jsonl', + failure: { + phase: 'final_gates', + message: 'final artifact gates failed', + }, + workUnits: [], + failedWorkUnits: [], + reconciliationSkipped: true, + conflictsResolved: [], + evictionsApplied: [], + unmappedFallbacks: [], + evictionInputs: [], + unresolvedCards: [], + supersededBy: null, + overrideOf: null, + provenanceRows: [], + toolTranscripts: [], + }, + }); + + expect(snapshot.body.status).toBe('failed'); + expect(snapshot.body.failure).toEqual({ + phase: 'final_gates', + message: 'final artifact gates failed', + }); + expect(snapshot.body.tracePath).toContain('trace.jsonl'); + }); + it('rejects malformed report snapshots with a concise message', () => { const report = validReportSnapshot(); report.body.workUnits[0] = { diff --git a/packages/context/src/ingest/report-snapshot.ts b/packages/context/src/ingest/report-snapshot.ts index 9d2e1062..47240d46 100644 --- a/packages/context/src/ingest/report-snapshot.ts +++ b/packages/context/src/ingest/report-snapshot.ts @@ -123,6 +123,11 @@ const sourceFetchReportSchema = z.object({ warnings: z.array(sourceFetchIssueSchema).default([]), }); +const ingestReportFailureSchema = z.object({ + phase: z.string().min(1), + message: z.string().min(1), +}); + export const ingestReportSnapshotSchema = z .object({ id: z.string().min(1), @@ -133,11 +138,13 @@ export const ingestReportSnapshotSchema = z createdAt: z.string().min(1), body: z .object({ + status: z.enum(['completed', 'failed']).optional(), syncId: z.string().min(1), diffSummary: ingestDiffSummarySchema, fetch: sourceFetchReportSchema.optional(), commitSha: z.string().nullable(), tracePath: z.string().optional(), + failure: ingestReportFailureSchema.optional(), isolatedDiff: z .object({ enabled: z.boolean(), diff --git a/packages/context/src/ingest/reports.ts b/packages/context/src/ingest/reports.ts index 95d610f2..f7eb445f 100644 --- a/packages/context/src/ingest/reports.ts +++ b/packages/context/src/ingest/reports.ts @@ -48,12 +48,19 @@ export interface IngestReportPostProcessorOutcome { touchedSources: TouchedSlSource[]; } +export interface IngestReportFailure { + phase: string; + message: string; +} + export interface IngestReportBody { + status?: 'completed' | 'failed'; syncId: string; diffSummary: IngestDiffSummary; fetch?: SourceFetchReport; commitSha: string | null; tracePath?: string; + failure?: IngestReportFailure; isolatedDiff?: { enabled: boolean; integrationWorktreePath?: string;