From 10641ac411872188dc0604eeafdce3c74398d7f3 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 18 May 2026 00:38:07 +0200 Subject: [PATCH] feat(ingest): wire isolated diff gate repair --- .../src/ingest/ingest-bundle.runner.ts | 175 ++++++++++++++---- .../src/ingest/report-snapshot.test.ts | 44 +++++ .../context/src/ingest/report-snapshot.ts | 3 + packages/context/src/ingest/reports.ts | 3 + 4 files changed, 191 insertions(+), 34 deletions(-) diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index 3788efb3..11dc3513 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -13,6 +13,7 @@ import { actionTargetConnectionId } from './action-identity.js'; import { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js'; import { validateFinalIngestArtifacts, validateProvenanceRawPaths } from './artifact-gates.js'; import { selectRelevantCanonicalPins } from './canonical-pins.js'; +import { finalGateRepairPaths, repairFinalGateFailure } from './final-gate-repair.js'; import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js'; import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js'; import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js'; @@ -1293,6 +1294,9 @@ export class IngestBundleRunner { resolverAttempts: 0, resolverRepairs: 0, resolverFailures: 0, + gateRepairAttempts: 0, + gateRepairs: 0, + gateRepairFailures: 0, }; latestIsolatedDiffSummary = isolatedDiffSummary; @@ -1540,6 +1544,17 @@ export class IngestBundleRunner { maxAttempts: 1, stepBudget: 12, }), + repairGateFailure: (context) => + repairFinalGateFailure({ + agentRunner: this.deps.agentRunner, + workdir: sessionWorktree.workdir, + gateError: context.reason, + allowedPaths: context.touchedPaths, + trace: runTrace, + repairKind: 'patch_semantic_gate', + maxAttempts: 1, + stepBudget: 16, + }), }); if (integration.textualResolution) { isolatedDiffSummary.resolverAttempts += integration.textualResolution.attempts; @@ -1550,6 +1565,15 @@ export class IngestBundleRunner { isolatedDiffSummary.resolverFailures += 1; } } + if (integration.gateRepair) { + isolatedDiffSummary.gateRepairAttempts += integration.gateRepair.attempts; + if (integration.gateRepair.status === 'repaired') { + isolatedDiffSummary.semanticConflicts += 1; + isolatedDiffSummary.gateRepairs += 1; + } else { + isolatedDiffSummary.gateRepairFailures += 1; + } + } if (integration.status === 'textual_conflict') { isolatedDiffSummary.textualConflicts += 1; await this.deps.runs.markFailed(runRow.id); @@ -2264,40 +2288,123 @@ export class IngestBundleRunner { }; activePhase = 'final_gates'; activeFailureDetails = finalArtifactGateTraceData; - await traceTimed( - runTrace, - 'final_gates', - 'final_artifact_gates', - finalArtifactGateTraceData, - async () => { - await validateFinalIngestArtifacts({ - connectionIds: repairConnectionIds, - changedWikiPageKeys: finalChangedWikiPageKeys, - touchedSlSources: finalTouchedSlSources, - wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - validateTouchedSources: (touched) => - validateWuTouchedSources( - { - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - connections: this.deps.connections, - configService: sessionWorktree.config, - gitService: sessionWorktree.git, - slSourcesRepository: this.deps.slSourcesRepository, - probeRowCount: this.deps.settings.probeRowCount, - slValidator: this.deps.slValidator, - }, - touched, - ), - tableExists: (connectionId, tableRef) => - this.tableRefExistsInSemanticLayer( - this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - [connectionId], - tableRef, - ), - }); - }, - ); + try { + await traceTimed( + runTrace, + 'final_gates', + 'final_artifact_gates', + finalArtifactGateTraceData, + async () => { + await validateFinalIngestArtifacts({ + connectionIds: repairConnectionIds, + changedWikiPageKeys: finalChangedWikiPageKeys, + touchedSlSources: finalTouchedSlSources, + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + validateTouchedSources: (touched) => + validateWuTouchedSources( + { + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + connections: this.deps.connections, + configService: sessionWorktree.config, + gitService: sessionWorktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + slValidator: this.deps.slValidator, + }, + touched, + ), + tableExists: (connectionId, tableRef) => + this.tableRefExistsInSemanticLayer( + this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + [connectionId], + tableRef, + ), + }); + }, + ); + } catch (error) { + const gateError = this.errorMessage(error); + const repairPaths = finalGateRepairPaths({ + changedWikiPageKeys: finalChangedWikiPageKeys, + touchedSlSources: finalTouchedSlSources, + }); + const gateRepair = await repairFinalGateFailure({ + agentRunner: this.deps.agentRunner, + workdir: sessionWorktree.workdir, + gateError, + allowedPaths: repairPaths, + trace: runTrace, + repairKind: 'final_artifact_gate', + maxAttempts: 1, + stepBudget: 16, + }); + + isolatedDiffSummary.gateRepairAttempts += gateRepair.attempts; + if (gateRepair.status === 'failed') { + isolatedDiffSummary.gateRepairFailures += 1; + activeFailureDetails = { + ...finalArtifactGateTraceData, + gateRepair, + gateError, + }; + throw new Error(`${gateError}\ngate repair failed: ${gateRepair.reason}`); + } + + isolatedDiffSummary.gateRepairs += 1; + await traceTimed( + runTrace, + 'final_gates', + 'final_artifact_gates_after_gate_repair', + { + ...finalArtifactGateTraceData, + repairedPaths: gateRepair.changedPaths, + }, + async () => { + await validateFinalIngestArtifacts({ + connectionIds: repairConnectionIds, + changedWikiPageKeys: finalChangedWikiPageKeys, + touchedSlSources: finalTouchedSlSources, + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + validateTouchedSources: (touched) => + validateWuTouchedSources( + { + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + connections: this.deps.connections, + configService: sessionWorktree.config, + gitService: sessionWorktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + slValidator: this.deps.slValidator, + }, + touched, + ), + tableExists: (connectionId, tableRef) => + this.tableRefExistsInSemanticLayer( + this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + [connectionId], + tableRef, + ), + }); + }, + ); + + const repairCommit = await sessionWorktree.git.commitFiles( + gateRepair.changedPaths, + `ingest(${job.sourceKey}): repair final gates syncId=${syncId}`, + this.deps.storage.systemGitAuthor.name, + this.deps.storage.systemGitAuthor.email, + ); + if (!repairCommit.created) { + isolatedDiffSummary.gateRepairFailures += 1; + throw new Error('final gate repair produced no committable changes'); + } + await runTrace.event('debug', 'final_gates', 'final_gate_repair_committed', { + commitSha: repairCommit.commitHash, + repairedPaths: gateRepair.changedPaths, + }); + } activeFailureDetails = undefined; activePhase = 'provenance_validation'; diff --git a/packages/context/src/ingest/report-snapshot.test.ts b/packages/context/src/ingest/report-snapshot.test.ts index 01b77b87..028c222c 100644 --- a/packages/context/src/ingest/report-snapshot.test.ts +++ b/packages/context/src/ingest/report-snapshot.test.ts @@ -301,4 +301,48 @@ describe('parseIngestReportSnapshot', () => { resolverFailures: 0, }); }); + + it('parses isolated-diff gate repair counters', () => { + const snapshot = parseIngestReportSnapshot({ + id: 'report-1', + runId: 'run-1', + jobId: 'job-1', + connectionId: 'warehouse', + sourceKey: 'metabase', + createdAt: '2026-05-18T00:00:00.000Z', + body: { + status: 'completed', + syncId: 'sync-1', + diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 }, + commitSha: 'abc123', + isolatedDiff: { + enabled: true, + acceptedPatches: 1, + textualConflicts: 0, + semanticConflicts: 1, + gateRepairAttempts: 1, + gateRepairs: 1, + gateRepairFailures: 0, + }, + workUnits: [], + failedWorkUnits: [], + reconciliationSkipped: true, + conflictsResolved: [], + evictionsApplied: [], + unmappedFallbacks: [], + evictionInputs: [], + unresolvedCards: [], + supersededBy: null, + overrideOf: null, + provenanceRows: [], + toolTranscripts: [], + }, + }); + + expect(snapshot.body.isolatedDiff).toMatchObject({ + gateRepairAttempts: 1, + gateRepairs: 1, + gateRepairFailures: 0, + }); + }); }); diff --git a/packages/context/src/ingest/report-snapshot.ts b/packages/context/src/ingest/report-snapshot.ts index 4d48a9c9..eef64b48 100644 --- a/packages/context/src/ingest/report-snapshot.ts +++ b/packages/context/src/ingest/report-snapshot.ts @@ -158,6 +158,9 @@ export const ingestReportSnapshotSchema = z resolverAttempts: z.number().int().min(0).default(0), resolverRepairs: z.number().int().min(0).default(0), resolverFailures: z.number().int().min(0).default(0), + gateRepairAttempts: z.number().int().min(0).default(0), + gateRepairs: z.number().int().min(0).default(0), + gateRepairFailures: z.number().int().min(0).default(0), }) .optional(), workUnits: z.array( diff --git a/packages/context/src/ingest/reports.ts b/packages/context/src/ingest/reports.ts index 32c5983a..431e4063 100644 --- a/packages/context/src/ingest/reports.ts +++ b/packages/context/src/ingest/reports.ts @@ -73,6 +73,9 @@ export interface IngestReportBody { resolverAttempts?: number; resolverRepairs?: number; resolverFailures?: number; + gateRepairAttempts?: number; + gateRepairs?: number; + gateRepairFailures?: number; }; workUnits: IngestReportWorkUnit[]; failedWorkUnits: string[];