Stabilize parallel ingest concurrency

This commit is contained in:
Andrey Avtomonov 2026-05-18 15:05:56 +02:00
parent e64da5a85d
commit 1db8a6debd
19 changed files with 1370 additions and 40 deletions

View file

@ -53,6 +53,31 @@ describe('GitService', () => {
expect(after).toBe(before);
});
it('serializes concurrent initialization for the same config directory', async () => {
const concurrentDir = await mkdtemp(join(tmpdir(), 'git-service-concurrent-init-'));
const coreConfig: KtxCoreConfig = {
storage: { configDir: concurrentDir, homeDir: concurrentDir },
git: {
userName: 'Test User',
userEmail: 'test@example.com',
bootstrapMessage: 'Initialize test config repo',
bootstrapAuthor: 'test-system',
bootstrapAuthorEmail: 'system@example.com',
},
};
const first = new GitService(coreConfig);
const second = new GitService(coreConfig);
try {
await expect(Promise.all([first.onModuleInit(), second.onModuleInit()])).resolves.toEqual([undefined, undefined]);
const firstHead = await first.revParseHead();
const secondHead = await second.revParseHead();
expect(firstHead).toBe(secondHead);
} finally {
await rm(concurrentDir, { recursive: true, force: true });
}
});
it('keeps git auto-maintenance attached for deterministic cleanup', async () => {
const config = await readFile(join(tempDir, '.git', 'config'), 'utf-8');

View file

@ -89,7 +89,7 @@ export class GitService {
this.git = createSimpleGit(this.configDir);
// Initialize git repository
await this.initialize();
await this.withMutationQueue(() => this.initialize());
}
private async initialize(): Promise<void> {

View file

@ -16,7 +16,7 @@ import { selectRelevantCanonicalPins } from './canonical-pins.js';
import { finalGateRepairPaths, repairFinalGateFailure } from './final-gate-repair.js';
import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js';
import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js';
import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js';
import { resolveTextualConflict, runTextualConflictResolvers } from './isolated-diff/textual-conflict-resolver.js';
import { runIsolatedWorkUnit } from './isolated-diff/work-unit-executor.js';
import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js';
import type { CanonicalPin } from './canonical-pins.js';
@ -1385,6 +1385,8 @@ export class IngestBundleRunner {
const patchDir = join(this.deps.storage.homeDir, 'ingest-patches', job.jobId);
const workUnitSettings = {
maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1,
resolverConcurrency:
this.deps.settings.workUnitResolverConcurrency ?? this.deps.settings.workUnitMaxConcurrency ?? 1,
stepBudget: this.deps.settings.workUnitStepBudget ?? 40,
failureMode: this.deps.settings.workUnitFailureMode ?? 'continue',
};
@ -1501,6 +1503,19 @@ export class IngestBundleRunner {
(outcome) => outcome?.status === 'success' && !!outcome.patchPath,
).length;
let integratedPatchCount = 0;
const deferredTextualConflicts: Array<{
order: number;
unitKey: string;
patchPath: string;
touchedPaths: string[];
reason: string;
integrationFailureDetails: {
unitKey: string;
patchPath: string;
allowedTargetConnectionIds: string[];
};
}> = [];
const shouldDeferTextualConflicts = workUnitSettings.resolverConcurrency > 1;
for (const [index, outcome] of workUnitOutcomesByIndex.entries()) {
if (!outcome || outcome.status !== 'success' || !outcome.patchPath) {
continue;
@ -1515,6 +1530,34 @@ export class IngestBundleRunner {
allowedTargetConnectionIds: slConnectionIds,
};
activeFailureDetails = integrationFailureDetails;
const validateAppliedTree = async (touchedPaths: string[]) => {
await validateFinalIngestArtifacts({
connectionIds: slConnectionIds,
changedWikiPageKeys: this.wikiPageKeysFromPaths(touchedPaths),
touchedSlSources: this.touchedSlSourcesFromPaths(touchedPaths),
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
};
emitStageProgress(
'integration',
80,
@ -1528,34 +1571,7 @@ export class IngestBundleRunner {
author: this.deps.storage.systemGitAuthor,
slDisallowed: wu.slDisallowed === true,
allowedTargetConnectionIds: new Set(slConnectionIds),
validateAppliedTree: async (touchedPaths) => {
await validateFinalIngestArtifacts({
connectionIds: slConnectionIds,
changedWikiPageKeys: this.wikiPageKeysFromPaths(touchedPaths),
touchedSlSources: this.touchedSlSourcesFromPaths(touchedPaths),
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
validateAppliedTree,
resolveTextualConflict: async (context) => {
emitStageProgress('integration', 81, `Resolving text conflict for ${context.unitKey}`);
const result = await resolveTextualConflict({
@ -1578,6 +1594,7 @@ export class IngestBundleRunner {
);
return result;
},
deferTextualConflictResolution: shouldDeferTextualConflicts,
repairGateFailure: async (context) => {
emitStageProgress('integration', 82, `Repairing semantic gate for ${context.unitKey}`);
const result = await repairFinalGateFailure({
@ -1618,6 +1635,20 @@ export class IngestBundleRunner {
isolatedDiffSummary.gateRepairFailures += 1;
}
}
if (integration.status === 'textual_conflict' && integration.deferredTextualResolution) {
deferredTextualConflicts.push({
order: index,
...integration.deferredTextualResolution,
integrationFailureDetails,
});
activeFailureDetails = undefined;
emitStageProgress(
'integration',
82,
`Deferred text conflict ${deferredTextualConflicts.length} for ${outcome.unitKey}`,
);
continue;
}
if (integration.status === 'textual_conflict') {
isolatedDiffSummary.textualConflicts += 1;
await this.deps.runs.markFailed(runRow.id);
@ -1652,6 +1683,166 @@ export class IngestBundleRunner {
);
}
if (deferredTextualConflicts.length > 0) {
const batches: typeof deferredTextualConflicts[] = [];
for (const conflict of deferredTextualConflicts.sort((left, right) => left.order - right.order)) {
const conflictPaths = new Set(conflict.touchedPaths);
let placed = false;
for (const batch of batches) {
const overlaps = batch.some((existing) => existing.touchedPaths.some((path) => conflictPaths.has(path)));
if (!overlaps) {
batch.push(conflict);
placed = true;
break;
}
}
if (!placed) {
batches.push([conflict]);
}
}
for (const batch of batches) {
const resolutions = await runTextualConflictResolvers({
maxConcurrency: workUnitSettings.resolverConcurrency,
conflicts: batch,
resolve: async (conflict) => {
emitStageProgress('integration', 81, `Resolving text conflict for ${conflict.unitKey}`);
const result = await resolveTextualConflict({
agentRunner: this.deps.agentRunner,
workdir: sessionWorktree.workdir,
unitKey: conflict.unitKey,
patchPath: conflict.patchPath,
touchedPaths: conflict.touchedPaths,
trace: runTrace,
reason: conflict.reason,
maxAttempts: 1,
stepBudget: 12,
});
emitStageProgress(
'integration',
82,
result.status === 'repaired'
? `Resolved text conflict for ${conflict.unitKey}`
: `Text conflict resolver failed for ${conflict.unitKey}`,
);
return result;
},
});
for (const [resolutionIndex, resolution] of resolutions.entries()) {
const conflict = batch[resolutionIndex];
if (!conflict) {
continue;
}
isolatedDiffSummary.resolverAttempts += resolution.attempts;
if (resolution.status === 'failed') {
isolatedDiffSummary.textualConflicts += 1;
isolatedDiffSummary.resolverFailures += 1;
await this.deps.runs.markFailed(runRow.id);
cleanupOutcome = 'conflict';
activeFailureDetails = {
...conflict.integrationFailureDetails,
touchedPaths: conflict.touchedPaths,
reason: resolution.reason,
};
throw new Error(`isolated diff textual conflict in ${conflict.unitKey}: ${resolution.reason}`);
}
isolatedDiffSummary.textualConflicts += 1;
isolatedDiffSummary.resolverRepairs += 1;
activeFailureDetails = {
...conflict.integrationFailureDetails,
touchedPaths: resolution.changedPaths,
reason: conflict.reason,
};
try {
await traceTimed(
runTrace,
'integration',
'semantic_gate_after_textual_resolution',
{ unitKey: conflict.unitKey, touchedPaths: resolution.changedPaths },
async () => {
await validateFinalIngestArtifacts({
connectionIds: slConnectionIds,
changedWikiPageKeys: this.wikiPageKeysFromPaths(resolution.changedPaths),
touchedSlSources: this.touchedSlSourcesFromPaths(resolution.changedPaths),
wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir),
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
validateTouchedSources: (touched) =>
validateWuTouchedSources(
{
semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
connections: this.deps.connections,
configService: sessionWorktree.config,
gitService: sessionWorktree.git,
slSourcesRepository: this.deps.slSourcesRepository,
probeRowCount: this.deps.settings.probeRowCount,
slValidator: this.deps.slValidator,
},
touched,
),
tableExists: (connectionId, tableRef) =>
this.tableRefExistsInSemanticLayer(
this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir),
[connectionId],
tableRef,
),
});
},
);
} catch (semanticError) {
isolatedDiffSummary.semanticConflicts += 1;
await this.deps.runs.markFailed(runRow.id);
cleanupOutcome = 'conflict';
activeFailureDetails = {
...conflict.integrationFailureDetails,
touchedPaths: resolution.changedPaths,
reason: semanticError instanceof Error ? semanticError.message : String(semanticError),
};
throw new Error(
`isolated diff semantic conflict in ${conflict.unitKey}: ${
semanticError instanceof Error ? semanticError.message : String(semanticError)
}`,
);
}
const commit = await sessionWorktree.git.commitFiles(
resolution.changedPaths,
`ingest: resolve WorkUnit ${conflict.unitKey} conflict`,
this.deps.storage.systemGitAuthor.name,
this.deps.storage.systemGitAuthor.email,
);
if (!commit.created) {
isolatedDiffSummary.resolverFailures += 1;
await this.deps.runs.markFailed(runRow.id);
cleanupOutcome = 'conflict';
activeFailureDetails = {
...conflict.integrationFailureDetails,
touchedPaths: resolution.changedPaths,
reason: 'textual resolver produced no committable changes',
};
throw new Error(`isolated diff textual conflict in ${conflict.unitKey}: textual resolver produced no committable changes`);
}
await runTrace.event('debug', 'integration', 'patch_accepted_after_textual_resolution', {
unitKey: conflict.unitKey,
commitSha: commit.commitHash,
touchedPaths: resolution.changedPaths,
attempts: resolution.attempts,
});
activeFailureDetails = undefined;
if (resolution.changedPaths.length > 0) {
isolatedDiffSummary.acceptedPatches += 1;
integratedPatchCount += 1;
}
emitStageProgress(
'integration',
83,
`Integrated ${integratedPatchCount}/${integrablePatchCount} patches`,
);
}
}
}
}
const carryForwardResult =
contextReport && this.deps.contextCandidateCarryforward

View file

@ -244,6 +244,66 @@ describe('integrateWorkUnitPatch', () => {
expect(await git.revParseHead()).not.toBe(baseSha);
});
it('can defer textual conflict resolution without invoking the resolver inline', async () => {
const { homeDir, configDir, git } = await makeRepo();
await mkdir(join(configDir, 'wiki/global'), { recursive: true });
await writeFile(join(configDir, 'wiki/global/a.md'), 'base\n', 'utf-8');
await git.commitFiles(['wiki/global/a.md'], 'base page', 'System User', 'system@example.com');
const conflictBase = await git.revParseHead();
await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\n', 'utf-8');
await git.commitFiles(['wiki/global/a.md'], 'accepted edit', 'System User', 'system@example.com');
const acceptedHead = await git.revParseHead();
const childDir = join(homeDir, 'child-conflict-deferred');
await git.addWorktree(childDir, 'child-conflict-deferred', conflictBase);
const childGit = git.forWorktree(childDir);
await writeFile(join(childDir, 'wiki/global/a.md'), 'proposal\n', 'utf-8');
await childGit.commitFiles(['wiki/global/a.md'], 'proposal edit', 'System User', 'system@example.com');
const patchPath = join(homeDir, 'proposal-deferred.patch');
await childGit.writeBinaryNoRenamePatch(conflictBase, 'HEAD', patchPath);
const trace = new FileIngestTraceWriter({
tracePath: join(homeDir, '.ktx/ingest-traces/job-resolver-deferred/trace.jsonl'),
jobId: 'job-resolver-deferred',
connectionId: 'warehouse',
sourceKey: 'metabase',
level: 'trace',
});
const resolveTextualConflict = vi.fn(async () => ({
status: 'failed' as const,
attempts: 1,
reason: 'should not run',
}));
const result = await integrateWorkUnitPatch({
unitKey: 'wu-conflict',
patchPath,
integrationGit: git,
trace,
author: { name: 'System User', email: 'system@example.com' },
slDisallowed: false,
allowedTargetConnectionIds: new Set(['warehouse']),
validateAppliedTree: vi.fn(async () => {}),
resolveTextualConflict,
deferTextualConflictResolution: true,
});
expect(result).toMatchObject({
status: 'textual_conflict',
reason: expect.stringContaining('conflicts'),
touchedPaths: ['wiki/global/a.md'],
deferredTextualResolution: {
unitKey: 'wu-conflict',
patchPath,
touchedPaths: ['wiki/global/a.md'],
},
});
expect(resolveTextualConflict).not.toHaveBeenCalled();
expect(await git.revParseHead()).toBe(acceptedHead);
await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('accepted\n');
});
it('keeps the pre-apply integration tree when the resolver cannot repair a textual conflict', async () => {
const { homeDir, configDir, git } = await makeRepo();
await mkdir(join(configDir, 'wiki/global'), { recursive: true });

View file

@ -23,6 +23,12 @@ export type PatchIntegrationResult =
reason: string;
touchedPaths: string[];
textualResolution?: PatchIntegrationTextualResolution;
deferredTextualResolution?: {
unitKey: string;
patchPath: string;
touchedPaths: string[];
reason: string;
};
gateRepair?: FinalGateRepairResult;
}
| {
@ -48,6 +54,7 @@ export interface IntegrateWorkUnitPatchInput {
touchedPaths: string[];
reason: string;
}): Promise<TextualConflictResolutionResult>;
deferTextualConflictResolution?: boolean;
repairGateFailure?(input: {
unitKey: string;
patchPath: string;
@ -125,6 +132,20 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput)
};
}
if (input.deferTextualConflictResolution) {
return {
status: 'textual_conflict',
reason,
touchedPaths,
deferredTextualResolution: {
unitKey: input.unitKey,
patchPath: input.patchPath,
touchedPaths,
reason,
},
};
}
const textualResolution = await input.resolveTextualConflict({
unitKey: input.unitKey,
patchPath: input.patchPath,

View file

@ -3,7 +3,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { FileIngestTraceWriter } from '../ingest-trace.js';
import { resolveTextualConflict } from './textual-conflict-resolver.js';
import { resolveTextualConflict, runTextualConflictResolvers } from './textual-conflict-resolver.js';
async function makeHarness() {
const root = await mkdtemp(join(tmpdir(), 'ktx-textual-resolver-'));
@ -37,6 +37,14 @@ async function makeHarness() {
return { root, workdir, patchPath, trace };
}
function deferred<T = void>() {
let resolve!: (value: T | PromiseLike<T>) => void;
const promise = new Promise<T>((promiseResolve) => {
resolve = promiseResolve;
});
return { promise, resolve };
}
describe('resolveTextualConflict', () => {
it('lets the repair agent read the failed patch and write only touched paths', async () => {
const { workdir, patchPath, trace } = await makeHarness();
@ -118,3 +126,55 @@ describe('resolveTextualConflict', () => {
});
});
});
describe('runTextualConflictResolvers', () => {
it('runs disjoint conflicts concurrently and preserves result order', async () => {
const releases = [deferred(), deferred()];
const starts: string[] = [];
const run = runTextualConflictResolvers({
maxConcurrency: 2,
conflicts: [
{ unitKey: 'wu-a', touchedPaths: ['wiki/global/a.md'] },
{ unitKey: 'wu-b', touchedPaths: ['wiki/global/b.md'] },
],
resolve: async (conflict, index) => {
starts.push(conflict.unitKey);
await releases[index].promise;
return `${conflict.unitKey}:resolved`;
},
});
await vi.waitFor(() => expect(starts).toEqual(['wu-a', 'wu-b']));
releases[1].resolve();
releases[0].resolve();
await expect(run).resolves.toEqual(['wu-a:resolved', 'wu-b:resolved']);
});
it('serializes overlapping conflicts even when concurrency allows more work', async () => {
const releaseFirst = deferred();
const starts: string[] = [];
const run = runTextualConflictResolvers({
maxConcurrency: 2,
conflicts: [
{ unitKey: 'wu-a', touchedPaths: ['wiki/global/account.md'] },
{ unitKey: 'wu-b', touchedPaths: ['wiki/global/account.md', 'wiki/global/other.md'] },
],
resolve: async (conflict) => {
starts.push(conflict.unitKey);
if (conflict.unitKey === 'wu-a') {
await releaseFirst.promise;
}
return `${conflict.unitKey}:resolved`;
},
});
await vi.waitFor(() => expect(starts).toEqual(['wu-a']));
releaseFirst.resolve();
await expect(run).resolves.toEqual(['wu-a:resolved', 'wu-b:resolved']);
expect(starts).toEqual(['wu-a', 'wu-b']);
});
});

View file

@ -1,5 +1,6 @@
import { mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import pLimit from 'p-limit';
import { z } from 'zod';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../llm/index.js';
import type { IngestTraceWriter } from '../ingest-trace.js';
@ -21,6 +22,12 @@ export interface ResolveTextualConflictInput {
stepBudget?: number;
}
export interface TextualConflictResolverQueueInput<TConflict extends { touchedPaths: string[] }, TResult> {
conflicts: TConflict[];
maxConcurrency: number;
resolve(conflict: TConflict, index: number): Promise<TResult>;
}
const readIntegrationFileSchema = z.object({
path: z.string().min(1),
});
@ -51,6 +58,69 @@ function assertAllowedPath(path: string, allowedPaths: ReadonlySet<string>): str
return normalized;
}
function hasPathOverlap(left: ReadonlySet<string>, right: ReadonlySet<string>): boolean {
for (const path of left) {
if (right.has(path)) {
return true;
}
}
return false;
}
export async function runTextualConflictResolvers<TConflict extends { touchedPaths: string[] }, TResult>(
input: TextualConflictResolverQueueInput<TConflict, TResult>,
): Promise<TResult[]> {
const maxConcurrency = Math.max(1, input.maxConcurrency);
const limit = pLimit(maxConcurrency);
const results: TResult[] = [];
const pending = input.conflicts.map((conflict, index) => ({
conflict,
index,
touchedPaths: new Set(conflict.touchedPaths.map(normalizeRepoPath)),
}));
const activePaths = new Set<string>();
const active: Promise<void>[] = [];
await new Promise<void>((resolve, reject) => {
const pump = () => {
if (pending.length === 0 && active.length === 0) {
resolve();
return;
}
for (let i = 0; i < pending.length && active.length < maxConcurrency; ) {
const candidate = pending[i];
if (!candidate || hasPathOverlap(candidate.touchedPaths, activePaths)) {
i += 1;
continue;
}
pending.splice(i, 1);
for (const path of candidate.touchedPaths) {
activePaths.add(path);
}
const task = limit(async () => {
results[candidate.index] = await input.resolve(candidate.conflict, candidate.index);
})
.then(() => {
for (const path of candidate.touchedPaths) {
activePaths.delete(path);
}
active.splice(active.indexOf(task), 1);
pump();
})
.catch(reject);
active.push(task);
}
};
pump();
});
return results;
}
async function readOptionalFile(path: string): Promise<{ exists: boolean; content: string }> {
try {
return { exists: true, content: await readFile(path, 'utf-8') };

View file

@ -5,7 +5,7 @@ import type { AgentRunnerPort } from '../llm/index.js';
import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js';
import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js';
import { createLocalBundleIngestRuntime, InProcessIngestLock } from './local-bundle-runtime.js';
type RuntimeWithConnectionDeps = {
deps: {
@ -39,6 +39,84 @@ function testAgentRunner(): AgentRunnerPort {
return { runLoop: vi.fn().mockResolvedValue({ stopReason: 'natural' as const }) };
}
function deferred<T = void>() {
let resolve!: (value: T | PromiseLike<T>) => void;
const promise = new Promise<T>((promiseResolve) => {
resolve = promiseResolve;
});
return { promise, resolve };
}
describe('InProcessIngestLock', () => {
it('serializes operations for the same key while allowing different keys to overlap', async () => {
const lock = new InProcessIngestLock();
const firstEntered = deferred();
const releaseFirst = deferred();
const secondEntered = deferred();
const otherEntered = deferred();
const events: string[] = [];
const first = lock.withLock('config:repo', async () => {
events.push('first:start');
firstEntered.resolve();
await releaseFirst.promise;
events.push('first:end');
return 'first';
});
await firstEntered.promise;
const second = lock.withLock('config:repo', async () => {
events.push('second:start');
secondEntered.resolve();
return 'second';
});
const other = lock.withLock('other:key', async () => {
events.push('other:start');
otherEntered.resolve();
return 'other';
});
await otherEntered.promise;
expect(events).toEqual(['first:start', 'other:start']);
releaseFirst.resolve();
await secondEntered.promise;
await expect(Promise.all([first, second, other])).resolves.toEqual(['first', 'second', 'other']);
expect(events).toEqual(['first:start', 'other:start', 'first:end', 'second:start']);
});
it('serializes operations for the same key across lock instances', async () => {
const firstLock = new InProcessIngestLock();
const secondLock = new InProcessIngestLock();
const firstEntered = deferred();
const releaseFirst = deferred();
const secondEntered = deferred();
const events: string[] = [];
const first = firstLock.withLock('config:repo', async () => {
events.push('first:start');
firstEntered.resolve();
await releaseFirst.promise;
events.push('first:end');
return 'first';
});
await firstEntered.promise;
const second = secondLock.withLock('config:repo', async () => {
events.push('second:start');
secondEntered.resolve();
return 'second';
});
await Promise.resolve();
expect(events).toEqual(['first:start']);
releaseFirst.resolve();
await secondEntered.promise;
await expect(Promise.all([first, second])).resolves.toEqual(['first', 'second']);
expect(events).toEqual(['first:start', 'first:end', 'second:start']);
});
});
describe('createLocalBundleIngestRuntime', () => {
let tempDir: string;
let project: KtxLocalProject;
@ -281,6 +359,7 @@ describe('createLocalBundleIngestRuntime', () => {
'probeRowCount',
'workUnitFailureMode',
'workUnitMaxConcurrency',
'workUnitResolverConcurrency',
'workUnitStepBudget',
]);
});

View file

@ -163,9 +163,29 @@ class LocalIngestStorage implements IngestStoragePort {
}
}
class LocalIngestLock implements IngestLockPort {
async withLock<T>(_key: string, fn: () => Promise<T>): Promise<T> {
return fn();
export class InProcessIngestLock implements IngestLockPort {
private static readonly queues = new Map<string, Promise<void>>();
async withLock<T>(key: string, fn: () => Promise<T>): Promise<T> {
const previous = InProcessIngestLock.queues.get(key) ?? Promise.resolve();
let release: () => void = () => {};
const current = previous.catch(() => undefined).then(
() =>
new Promise<void>((resolve) => {
release = resolve;
}),
);
InProcessIngestLock.queues.set(key, current);
await previous.catch(() => undefined);
try {
return await fn();
} finally {
release();
if (InProcessIngestLock.queues.get(key) === current) {
InProcessIngestLock.queues.delete(key);
}
}
}
}
@ -714,12 +734,13 @@ export function createLocalBundleIngestRuntime(
}),
agentRunner,
gitService: options.project.git,
lockingService: new LocalIngestLock(),
lockingService: new InProcessIngestLock(),
storage,
settings: {
memoryIngestionModel: options.project.config.llm.models.default ?? 'local-ingest-model',
probeRowCount: 0,
workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency,
workUnitResolverConcurrency: options.project.config.ingest.workUnits.resolverConcurrency,
workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget,
workUnitFailureMode: options.project.config.ingest.workUnits.failureMode,
ingestTraceLevel: ingestTraceLevelFromEnv(),

View file

@ -141,6 +141,7 @@ export interface IngestSettingsPort {
memoryIngestionModel: string;
probeRowCount: number;
workUnitMaxConcurrency?: number;
workUnitResolverConcurrency?: number;
workUnitStepBudget?: number;
workUnitFailureMode?: 'abort' | 'continue';
ingestTraceLevel?: IngestTraceLevel;

View file

@ -49,8 +49,12 @@ connections:
workUnits: {
stepBudget: 40,
maxConcurrency: 1,
resolverConcurrency: 1,
failureMode: 'continue',
},
sources: {
maxConcurrency: 1,
},
},
agent: {
run_research: {
@ -155,10 +159,60 @@ ingest:
expect(config.ingest.workUnits).toEqual({
stepBudget: 30,
maxConcurrency: 2,
resolverConcurrency: 2,
failureMode: 'abort',
});
});
it('parses ingest source and resolver concurrency knobs', () => {
const config = parseKtxProjectConfig(`
ingest:
sources:
maxConcurrency: 4
workUnits:
maxConcurrency: 6
resolverConcurrency: 3
`);
expect(config.ingest.sources).toEqual({
maxConcurrency: 4,
});
expect(config.ingest.workUnits).toEqual({
stepBudget: 40,
maxConcurrency: 6,
resolverConcurrency: 3,
failureMode: 'continue',
});
});
it('defaults resolver concurrency to work-unit concurrency', () => {
const config = parseKtxProjectConfig(`
ingest:
workUnits:
maxConcurrency: 5
`);
expect(config.ingest.workUnits.resolverConcurrency).toBe(5);
});
it('rejects concurrency values above the configured caps', () => {
const validation = validateKtxProjectConfig(`
ingest:
sources:
maxConcurrency: 9
workUnits:
resolverConcurrency: 9
`);
expect(validation).toEqual({
ok: false,
issues: expect.arrayContaining([
expect.objectContaining({ path: 'ingest.sources.maxConcurrency' }),
expect.objectContaining({ path: 'ingest.workUnits.resolverConcurrency' }),
]),
});
});
it('parses global Vertex LLM config', () => {
const config = parseKtxProjectConfig(`
llm:
@ -535,6 +589,13 @@ describe('generateKtxProjectConfigJsonSchema', () => {
expect(relationships?.properties?.acceptThreshold?.description).toMatch(/auto-accepted/);
});
it('emits ingest concurrency caps in the generated schema', () => {
const serialized = JSON.stringify(schema);
expect(serialized).toContain('"sources"');
expect(serialized).toContain('"resolverConcurrency"');
expect(serialized).toContain('"maximum":8');
});
it('emits the mappings shapes under connections', () => {
const serialized = JSON.stringify(schema);
expect(serialized).toContain('databaseMappings');

View file

@ -93,14 +93,35 @@ const embeddingSchema = z
const workUnitsSchema = z
.strictObject({
stepBudget: z.int().positive().default(40).describe('Maximum number of agent steps allowed per work unit before it is force-terminated.'),
maxConcurrency: z.int().positive().default(1).describe('Maximum number of work units run concurrently during ingest.'),
maxConcurrency: z.int().positive().max(8).default(1).describe('Maximum number of work units run concurrently during ingest.'),
resolverConcurrency: z
.int()
.positive()
.max(8)
.optional()
.describe('Maximum number of textual conflict resolvers run concurrently during ingest. Defaults to maxConcurrency.'),
failureMode: z
.enum(KTX_WORK_UNIT_FAILURE_MODES)
.default('continue')
.describe('Behavior when a work unit fails: "abort" stops the whole ingest run; "continue" records the failure and keeps going.'),
})
.transform((workUnits) => ({
...workUnits,
resolverConcurrency: workUnits.resolverConcurrency ?? workUnits.maxConcurrency,
}))
.describe('Concurrency and failure handling for ingest work units.');
const sourcesSchema = z
.strictObject({
maxConcurrency: z
.int()
.positive()
.max(8)
.default(1)
.describe('Maximum number of ingest sources run concurrently by `ktx ingest --all`.'),
})
.describe('Concurrency policy for top-level ingest sources.');
const ingestSchema = z
.strictObject({
adapters: z
@ -111,6 +132,7 @@ const ingestSchema = z
.prefault({ backend: 'deterministic', model: 'deterministic' })
.describe('Embedding configuration used when ingest adapters need to embed documents.'),
workUnits: workUnitsSchema.prefault({}).describe('Concurrency and failure handling for ingest work units.'),
sources: sourcesSchema.prefault({}).describe('Concurrency policy for top-level ingest sources.'),
})
.describe('Ingest pipeline configuration: adapters, embeddings, and work-unit policy.');
@ -260,6 +282,7 @@ export type KtxProjectLlmProviderConfig = z.infer<typeof llmProviderSchema>;
export type KtxProjectEmbeddingConfig = z.infer<typeof embeddingSchema>;
export type KtxScanEnrichmentConfig = z.infer<typeof scanEnrichmentSchema>;
export type KtxIngestWorkUnitsConfig = z.infer<typeof workUnitsSchema>;
export type KtxIngestSourcesConfig = z.infer<typeof sourcesSchema>;
export type KtxScanRelationshipConfig = z.infer<typeof scanRelationshipsSchema>;
export type KtxProjectScanConfig = z.infer<typeof scanSchema>;
export type KtxProjectConnectionConfig = z.infer<typeof connectionSchema>;
@ -384,6 +407,7 @@ export function serializeKtxProjectConfig(config: KtxProjectConfig): string {
ingest: {
embeddings: config.ingest.embeddings,
workUnits: config.ingest.workUnits,
sources: config.ingest.sources,
},
}
: config;