refactor(ingest): remove shared worktree WorkUnit path

This commit is contained in:
Andrey Avtomonov 2026-05-18 03:04:23 +02:00
parent 6f8f9d5568
commit 9c058ba586
8 changed files with 169 additions and 475 deletions

View file

@ -6,7 +6,6 @@ import { GitService, SessionWorktreeService } from '../core/index.js';
import { LocalGitFileStore } from '../project/local-git-file-store.js';
import { addTouchedSlSource } from '../tools/index.js';
import { IngestBundleRunner } from './ingest-bundle.runner.js';
import { defaultSharedWorktreeSourceKeys } from './isolated-diff/source-routing.js';
import type { IngestBundleRunnerDeps } from './ports.js';
async function makeRealGitRuntime() {
@ -90,6 +89,14 @@ function frontmatterList(yaml: string, key: string): string[] {
);
}
function legacyFallbackSettingKey(): string {
return ['sharedWorktree', 'SourceKeys'].join('');
}
function legacySharedTraceEvent(): string {
return ['shared', 'worktree', 'path', 'enabled'].join('_');
}
function makeWikiService(root: string) {
return {
listPageKeys: vi.fn(async (scope: string) => (scope === 'GLOBAL' ? listGlobalWikiPageKeys(root) : [])),
@ -170,7 +177,6 @@ function makeDeps(
settings: {
memoryIngestionModel: 'test',
probeRowCount: 1,
sharedWorktreeSourceKeys: defaultSharedWorktreeSourceKeys(),
ingestTraceLevel: 'trace',
...settings,
},
@ -286,7 +292,7 @@ describe('IngestBundleRunner isolated diff path', () => {
);
expect(trace).toContain('isolated_diff_enabled');
expect(trace).toContain('work_unit_child_created');
expect(trace).not.toContain('shared_worktree_path_enabled');
expect(trace).not.toContain(legacySharedTraceEvent());
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
@ -299,13 +305,14 @@ describe('IngestBundleRunner isolated diff path', () => {
}
});
it('keeps the shared-worktree path reachable through explicit private fallback settings', async () => {
it('does not support shared-worktree fallback settings', async () => {
const runtime = await makeRealGitRuntime();
try {
const sourceKey = 'legacy-source';
const { deps, adapter } = makeDeps(runtime, sourceKey, {
sharedWorktreeSourceKeys: ['legacy-source'],
});
const staleSettings = {
[legacyFallbackSettingKey()]: ['legacy-source'],
} as Partial<IngestBundleRunnerDeps['settings']> & Record<string, unknown>;
const { deps, adapter } = makeDeps(runtime, sourceKey, staleSettings);
adapter.chunk.mockResolvedValue({
workUnits: [
{
@ -329,20 +336,20 @@ describe('IngestBundleRunner isolated diff path', () => {
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/legacy-shared.md'),
'---\nsummary: Legacy shared write\nusage_mode: auto\n---\n\nLegacy shared write.\n',
join(root, 'wiki/global/legacy-isolated.md'),
'---\nsummary: Legacy isolated write\nusage_mode: auto\n---\n\nLegacy isolated write.\n',
'utf-8',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'legacy-shared',
detail: 'Legacy shared write',
key: 'legacy-isolated',
detail: 'Legacy isolated write',
rawPaths: ['legacy/page.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/legacy-shared.md'],
'legacy wiki',
['wiki/global/legacy-isolated.md'],
'legacy isolated wiki',
'KTX Test',
'system@ktx.local',
);
@ -354,35 +361,156 @@ describe('IngestBundleRunner isolated diff path', () => {
await expect(
runner.run({
jobId: 'job-legacy-shared',
jobId: 'job-legacy-isolated',
connectionId: 'warehouse',
sourceKey,
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).resolves.toMatchObject({
jobId: 'job-legacy-shared',
jobId: 'job-legacy-isolated',
failedWorkUnits: [],
workUnitCount: 1,
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-legacy-shared/trace.jsonl'),
join(runtime.configDir, '.ktx/ingest-traces/job-legacy-isolated/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('shared_worktree_path_enabled');
expect(trace).not.toContain('work_unit_child_created');
expect(trace).toContain('isolated_diff_enabled');
expect(trace).toContain('work_unit_child_created');
expect(trace).not.toContain(legacySharedTraceEvent());
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
expect(reportBody?.isolatedDiff).toMatchObject({
enabled: false,
enabled: true,
acceptedPatches: 1,
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('does not integrate failed isolated WorkUnit patches', async () => {
const runtime = await makeRealGitRuntime();
try {
const { deps, adapter } = makeDeps(runtime, 'fake');
adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'wu-good', rawFiles: ['good.raw'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'wu-bad', rawFiles: ['bad.raw'], peerFileIndex: [], dependencyPaths: [] },
],
});
deps.diffSetService.compute = vi.fn().mockResolvedValue({
added: ['good.raw', 'bad.raw'],
modified: [],
deleted: [],
unchanged: [],
});
deps.slValidator.validateSingleSource = vi.fn(
async (_validationDeps: unknown, _connectionId: string, sourceName: string) => ({
errors: sourceName === 'bad' ? [{ message: 'bad source rejected' }] : [],
warnings: [],
}),
) as never;
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
return { stopReason: 'natural' };
}
const unitKey = params.telemetryTags.unitKey;
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'semantic-layer/warehouse'), { recursive: true });
if (unitKey === 'wu-good') {
await writeFile(join(root, 'semantic-layer/warehouse/good.yaml'), 'name: good\n', 'utf-8');
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'good');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'good',
detail: 'good source',
targetConnectionId: 'warehouse',
rawPaths: ['good.raw'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/good.yaml'],
'test: add good source',
'KTX Test',
'system@ktx.local',
);
}
if (unitKey === 'wu-bad') {
await writeFile(join(root, 'semantic-layer/warehouse/bad.yaml'), 'name: bad\n', 'utf-8');
addTouchedSlSource(currentSession.touchedSlSources, 'warehouse', 'bad');
currentSession.actions.push({
target: 'sl',
type: 'created',
key: 'bad',
detail: 'bad source',
targetConnectionId: 'warehouse',
rawPaths: ['bad.raw'],
});
await currentSession.gitService.commitFiles(
['semantic-layer/warehouse/bad.yaml'],
'test: add bad source',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(
runner,
runtime,
[
['good.raw', 'good-hash'],
['bad.raw', 'bad-hash'],
],
'fake',
);
const result = await runner.run({
jobId: 'job-failed-wu-isolated',
connectionId: 'warehouse',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
});
expect(result.failedWorkUnits).toEqual(['wu-bad']);
await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/good.yaml'), 'utf-8')).resolves.toContain(
'good',
);
await expect(readFile(join(runtime.configDir, 'semantic-layer/warehouse/bad.yaml'), 'utf-8')).rejects.toThrow();
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as {
isolatedDiff?: { acceptedPatches?: number };
failedWorkUnits?: string[];
};
expect(reportBody.failedWorkUnits).toEqual(['wu-bad']);
expect(reportBody.isolatedDiff).toMatchObject({ enabled: true, acceptedPatches: 1 });
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-failed-wu-isolated/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('work_unit_failed_before_patch');
expect(trace).toContain('patch_accepted');
expect(trace).not.toContain(legacySharedTraceEvent());
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it.each(['notion', 'lookml', 'looker', 'dbt', 'metricflow'] as const)(
'routes %s direct writes through isolated child worktrees',
async (sourceKey) => {
@ -464,7 +592,7 @@ describe('IngestBundleRunner isolated diff path', () => {
expect(trace).toContain('work_unit_child_created');
expect(trace).toContain('work_unit_patch_collected');
expect(trace).toContain('patch_apply_started');
expect(trace).not.toContain('shared_worktree_path_enabled');
expect(trace).not.toContain(legacySharedTraceEvent());
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;

View file

@ -1,8 +1,7 @@
import { mkdir, mkdtemp, readFile, rm, stat, writeFile } from 'node:fs/promises';
import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { GitService } from '../core/index.js';
import { addTouchedSlSource } from '../tools/index.js';
import { IngestBundleRunner } from './ingest-bundle.runner.js';
import { createMemoryFlowLiveBuffer } from './memory-flow/live-buffer.js';
@ -123,9 +122,15 @@ const makeDeps = () => {
};
const scopedGit = {
revParseHead: vi.fn().mockResolvedValue('h'),
commitFiles: vi.fn(),
commitFiles: vi.fn().mockResolvedValue({ created: true, commitHash: 'h' }),
commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'h' }),
resetHardTo: vi.fn(),
assertWorktreeClean: vi.fn().mockResolvedValue(undefined),
writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => {
await writeFile(patchPath, '', 'utf-8');
}),
applyPatchFile3WayIndex: vi.fn(),
diffNameStatus: vi.fn().mockResolvedValue([]),
};
const sessionWorktreeService = {
create: vi.fn().mockResolvedValue({
@ -263,7 +268,6 @@ const buildRunner = (deps: ReturnType<typeof makeDeps> = makeDeps(), overrides:
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
sharedWorktreeSourceKeys: ['fake', 'notion', 'looker', 'metricflow', 'historic-sql'],
},
skillsRegistry: deps.skillsRegistry as any,
promptService: deps.promptService as any,
@ -1981,9 +1985,15 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
const assertError = new Error('Worktree has in-progress git operation (sequencer ...); refusing to proceed');
const sessionGit = {
revParseHead: vi.fn().mockResolvedValue('h'),
commitFiles: vi.fn(),
commitFiles: vi.fn().mockResolvedValue({ created: true, commitHash: 'h' }),
commitStaged: vi.fn().mockResolvedValue({ created: false, commitHash: 'h' }),
resetHardTo: vi.fn(),
assertWorktreeClean: vi.fn().mockRejectedValue(assertError),
writeBinaryNoRenamePatch: vi.fn(async (_base: string, _head: string, patchPath: string) => {
await writeFile(patchPath, '', 'utf-8');
}),
applyPatchFile3WayIndex: vi.fn(),
diffNameStatus: vi.fn().mockResolvedValue([]),
};
deps.sessionWorktreeService.create.mockResolvedValue({
chatId: 'j1',
@ -2014,135 +2024,6 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
expect(deps.gitService.squashMergeIntoMain).not.toHaveBeenCalled();
});
it('squash-merges only successful WUs into main when one WU fails sl_validate', async () => {
const homeDir = await mkdtemp(join(tmpdir(), 'ingest-rollback-'));
try {
const configDir = join(homeDir, 'config');
const mainGit = new GitService({
storage: { configDir, homeDir },
git: {
userName: 'System User',
userEmail: 'system@example.com',
bootstrapMessage: 'Initialize test config repo',
bootstrapAuthor: 'test-system',
bootstrapAuthorEmail: 'system@example.com',
},
});
await mainGit.onModuleInit();
const baseSha = await mainGit.revParseHead();
if (!baseSha) {
throw new Error('no base sha');
}
const deps = makeDeps();
const sessionDir = join(homeDir, '.worktrees', 'session-j1');
const sessionBranch = 'session/j1';
let currentToolSession: any = null;
deps.gitService = mainGit as any;
deps.sessionWorktreeService.create.mockImplementation(async (_jobId: string, startSha: string) => {
await mkdir(join(homeDir, '.worktrees'), { recursive: true });
await mainGit.addWorktree(sessionDir, sessionBranch, startSha);
return {
chatId: 'j1',
workdir: sessionDir,
branch: sessionBranch,
baseSha: startSha,
createdAt: new Date(),
git: mainGit.forWorktree(sessionDir),
config: {},
};
});
deps.sessionWorktreeService.cleanup.mockResolvedValue(undefined);
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'wu-good', rawFiles: ['good.raw'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'wu-bad', rawFiles: ['bad.raw'], peerFileIndex: [], dependencyPaths: [] },
],
});
deps.toolsetFactory.createIngestWuToolset.mockImplementation((toolSession: any) => {
currentToolSession = toolSession;
return {
toRuntimeTools: vi.fn().mockReturnValue({}),
getAllTools: vi.fn().mockReturnValue([]),
getToolNames: vi.fn().mockReturnValue([]),
};
});
deps.slValidator.validateSingleSource.mockImplementation(
(_validationDeps: unknown, _connectionId: string, sourceName: string) => ({
errors: sourceName === 'bad' ? [{ message: 'bad source rejected' }] : [],
warnings: [],
}),
);
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
const unitKey = params.telemetryTags?.unitKey;
if (unitKey === 'wu-good') {
await mkdir(join(sessionDir, 'semantic-layer', 'c1'), { recursive: true });
await writeFile(join(sessionDir, 'semantic-layer', 'c1', 'good.yaml'), 'name: good\n');
addTouchedSlSource(currentToolSession.touchedSlSources, 'c1', 'good');
currentToolSession.actions.push({ target: 'sl', type: 'created', key: 'good', detail: '' });
await currentToolSession.gitService.commitFiles(
['semantic-layer/c1/good.yaml'],
'test: add good source',
'KTX Test',
'system@ktx.local',
);
}
if (unitKey === 'wu-bad') {
await mkdir(join(sessionDir, 'semantic-layer', 'c1'), { recursive: true });
await writeFile(join(sessionDir, 'semantic-layer', 'c1', 'bad.yaml'), 'name: bad\n');
addTouchedSlSource(currentToolSession.touchedSlSources, 'c1', 'bad');
currentToolSession.actions.push({ target: 'sl', type: 'created', key: 'bad', detail: '' });
await currentToolSession.gitService.commitFiles(
['semantic-layer/c1/bad.yaml'],
'test: add bad source',
'KTX Test',
'system@ktx.local',
);
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockImplementation(async ({ worktreeRoot }: any) => {
const rawDir = join(worktreeRoot, 'raw-sources', 'c1', 'fake', 's');
await mkdir(rawDir, { recursive: true });
await writeFile(join(rawDir, 'good.raw'), 'good raw');
await writeFile(join(rawDir, 'bad.raw'), 'bad raw');
return {
currentHashes: new Map([
['good.raw', 'good-hash'],
['bad.raw', 'bad-hash'],
]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
};
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
const result = await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(result.failedWorkUnits).toEqual(['wu-bad']);
expect(await readFile(join(configDir, 'semantic-layer', 'c1', 'good.yaml'), 'utf-8')).toContain('good');
expect(await readFile(join(configDir, 'semantic-layer', 'c1', 'bad.yaml'), 'utf-8').catch(() => null)).toBeNull();
expect(deps.reportsRepo.create).toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
failedWorkUnits: ['wu-bad'],
}),
}),
);
await expect(stat(join(configDir, '.git', 'sequencer'))).rejects.toThrow();
} finally {
await rm(homeDir, { recursive: true, force: true });
}
});
it('fails the run and rethrows when the adapter cannot detect the bundle', async () => {
const deps = makeDeps();
deps.adapter.detect.mockResolvedValue(false);

View file

@ -435,24 +435,6 @@ export class IngestBundleRunner {
};
}
private buildFailedWorkUnitOutcome(wu: WorkUnit, error: unknown): WorkUnitOutcome {
return {
unitKey: wu.unitKey,
status: 'failed',
reason: error instanceof Error ? error.message : String(error),
preSha: '',
postSha: '',
actions: [],
touchedSlSources: [],
slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason,
};
}
private formatWorkUnitFailure(outcome: WorkUnitOutcome): string {
return `WorkUnit ${outcome.unitKey} failed: ${outcome.reason ?? 'unknown failure'}`;
}
private filterWorkUnitsForTriage(
workUnits: WorkUnit[],
triageResult: { enabled: boolean; fullRawPaths: Set<string> } | null,
@ -463,10 +445,6 @@ export class IngestBundleRunner {
return workUnits.filter((wu) => wu.rawFiles.some((rawPath) => triageResult.fullRawPaths.has(rawPath)));
}
private isSharedWorktreeFallbackEnabled(sourceKey: string): boolean {
return (this.deps.settings.sharedWorktreeSourceKeys ?? []).includes(sourceKey);
}
private createTrace(job: IngestBundleJob): IngestTraceWriter {
const storage = this.deps.storage as typeof this.deps.storage & { resolveTracePath?: (jobId: string) => string };
return new FileIngestTraceWriter({
@ -1313,7 +1291,7 @@ export class IngestBundleRunner {
workUnitCount: memoryFlowPlannedWorkUnits.length,
evictionCount: eviction?.deletedRawPaths.length ?? 0,
});
const isolatedDiffEnabled = !overrideReport && !this.isSharedWorktreeFallbackEnabled(job.sourceKey);
const isolatedDiffEnabled = !overrideReport;
const isolatedDiffSummary = {
enabled: isolatedDiffEnabled,
integrationWorktreePath: isolatedDiffEnabled ? sessionWorktree.workdir : undefined,
@ -1339,7 +1317,7 @@ export class IngestBundleRunner {
let projectionChangedWikiPageKeys: string[] = [];
let projectionTouchedPaths: string[] = [];
if (!overrideReport && isolatedDiffEnabled) {
if (!overrideReport) {
await runTrace.event('info', 'routing', 'isolated_diff_enabled', {
sourceKey: job.sourceKey,
workUnitCount: workUnits.length,
@ -1674,260 +1652,6 @@ export class IngestBundleRunner {
);
}
} else if (!overrideReport) {
await runTrace.event('info', 'routing', 'shared_worktree_path_enabled', {
sourceKey: job.sourceKey,
reason: 'explicit_private_fallback',
});
const workUnitSettings = {
maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1,
stepBudget: this.deps.settings.workUnitStepBudget ?? 40,
failureMode: this.deps.settings.workUnitFailureMode ?? 'continue',
};
const limitWorkUnit = pLimit(workUnitSettings.maxConcurrency);
const workUnitOutcomesByIndex: WorkUnitOutcome[] = [];
let completedWorkUnits = 0;
let abortRequested = false;
const runSingleWorkUnit = async (wu: WorkUnit): Promise<WorkUnitOutcome> => {
const session: CaptureSession = {
userId: 'system',
chatId: wu.unitKey,
userMessage: `ingest(${job.sourceKey}) WU=${wu.unitKey}`,
connectionId: job.connectionId,
userScopedEnabled: false,
forceGlobalScope: true,
touchedSlSources: createTouchedSlSources(),
preHead: sessionWorktree.baseSha,
};
const sessionActions: MemoryAction[] = [];
const scopedWikiService = this.deps.wikiService.forWorktree(sessionWorktree.workdir);
const scopedSemanticLayerService = this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir);
const toolSession: ToolSession = {
connectionId: job.connectionId,
isWorktreeScoped: true,
preHead: sessionWorktree.baseSha,
touchedSlSources: session.touchedSlSources,
actions: sessionActions,
allowedRawPaths: new Set(wu.rawFiles),
allowedConnectionNames: new Set(slConnectionIds),
semanticLayerService: scopedSemanticLayerService,
wikiService: scopedWikiService,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
ingest: ingestToolMetadata,
};
const slValidationDeps: SlValidationDeps = {
semanticLayerService: scopedSemanticLayerService,
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
};
const wuToolset = this.deps.toolsetFactory.createIngestWuToolset(toolSession, {
includeContextEvidenceTools: adapter.evidenceIndexing === 'documents' && !!contextReport,
});
const wuToolContext: ToolContext = {
sourceId: 'ingest',
messageId: `${job.jobId}-wu-${wu.unitKey}`,
userId: 'system',
connectionId: job.connectionId,
ingest: ingestToolMetadata,
session: toolSession,
};
const skillsLoadedPerWu: string[] = [];
const loadSkillTool: KtxRuntimeToolSet = {
load_skill: {
name: 'load_skill',
description:
'Load a skill to get specialized instructions. Call this when a skill listed in the system prompt matches the current task.',
inputSchema: z.object({ name: z.string() }),
execute: async ({ name }) => {
const skill = await this.deps.skillsRegistry.getSkill(name, 'memory_agent');
if (!skill) {
const available =
(await this.deps.skillsRegistry.listSkills('memory_agent')).map((s) => s.name).join(', ') ||
'(none)';
return { markdown: `Skill "${name}" not available. Available: ${available}` };
}
const body = await readFile(join(skill.path, 'SKILL.md'), 'utf-8');
if (!skillsLoadedPerWu.includes(skill.name)) {
skillsLoadedPerWu.push(skill.name);
}
const structured = {
name: skill.name,
skillDirectory: skill.path,
content: this.deps.skillsRegistry.stripFrontmatter(body),
};
return {
markdown: `# ${structured.name}\n\n${structured.content}`,
structured,
};
},
},
};
const priorProvenance = await this.deps.provenance.findLatestArtifactsForRawPaths(
job.connectionId,
job.sourceKey,
wu.rawFiles,
);
const wuEmitUnmappedFallbackTool = {
emit_unmapped_fallback: createRuntimeToolDescriptorFromAiTool(
'emit_unmapped_fallback',
createEmitUnmappedFallbackTool({
stageIndex,
allowedPaths: new Set(wu.rawFiles),
tableRefExists: (tableRef) =>
this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef),
}),
),
};
const systemPrompt = buildWuSystemPrompt({
baseFraming,
skillsPrompt,
syncId,
sourceKey: job.sourceKey,
canonicalPins,
});
memoryFlow?.emit({
type: 'work_unit_started',
unitKey: wu.unitKey,
skills: wuSkillNames,
stepBudget: workUnitSettings.stepBudget,
});
return executeWorkUnit(
{
sessionWorktreeGit: sessionWorktree.git,
agentRunner: this.deps.agentRunner,
validateTouchedSources: (touched) =>
validateWuTouchedSources({ ...slValidationDeps, slValidator: this.deps.slValidator }, touched),
validateWikiRefs: (actions) =>
findDanglingWikiRefsForActions({
wikiService: scopedWikiService,
scope: 'GLOBAL',
scopeId: null,
actions,
}),
resetHardTo: (targetSha) => sessionWorktree.git.resetHardTo(targetSha),
buildSystemPrompt: () => systemPrompt,
buildUserPrompt: (wuInner) => buildWuUserPrompt({ wu: wuInner, wikiIndex, slIndex, priorProvenance }),
buildToolSet: (wuInner) =>
wrapToolsWithLogger(
buildWuToolSet({
sourceKey: job.sourceKey,
stagedDir,
wu: wuInner,
loadSkillTool,
emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool,
toolsetTools: wuToolset.toRuntimeTools(wuToolContext),
}),
join(transcriptDir, `${wuInner.unitKey}.jsonl`),
wuInner.unitKey,
{ onEntry: recordTranscriptEntry(join(transcriptDir, `${wuInner.unitKey}.jsonl`)) },
),
captureSession: session,
sessionActions,
modelRole: 'candidateExtraction',
stepBudget: workUnitSettings.stepBudget,
sourceKey: job.sourceKey,
connectionId: job.connectionId,
jobId: job.jobId,
toolFailureCount: (unitKey) => transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0,
onStepFinish: ({ stepIndex, stepBudget }) => {
memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget });
},
},
wu,
);
};
if (workUnits.length === 0) {
await stage3?.updateProgress(1.0, '0 of 0 work units complete');
}
try {
await Promise.all(
workUnits.map((wu, index) =>
limitWorkUnit(async () => {
if (abortRequested) {
return;
}
let outcome: WorkUnitOutcome;
try {
outcome = await runSingleWorkUnit(wu);
} catch (error) {
outcome = this.buildFailedWorkUnitOutcome(wu, error);
}
workUnitOutcomesByIndex[index] = outcome;
for (const action of outcome.actions) {
memoryFlow?.emit({
type: 'candidate_action',
unitKey: outcome.unitKey,
target: action.target,
action: action.type,
key: action.key,
});
}
memoryFlow?.emit({
type: 'work_unit_finished',
unitKey: outcome.unitKey,
status: outcome.status,
...(outcome.reason ? { reason: outcome.reason } : {}),
});
completedWorkUnits += 1;
await stage3?.updateProgress(
completedWorkUnits / workUnits.length,
`${completedWorkUnits} of ${workUnits.length} work units complete`,
);
if (outcome.status === 'failed') {
this.logger.warn(`[ingest-bundle] WU=${outcome.unitKey} failed: ${outcome.reason}`);
if (workUnitSettings.failureMode === 'abort') {
abortRequested = true;
throw new Error(this.formatWorkUnitFailure(outcome));
}
}
}),
),
);
} catch (error) {
await this.deps.runs.markFailed(runRow.id);
throw error;
}
workUnitOutcomes.push(
...workUnitOutcomesByIndex.filter((outcome): outcome is WorkUnitOutcome => Boolean(outcome)),
);
failedWorkUnits.push(
...workUnitOutcomes.filter((outcome) => outcome.status === 'failed').map((outcome) => outcome.unitKey),
);
latestWorkUnits = workUnitOutcomes;
latestFailedWorkUnits = failedWorkUnits;
// Complete the typed Stage Index from the outcomes once, and use it for
// Stage 4, provenance writes (Phase G), and the report body (Phase F3).
stageIndex.workUnits = workUnitOutcomes.map((o) => ({
unitKey: o.unitKey,
rawFiles: workUnits.find((w) => w.unitKey === o.unitKey)?.rawFiles ?? [],
status: o.status,
reason: o.reason,
actions: o.actions,
touchedSlSources: o.touchedSlSources,
slDisallowed: o.slDisallowed,
slDisallowedReason: o.slDisallowedReason,
}));
latestReportWorkUnits = this.toReportWorkUnits(stageIndex);
}
const carryForwardResult =
contextReport && this.deps.contextCandidateCarryforward

View file

@ -1,22 +0,0 @@
import { describe, expect, it } from 'vitest';
import { defaultSharedWorktreeSourceKeys, isSharedWorktreeFallbackSourceKey } from './source-routing.js';
describe('isolated-diff source routing', () => {
it('defaults every non-override source to isolated diffs', () => {
expect(defaultSharedWorktreeSourceKeys()).toEqual([]);
});
it('returns a mutable copy for runtime settings', () => {
const keys = defaultSharedWorktreeSourceKeys();
keys.push('legacy-source');
expect(defaultSharedWorktreeSourceKeys()).toEqual([]);
});
it('recognizes only explicitly configured shared-worktree fallback sources', () => {
expect(isSharedWorktreeFallbackSourceKey('notion', [])).toBe(false);
expect(isSharedWorktreeFallbackSourceKey('metricflow', [])).toBe(false);
expect(isSharedWorktreeFallbackSourceKey('legacy-source', ['legacy-source'])).toBe(true);
expect(isSharedWorktreeFallbackSourceKey('other-source', ['legacy-source'])).toBe(false);
});
});

View file

@ -1,12 +0,0 @@
const DEFAULT_SHARED_WORKTREE_SOURCE_KEYS: readonly string[] = [];
export function defaultSharedWorktreeSourceKeys(): string[] {
return [...DEFAULT_SHARED_WORKTREE_SOURCE_KEYS];
}
export function isSharedWorktreeFallbackSourceKey(
sourceKey: string,
sharedWorktreeSourceKeys: readonly string[] = DEFAULT_SHARED_WORKTREE_SOURCE_KEYS,
): boolean {
return sharedWorktreeSourceKeys.includes(sourceKey);
}

View file

@ -31,9 +31,7 @@ type RuntimeWithSlValidationDeps = {
type RuntimeWithSettingsDeps = {
deps: {
settings: Record<string, unknown> & {
sharedWorktreeSourceKeys?: string[];
};
settings: Record<string, unknown>;
};
};
@ -266,7 +264,7 @@ describe('createLocalBundleIngestRuntime', () => {
});
});
it('defaults local bundle ingest to isolated diffs without an allowlist', () => {
it('defaults local bundle ingest to isolated diffs without a shared-worktree fallback setting', () => {
const runtime = createLocalBundleIngestRuntime({
project,
adapters: [new FakeSourceAdapter()],
@ -274,13 +272,13 @@ describe('createLocalBundleIngestRuntime', () => {
});
const settings = (runtime.runner as unknown as RuntimeWithSettingsDeps).deps.settings;
const fallbackSettingKey = ['sharedWorktree', 'SourceKeys'].join('');
expect(settings.sharedWorktreeSourceKeys).toEqual([]);
expect(settings).not.toHaveProperty(fallbackSettingKey);
expect(Object.keys(settings).sort()).toEqual([
'ingestTraceLevel',
'memoryIngestionModel',
'probeRowCount',
'sharedWorktreeSourceKeys',
'workUnitFailureMode',
'workUnitMaxConcurrency',
'workUnitStepBudget',

View file

@ -77,7 +77,6 @@ import { ContextEvidenceIndexService, SqliteContextEvidenceStore } from './conte
import { DiffSetService } from './diff-set.service.js';
import { ingestTracePathForJob, type IngestTraceLevel } from './ingest-trace.js';
import { IngestBundleRunner } from './ingest-bundle.runner.js';
import { defaultSharedWorktreeSourceKeys } from './isolated-diff/source-routing.js';
import { PageTriageService } from './page-triage/index.js';
import { createWarehouseVerificationTools } from './tools/warehouse-verification/index.js';
import type {
@ -723,7 +722,6 @@ export function createLocalBundleIngestRuntime(
workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency,
workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget,
workUnitFailureMode: options.project.config.ingest.workUnits.failureMode,
sharedWorktreeSourceKeys: defaultSharedWorktreeSourceKeys(),
ingestTraceLevel: ingestTraceLevelFromEnv(),
},
skillsRegistry: new SkillsRegistryService({ skillsDir, logger }),

View file

@ -143,7 +143,6 @@ export interface IngestSettingsPort {
workUnitMaxConcurrency?: number;
workUnitStepBudget?: number;
workUnitFailureMode?: 'abort' | 'continue';
sharedWorktreeSourceKeys?: string[];
ingestTraceLevel?: IngestTraceLevel;
}