fix(ingest): preserve transient evidence for isolated work units

This commit is contained in:
Andrey Avtomonov 2026-05-18 02:54:10 +02:00
parent 1a429a1585
commit cbe822a33a
3 changed files with 19 additions and 2 deletions

View file

@ -260,7 +260,11 @@ const buildRunner = (deps: ReturnType<typeof makeDeps> = makeDeps(), overrides:
resolveTranscriptDir: (jobId) => `/tmp/ktx-test/run/wu-transcripts/${jobId}`,
resolveTracePath: (jobId) => `/tmp/ktx-test/ingest-traces/${jobId}/trace.jsonl`,
},
settings: { probeRowCount: 1, memoryIngestionModel: 'test-model' },
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
sharedWorktreeSourceKeys: ['fake', 'notion', 'looker', 'metricflow', 'historic-sql'],
},
skillsRegistry: deps.skillsRegistry as any,
promptService: deps.promptService as any,
wikiService: deps.wikiService as any,

View file

@ -1,4 +1,4 @@
import { mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { cp, mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import pLimit from 'p-limit';
import { z } from 'zod';
@ -78,6 +78,16 @@ import { repairWikiSlRefs, type WikiSlRefRepairResult } from './wiki-sl-ref-repa
type MemoryFlowStageProgress = Extract<MemoryFlowEvent, { type: 'stage_progress' }>;
async function copyTransientIngestEvidence(sourceWorkdir: string, targetWorkdir: string): Promise<void> {
const source = join(sourceWorkdir, '.ktx/ingest-evidence');
const target = join(targetWorkdir, '.ktx/ingest-evidence');
await cp(source, target, { recursive: true, force: true }).catch((error: NodeJS.ErrnoException) => {
if (error.code !== 'ENOENT') {
throw error;
}
});
}
function workUnitToMemoryFlowPlannedWorkUnit(workUnit: WorkUnit): MemoryFlowPlannedWorkUnit {
return {
unitKey: workUnit.unitKey,
@ -1419,6 +1429,7 @@ export class IngestBundleRunner {
patchDir,
trace: runTrace,
workUnit: wu,
afterSuccess: (child) => copyTransientIngestEvidence(child.workdir, sessionWorktree.workdir),
run: async (child) => {
const scopedWikiService = this.deps.wikiService.forWorktree(child.workdir);
const scopedSemanticLayerService = this.deps.semanticLayerService.forWorktree(child.workdir);

View file

@ -15,6 +15,7 @@ export interface RunIsolatedWorkUnitInput {
trace: IngestTraceWriter;
workUnit: WorkUnit;
run(child: IngestSessionWorktree): Promise<WorkUnitOutcome>;
afterSuccess?(child: IngestSessionWorktree): Promise<void>;
}
function patchFileName(unitIndex: number, unitKey: string): string {
@ -44,6 +45,7 @@ export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Prom
return { ...outcome, childWorktreePath: child.workdir };
}
await input.afterSuccess?.(child);
await mkdir(input.patchDir, { recursive: true });
const patchPath = join(input.patchDir, patchFileName(input.unitIndex, input.workUnit.unitKey));
await child.git.writeBinaryNoRenamePatch(input.ingestionBaseSha, 'HEAD', patchPath);