feat: route selected ingest sources through isolated diffs

This commit is contained in:
Andrey Avtomonov 2026-05-17 21:27:41 +02:00
parent 0be264dde0
commit c481f1cce9
5 changed files with 600 additions and 10 deletions

View file

@ -609,6 +609,11 @@ export {
} from './raw-sources-paths.js';
export { ingestReportSnapshotSchema, parseIngestReportSnapshot } from './report-snapshot.js';
export type { IngestReportBody, IngestReportSnapshot } from './reports.js';
export * from './artifact-gates.js';
export * from './ingest-trace.js';
export * from './isolated-diff/git-patch.js';
export * from './isolated-diff/patch-integrator.js';
export * from './isolated-diff/work-unit-executor.js';
export * from './reports.js';
export { SourceAdapterRegistry } from './source-adapter-registry.js';
export type { SqliteBundleIngestStoreOptions } from './sqlite-bundle-ingest-store.js';
@ -652,4 +657,7 @@ export type {
TriageSignals,
UnresolvedCardInfo,
WorkUnit,
DeterministicProjectionContext,
ProjectionResult,
} from './types.js';
export * from './wiki-body-refs.js';

View file

@ -255,6 +255,7 @@ const buildRunner = (deps: ReturnType<typeof makeDeps> = makeDeps(), overrides:
resolveUploadDir: (uploadId) => `/tmp/ktx-test/ingest-uploads/${uploadId}`,
resolvePullDir: (jobId) => `/tmp/ktx-test/ingest-pulls/${jobId}`,
resolveTranscriptDir: (jobId) => `/tmp/ktx-test/run/wu-transcripts/${jobId}`,
resolveTracePath: (jobId) => `/tmp/ktx-test/ingest-traces/${jobId}/trace.jsonl`,
},
settings: { probeRowCount: 1, memoryIngestionModel: 'test-model' },
skillsRegistry: deps.skillsRegistry as any,
@ -1505,7 +1506,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['explores/b2b/sales_pipeline.json', 'h1']]),
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/looker-run/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');

View file

