From 51fe8306c3b31bab19d069742949dacfc96944b0 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Sun, 17 May 2026 21:42:45 +0200 Subject: [PATCH] fix(ingest): gate isolated final integration tree --- ...ingest-bundle.runner.isolated-diff.test.ts | 76 ++++++++ .../src/ingest/ingest-bundle.runner.ts | 162 ++++++++++++------ 2 files changed, 185 insertions(+), 53 deletions(-) diff --git a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts index 520879ab..044dda31 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts @@ -419,6 +419,82 @@ describe('IngestBundleRunner isolated diff path', () => { } }); + it('runs final artifact gates after reconciliation mutates the integration tree', async () => { + const runtime = await makeRealGitRuntime(); + try { + const { deps, adapter } = makeDeps(runtime); + adapter.chunk.mockResolvedValue({ + workUnits: [{ unitKey: 'card-source', rawFiles: ['cards/source.json'], peerFileIndex: [], dependencyPaths: [] }], + }); + let currentSession: any = null; + deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => { + currentSession = toolSession; + return { toRuntimeTools: vi.fn(() => ({})) }; + }); + deps.agentRunner.runLoop = vi.fn(async (params: any) => { + const root = rootOfConfig(currentSession.configService, runtime.configDir); + if (params.telemetryTags.operationName === 'ingest-bundle-wu') { + await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true }); + await writeFile( + join(root, 'semantic-layer/warehouse/mart_account_segments.yaml'), + 'name: mart_account_segments\ngrain: [account_id]\ncolumns: [{name: account_id, type: string}]\njoins: []\nmeasures:\n - name: total_contract_arr\n expr: sum(contract_arr)\n', + ); + addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'mart_account_segments'); + currentSession.actions.push({ + target: 'sl', + type: 'created', + key: 'mart_account_segments', + detail: 'Source with renamed ARR measure', + targetConnectionId: 'warehouse', + rawPaths: ['cards/source.json'], + }); + await currentSession.gitService.commitFiles( + ['semantic-layer/warehouse/mart_account_segments.yaml'], + 'wu source', + 'KTX Test', + 'system@ktx.local', + ); + } else { + await mkdir(join(root, 'wiki/global'), { recursive: true }); + await writeFile( + join(root, 'wiki/global/account-segments.md'), + '---\nsummary: Account segments\nusage_mode: auto\nsl_refs:\n - mart_account_segments\n---\n\nReconcile wrote stale ARR `mart_account_segments.total_contract_arr_cents`.\n', + ); + currentSession.actions.push({ + target: 'wiki', + type: 'created', + key: 'account-segments', + detail: 'Stale reconcile wiki page', + rawPaths: ['cards/source.json'], + }); + await currentSession.gitService.commitFiles(['wiki/global/account-segments.md'], 'reconcile wiki', 'KTX Test', 'system@ktx.local'); + } + return { stopReason: 'natural' }; + }) as never; + + const runner = new IngestBundleRunner(deps); + await mockStageRawFiles(runner, runtime, [['cards/source.json', 'h1']]); + + await expect( + runner.run({ + jobId: 'job-reconcile-stale', + connectionId: 'warehouse', + sourceKey: 'metabase', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload' }, + }), + ).rejects.toThrow(/total_contract_arr_cents/); + + const trace = await readFile(join(runtime.configDir, '.ktx/ingest-traces/job-reconcile-stale/trace.jsonl'), 'utf-8'); + expect(trace).toContain('reconciliation_finished'); + expect(trace).toContain('final_artifact_gates_failed'); + expect(trace).toContain('ingest_failed'); + expect(await runtime.git.revParseHead()).not.toContain('reconcile wiki'); + } finally { + await rm(runtime.homeDir, { recursive: true, force: true }); + } + }); + it('rejects slDisallowed patches that touch semantic-layer files', async () => { const runtime = await makeRealGitRuntime(); try { diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index ef804405..ddea1cf9 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -440,6 +440,40 @@ export class IngestBundleRunner { .filter((source) => source.connectionId.length > 0 && source.sourceName.length > 0); } + private touchedSlSourcesFromActions(actions: MemoryAction[], fallbackConnectionId: string): TouchedSlSource[] { + return actions + .filter((action) => action.target === 'sl') + .map((action) => ({ + connectionId: actionTargetConnectionId(action, fallbackConnectionId), + sourceName: action.key, + })); + } + + private wikiPageKeysFromActions(actions: MemoryAction[]): string[] { + return actions.filter((action) => action.target === 'wiki').map((action) => action.key); + } + + private uniqueWikiPageKeys(keys: string[]): string[] { + return [...new Set(keys.filter((key): key is string => typeof key === 'string' && key.length > 0))].sort(); + } + + private uniqueTouchedSlSources(sources: TouchedSlSource[]): TouchedSlSource[] { + const seen = new Set(); + const unique: TouchedSlSource[] = []; + for (const source of sources) { + const key = `${source.connectionId}:${source.sourceName}`; + if (seen.has(key)) { + continue; + } + seen.add(key); + unique.push(source); + } + return unique.sort((left, right) => { + const byConnection = left.connectionId.localeCompare(right.connectionId); + return byConnection === 0 ? left.sourceName.localeCompare(right.sourceName) : byConnection; + }); + } + private async runWorkUnitInWorktree(input: { job: IngestBundleJob; syncId: string; @@ -884,6 +918,8 @@ export class IngestBundleRunner { const stage3 = ctx?.startPhase(0.6); await stage3?.updateProgress(0.0, `Processing ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`); this.logger.log(`[ingest-bundle] job=${job.jobId} tool-call transcripts: ${transcriptDir}/`); + let projectionTouchedSources: TouchedSlSource[] = []; + let projectionChangedWikiPageKeys: string[] = []; if (!overrideReport && isolatedDiffEnabled) { await runTrace.event('info', 'routing', 'isolated_diff_enabled', { @@ -892,8 +928,6 @@ export class IngestBundleRunner { integrationWorktreePath: sessionWorktree.workdir, }); - let projectionTouchedSources: TouchedSlSource[] = []; - let projectionChangedWikiPageKeys: string[] = []; if (adapter.project) { const projection = await traceTimed( runTrace, @@ -1116,57 +1150,6 @@ export class IngestBundleRunner { isolatedDiffSummary.acceptedPatches += 1; } - await traceTimed( - runTrace, - 'final_gates', - 'final_artifact_gates', - { - changedWikiPageKeys: [ - ...new Set([ - ...projectionChangedWikiPageKeys, - ...workUnitOutcomes - .flatMap((outcome) => outcome.patchTouchedPaths ?? []) - .flatMap((path) => this.wikiPageKeysFromPaths([path])), - ]), - ], - touchedSlSources: [...projectionTouchedSources, ...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources)], - }, - async () => { - await validateFinalIngestArtifacts({ - connectionIds: slConnectionIds, - changedWikiPageKeys: [ - ...new Set([ - ...projectionChangedWikiPageKeys, - ...workUnitOutcomes - .flatMap((outcome) => outcome.patchTouchedPaths ?? []) - .flatMap((path) => this.wikiPageKeysFromPaths([path])), - ]), - ], - touchedSlSources: [...projectionTouchedSources, ...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources)], - wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - validateTouchedSources: (touched) => - validateWuTouchedSources( - { - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - connections: this.deps.connections, - configService: sessionWorktree.config, - gitService: sessionWorktree.git, - slSourcesRepository: this.deps.slSourcesRepository, - probeRowCount: this.deps.settings.probeRowCount, - slValidator: this.deps.slValidator, - }, - touched, - ), - tableExists: (connectionId, tableRef) => - this.tableRefExistsInSemanticLayer( - this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - [connectionId], - tableRef, - ), - }); - }, - ); } else if (!overrideReport) { await runTrace.event('info', 'routing', 'shared_worktree_path_enabled', { sourceKey: job.sourceKey }); const workUnitSettings = { @@ -1426,6 +1409,7 @@ export class IngestBundleRunner { : null; const dedupResult = contextReport && this.deps.candidateDedup ? await this.deps.candidateDedup.deduplicateRun(runRow.id) : null; + const preReconciliationSha = await sessionWorktree.git.revParseHead(); // Stage 4 — reconciliation. Shares scoped wiki/SL with a fresh CaptureSession // so reconciliation writes land in the same worktree Stage 3 used. @@ -1693,6 +1677,14 @@ export class IngestBundleRunner { conflictCount: stageIndex.conflictsResolved.length, fallbackCount: stageIndex.unmappedFallbacks.length, }); + await runTrace.event('debug', 'reconciliation', 'reconciliation_finished', { + skipped: reconcileOutcome.skipped, + stopReason: reconcileOutcome.stopReason ?? null, + actionCount: reconcileActions.length, + conflictCount: stageIndex.conflictsResolved.length, + fallbackCount: stageIndex.unmappedFallbacks.length, + artifactResolutionCount: stageIndex.artifactResolutions?.length ?? 0, + }); await stage4?.updateProgress(1.0, reconcileOutcome.skipped ? 'No reconciliation needed' : 'Reconciled'); @@ -1744,6 +1736,70 @@ export class IngestBundleRunner { configService: sessionWorktree.config, connectionIds: repairConnectionIds, }); + const postReconciliationSha = await sessionWorktree.git.revParseHead(); + const postReconciliationPaths = + preReconciliationSha && postReconciliationSha && preReconciliationSha !== postReconciliationSha + ? (await sessionWorktree.git.diffNameStatus(preReconciliationSha, postReconciliationSha)).map((entry) => entry.path) + : []; + const finalChangedWikiPageKeys = this.uniqueWikiPageKeys([ + ...(isolatedDiffEnabled ? projectionChangedWikiPageKeys : []), + ...workUnitOutcomes + .flatMap((outcome) => outcome.patchTouchedPaths ?? []) + .flatMap((path) => this.wikiPageKeysFromPaths([path])), + ...this.wikiPageKeysFromActions(reconcileActions), + ...postReconciliationPaths.flatMap((path) => this.wikiPageKeysFromPaths([path])), + ...wikiSlRefRepairResult.repairs.filter((repair) => repair.scope === 'GLOBAL').map((repair) => repair.pageKey), + ]); + const finalTouchedSlSources = this.uniqueTouchedSlSources([ + ...(isolatedDiffEnabled ? projectionTouchedSources : []), + ...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources), + ...this.touchedSlSourcesFromActions(reconcileActions, job.connectionId), + ...this.touchedSlSourcesFromPaths(postReconciliationPaths), + ...(postProcessorOutcome?.touchedSources ?? []), + ]); + + await traceTimed( + runTrace, + 'final_gates', + 'final_artifact_gates', + { + changedWikiPageKeys: finalChangedWikiPageKeys, + touchedSlSources: finalTouchedSlSources, + preReconciliationSha, + postReconciliationSha, + postReconciliationPaths, + reconciliationActionCount: reconcileActions.length, + wikiSlRefRepairCount: wikiSlRefRepairResult.repairs.length, + }, + async () => { + await validateFinalIngestArtifacts({ + connectionIds: repairConnectionIds, + changedWikiPageKeys: finalChangedWikiPageKeys, + touchedSlSources: finalTouchedSlSources, + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + validateTouchedSources: (touched) => + validateWuTouchedSources( + { + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + connections: this.deps.connections, + configService: sessionWorktree.config, + gitService: sessionWorktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + slValidator: this.deps.slValidator, + }, + touched, + ), + tableExists: (connectionId, tableRef) => + this.tableRefExistsInSemanticLayer( + this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + [connectionId], + tableRef, + ), + }); + }, + ); // Stage 6 — squash commit const stage6 = ctx?.startPhase(0.04);