feat(ingest): wire isolated diff gate repair

This commit is contained in:
Andrey Avtomonov 2026-05-18 00:38:07 +02:00
parent 51a5530995
commit 10641ac411
4 changed files with 191 additions and 34 deletions

View file

@ -13,6 +13,7 @@ import { actionTargetConnectionId } from './action-identity.js';
import { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js';
import { validateFinalIngestArtifacts, validateProvenanceRawPaths } from './artifact-gates.js';
import { selectRelevantCanonicalPins } from './canonical-pins.js';
import { finalGateRepairPaths, repairFinalGateFailure } from './final-gate-repair.js';
import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js';
import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js';
import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js';
@ -1293,6 +1294,9 @@ export class IngestBundleRunner {
resolverAttempts: 0,
resolverRepairs: 0,
resolverFailures: 0,
gateRepairAttempts: 0,
gateRepairs: 0,
gateRepairFailures: 0,
};
latestIsolatedDiffSummary = isolatedDiffSummary;
@ -1540,6 +1544,17 @@ export class IngestBundleRunner {
maxAttempts: 1,
stepBudget: 12,
}),
repairGateFailure: (context) =>
repairFinalGateFailure({
agentRunner: this.deps.agentRunner,
workdir: sessionWorktree.workdir,
gateError: context.reason,
allowedPaths: context.touchedPaths,
trace: runTrace,
repairKind: 'patch_semantic_gate',
maxAttempts: 1,
stepBudget: 16,
}),
});
if (integration.textualResolution) {
isolatedDiffSummary.resolverAttempts += integration.textualResolution.attempts;
@ -1550,6 +1565,15 @@ export class IngestBundleRunner {
isolatedDiffSummary.resolverFailures += 1;
}
}
if (integration.gateRepair) {
isolatedDiffSummary.gateRepairAttempts += integration.gateRepair.attempts;
if (integration.gateRepair.status === 'repaired') {
isolatedDiffSummary.semanticConflicts += 1;
isolatedDiffSummary.gateRepairs += 1;
} else {
isolatedDiffSummary.gateRepairFailures += 1;
}
}
if (integration.status === 'textual_conflict') {
isolatedDiffSummary.textualConflicts += 1;
await this.deps.runs.markFailed(runRow.id);
@ -2264,40 +2288,123 @@ export class IngestBundleRunner {
};
activePhase = 'final_gates';
activeFailureDetails = finalArtifactGateTraceData;
await traceTimed(
runTrace,
'final_gates',
'final_artifact_gates',
finalArtifactGateTraceData,
async () => {
await validateFinalIngestArtifacts({
connectionIds: repairConnectionIds,
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
);
try {
await traceTimed(
runTrace,
'final_gates',
'final_artifact_gates',
finalArtifactGateTraceData,
async () => {
await validateFinalIngestArtifacts({
connectionIds: repairConnectionIds,
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
);
} catch (error) {
const gateError = this.errorMessage(error);
const repairPaths = finalGateRepairPaths({
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
});
const gateRepair = await repairFinalGateFailure({
agentRunner: this.deps.agentRunner,
workdir: sessionWorktree.workdir,
gateError,
allowedPaths: repairPaths,
trace: runTrace,
repairKind: 'final_artifact_gate',
maxAttempts: 1,
stepBudget: 16,
});
isolatedDiffSummary.gateRepairAttempts += gateRepair.attempts;
if (gateRepair.status === 'failed') {
isolatedDiffSummary.gateRepairFailures += 1;
activeFailureDetails = {
...finalArtifactGateTraceData,
gateRepair,
gateError,
};
throw new Error(`${gateError}\ngate repair failed: ${gateRepair.reason}`);
}
isolatedDiffSummary.gateRepairs += 1;
await traceTimed(
runTrace,
'final_gates',
'final_artifact_gates_after_gate_repair',
{
...finalArtifactGateTraceData,
repairedPaths: gateRepair.changedPaths,
},
async () => {
await validateFinalIngestArtifacts({
connectionIds: repairConnectionIds,
changedWikiPageKeys: finalChangedWikiPageKeys,
touchedSlSources: finalTouchedSlSources,
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
);
const repairCommit = await sessionWorktree.git.commitFiles(
gateRepair.changedPaths,
`ingest(${job.sourceKey}): repair final gates syncId=${syncId}`,
this.deps.storage.systemGitAuthor.name,
this.deps.storage.systemGitAuthor.email,
);
if (!repairCommit.created) {
isolatedDiffSummary.gateRepairFailures += 1;
throw new Error('final gate repair produced no committable changes');
}
await runTrace.event('debug', 'final_gates', 'final_gate_repair_committed', {
commitSha: repairCommit.commitHash,
repairedPaths: gateRepair.changedPaths,
});
}
activeFailureDetails = undefined;
activePhase = 'provenance_validation';

View file

@ -301,4 +301,48 @@ describe('parseIngestReportSnapshot', () => {
resolverFailures: 0,
});
});
it('parses isolated-diff gate repair counters', () => {
const snapshot = parseIngestReportSnapshot({
id: 'report-1',
runId: 'run-1',
jobId: 'job-1',
connectionId: 'warehouse',
sourceKey: 'metabase',
createdAt: '2026-05-18T00:00:00.000Z',
body: {
status: 'completed',
syncId: 'sync-1',
diffSummary: { added: 1, modified: 0, deleted: 0, unchanged: 0 },
commitSha: 'abc123',
isolatedDiff: {
enabled: true,
acceptedPatches: 1,
textualConflicts: 0,
semanticConflicts: 1,
gateRepairAttempts: 1,
gateRepairs: 1,
gateRepairFailures: 0,
},
workUnits: [],
failedWorkUnits: [],
reconciliationSkipped: true,
conflictsResolved: [],
evictionsApplied: [],
unmappedFallbacks: [],
evictionInputs: [],
unresolvedCards: [],
supersededBy: null,
overrideOf: null,
provenanceRows: [],
toolTranscripts: [],
},
});
expect(snapshot.body.isolatedDiff).toMatchObject({
gateRepairAttempts: 1,
gateRepairs: 1,
gateRepairFailures: 0,
});
});
});

View file

@ -158,6 +158,9 @@ export const ingestReportSnapshotSchema = z
resolverAttempts: z.number().int().min(0).default(0),
resolverRepairs: z.number().int().min(0).default(0),
resolverFailures: z.number().int().min(0).default(0),
gateRepairAttempts: z.number().int().min(0).default(0),
gateRepairs: z.number().int().min(0).default(0),
gateRepairFailures: z.number().int().min(0).default(0),
})
.optional(),
workUnits: z.array(

View file

@ -73,6 +73,9 @@ export interface IngestReportBody {
resolverAttempts?: number;
resolverRepairs?: number;
resolverFailures?: number;
gateRepairAttempts?: number;
gateRepairs?: number;
gateRepairFailures?: number;
};
workUnits: IngestReportWorkUnit[];
failedWorkUnits: string[];