@ -6,14 +6,26 @@ import { type KtxLogger, noopLogger } from '../core/index.js';
import { createRuntimeToolDescriptorFromAiTool, type KtxRuntimeToolSet } from '../llm/index.js';
import type { CaptureSession, MemoryAction } from '../memory/index.js';
import type { SemanticLayerService, SemanticLayerSource, SlValidationDeps } from '../sl/index.js';
import { createTouchedSlSources, type ToolContext, type ToolSession } from '../tools/index.js';
import { createTouchedSlSources, type ToolContext, type ToolSession, type TouchedSlSource } from '../tools/index.js';
import type { KnowledgeWikiService } from '../wiki/index.js';
import { findDanglingWikiRefsForActions } from '../wiki/wiki-ref-validation.js';
import { actionTargetConnectionId } from './action-identity.js';
import { NOTION_DEFAULT_MAX_KNOWLEDGE_CREATES_PER_RUN } from './adapters/notion/types.js';
import { validateFinalIngestArtifacts, validateProvenanceRawPaths } from './artifact-gates.js';
import { selectRelevantCanonicalPins } from './canonical-pins.js';
import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js';
import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js';
import { runIsolatedWorkUnit } from './isolated-diff/work-unit-executor.js';
import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js';
import type { MemoryFlowPlannedWorkUnit } from './memory-flow/types.js';
import type { ContextEvidenceIndexSummary, IngestBundleRunnerDeps, PageTriageRunResult } from './ports.js';
import type { CanonicalPin } from './canonical-pins.js';
import type { MemoryFlowEventSink, MemoryFlowPlannedWorkUnit } from './memory-flow/types.js';
import type {
ContextEvidenceIndexSummary,
IngestBundleRunnerDeps,
IngestProvenanceRow,
IngestSessionWorktree,
PageTriageRunResult,
} from './ports.js';
import { buildSyncId, rawSourcesDirForSync } from './raw-sources-paths.js';
import {
buildStageIndexFromReportBody,
@ -393,8 +405,231 @@ export class IngestBundleRunner {
return workUnits.filter((wu) => wu.rawFiles.some((rawPath) => triageResult.fullRawPaths.has(rawPath)));
}
private isIsolatedDiffEnabled(sourceKey: string): boolean {
return (this.deps.settings.isolatedDiffSourceKeys ?? []).includes(sourceKey);
}
private createTrace(job: IngestBundleJob): IngestTraceWriter {
const storage = this.deps.storage as typeof this.deps.storage & { resolveTracePath?: (jobId: string) => string };
return new FileIngestTraceWriter({
tracePath: storage.resolveTracePath?.(job.jobId) ?? ingestTracePathForJob(this.deps.storage.homeDir, job.jobId),
jobId: job.jobId,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
level: this.deps.settings.ingestTraceLevel ?? 'debug',
});
}
private wikiPageKeysFromPaths(paths: string[]): string[] {
return [
...new Set(
paths
.filter((path) => path.startsWith('wiki/global/') && path.endsWith('.md'))
.map((path) => path.slice('wiki/global/'.length, -'.md'.length)),
),
].sort();
}
private touchedSlSourcesFromPaths(paths: string[]): TouchedSlSource[] {
return paths
.filter((path) => path.startsWith('semantic-layer/') && path.endsWith('.yaml') && !path.includes('/_schema/'))
.map((path) => {
const [, connectionId, fileName] = path.split('/');
return { connectionId: connectionId ?? '', sourceName: (fileName ?? '').replace(/\.yaml$/, '') };
})
.filter((source) => source.connectionId.length > 0 && source.sourceName.length > 0);
}
private async runWorkUnitInWorktree(input: {
job: IngestBundleJob;
syncId: string;
wu: WorkUnit;
worktree: IngestSessionWorktree;
stagedDir: string;
contextReport: ContextEvidenceIndexSummary | null;
ingestToolMetadata: { runId: string; jobId: string; syncId: string; sourceKey: string };
slConnectionIds: string[];
wikiIndex: string;
slIndex: string;
priorProvenance: Map<string, IngestProvenanceRow[]>;
scopedWikiService: ReturnType<KnowledgeWikiService['forWorktree']>;
scopedSemanticLayerService: ReturnType<SemanticLayerService['forWorktree']>;
baseFraming: string;
skillsPrompt: string;
canonicalPins: CanonicalPin[];
workUnitSettings: { maxConcurrency: number; stepBudget: number; failureMode: 'abort' | 'continue' };
transcriptDir: string;
transcriptSummaries: Map<string, MutableToolTranscriptSummary>;
recordTranscriptEntry(path: string): (entry: ToolCallLogEntry) => void;
stageIndex: StageIndex;
includeContextEvidenceTools: boolean;
currentTableExists(tableRef: string): Promise<boolean>;
memoryFlow?: MemoryFlowEventSink;
wuSkillNames: string[];
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
}): Promise<WorkUnitOutcome> {
const session: CaptureSession = {
userId: 'system',
chatId: input.wu.unitKey,
userMessage: `ingest(${input.job.sourceKey}) WU=${input.wu.unitKey}`,
connectionId: input.job.connectionId,
userScopedEnabled: false,
forceGlobalScope: true,
touchedSlSources: createTouchedSlSources(),
preHead: input.worktree.baseSha,
};
const sessionActions: MemoryAction[] = [];
const toolSession: ToolSession = {
connectionId: input.job.connectionId,
isWorktreeScoped: true,
preHead: input.worktree.baseSha,
touchedSlSources: session.touchedSlSources,
actions: sessionActions,
allowedRawPaths: new Set(input.wu.rawFiles),
allowedConnectionNames: new Set(input.slConnectionIds),
semanticLayerService: input.scopedSemanticLayerService,
wikiService: input.scopedWikiService,
configService: input.worktree.config,
gitService: input.worktree.git,
ingest: input.ingestToolMetadata,
};
const slValidationDeps: SlValidationDeps = {
semanticLayerService: input.scopedSemanticLayerService,
connections: this.deps.connections,
configService: input.worktree.config,
gitService: input.worktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
};
const wuToolset = this.deps.toolsetFactory.createIngestWuToolset(toolSession, {
includeContextEvidenceTools: input.includeContextEvidenceTools,
});
const wuToolContext: ToolContext = {
sourceId: 'ingest',
messageId: `${input.job.jobId}-wu-${input.wu.unitKey}`,
userId: 'system',
connectionId: input.job.connectionId,
ingest: input.ingestToolMetadata,
session: toolSession,
};
const skillsLoadedPerWu: string[] = [];
const loadSkillTool: KtxRuntimeToolSet = {
load_skill: {
name: 'load_skill',
description:
'Load a skill to get specialized instructions. Call this when a skill listed in the system prompt matches the current task.',
inputSchema: z.object({ name: z.string() }),
execute: async ({ name }) => {
const skill = await this.deps.skillsRegistry.getSkill(name, 'memory_agent');
if (!skill) {
const available =
(await this.deps.skillsRegistry.listSkills('memory_agent')).map((s) => s.name).join(', ') || '(none)';
return { markdown: `Skill "${name}" not available. Available: ${available}` };
}
const body = await readFile(join(skill.path, 'SKILL.md'), 'utf-8');
if (!skillsLoadedPerWu.includes(skill.name)) {
skillsLoadedPerWu.push(skill.name);
}
const structured = {
name: skill.name,
skillDirectory: skill.path,
content: this.deps.skillsRegistry.stripFrontmatter(body),
};
return {
markdown: `# ${structured.name}\n\n${structured.content}`,
structured,
};
},
},
};
const wuEmitUnmappedFallbackTool = {
emit_unmapped_fallback: createRuntimeToolDescriptorFromAiTool(
'emit_unmapped_fallback',
createEmitUnmappedFallbackTool({
stageIndex: input.stageIndex,
allowedPaths: new Set(input.wu.rawFiles),
tableRefExists: input.currentTableExists,
}),
),
};
const systemPrompt = buildWuSystemPrompt({
baseFraming: input.baseFraming,
skillsPrompt: input.skillsPrompt,
syncId: input.syncId,
sourceKey: input.job.sourceKey,
canonicalPins: input.canonicalPins,
});
input.memoryFlow?.emit({
type: 'work_unit_started',
unitKey: input.wu.unitKey,
skills: input.wuSkillNames,
stepBudget: input.workUnitSettings.stepBudget,
});
return executeWorkUnit(
{
sessionWorktreeGit: input.worktree.git,
agentRunner: this.deps.agentRunner,
validateTouchedSources: (touched) =>
validateWuTouchedSources({ ...slValidationDeps, slValidator: this.deps.slValidator }, touched),
validateWikiRefs: (actions) =>
findDanglingWikiRefsForActions({
wikiService: input.scopedWikiService,
scope: 'GLOBAL',
scopeId: null,
actions,
}),
resetHardTo: (targetSha) => input.worktree.git.resetHardTo(targetSha),
buildSystemPrompt: () => systemPrompt,
buildUserPrompt: (wuInner) =>
buildWuUserPrompt({
wu: wuInner,
wikiIndex: input.wikiIndex,
slIndex: input.slIndex,
priorProvenance: input.priorProvenance,
}),
buildToolSet: (wuInner) =>
wrapToolsWithLogger(
buildWuToolSet({
sourceKey: input.job.sourceKey,
stagedDir: input.stagedDir,
wu: wuInner,
loadSkillTool,
emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool,
toolsetTools: wuToolset.toRuntimeTools(wuToolContext),
}),
join(input.transcriptDir, `${wuInner.unitKey}.jsonl`),
wuInner.unitKey,
{ onEntry: input.recordTranscriptEntry(join(input.transcriptDir, `${wuInner.unitKey}.jsonl`)) },
),
captureSession: session,
sessionActions,
modelRole: 'candidateExtraction',
stepBudget: input.workUnitSettings.stepBudget,
sourceKey: input.job.sourceKey,
connectionId: input.job.connectionId,
jobId: input.job.jobId,
toolFailureCount: (unitKey) => input.transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0,
onStepFinish: input.onStepFinish,
},
input.wu,
);
}
protected async runInner(job: IngestBundleJob, ctx?: IngestJobContext): Promise<Omit<IngestBundleResult, 'jobId'>> {
const syncId = buildSyncId(new Date(), job.jobId);
const trace = this.createTrace(job);
await trace.event('info', 'run', 'ingest_started', {
trigger: job.trigger,
bundleRefKind: job.bundleRef.kind,
});
try {
const memoryFlow = ctx?.memoryFlow;
const baseSha = await this.deps.lockingService.withLock('config:repo', () => this.deps.gitService.revParseHead());
if (!baseSha) {
@ -434,7 +669,7 @@ export class IngestBundleRunner {
const sessionWorktree = await this.deps.lockingService.withLock('config:repo', () =>
this.deps.sessionWorktreeService.create(job.jobId, baseSha),
);
let cleanupOutcome: 'success' | 'crash' = 'crash';
let cleanupOutcome: 'success' | 'crash' | 'conflict' = 'crash';
try {
const { currentHashes, rawDirInWorktree } = await this.stageRawFilesStage1({
@ -497,6 +732,15 @@ export class IngestBundleRunner {
syncId,
sourceKey: job.sourceKey,
};
const runTrace = trace.withContext({ runId: runRow.id, syncId });
await runTrace.event('debug', 'snapshot', 'input_snapshot', {
baseSha,
stagedDir,
rawFileCount: currentHashes.size,
rawDirInWorktree,
diffSummary,
scopeFingerprint: scopeDescriptor?.fingerprint ?? null,
});
await stage1?.updateProgress(
1.0,
@ -626,12 +870,305 @@ export class IngestBundleRunner {
workUnitCount: memoryFlowPlannedWorkUnits.length,
evictionCount: eviction?.deletedRawPaths.length ?? 0,
});
const isolatedDiffEnabled = !overrideReport && this.isIsolatedDiffEnabled(job.sourceKey);
const isolatedDiffSummary = {
enabled: isolatedDiffEnabled,
integrationWorktreePath: isolatedDiffEnabled ? sessionWorktree.workdir : undefined,
ingestionBaseSha: undefined as string | undefined,
projectionSha: null as string | null,
acceptedPatches: 0,
textualConflicts: 0,
semanticConflicts: 0,
};
const stage3 = ctx?.startPhase(0.6);
await stage3?.updateProgress(0.0, `Processing ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`);
this.logger.log(`[ingest-bundle] job=${job.jobId} tool-call transcripts: ${transcriptDir}/`);
if (!overrideReport) {
if (!overrideReport && isolatedDiffEnabled) {
await runTrace.event('info', 'routing', 'isolated_diff_enabled', {
sourceKey: job.sourceKey,
workUnitCount: workUnits.length,
integrationWorktreePath: sessionWorktree.workdir,
});
let projectionTouchedSources: TouchedSlSource[] = [];
let projectionChangedWikiPageKeys: string[] = [];
if (adapter.project) {
const projection = await traceTimed(
runTrace,
'projection',
'deterministic_projection',
{ sourceKey: job.sourceKey },
() =>
adapter.project!({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
jobId: job.jobId,
runId: runRow.id,
stagedDir,
workdir: sessionWorktree.workdir,
parseArtifacts,
}),
);
if (projection.errors.length > 0) {
await this.deps.runs.markFailed(runRow.id);
throw new Error(`deterministic projection failed: ${projection.errors.join('; ')}`);
}
projectionTouchedSources = projection.touchedSources;
projectionChangedWikiPageKeys = projection.changedWikiPageKeys;
const projectionPaths = [
...projection.touchedSources.map((source) => `semantic-layer/${source.connectionId}/${source.sourceName}.yaml`),
...projection.changedWikiPageKeys.map((pageKey) => `wiki/global/${pageKey}.md`),
];
const projectionCommit =
projectionPaths.length > 0
? await sessionWorktree.git.commitFiles(
projectionPaths,
`ingest(${job.sourceKey}): deterministic projection syncId=${syncId}`,
this.deps.storage.systemGitAuthor.name,
this.deps.storage.systemGitAuthor.email,
)
: await sessionWorktree.git.commitStaged(
`ingest(${job.sourceKey}): deterministic projection syncId=${syncId}`,
this.deps.storage.systemGitAuthor.name,
this.deps.storage.systemGitAuthor.email,
);
isolatedDiffSummary.projectionSha = projectionCommit.created ? projectionCommit.commitHash : null;
await runTrace.event('debug', 'projection', 'deterministic_projection_committed', {
projectionSha: isolatedDiffSummary.projectionSha,
touchedSources: projectionTouchedSources,
changedWikiPageKeys: projectionChangedWikiPageKeys,
warnings: projection.warnings,
});
}
const ingestionBaseSha = await sessionWorktree.git.revParseHead();
isolatedDiffSummary.ingestionBaseSha = ingestionBaseSha;
const patchDir = join(this.deps.storage.homeDir, 'ingest-patches', job.jobId);
const workUnitSettings = {
maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1,
stepBudget: this.deps.settings.workUnitStepBudget ?? 40,
failureMode: this.deps.settings.workUnitFailureMode ?? 'continue',
};
const limitWorkUnit = pLimit(workUnitSettings.maxConcurrency);
const workUnitOutcomesByIndex: WorkUnitOutcome[] = [];
let completedWorkUnits = 0;
if (workUnits.length === 0) {
await stage3?.updateProgress(1.0, '0 of 0 work units complete');
}
try {
await Promise.all(
workUnits.map((wu, index) =>
limitWorkUnit(async () => {
const outcome = await runIsolatedWorkUnit({
unitIndex: index,
ingestionBaseSha,
sessionWorktreeService: this.deps.sessionWorktreeService,
patchDir,
trace: runTrace,
workUnit: wu,
run: async (child) => {
const scopedWikiService = this.deps.wikiService.forWorktree(child.workdir);
const scopedSemanticLayerService = this.deps.semanticLayerService.forWorktree(child.workdir);
return this.runWorkUnitInWorktree({
job,
syncId,
wu,
worktree: child,
stagedDir,
contextReport,
ingestToolMetadata,
slConnectionIds,
wikiIndex,
slIndex,
priorProvenance: await this.deps.provenance.findLatestArtifactsForRawPaths(
job.connectionId,
job.sourceKey,
wu.rawFiles,
),
scopedWikiService,
scopedSemanticLayerService,
baseFraming,
skillsPrompt,
canonicalPins,
workUnitSettings,
transcriptDir,
transcriptSummaries,
recordTranscriptEntry,
stageIndex,
includeContextEvidenceTools: adapter.evidenceIndexing === 'documents' && !!contextReport,
currentTableExists: (tableRef) =>
this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef),
memoryFlow,
wuSkillNames,
onStepFinish: ({ stepIndex, stepBudget }) => {
memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget });
},
});
},
});
workUnitOutcomesByIndex[index] = outcome;
for (const action of outcome.actions) {
memoryFlow?.emit({
type: 'candidate_action',
unitKey: outcome.unitKey,
target: action.target,
action: action.type,
key: action.key,
});
}
memoryFlow?.emit({
type: 'work_unit_finished',
unitKey: outcome.unitKey,
status: outcome.status,
...(outcome.reason ? { reason: outcome.reason } : {}),
});
completedWorkUnits += 1;
await stage3?.updateProgress(
completedWorkUnits / workUnits.length,
`${completedWorkUnits} of ${workUnits.length} work units complete`,
);
}),
),
);
} catch (error) {
await this.deps.runs.markFailed(runRow.id);
throw error;
}
workUnitOutcomes.push(
...workUnitOutcomesByIndex.filter((outcome): outcome is WorkUnitOutcome => Boolean(outcome)),
);
failedWorkUnits.push(
...workUnitOutcomes.filter((outcome) => outcome.status === 'failed').map((outcome) => outcome.unitKey),
);
stageIndex.workUnits = workUnitOutcomes.map((o) => ({
unitKey: o.unitKey,
rawFiles: workUnits.find((w) => w.unitKey === o.unitKey)?.rawFiles ?? [],
status: o.status,
reason: o.reason,
actions: o.actions,
touchedSlSources: o.touchedSlSources,
slDisallowed: o.slDisallowed,
slDisallowedReason: o.slDisallowedReason,
}));
for (const [index, outcome] of workUnitOutcomesByIndex.entries()) {
if (!outcome || outcome.status !== 'success' || !outcome.patchPath) {
continue;
}
const wu = workUnits[index];
if (!wu) {
continue;
}
const integration = await integrateWorkUnitPatch({
unitKey: outcome.unitKey,
patchPath: outcome.patchPath,
integrationGit: sessionWorktree.git,
trace: runTrace,
author: this.deps.storage.systemGitAuthor,
slDisallowed: wu.slDisallowed === true,
validateAppliedTree: async (touchedPaths) => {
await validateFinalIngestArtifacts({
connectionIds: slConnectionIds,
changedWikiPageKeys: this.wikiPageKeysFromPaths(touchedPaths),
touchedSlSources: this.touchedSlSourcesFromPaths(touchedPaths),
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
});
if (integration.status === 'textual_conflict') {
isolatedDiffSummary.textualConflicts += 1;
await this.deps.runs.markFailed(runRow.id);
cleanupOutcome = 'conflict';
throw new Error(`isolated diff textual conflict in ${outcome.unitKey}: ${integration.reason}`);
}
if (integration.status === 'semantic_conflict') {
isolatedDiffSummary.semanticConflicts += 1;
await this.deps.runs.markFailed(runRow.id);
cleanupOutcome = 'conflict';
throw new Error(`isolated diff semantic conflict in ${outcome.unitKey}: ${integration.reason}`);
}
isolatedDiffSummary.acceptedPatches += 1;
}
await traceTimed(
runTrace,
'final_gates',
'final_artifact_gates',
{
changedWikiPageKeys: [
...new Set([
...projectionChangedWikiPageKeys,
...workUnitOutcomes
.flatMap((outcome) => outcome.patchTouchedPaths ?? [])
.flatMap((path) => this.wikiPageKeysFromPaths([path])),
]),
],
touchedSlSources: [...projectionTouchedSources, ...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources)],
},
async () => {
await validateFinalIngestArtifacts({
connectionIds: slConnectionIds,
changedWikiPageKeys: [
...new Set([
...projectionChangedWikiPageKeys,
...workUnitOutcomes
.flatMap((outcome) => outcome.patchTouchedPaths ?? [])
.flatMap((path) => this.wikiPageKeysFromPaths([path])),
]),
],
touchedSlSources: [...projectionTouchedSources, ...workUnitOutcomes.flatMap((outcome) => outcome.touchedSlSources)],
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
);
} else if (!overrideReport) {
await runTrace.event('info', 'routing', 'shared_worktree_path_enabled', { sourceKey: job.sourceKey });
const workUnitSettings = {
maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1,
stepBudget: this.deps.settings.workUnitStepBudget ?? 40,
@ -1291,7 +1828,7 @@ export class IngestBundleRunner {
};
const producedPaths = new Set<string>();
const pushActionProvenance = (rawPath: string, action: MemoryAction): void => {
const hash = currentHashes.get(rawPath) ?? 'unknown';
const hash = currentHashes.get(rawPath) ?? '';
provenanceRows.push({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
@ -1319,7 +1856,7 @@ export class IngestBundleRunner {
}
}
for (const resolution of stageIndex.artifactResolutions ?? []) {
const hash = currentHashes.get(resolution.rawPath) ?? 'unknown';
const hash = currentHashes.get(resolution.rawPath) ?? '';
provenanceRows.push({
connectionId: job.connectionId,
sourceKey: job.sourceKey,
@ -1351,6 +1888,11 @@ export class IngestBundleRunner {
actionType: 'skipped',
});
}
validateProvenanceRawPaths({
rows: provenanceRows,
currentRawPaths: new Set(currentHashes.keys()),
deletedRawPaths: new Set(eviction?.deletedRawPaths ?? []),
});
await this.deps.provenance.insertMany(provenanceRows);
memoryFlow?.emit({ type: 'provenance_recorded', rowCount: provenanceRows.length });
await stage5?.updateProgress(
@ -1399,6 +1941,8 @@ export class IngestBundleRunner {
diffSummary,
fetch: fetchReport ?? undefined,
commitSha,
tracePath: runTrace.tracePath,
isolatedDiff: isolatedDiffEnabled ? isolatedDiffSummary : undefined,
workUnits: stageIndex.workUnits.map((wu) => ({
unitKey: wu.unitKey,
rawFiles: wu.rawFiles,
@ -1514,6 +2058,12 @@ export class IngestBundleRunner {
});
}
await stage7?.updateProgress(1.0, 'Done');
await runTrace.event('info', 'run', 'ingest_finished', {
status: 'completed',
commitSha,
failedWorkUnits,
tracePath: runTrace.tracePath,
});
cleanupOutcome = 'success';
return {
@ -1528,5 +2078,9 @@ export class IngestBundleRunner {
} finally {
await this.deps.sessionWorktreeService.cleanup(sessionWorktree, cleanupOutcome);
}
} catch (error) {
await trace.event('error', 'run', 'ingest_failed', { tracePath: trace.tracePath }, error);
throw error;
}
}
}

View file

@ -81,8 +81,15 @@ describe('runIsolatedWorkUnit', () => {
expect(sessionWorktreeService.create).toHaveBeenCalledWith('job-1-wu-1', baseSha);
expect(sessionWorktreeService.cleanup).toHaveBeenCalledWith(expect.any(Object), 'success');
expect(result.status).toBe('success');
expect(result.patchPath).toContain('0000-wu-1.patch');
await expect(readFile(result.patchPath, 'utf-8')).resolves.toContain('wiki/global/a.md');
if (result.status !== 'success') {
throw new Error('expected successful work unit');
}
const patchPath = result.patchPath;
if (!patchPath) {
throw new Error('expected patch path');
}
expect(patchPath).toContain('0000-wu-1.patch');
await expect(readFile(patchPath, 'utf-8')).resolves.toContain('wiki/global/a.md');
await expect(readFile(tracePath, 'utf-8')).resolves.toContain('work_unit_child_created');
});
});

View file

@ -96,6 +96,25 @@ export interface ClusterWorkUnitsContext {
embedding: KtxEmbeddingPort;
}
export interface DeterministicProjectionContext {
connectionId: string;
sourceKey: string;
syncId: string;
jobId: string;
runId: string;
stagedDir: string;
workdir: string;
parseArtifacts?: unknown;
}
export interface ProjectionResult {
warnings: string[];
errors: string[];
touchedSources: Array<{ connectionId: string; sourceName: string }>;
changedWikiPageKeys: string[];
result?: unknown;
}
export interface SourceAdapter {
readonly source: string;
readonly skillNames: string[];
@ -109,6 +128,7 @@ export interface SourceAdapter {
listTargetConnectionIds?(stagedDir: string): Promise<string[]>;
chunk(stagedDir: string, diffSet?: DiffSet): Promise<ChunkResult>;
clusterWorkUnits?(ctx: ClusterWorkUnitsContext): Promise<WorkUnit[]>;
project?(ctx: DeterministicProjectionContext): Promise<ProjectionResult>;
describeScope?(stagedDir: string): Promise<ScopeDescriptor>;
onPullSucceeded?(ctx: {
connectionId: string;