feat(ingest): promote isolated diff to default runner path

This commit is contained in:
Andrey Avtomonov 2026-05-18 02:48:37 +02:00
parent 8cf63b2248
commit adb5c4488e
2 changed files with 184 additions and 8 deletions

View file

@ -6,7 +6,7 @@ import { GitService, SessionWorktreeService } from '../core/index.js';
import { LocalGitFileStore } from '../project/local-git-file-store.js';
import { addTouchedSlSource } from '../tools/index.js';
import { IngestBundleRunner } from './ingest-bundle.runner.js';
import { defaultIsolatedDiffSourceKeys } from './isolated-diff/source-routing.js';
import { defaultSharedWorktreeSourceKeys } from './isolated-diff/source-routing.js';
import type { IngestBundleRunnerDeps } from './ports.js';
async function makeRealGitRuntime() {
@ -115,7 +115,11 @@ function makeWikiService(root: string) {
};
}
function makeDeps(runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>, sourceKey = 'metabase') {
function makeDeps(
runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>,
sourceKey = 'metabase',
settings: Partial<IngestBundleRunnerDeps['settings']> = {},
) {
const adapter: any = {
source: sourceKey,
skillNames: [],
@ -166,8 +170,9 @@ function makeDeps(runtime: Awaited<ReturnType<typeof makeRealGitRuntime>>, sourc
settings: {
memoryIngestionModel: 'test',
probeRowCount: 1,
isolatedDiffSourceKeys: defaultIsolatedDiffSourceKeys(),
sharedWorktreeSourceKeys: defaultSharedWorktreeSourceKeys(),
ingestTraceLevel: 'trace',
...settings,
},
skillsRegistry: {
listSkills: vi.fn().mockResolvedValue([]),
@ -210,6 +215,174 @@ async function mockStageRawFiles(
}
describe('IngestBundleRunner isolated diff path', () => {
it('routes an unlisted direct-writing source through isolated diffs by default', async () => {
const runtime = await makeRealGitRuntime();
try {
const sourceKey = 'custom-direct-source';
const { deps, adapter } = makeDeps(runtime, sourceKey);
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'custom-wiki',
rawFiles: ['custom/page.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
return { stopReason: 'natural' };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/custom-isolated.md'),
'---\nsummary: Custom isolated write\nusage_mode: auto\n---\n\nCustom isolated write.\n',
'utf-8',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'custom-isolated',
detail: 'Custom isolated write',
rawPaths: ['custom/page.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/custom-isolated.md'],
'custom wiki',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['custom/page.json', 'h1']], sourceKey);
await expect(
runner.run({
jobId: 'job-custom-default',
connectionId: 'warehouse',
sourceKey,
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).resolves.toMatchObject({
jobId: 'job-custom-default',
failedWorkUnits: [],
workUnitCount: 1,
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-custom-default/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('isolated_diff_enabled');
expect(trace).toContain('work_unit_child_created');
expect(trace).not.toContain('shared_worktree_path_enabled');
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
expect(reportBody?.isolatedDiff).toMatchObject({
enabled: true,
acceptedPatches: 1,
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it('keeps the shared-worktree path reachable through explicit private fallback settings', async () => {
const runtime = await makeRealGitRuntime();
try {
const sourceKey = 'legacy-source';
const { deps, adapter } = makeDeps(runtime, sourceKey, {
sharedWorktreeSourceKeys: ['legacy-source'],
});
adapter.chunk.mockResolvedValue({
workUnits: [
{
unitKey: 'legacy-wiki',
rawFiles: ['legacy/page.json'],
peerFileIndex: [],
dependencyPaths: [],
},
],
});
let currentSession: any = null;
deps.toolsetFactory.createIngestWuToolset = vi.fn((toolSession: any) => {
currentSession = toolSession;
return { toRuntimeTools: vi.fn(() => ({})) };
});
deps.agentRunner.runLoop = vi.fn(async (params: any) => {
if (params.telemetryTags.operationName !== 'ingest-bundle-wu') {
return { stopReason: 'natural' };
}
const root = rootOfConfig(currentSession.configService, runtime.configDir);
await mkdir(join(root, 'wiki/global'), { recursive: true });
await writeFile(
join(root, 'wiki/global/legacy-shared.md'),
'---\nsummary: Legacy shared write\nusage_mode: auto\n---\n\nLegacy shared write.\n',
'utf-8',
);
currentSession.actions.push({
target: 'wiki',
type: 'created',
key: 'legacy-shared',
detail: 'Legacy shared write',
rawPaths: ['legacy/page.json'],
});
await currentSession.gitService.commitFiles(
['wiki/global/legacy-shared.md'],
'legacy wiki',
'KTX Test',
'system@ktx.local',
);
return { stopReason: 'natural' };
}) as never;
const runner = new IngestBundleRunner(deps);
await mockStageRawFiles(runner, runtime, [['legacy/page.json', 'h1']], sourceKey);
await expect(
runner.run({
jobId: 'job-legacy-shared',
connectionId: 'warehouse',
sourceKey,
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload' },
}),
).resolves.toMatchObject({
jobId: 'job-legacy-shared',
failedWorkUnits: [],
workUnitCount: 1,
});
const trace = await readFile(
join(runtime.configDir, '.ktx/ingest-traces/job-legacy-shared/trace.jsonl'),
'utf-8',
);
expect(trace).toContain('shared_worktree_path_enabled');
expect(trace).not.toContain('work_unit_child_created');
const reportCreate = vi.mocked(deps.reports.create).mock.calls.at(-1)?.[0];
const reportBody = reportCreate?.body as { isolatedDiff?: unknown } | undefined;
expect(reportBody?.isolatedDiff).toMatchObject({
enabled: false,
});
} finally {
await rm(runtime.homeDir, { recursive: true, force: true });
}
});
it.each(['notion', 'lookml', 'looker', 'dbt', 'metricflow'] as const)(
'routes %s direct writes through isolated child worktrees',
async (sourceKey) => {

View file

@ -453,8 +453,8 @@ export class IngestBundleRunner {
return workUnits.filter((wu) => wu.rawFiles.some((rawPath) => triageResult.fullRawPaths.has(rawPath)));
}
private isIsolatedDiffEnabled(sourceKey: string): boolean {
return (this.deps.settings.isolatedDiffSourceKeys ?? []).includes(sourceKey);
private isSharedWorktreeFallbackEnabled(sourceKey: string): boolean {
return (this.deps.settings.sharedWorktreeSourceKeys ?? []).includes(sourceKey);
}
private createTrace(job: IngestBundleJob): IngestTraceWriter {
@ -1303,7 +1303,7 @@ export class IngestBundleRunner {
workUnitCount: memoryFlowPlannedWorkUnits.length,
evictionCount: eviction?.deletedRawPaths.length ?? 0,
});
const isolatedDiffEnabled = !overrideReport && this.isIsolatedDiffEnabled(job.sourceKey);
const isolatedDiffEnabled = !overrideReport && !this.isSharedWorktreeFallbackEnabled(job.sourceKey);
const isolatedDiffSummary = {
enabled: isolatedDiffEnabled,
integrationWorktreePath: isolatedDiffEnabled ? sessionWorktree.workdir : undefined,
@ -1664,7 +1664,10 @@ export class IngestBundleRunner {
}
} else if (!overrideReport) {
await runTrace.event('info', 'routing', 'shared_worktree_path_enabled', { sourceKey: job.sourceKey });
await runTrace.event('info', 'routing', 'shared_worktree_path_enabled', {
sourceKey: job.sourceKey,
reason: 'explicit_private_fallback',
});
const workUnitSettings = {
maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1,
stepBudget: this.deps.settings.workUnitStepBudget ?? 40,
@ -2653,7 +2656,7 @@ export class IngestBundleRunner {
fetch: fetchReport ?? undefined,
commitSha,
tracePath: runTrace.tracePath,
isolatedDiff: isolatedDiffEnabled ? isolatedDiffSummary : undefined,
isolatedDiff: !overrideReport ? isolatedDiffSummary : undefined,
workUnits: stageIndex.workUnits.map((wu) => ({
unitKey: wu.unitKey,
rawFiles: wu.rawFiles,