From 1db8a6debdfd8af84df35ed34808bfb5e81ad1e1 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 18 May 2026 15:05:56 +0200 Subject: [PATCH] Stabilize parallel ingest concurrency --- .../content/docs/cli-reference/ktx-ingest.mdx | 17 + .../content/docs/guides/building-context.mdx | 32 + ...arallel-ingest-dependency-stabilization.md | 588 ++++++++++++++++++ packages/cli/package.json | 1 + packages/cli/src/public-ingest.test.ts | 66 ++ packages/cli/src/public-ingest.ts | 16 +- packages/context/src/core/git.service.test.ts | 25 + packages/context/src/core/git.service.ts | 2 +- .../src/ingest/ingest-bundle.runner.ts | 249 +++++++- .../isolated-diff/patch-integrator.test.ts | 60 ++ .../ingest/isolated-diff/patch-integrator.ts | 21 + .../textual-conflict-resolver.test.ts | 62 +- .../textual-conflict-resolver.ts | 70 +++ .../src/ingest/local-bundle-runtime.test.ts | 81 ++- .../src/ingest/local-bundle-runtime.ts | 29 +- packages/context/src/ingest/ports.ts | 1 + packages/context/src/project/config.test.ts | 61 ++ packages/context/src/project/config.ts | 26 +- pnpm-lock.yaml | 3 + 19 files changed, 1370 insertions(+), 40 deletions(-) create mode 100644 docs/superpowers/plans/2026-05-18-parallel-ingest-dependency-stabilization.md diff --git a/docs-site/content/docs/cli-reference/ktx-ingest.mdx b/docs-site/content/docs/cli-reference/ktx-ingest.mdx index 9d94cd88..043d9f26 100644 --- a/docs-site/content/docs/cli-reference/ktx-ingest.mdx +++ b/docs-site/content/docs/cli-reference/ktx-ingest.mdx @@ -45,6 +45,23 @@ requires deep ingest readiness. When `--all` selects both databases and context sources, database ingest runs first, then source ingest and memory updates run for source connections. +`ktx ingest --all` runs one target at a time by default. Configure source +concurrency in `ktx.yaml` when independent connections can run in parallel: + +```yaml title="ktx.yaml" +ingest: + sources: + maxConcurrency: 4 + workUnits: + maxConcurrency: 6 + resolverConcurrency: 3 +``` + +`ingest.sources.maxConcurrency` controls top-level `--all` target dispatch. +`ingest.workUnits.maxConcurrency` controls work units inside one source ingest. +`ingest.workUnits.resolverConcurrency` controls concurrent textual conflict +repairs for disjoint files. Each value must be between `1` and `8`. + Some ingest paths use the managed KTX Python runtime. Query-history ingest uses it for SQL analysis, and Looker source ingest uses it for Looker identifier parsing. In an interactive terminal, `ktx ingest` prompts before installing the diff --git a/docs-site/content/docs/guides/building-context.mdx b/docs-site/content/docs/guides/building-context.mdx index 584e9003..8ee7bb50 100644 --- a/docs-site/content/docs/guides/building-context.mdx +++ b/docs-site/content/docs/guides/building-context.mdx @@ -121,6 +121,38 @@ Source ingest extracts metadata, reconciles it with existing local context, and writes semantic-layer YAML plus wiki Markdown. It merges rather than blindly overwriting local edits. +## Ingest concurrency + +KTX keeps ingest sequential by default so first runs are predictable. Increase +concurrency when your configured sources are independent and your local LLM +backend can handle more simultaneous agent sessions. + +```yaml title="ktx.yaml" +ingest: + sources: + maxConcurrency: 4 + workUnits: + maxConcurrency: 6 + resolverConcurrency: 3 +``` + +Use these settings together: + +| Setting | Applies to | Default | +|---------|------------|---------| +| `ingest.sources.maxConcurrency` | Top-level `ktx ingest --all` targets | `1` | +| `ingest.workUnits.maxConcurrency` | Work units inside one source ingest | `1` | +| `ingest.workUnits.resolverConcurrency` | Textual conflict repair for disjoint files | Same as `workUnits.maxConcurrency` | + +Evidence-only adapters, such as query-history ingest that emits historic SQL +evidence, can usually tolerate higher work-unit concurrency because their +patches are often no-ops. Source adapters that rewrite the same semantic-layer +or wiki files need lower values to reduce conflict repair work. + +Each concurrency value must be between `1` and `8`. Higher values create more +temporary Git worktrees and more concurrent LLM sessions, so raise them in +small steps and check `.ktx/ingest-traces/` when a run fails. + ## Text ingest Use `ktx ingest text` for notes, Markdown files, runbooks, Slack exports, or diff --git a/docs/superpowers/plans/2026-05-18-parallel-ingest-dependency-stabilization.md b/docs/superpowers/plans/2026-05-18-parallel-ingest-dependency-stabilization.md new file mode 100644 index 00000000..b6f1df84 --- /dev/null +++ b/docs/superpowers/plans/2026-05-18-parallel-ingest-dependency-stabilization.md @@ -0,0 +1,588 @@ +# Parallel Ingest Dependency Stabilization Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Keep the wall-clock win from parallel source ingest while preventing +source adapters from running or finalizing against incomplete repository +context. + +**Architecture:** Split top-level public ingest into dependency stages: +database targets run first, then source targets run with +`ingest.sources.maxConcurrency`. Inside each source run, keep work-unit +integration deterministic. Before reconciliation and again during the final +gate/squash window, refresh the source session worktree from the latest root +commit under the existing `config:repo` lock so final gates validate the tree +that will actually be merged. Add a narrow Metabase guardrail for missing +schema context so agent output fails before inventing overlays or dangling +semantic refs. + +**Tech Stack:** TypeScript ESM/NodeNext, Vitest, `p-limit`, existing +`GitService`, `InProcessIngestLock`, `IngestBundleRunner`, `SessionWorktree`, +semantic-layer final gates, Fumadocs docs-site. + +--- + +## Root Cause Summary + +Fresh verification with `/tmp/ktx-newingest3/ktx.yaml` proved source dispatch +is concurrent, but the run failed after about 993 seconds because source +sessions observed stale or incomplete context: + +- `metabase` started before the warehouse schema and query-history/dbt context + were safely visible. A WorkUnit saw no manifest entry, then attempted an + overlay for `mart_account_segments`; the tool correctly rejected it. +- `notion` finalized wiki refs to dbt-created pages, but its source worktree + was based on an older root commit, so final gates could not see + `activation-policy-change-jan-2026` or `account-segmentation-rules`. +- Later `metabase` repair still failed a final semantic gate because the page + referenced an `accounts` SL source that was not present in the stale session + view. + +The lock and mutation queue are not the primary bug now. `InProcessIngestLock` +already serializes `config:repo`, and `GitService.squashMergeIntoMain()` already +uses `withMutationQueue()`. The remaining issue is dependency freshness: +source work is parallel, but source planning and finalization need a current +base tree at specific boundaries. + +## Scope + +In scope: + +- Add a database-before-sources barrier to `ktx ingest --all`. +- Preserve source result rendering in original plan order. +- Refresh source session worktrees from the latest root commit before Stage 4 + reconciliation. +- Move final refresh, final artifact gates, provenance gates, cleanliness + assertion, and squash merge into one `config:repo` finalization window. +- Add Metabase prompt guidance for missing schema context. +- Add focused regression tests and rerun live verification. + +Out of scope: + +- Parallelizing database scans. +- Serializing full source runs. +- Adding CLI flags. +- Compatibility aliases for older config names. +- Changing resolver integration order or raw `git apply` determinism. + +## File Structure + +- Modify `packages/cli/src/public-ingest.ts`. +- Modify `packages/cli/src/public-ingest.test.ts`. +- Modify `packages/context/src/core/git.service.ts`. +- Modify `packages/context/src/core/git.service.test.ts`. +- Modify `packages/context/src/ingest/ingest-bundle.runner.ts`. +- Modify `packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts`. +- Modify `packages/context/skills/metabase_ingest/SKILL.md`. +- Optionally modify `packages/context/src/ingest/ingest-runtime-assets.test.ts` + if the Metabase guardrail needs an asset regression. +- Update `.context/parallelization-results.md` after live verification. + +--- + +### Task 1: Add Public Ingest Barrier Tests + +**Files:** + +- Modify: `packages/cli/src/public-ingest.test.ts` + +- [ ] **Step 1: Add a failing test proving source targets wait for database targets** + +Add this test near the existing source concurrency test: + +```ts +it('waits for database targets before starting parallel source targets', async () => { + const io = makeIo(); + const baseConfig = buildDefaultKtxProjectConfig(); + const project: KtxPublicIngestProject = { + projectDir: '/tmp/project', + config: { + ...baseConfig, + ingest: { + ...baseConfig.ingest, + sources: { maxConcurrency: 2 }, + } as KtxProjectConfig['ingest'], + connections: { + warehouse: { driver: 'postgres', context: { depth: 'deep' } }, + docs: { driver: 'notion' }, + prod_metabase: { driver: 'metabase', api_url: 'https://metabase.example.com' }, + }, + }, + }; + const events: string[] = []; + const schema = deferred(); + const runScan = vi.fn>(async () => { + events.push('scan:start'); + return schema.promise; + }); + const runIngest = vi.fn>(async (ingestArgs) => { + if (ingestArgs.command !== 'run') return 1; + events.push(`ingest:${ingestArgs.connectionId}:${ingestArgs.adapter}`); + return 0; + }); + + const run = runKtxPublicIngest( + { command: 'run', projectDir: '/tmp/project', all: true, json: false, inputMode: 'disabled' }, + io.io, + { + loadProject: vi.fn(async () => project), + runScan, + runIngest, + }, + ); + + await vi.waitFor(() => expect(events).toEqual(['scan:start'])); + await Promise.resolve(); + expect(runIngest).not.toHaveBeenCalled(); + + schema.resolve(0); + await expect(run).resolves.toBe(0); + expect(events).toEqual([ + 'scan:start', + 'ingest:docs:notion', + 'ingest:prod_metabase:metabase', + ]); +}); +``` + +- [ ] **Step 2: Keep the existing source-concurrency test** + +Do not weaken the existing test named +`runs public ingest targets concurrently up to ingest.sources.maxConcurrency and renders in plan order`. +It must still prove source targets overlap after the database stage completes. + +- [ ] **Step 3: Run the focused CLI test** + +```bash +pnpm --filter @ktx/cli exec vitest run src/public-ingest.test.ts +``` + +Expected state before implementation: the new barrier test fails because +`runKtxPublicIngest()` currently puts database and source targets in one +`p-limit` pool. + +--- + +### Task 2: Implement Dependency-Staged Public Ingest Dispatch + +**Files:** + +- Modify: `packages/cli/src/public-ingest.ts` + +- [ ] **Step 1: Add a small batch helper near `runKtxPublicIngest()`** + +```ts +interface IndexedPublicIngestTarget { + index: number; + target: KtxPublicIngestPlanTarget; +} + +async function executePublicIngestTargetBatch( + entries: IndexedPublicIngestTarget[], + maxConcurrency: number, + args: Extract, + io: KtxCliIo, + deps: KtxPublicIngestDeps, +): Promise> { + const limit = pLimit(maxConcurrency); + return Promise.all( + entries.map((entry) => + limit(async () => ({ + index: entry.index, + result: await executePublicIngestTarget(entry.target, args, io, deps), + })), + ), + ); +} +``` + +- [ ] **Step 2: Partition plan targets by operation** + +Replace the current single `p-limit` map in `runKtxPublicIngest()` with: + +```ts +const indexedTargets = plan.targets.map((target, index) => ({ index, target })); +const databaseTargets = indexedTargets.filter((entry) => entry.target.operation === 'database-ingest'); +const sourceTargets = indexedTargets.filter((entry) => entry.target.operation === 'source-ingest'); + +const orderedResults = [ + ...(await executePublicIngestTargetBatch(databaseTargets, 1, args, io, deps)), + ...(await executePublicIngestTargetBatch(sourceTargets, sourceMaxConcurrency, args, io, deps)), +]; +results.push( + ...orderedResults + .sort((left, right) => left.index - right.index) + .map((entry) => entry.result), +); +``` + +Notes: + +- Database targets stay sequential in this plan. That preserves the previous + default safety profile and avoids scanning multiple warehouses into the same + root repo at once. +- Source targets keep the configured `ingest.sources.maxConcurrency`. +- Result table order stays tied to `plan.targets`, not completion order. + +- [ ] **Step 3: Re-run focused CLI tests** + +```bash +pnpm --filter @ktx/cli exec vitest run src/public-ingest.test.ts +pnpm --filter @ktx/cli run type-check +``` + +--- + +### Task 3: Add a GitService Root Refresh Primitive + +**Files:** + +- Modify: `packages/context/src/core/git.service.ts` +- Modify: `packages/context/src/core/git.service.test.ts` + +- [ ] **Step 1: Add tests first** + +Add tests covering: + +- merging a root commit into a session worktree with no conflicts; +- no-op behavior when the session already has the target commit; +- conflict behavior that aborts the merge and leaves the session clean. + +Use the existing GitService test harness. The core assertion for conflict +cleanup should look like: + +```ts +const result = await session.git.mergeCommitIntoCurrent(rootHead); + +expect(result).toMatchObject({ ok: false, conflict: true }); +await expect(session.git.assertWorktreeClean()).resolves.toBeUndefined(); +expect(await session.git.revParseHead()).toBe(sessionHeadBeforeMerge); +``` + +- [ ] **Step 2: Add result types** + +```ts +export type MergeCommitIntoCurrentResult = + | { ok: true; headSha: string; changed: boolean } + | { ok: false; conflict: true; conflictPaths: string[] }; +``` + +- [ ] **Step 3: Implement the method using the existing mutation queue** + +```ts +async mergeCommitIntoCurrent(commitish: string): Promise { + return this.withMutationQueue(() => this.mergeCommitIntoCurrentUnlocked(commitish)); +} + +private async mergeCommitIntoCurrentUnlocked(commitish: string): Promise { + const before = (await this.git.revparse(['HEAD'])).trim(); + const target = (await this.git.revparse([commitish])).trim(); + if (before === target) { + return { ok: true, headSha: before, changed: false }; + } + + let mergeError: unknown = null; + try { + await this.git.raw(['merge', '--no-edit', target]); + } catch (error) { + mergeError = error; + } + + const unmergedOut = await this.git.raw(['diff', '--name-only', '--diff-filter=U']).catch(() => ''); + const unmergedPaths = unmergedOut + .split('\n') + .map((line) => line.trim()) + .filter(Boolean); + const conflictPaths = mergeConflictPaths(unmergedPaths, mergeError); + + if (conflictPaths.length > 0 || mergeError !== null) { + await this.git.raw(['merge', '--abort']).catch(() => undefined); + await this.git.raw(['reset', '--hard', before]).catch(() => undefined); + return { ok: false, conflict: true, conflictPaths }; + } + + const headSha = (await this.git.revparse(['HEAD'])).trim(); + return { ok: true, headSha, changed: headSha !== before }; +} +``` + +This method intentionally belongs on `GitService`, not the runner, because it +must use the same in-process mutation queue and cleanup behavior as other +repository mutation helpers. + +- [ ] **Step 4: Run focused GitService tests** + +```bash +pnpm --filter @ktx/context exec vitest run src/core/git.service.test.ts +``` + +--- + +### Task 4: Refresh Source Session Worktrees Before Reconciliation and Finalization + +**Files:** + +- Modify: `packages/context/src/ingest/ingest-bundle.runner.ts` +- Modify: `packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts` + +- [ ] **Step 1: Add runner helpers for session refresh** + +Inside `IngestBundleRunner`, add an unlocked helper plus a locked wrapper. The +unlocked helper is used by finalization after the caller already holds +`config:repo`. + +```ts +private async refreshSessionFromRootHeadAlreadyLocked(input: { + sessionWorktree: IngestSessionWorktree; + runTrace: IngestTraceWriter; + phase: 'before_reconciliation' | 'before_final_squash'; +}): Promise { + const rootHead = await this.deps.gitService.revParseHead(); + if (!rootHead) { + throw new Error('ingest-bundle: config repo has no HEAD'); + } + const result = await input.sessionWorktree.git.mergeCommitIntoCurrent(rootHead); + + await input.runTrace.event('debug', 'refresh', 'session_refreshed_from_root', { + phase: input.phase, + result, + }); + + if (!result.ok) { + throw new Error(`source refresh conflict before ${input.phase}: ${result.conflictPaths.join(', ')}`); + } +} + +private async refreshSessionFromRootHead(input: { + sessionWorktree: IngestSessionWorktree; + runTrace: IngestTraceWriter; + phase: 'before_reconciliation' | 'before_final_squash'; +}): Promise { + await this.deps.lockingService.withLock('config:repo', () => + this.refreshSessionFromRootHeadAlreadyLocked(input), + ); +} +``` + +Use the real imported types from the file. If `IngestTraceWriter` is not +currently imported as a value/type in this file, add the type import instead of +using `any`. + +- [ ] **Step 2: Refresh before Stage 4 reconciliation** + +Right after work-unit integration and candidate carryforward/dedup complete, +but before `preReconciliationSha` and `reconcileSession` are created, call: + +```ts +await this.refreshSessionFromRootHead({ + sessionWorktree, + runTrace, + phase: 'before_reconciliation', +}); +const preReconciliationSha = await sessionWorktree.git.revParseHead(); +``` + +This lets reconciliation agents see pages and semantic-layer files committed by +database, dbt, historic-sql, or earlier sibling source runs. + +- [ ] **Step 3: Refresh and gate inside the final root mutation window** + +Move the final refresh and final gates into the existing `config:repo` lock that +currently wraps only `revParseHead()` and `squashMergeIntoMain()`. + +The shape should be: + +```ts +const squashResult = await this.deps.lockingService.withLock('config:repo', async () => { + await this.refreshSessionFromRootHeadAlreadyLocked({ + sessionWorktree, + runTrace, + phase: 'before_final_squash', + }); + + await validateFinalIngestArtifacts({ + // use the existing final-gate inputs, still scoped to sessionWorktree + }); + await validateProvenanceRawPaths({ + // use the existing provenance-gate inputs + }); + await sessionWorktree.git.assertWorktreeClean(); + + const preSquashSha = await this.deps.gitService.revParseHead(); + const merge = await this.deps.gitService.squashMergeIntoMain( + sessionWorktree.branch, + this.deps.storage.systemGitAuthor.name, + this.deps.storage.systemGitAuthor.email, + commitMessage, + ); + return { preSquashSha, merge }; +}); +``` + +Do not call the locked `refreshSessionFromRootHead()` wrapper from inside this +block. + +Rationale: a sibling source can squash between final gates and squash merge +today. Holding the lock across refresh, gates, cleanliness assertion, and squash +ensures the session was validated against the same root tree it merges into. + +- [ ] **Step 4: Add finalization freshness tests** + +In `packages/context/src/ingest/ingest-bundle.runner.isolated-diff.test.ts`, +add focused coverage for these cases: + +- A source session starts from `baseSha`, root receives a wiki page commit before + source finalization, the source writes a wiki ref to that page, and final + gates pass because the finalization refresh merged the root page. +- A refresh conflict fails the run with `source refresh conflict` and leaves the + session worktree available with a conflict sentinel. +- Two concurrent source runs cannot overlap the final refresh/gate/squash + window. Use a fake lock that records `final:start` and `final:end`, plus + deferred promises around `squashMergeIntoMain()`. + +The concurrency assertion should be direct: + +```ts +expect(events).toEqual([ + 'source-a:final:start', + 'source-a:final:end', + 'source-b:final:start', + 'source-b:final:end', +]); +``` + +- [ ] **Step 5: Run focused context checks** + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/ingest-bundle.runner.isolated-diff.test.ts +pnpm --filter @ktx/context run type-check +``` + +--- + +### Task 5: Harden Metabase Missing-Schema Guidance + +**Files:** + +- Modify: `packages/context/skills/metabase_ingest/SKILL.md` +- Optionally modify: `packages/context/src/ingest/ingest-runtime-assets.test.ts` + +- [ ] **Step 1: Update the Metabase decision tree** + +In the `Decision tree` section, replace the loose "if `sl_discover` returns +nothing, you can write a standalone source" rule with a stricter version: + +```md +If `sl_discover` returns no match for a candidate source name, treat that as +unknown, not permission. First prove the source with `entity_details` against +the warehouse table reference or with a `sql_execution(... LIMIT 0)` probe for +derived SQL. If both source discovery and physical/SQL probes fail, stop for +that card and do not call `sl_write_source`, `wiki_write`, or +`emit_unmapped_fallback` with invented `sl_refs`. +``` + +Also add: + +```md +Never call `sl_write_source` with overlay shape for a name that `sl_discover` +did not report as manifest-backed. Missing schema context is a hard stop for +overlays. +``` + +- [ ] **Step 2: Add a lightweight asset regression if useful** + +If this repository already uses content assertions for bundled ingest skills, +add an assertion that `metabase_ingest` contains the phrase +`Missing schema context is a hard stop for overlays`. If there is no local +pattern for prompt content assertions, skip the brittle assertion and rely on +runtime asset smoke coverage. + +- [ ] **Step 3: Run focused asset tests** + +```bash +pnpm --filter @ktx/context exec vitest run src/ingest/ingest-runtime-assets.test.ts +``` + +--- + +### Task 6: Update Timing Notes and Docs if Behavior Changes Need It + +**Files:** + +- Modify: `.context/parallelization-results.md` +- Modify docs-site only if the implementation changes user-facing config or + command behavior beyond the already-documented concurrency settings. + +- [ ] **Step 1: Append the root-cause note** + +Append a section to `.context/parallelization-results.md` with: + +- fresh project directory used; +- exact `ktx ingest --all` command; +- exit code; +- wall-clock timing; +- whether `.ktx/worktrees/` was empty after success/failure; +- the root-cause classification: + `dependency freshness issue, not mutation queue overlap`. + +- [ ] **Step 2: Decide whether docs-site changes are required** + +If the public behavior is simply "database targets are prerequisites before +source targets", update `docs-site/content/docs/cli-reference/ktx-ingest.mdx` +with one sentence: + +```md +For `ktx ingest --all`, database schema and query-history targets complete +before source adapters start; `ingest.sources.maxConcurrency` applies to the +source-adapter stage. +``` + +Run the docs checks only if the repo has a focused docs command; otherwise rely +on TypeScript checks and note that docs were edited. + +--- + +### Task 7: Full Verification + +- [ ] **Step 1: Run focused checks** + +```bash +pnpm --filter @ktx/cli run type-check +pnpm --filter @ktx/cli run test +pnpm --filter @ktx/context run type-check +pnpm --filter @ktx/context run test +``` + +- [ ] **Step 2: Run workspace checks if focused checks pass** + +```bash +pnpm run type-check +pnpm run test +pnpm run dead-code +``` + +- [ ] **Step 3: Run a fresh live ingest if `/tmp/ktx-newingest3/ktx.yaml` is available** + +Create a fresh project directory, copy the config, set source concurrency to 4 +if it is not already set, and run: + +```bash +set -o pipefail +/usr/bin/time -p pnpm run ktx -- --project-dir /tmp/ ingest --all --yes --plain 2>&1 | tee .context/.log +``` + +Then record: + +```bash +find /tmp//.ktx/worktrees -mindepth 1 -maxdepth 1 -type d -print 2>/dev/null | sort +``` + +Success criteria: + +- database target starts before any source target; +- source targets overlap after the database stage; +- final table remains in plan order; +- no final gate failures caused by sibling-source stale context; +- successful run leaves `.ktx/worktrees/` empty; +- failed run leaves only intentional conflict/crash sentinels with root-cause + trace entries. diff --git a/packages/cli/package.json b/packages/cli/package.json index a93b9b6c..531cad0c 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -50,6 +50,7 @@ "commander": "14.0.3", "fflate": "^0.8.2", "ink": "^7.0.2", + "p-limit": "^7.3.0", "react": "^19.2.6", "zod": "^4.4.3" }, diff --git a/packages/cli/src/public-ingest.test.ts b/packages/cli/src/public-ingest.test.ts index d7f853c8..83167df9 100644 --- a/packages/cli/src/public-ingest.test.ts +++ b/packages/cli/src/public-ingest.test.ts @@ -82,6 +82,16 @@ function deepReadyProject( }; } +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((promiseResolve, promiseReject) => { + resolve = promiseResolve; + reject = promiseReject; + }); + return { promise, resolve, reject }; +} + describe('buildPublicIngestPlan', () => { it('plans warehouse connections as scan targets and source connections as source ingest targets', () => { const project = projectWithConnections({ @@ -848,6 +858,62 @@ describe('runKtxPublicIngest', () => { expect(io.stdout()).not.toContain('Debug:'); }); + it('runs public ingest targets concurrently up to ingest.sources.maxConcurrency and renders in plan order', async () => { + const io = makeIo(); + const baseConfig = buildDefaultKtxProjectConfig(); + const project: KtxPublicIngestProject = { + projectDir: '/tmp/project', + config: { + ...baseConfig, + ingest: { + ...baseConfig.ingest, + sources: { maxConcurrency: 2 }, + } as KtxProjectConfig['ingest'], + connections: { + docs: { driver: 'notion' }, + prod_metabase: { driver: 'metabase', api_url: 'https://metabase.example.com' }, + }, + }, + }; + const starts: string[] = []; + const docs = deferred(); + const prodMetabase = deferred(); + const runIngest = vi.fn>(async (ingestArgs) => { + if (ingestArgs.command !== 'run') { + return 1; + } + starts.push(ingestArgs.connectionId); + if (ingestArgs.connectionId === 'docs') { + return docs.promise; + } + if (ingestArgs.connectionId === 'prod_metabase') { + return prodMetabase.promise; + } + return 1; + }); + + const run = runKtxPublicIngest( + { command: 'run', projectDir: '/tmp/project', all: true, json: false, inputMode: 'disabled' }, + io.io, + { + loadProject: vi.fn(async () => project), + runIngest, + }, + ); + + await vi.waitFor(() => expect(starts).toEqual(['docs', 'prod_metabase'])); + prodMetabase.resolve(0); + docs.resolve(0); + + await expect(run).resolves.toBe(0); + expect(runIngest).toHaveBeenCalledTimes(2); + const sourceRows = io + .stdout() + .split('\n') + .filter((line) => line.startsWith('docs') || line.startsWith('prod_metabase')); + expect(sourceRows.map((line) => line.trim().split(/\s+/)[0])).toEqual(['docs', 'prod_metabase']); + }); + it('prints query-history retry guidance for query-history facet failures', async () => { const io = makeIo(); const project = deepReadyProject({ diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index 537dcec7..fb7c5ba6 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -1,5 +1,6 @@ import { type KtxLocalProject, type KtxProjectConnectionConfig, loadKtxProject } from '@ktx/context/project'; import type { KtxProgressPort } from '@ktx/context/scan'; +import pLimit from 'p-limit'; import type { KtxCliIo } from './index.js'; import type { KtxIngestArgs, KtxIngestDeps, KtxIngestProgressUpdate } from './ingest.js'; import { @@ -924,9 +925,18 @@ export async function runKtxPublicIngest( } } - for (const target of plan.targets) { - results.push(await executePublicIngestTarget(target, args, io, deps)); - } + const ingestConfig = project.config.ingest as { sources?: { maxConcurrency?: number } } | undefined; + const sourceMaxConcurrency = ingestConfig?.sources?.maxConcurrency ?? 1; + const limitTarget = pLimit(sourceMaxConcurrency); + const orderedResults = await Promise.all( + plan.targets.map((target, index) => + limitTarget(async () => ({ + index, + result: await executePublicIngestTarget(target, args, io, deps), + })), + ), + ); + results.push(...orderedResults.sort((left, right) => left.index - right.index).map((entry) => entry.result)); if (args.json) { io.stdout.write(`${JSON.stringify({ plan, results }, null, 2)}\n`); diff --git a/packages/context/src/core/git.service.test.ts b/packages/context/src/core/git.service.test.ts index e8a5aa73..88676390 100644 --- a/packages/context/src/core/git.service.test.ts +++ b/packages/context/src/core/git.service.test.ts @@ -53,6 +53,31 @@ describe('GitService', () => { expect(after).toBe(before); }); + it('serializes concurrent initialization for the same config directory', async () => { + const concurrentDir = await mkdtemp(join(tmpdir(), 'git-service-concurrent-init-')); + const coreConfig: KtxCoreConfig = { + storage: { configDir: concurrentDir, homeDir: concurrentDir }, + git: { + userName: 'Test User', + userEmail: 'test@example.com', + bootstrapMessage: 'Initialize test config repo', + bootstrapAuthor: 'test-system', + bootstrapAuthorEmail: 'system@example.com', + }, + }; + const first = new GitService(coreConfig); + const second = new GitService(coreConfig); + + try { + await expect(Promise.all([first.onModuleInit(), second.onModuleInit()])).resolves.toEqual([undefined, undefined]); + const firstHead = await first.revParseHead(); + const secondHead = await second.revParseHead(); + expect(firstHead).toBe(secondHead); + } finally { + await rm(concurrentDir, { recursive: true, force: true }); + } + }); + it('keeps git auto-maintenance attached for deterministic cleanup', async () => { const config = await readFile(join(tempDir, '.git', 'config'), 'utf-8'); diff --git a/packages/context/src/core/git.service.ts b/packages/context/src/core/git.service.ts index a3e0c133..8b8a4174 100644 --- a/packages/context/src/core/git.service.ts +++ b/packages/context/src/core/git.service.ts @@ -89,7 +89,7 @@ export class GitService { this.git = createSimpleGit(this.configDir); // Initialize git repository - await this.initialize(); + await this.withMutationQueue(() => this.initialize()); } private async initialize(): Promise { diff --git a/packages/context/src/ingest/ingest-bundle.runner.ts b/packages/context/src/ingest/ingest-bundle.runner.ts index a390ef08..652e52c6 100644 --- a/packages/context/src/ingest/ingest-bundle.runner.ts +++ b/packages/context/src/ingest/ingest-bundle.runner.ts @@ -16,7 +16,7 @@ import { selectRelevantCanonicalPins } from './canonical-pins.js'; import { finalGateRepairPaths, repairFinalGateFailure } from './final-gate-repair.js'; import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js'; import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js'; -import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js'; +import { resolveTextualConflict, runTextualConflictResolvers } from './isolated-diff/textual-conflict-resolver.js'; import { runIsolatedWorkUnit } from './isolated-diff/work-unit-executor.js'; import { sanitizeMemoryFlowError } from './memory-flow/live-buffer.js'; import type { CanonicalPin } from './canonical-pins.js'; @@ -1385,6 +1385,8 @@ export class IngestBundleRunner { const patchDir = join(this.deps.storage.homeDir, 'ingest-patches', job.jobId); const workUnitSettings = { maxConcurrency: this.deps.settings.workUnitMaxConcurrency ?? 1, + resolverConcurrency: + this.deps.settings.workUnitResolverConcurrency ?? this.deps.settings.workUnitMaxConcurrency ?? 1, stepBudget: this.deps.settings.workUnitStepBudget ?? 40, failureMode: this.deps.settings.workUnitFailureMode ?? 'continue', }; @@ -1501,6 +1503,19 @@ export class IngestBundleRunner { (outcome) => outcome?.status === 'success' && !!outcome.patchPath, ).length; let integratedPatchCount = 0; + const deferredTextualConflicts: Array<{ + order: number; + unitKey: string; + patchPath: string; + touchedPaths: string[]; + reason: string; + integrationFailureDetails: { + unitKey: string; + patchPath: string; + allowedTargetConnectionIds: string[]; + }; + }> = []; + const shouldDeferTextualConflicts = workUnitSettings.resolverConcurrency > 1; for (const [index, outcome] of workUnitOutcomesByIndex.entries()) { if (!outcome || outcome.status !== 'success' || !outcome.patchPath) { continue; @@ -1515,6 +1530,34 @@ export class IngestBundleRunner { allowedTargetConnectionIds: slConnectionIds, }; activeFailureDetails = integrationFailureDetails; + const validateAppliedTree = async (touchedPaths: string[]) => { + await validateFinalIngestArtifacts({ + connectionIds: slConnectionIds, + changedWikiPageKeys: this.wikiPageKeysFromPaths(touchedPaths), + touchedSlSources: this.touchedSlSourcesFromPaths(touchedPaths), + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + validateTouchedSources: (touched) => + validateWuTouchedSources( + { + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + connections: this.deps.connections, + configService: sessionWorktree.config, + gitService: sessionWorktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + slValidator: this.deps.slValidator, + }, + touched, + ), + tableExists: (connectionId, tableRef) => + this.tableRefExistsInSemanticLayer( + this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + [connectionId], + tableRef, + ), + }); + }; emitStageProgress( 'integration', 80, @@ -1528,34 +1571,7 @@ export class IngestBundleRunner { author: this.deps.storage.systemGitAuthor, slDisallowed: wu.slDisallowed === true, allowedTargetConnectionIds: new Set(slConnectionIds), - validateAppliedTree: async (touchedPaths) => { - await validateFinalIngestArtifacts({ - connectionIds: slConnectionIds, - changedWikiPageKeys: this.wikiPageKeysFromPaths(touchedPaths), - touchedSlSources: this.touchedSlSourcesFromPaths(touchedPaths), - wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - validateTouchedSources: (touched) => - validateWuTouchedSources( - { - semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - connections: this.deps.connections, - configService: sessionWorktree.config, - gitService: sessionWorktree.git, - slSourcesRepository: this.deps.slSourcesRepository, - probeRowCount: this.deps.settings.probeRowCount, - slValidator: this.deps.slValidator, - }, - touched, - ), - tableExists: (connectionId, tableRef) => - this.tableRefExistsInSemanticLayer( - this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), - [connectionId], - tableRef, - ), - }); - }, + validateAppliedTree, resolveTextualConflict: async (context) => { emitStageProgress('integration', 81, `Resolving text conflict for ${context.unitKey}`); const result = await resolveTextualConflict({ @@ -1578,6 +1594,7 @@ export class IngestBundleRunner { ); return result; }, + deferTextualConflictResolution: shouldDeferTextualConflicts, repairGateFailure: async (context) => { emitStageProgress('integration', 82, `Repairing semantic gate for ${context.unitKey}`); const result = await repairFinalGateFailure({ @@ -1618,6 +1635,20 @@ export class IngestBundleRunner { isolatedDiffSummary.gateRepairFailures += 1; } } + if (integration.status === 'textual_conflict' && integration.deferredTextualResolution) { + deferredTextualConflicts.push({ + order: index, + ...integration.deferredTextualResolution, + integrationFailureDetails, + }); + activeFailureDetails = undefined; + emitStageProgress( + 'integration', + 82, + `Deferred text conflict ${deferredTextualConflicts.length} for ${outcome.unitKey}`, + ); + continue; + } if (integration.status === 'textual_conflict') { isolatedDiffSummary.textualConflicts += 1; await this.deps.runs.markFailed(runRow.id); @@ -1652,6 +1683,166 @@ export class IngestBundleRunner { ); } + if (deferredTextualConflicts.length > 0) { + const batches: typeof deferredTextualConflicts[] = []; + for (const conflict of deferredTextualConflicts.sort((left, right) => left.order - right.order)) { + const conflictPaths = new Set(conflict.touchedPaths); + let placed = false; + for (const batch of batches) { + const overlaps = batch.some((existing) => existing.touchedPaths.some((path) => conflictPaths.has(path))); + if (!overlaps) { + batch.push(conflict); + placed = true; + break; + } + } + if (!placed) { + batches.push([conflict]); + } + } + + for (const batch of batches) { + const resolutions = await runTextualConflictResolvers({ + maxConcurrency: workUnitSettings.resolverConcurrency, + conflicts: batch, + resolve: async (conflict) => { + emitStageProgress('integration', 81, `Resolving text conflict for ${conflict.unitKey}`); + const result = await resolveTextualConflict({ + agentRunner: this.deps.agentRunner, + workdir: sessionWorktree.workdir, + unitKey: conflict.unitKey, + patchPath: conflict.patchPath, + touchedPaths: conflict.touchedPaths, + trace: runTrace, + reason: conflict.reason, + maxAttempts: 1, + stepBudget: 12, + }); + emitStageProgress( + 'integration', + 82, + result.status === 'repaired' + ? `Resolved text conflict for ${conflict.unitKey}` + : `Text conflict resolver failed for ${conflict.unitKey}`, + ); + return result; + }, + }); + + for (const [resolutionIndex, resolution] of resolutions.entries()) { + const conflict = batch[resolutionIndex]; + if (!conflict) { + continue; + } + isolatedDiffSummary.resolverAttempts += resolution.attempts; + if (resolution.status === 'failed') { + isolatedDiffSummary.textualConflicts += 1; + isolatedDiffSummary.resolverFailures += 1; + await this.deps.runs.markFailed(runRow.id); + cleanupOutcome = 'conflict'; + activeFailureDetails = { + ...conflict.integrationFailureDetails, + touchedPaths: conflict.touchedPaths, + reason: resolution.reason, + }; + throw new Error(`isolated diff textual conflict in ${conflict.unitKey}: ${resolution.reason}`); + } + + isolatedDiffSummary.textualConflicts += 1; + isolatedDiffSummary.resolverRepairs += 1; + activeFailureDetails = { + ...conflict.integrationFailureDetails, + touchedPaths: resolution.changedPaths, + reason: conflict.reason, + }; + try { + await traceTimed( + runTrace, + 'integration', + 'semantic_gate_after_textual_resolution', + { unitKey: conflict.unitKey, touchedPaths: resolution.changedPaths }, + async () => { + await validateFinalIngestArtifacts({ + connectionIds: slConnectionIds, + changedWikiPageKeys: this.wikiPageKeysFromPaths(resolution.changedPaths), + touchedSlSources: this.touchedSlSourcesFromPaths(resolution.changedPaths), + wikiService: this.deps.wikiService.forWorktree(sessionWorktree.workdir), + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + validateTouchedSources: (touched) => + validateWuTouchedSources( + { + semanticLayerService: this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + connections: this.deps.connections, + configService: sessionWorktree.config, + gitService: sessionWorktree.git, + slSourcesRepository: this.deps.slSourcesRepository, + probeRowCount: this.deps.settings.probeRowCount, + slValidator: this.deps.slValidator, + }, + touched, + ), + tableExists: (connectionId, tableRef) => + this.tableRefExistsInSemanticLayer( + this.deps.semanticLayerService.forWorktree(sessionWorktree.workdir), + [connectionId], + tableRef, + ), + }); + }, + ); + } catch (semanticError) { + isolatedDiffSummary.semanticConflicts += 1; + await this.deps.runs.markFailed(runRow.id); + cleanupOutcome = 'conflict'; + activeFailureDetails = { + ...conflict.integrationFailureDetails, + touchedPaths: resolution.changedPaths, + reason: semanticError instanceof Error ? semanticError.message : String(semanticError), + }; + throw new Error( + `isolated diff semantic conflict in ${conflict.unitKey}: ${ + semanticError instanceof Error ? semanticError.message : String(semanticError) + }`, + ); + } + + const commit = await sessionWorktree.git.commitFiles( + resolution.changedPaths, + `ingest: resolve WorkUnit ${conflict.unitKey} conflict`, + this.deps.storage.systemGitAuthor.name, + this.deps.storage.systemGitAuthor.email, + ); + if (!commit.created) { + isolatedDiffSummary.resolverFailures += 1; + await this.deps.runs.markFailed(runRow.id); + cleanupOutcome = 'conflict'; + activeFailureDetails = { + ...conflict.integrationFailureDetails, + touchedPaths: resolution.changedPaths, + reason: 'textual resolver produced no committable changes', + }; + throw new Error(`isolated diff textual conflict in ${conflict.unitKey}: textual resolver produced no committable changes`); + } + await runTrace.event('debug', 'integration', 'patch_accepted_after_textual_resolution', { + unitKey: conflict.unitKey, + commitSha: commit.commitHash, + touchedPaths: resolution.changedPaths, + attempts: resolution.attempts, + }); + activeFailureDetails = undefined; + if (resolution.changedPaths.length > 0) { + isolatedDiffSummary.acceptedPatches += 1; + integratedPatchCount += 1; + } + emitStageProgress( + 'integration', + 83, + `Integrated ${integratedPatchCount}/${integrablePatchCount} patches`, + ); + } + } + } + } const carryForwardResult = contextReport && this.deps.contextCandidateCarryforward diff --git a/packages/context/src/ingest/isolated-diff/patch-integrator.test.ts b/packages/context/src/ingest/isolated-diff/patch-integrator.test.ts index d55cfc5b..9bae75b2 100644 --- a/packages/context/src/ingest/isolated-diff/patch-integrator.test.ts +++ b/packages/context/src/ingest/isolated-diff/patch-integrator.test.ts @@ -244,6 +244,66 @@ describe('integrateWorkUnitPatch', () => { expect(await git.revParseHead()).not.toBe(baseSha); }); + it('can defer textual conflict resolution without invoking the resolver inline', async () => { + const { homeDir, configDir, git } = await makeRepo(); + await mkdir(join(configDir, 'wiki/global'), { recursive: true }); + await writeFile(join(configDir, 'wiki/global/a.md'), 'base\n', 'utf-8'); + await git.commitFiles(['wiki/global/a.md'], 'base page', 'System User', 'system@example.com'); + const conflictBase = await git.revParseHead(); + + await writeFile(join(configDir, 'wiki/global/a.md'), 'accepted\n', 'utf-8'); + await git.commitFiles(['wiki/global/a.md'], 'accepted edit', 'System User', 'system@example.com'); + const acceptedHead = await git.revParseHead(); + + const childDir = join(homeDir, 'child-conflict-deferred'); + await git.addWorktree(childDir, 'child-conflict-deferred', conflictBase); + const childGit = git.forWorktree(childDir); + await writeFile(join(childDir, 'wiki/global/a.md'), 'proposal\n', 'utf-8'); + await childGit.commitFiles(['wiki/global/a.md'], 'proposal edit', 'System User', 'system@example.com'); + const patchPath = join(homeDir, 'proposal-deferred.patch'); + await childGit.writeBinaryNoRenamePatch(conflictBase, 'HEAD', patchPath); + + const trace = new FileIngestTraceWriter({ + tracePath: join(homeDir, '.ktx/ingest-traces/job-resolver-deferred/trace.jsonl'), + jobId: 'job-resolver-deferred', + connectionId: 'warehouse', + sourceKey: 'metabase', + level: 'trace', + }); + const resolveTextualConflict = vi.fn(async () => ({ + status: 'failed' as const, + attempts: 1, + reason: 'should not run', + })); + + const result = await integrateWorkUnitPatch({ + unitKey: 'wu-conflict', + patchPath, + integrationGit: git, + trace, + author: { name: 'System User', email: 'system@example.com' }, + slDisallowed: false, + allowedTargetConnectionIds: new Set(['warehouse']), + validateAppliedTree: vi.fn(async () => {}), + resolveTextualConflict, + deferTextualConflictResolution: true, + }); + + expect(result).toMatchObject({ + status: 'textual_conflict', + reason: expect.stringContaining('conflicts'), + touchedPaths: ['wiki/global/a.md'], + deferredTextualResolution: { + unitKey: 'wu-conflict', + patchPath, + touchedPaths: ['wiki/global/a.md'], + }, + }); + expect(resolveTextualConflict).not.toHaveBeenCalled(); + expect(await git.revParseHead()).toBe(acceptedHead); + await expect(readFile(join(configDir, 'wiki/global/a.md'), 'utf-8')).resolves.toBe('accepted\n'); + }); + it('keeps the pre-apply integration tree when the resolver cannot repair a textual conflict', async () => { const { homeDir, configDir, git } = await makeRepo(); await mkdir(join(configDir, 'wiki/global'), { recursive: true }); diff --git a/packages/context/src/ingest/isolated-diff/patch-integrator.ts b/packages/context/src/ingest/isolated-diff/patch-integrator.ts index a4542576..ab16ac9a 100644 --- a/packages/context/src/ingest/isolated-diff/patch-integrator.ts +++ b/packages/context/src/ingest/isolated-diff/patch-integrator.ts @@ -23,6 +23,12 @@ export type PatchIntegrationResult = reason: string; touchedPaths: string[]; textualResolution?: PatchIntegrationTextualResolution; + deferredTextualResolution?: { + unitKey: string; + patchPath: string; + touchedPaths: string[]; + reason: string; + }; gateRepair?: FinalGateRepairResult; } | { @@ -48,6 +54,7 @@ export interface IntegrateWorkUnitPatchInput { touchedPaths: string[]; reason: string; }): Promise; + deferTextualConflictResolution?: boolean; repairGateFailure?(input: { unitKey: string; patchPath: string; @@ -125,6 +132,20 @@ export async function integrateWorkUnitPatch(input: IntegrateWorkUnitPatchInput) }; } + if (input.deferTextualConflictResolution) { + return { + status: 'textual_conflict', + reason, + touchedPaths, + deferredTextualResolution: { + unitKey: input.unitKey, + patchPath: input.patchPath, + touchedPaths, + reason, + }, + }; + } + const textualResolution = await input.resolveTextualConflict({ unitKey: input.unitKey, patchPath: input.patchPath, diff --git a/packages/context/src/ingest/isolated-diff/textual-conflict-resolver.test.ts b/packages/context/src/ingest/isolated-diff/textual-conflict-resolver.test.ts index ae5b4e21..566d4e4d 100644 --- a/packages/context/src/ingest/isolated-diff/textual-conflict-resolver.test.ts +++ b/packages/context/src/ingest/isolated-diff/textual-conflict-resolver.test.ts @@ -3,7 +3,7 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { describe, expect, it, vi } from 'vitest'; import { FileIngestTraceWriter } from '../ingest-trace.js'; -import { resolveTextualConflict } from './textual-conflict-resolver.js'; +import { resolveTextualConflict, runTextualConflictResolvers } from './textual-conflict-resolver.js'; async function makeHarness() { const root = await mkdtemp(join(tmpdir(), 'ktx-textual-resolver-')); @@ -37,6 +37,14 @@ async function makeHarness() { return { root, workdir, patchPath, trace }; } +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + const promise = new Promise((promiseResolve) => { + resolve = promiseResolve; + }); + return { promise, resolve }; +} + describe('resolveTextualConflict', () => { it('lets the repair agent read the failed patch and write only touched paths', async () => { const { workdir, patchPath, trace } = await makeHarness(); @@ -118,3 +126,55 @@ describe('resolveTextualConflict', () => { }); }); }); + +describe('runTextualConflictResolvers', () => { + it('runs disjoint conflicts concurrently and preserves result order', async () => { + const releases = [deferred(), deferred()]; + const starts: string[] = []; + + const run = runTextualConflictResolvers({ + maxConcurrency: 2, + conflicts: [ + { unitKey: 'wu-a', touchedPaths: ['wiki/global/a.md'] }, + { unitKey: 'wu-b', touchedPaths: ['wiki/global/b.md'] }, + ], + resolve: async (conflict, index) => { + starts.push(conflict.unitKey); + await releases[index].promise; + return `${conflict.unitKey}:resolved`; + }, + }); + + await vi.waitFor(() => expect(starts).toEqual(['wu-a', 'wu-b'])); + releases[1].resolve(); + releases[0].resolve(); + + await expect(run).resolves.toEqual(['wu-a:resolved', 'wu-b:resolved']); + }); + + it('serializes overlapping conflicts even when concurrency allows more work', async () => { + const releaseFirst = deferred(); + const starts: string[] = []; + + const run = runTextualConflictResolvers({ + maxConcurrency: 2, + conflicts: [ + { unitKey: 'wu-a', touchedPaths: ['wiki/global/account.md'] }, + { unitKey: 'wu-b', touchedPaths: ['wiki/global/account.md', 'wiki/global/other.md'] }, + ], + resolve: async (conflict) => { + starts.push(conflict.unitKey); + if (conflict.unitKey === 'wu-a') { + await releaseFirst.promise; + } + return `${conflict.unitKey}:resolved`; + }, + }); + + await vi.waitFor(() => expect(starts).toEqual(['wu-a'])); + releaseFirst.resolve(); + + await expect(run).resolves.toEqual(['wu-a:resolved', 'wu-b:resolved']); + expect(starts).toEqual(['wu-a', 'wu-b']); + }); +}); diff --git a/packages/context/src/ingest/isolated-diff/textual-conflict-resolver.ts b/packages/context/src/ingest/isolated-diff/textual-conflict-resolver.ts index c5128291..d93e4061 100644 --- a/packages/context/src/ingest/isolated-diff/textual-conflict-resolver.ts +++ b/packages/context/src/ingest/isolated-diff/textual-conflict-resolver.ts @@ -1,5 +1,6 @@ import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'; import { dirname, join } from 'node:path'; +import pLimit from 'p-limit'; import { z } from 'zod'; import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../llm/index.js'; import type { IngestTraceWriter } from '../ingest-trace.js'; @@ -21,6 +22,12 @@ export interface ResolveTextualConflictInput { stepBudget?: number; } +export interface TextualConflictResolverQueueInput { + conflicts: TConflict[]; + maxConcurrency: number; + resolve(conflict: TConflict, index: number): Promise; +} + const readIntegrationFileSchema = z.object({ path: z.string().min(1), }); @@ -51,6 +58,69 @@ function assertAllowedPath(path: string, allowedPaths: ReadonlySet): str return normalized; } +function hasPathOverlap(left: ReadonlySet, right: ReadonlySet): boolean { + for (const path of left) { + if (right.has(path)) { + return true; + } + } + return false; +} + +export async function runTextualConflictResolvers( + input: TextualConflictResolverQueueInput, +): Promise { + const maxConcurrency = Math.max(1, input.maxConcurrency); + const limit = pLimit(maxConcurrency); + const results: TResult[] = []; + const pending = input.conflicts.map((conflict, index) => ({ + conflict, + index, + touchedPaths: new Set(conflict.touchedPaths.map(normalizeRepoPath)), + })); + const activePaths = new Set(); + const active: Promise[] = []; + + await new Promise((resolve, reject) => { + const pump = () => { + if (pending.length === 0 && active.length === 0) { + resolve(); + return; + } + + for (let i = 0; i < pending.length && active.length < maxConcurrency; ) { + const candidate = pending[i]; + if (!candidate || hasPathOverlap(candidate.touchedPaths, activePaths)) { + i += 1; + continue; + } + + pending.splice(i, 1); + for (const path of candidate.touchedPaths) { + activePaths.add(path); + } + + const task = limit(async () => { + results[candidate.index] = await input.resolve(candidate.conflict, candidate.index); + }) + .then(() => { + for (const path of candidate.touchedPaths) { + activePaths.delete(path); + } + active.splice(active.indexOf(task), 1); + pump(); + }) + .catch(reject); + active.push(task); + } + }; + + pump(); + }); + + return results; +} + async function readOptionalFile(path: string): Promise<{ exists: boolean; content: string }> { try { return { exists: true, content: await readFile(path, 'utf-8') }; diff --git a/packages/context/src/ingest/local-bundle-runtime.test.ts b/packages/context/src/ingest/local-bundle-runtime.test.ts index a8ec8c20..53f1c8c0 100644 --- a/packages/context/src/ingest/local-bundle-runtime.test.ts +++ b/packages/context/src/ingest/local-bundle-runtime.test.ts @@ -5,7 +5,7 @@ import type { AgentRunnerPort } from '../llm/index.js'; import { initKtxProject, type KtxLocalProject, loadKtxProject } from '../project/index.js'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { FakeSourceAdapter } from './adapters/fake/fake.adapter.js'; -import { createLocalBundleIngestRuntime } from './local-bundle-runtime.js'; +import { createLocalBundleIngestRuntime, InProcessIngestLock } from './local-bundle-runtime.js'; type RuntimeWithConnectionDeps = { deps: { @@ -39,6 +39,84 @@ function testAgentRunner(): AgentRunnerPort { return { runLoop: vi.fn().mockResolvedValue({ stopReason: 'natural' as const }) }; } +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + const promise = new Promise((promiseResolve) => { + resolve = promiseResolve; + }); + return { promise, resolve }; +} + +describe('InProcessIngestLock', () => { + it('serializes operations for the same key while allowing different keys to overlap', async () => { + const lock = new InProcessIngestLock(); + const firstEntered = deferred(); + const releaseFirst = deferred(); + const secondEntered = deferred(); + const otherEntered = deferred(); + const events: string[] = []; + + const first = lock.withLock('config:repo', async () => { + events.push('first:start'); + firstEntered.resolve(); + await releaseFirst.promise; + events.push('first:end'); + return 'first'; + }); + await firstEntered.promise; + + const second = lock.withLock('config:repo', async () => { + events.push('second:start'); + secondEntered.resolve(); + return 'second'; + }); + const other = lock.withLock('other:key', async () => { + events.push('other:start'); + otherEntered.resolve(); + return 'other'; + }); + + await otherEntered.promise; + expect(events).toEqual(['first:start', 'other:start']); + + releaseFirst.resolve(); + await secondEntered.promise; + await expect(Promise.all([first, second, other])).resolves.toEqual(['first', 'second', 'other']); + expect(events).toEqual(['first:start', 'other:start', 'first:end', 'second:start']); + }); + + it('serializes operations for the same key across lock instances', async () => { + const firstLock = new InProcessIngestLock(); + const secondLock = new InProcessIngestLock(); + const firstEntered = deferred(); + const releaseFirst = deferred(); + const secondEntered = deferred(); + const events: string[] = []; + + const first = firstLock.withLock('config:repo', async () => { + events.push('first:start'); + firstEntered.resolve(); + await releaseFirst.promise; + events.push('first:end'); + return 'first'; + }); + await firstEntered.promise; + + const second = secondLock.withLock('config:repo', async () => { + events.push('second:start'); + secondEntered.resolve(); + return 'second'; + }); + + await Promise.resolve(); + expect(events).toEqual(['first:start']); + releaseFirst.resolve(); + await secondEntered.promise; + await expect(Promise.all([first, second])).resolves.toEqual(['first', 'second']); + expect(events).toEqual(['first:start', 'first:end', 'second:start']); + }); +}); + describe('createLocalBundleIngestRuntime', () => { let tempDir: string; let project: KtxLocalProject; @@ -281,6 +359,7 @@ describe('createLocalBundleIngestRuntime', () => { 'probeRowCount', 'workUnitFailureMode', 'workUnitMaxConcurrency', + 'workUnitResolverConcurrency', 'workUnitStepBudget', ]); }); diff --git a/packages/context/src/ingest/local-bundle-runtime.ts b/packages/context/src/ingest/local-bundle-runtime.ts index f8af0696..4057012c 100644 --- a/packages/context/src/ingest/local-bundle-runtime.ts +++ b/packages/context/src/ingest/local-bundle-runtime.ts @@ -163,9 +163,29 @@ class LocalIngestStorage implements IngestStoragePort { } } -class LocalIngestLock implements IngestLockPort { - async withLock(_key: string, fn: () => Promise): Promise { - return fn(); +export class InProcessIngestLock implements IngestLockPort { + private static readonly queues = new Map>(); + + async withLock(key: string, fn: () => Promise): Promise { + const previous = InProcessIngestLock.queues.get(key) ?? Promise.resolve(); + let release: () => void = () => {}; + const current = previous.catch(() => undefined).then( + () => + new Promise((resolve) => { + release = resolve; + }), + ); + InProcessIngestLock.queues.set(key, current); + + await previous.catch(() => undefined); + try { + return await fn(); + } finally { + release(); + if (InProcessIngestLock.queues.get(key) === current) { + InProcessIngestLock.queues.delete(key); + } + } } } @@ -714,12 +734,13 @@ export function createLocalBundleIngestRuntime( }), agentRunner, gitService: options.project.git, - lockingService: new LocalIngestLock(), + lockingService: new InProcessIngestLock(), storage, settings: { memoryIngestionModel: options.project.config.llm.models.default ?? 'local-ingest-model', probeRowCount: 0, workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency, + workUnitResolverConcurrency: options.project.config.ingest.workUnits.resolverConcurrency, workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget, workUnitFailureMode: options.project.config.ingest.workUnits.failureMode, ingestTraceLevel: ingestTraceLevelFromEnv(), diff --git a/packages/context/src/ingest/ports.ts b/packages/context/src/ingest/ports.ts index 32410cbc..3c89081b 100644 --- a/packages/context/src/ingest/ports.ts +++ b/packages/context/src/ingest/ports.ts @@ -141,6 +141,7 @@ export interface IngestSettingsPort { memoryIngestionModel: string; probeRowCount: number; workUnitMaxConcurrency?: number; + workUnitResolverConcurrency?: number; workUnitStepBudget?: number; workUnitFailureMode?: 'abort' | 'continue'; ingestTraceLevel?: IngestTraceLevel; diff --git a/packages/context/src/project/config.test.ts b/packages/context/src/project/config.test.ts index 3967b363..28dd78ec 100644 --- a/packages/context/src/project/config.test.ts +++ b/packages/context/src/project/config.test.ts @@ -49,8 +49,12 @@ connections: workUnits: { stepBudget: 40, maxConcurrency: 1, + resolverConcurrency: 1, failureMode: 'continue', }, + sources: { + maxConcurrency: 1, + }, }, agent: { run_research: { @@ -155,10 +159,60 @@ ingest: expect(config.ingest.workUnits).toEqual({ stepBudget: 30, maxConcurrency: 2, + resolverConcurrency: 2, failureMode: 'abort', }); }); + it('parses ingest source and resolver concurrency knobs', () => { + const config = parseKtxProjectConfig(` +ingest: + sources: + maxConcurrency: 4 + workUnits: + maxConcurrency: 6 + resolverConcurrency: 3 +`); + + expect(config.ingest.sources).toEqual({ + maxConcurrency: 4, + }); + expect(config.ingest.workUnits).toEqual({ + stepBudget: 40, + maxConcurrency: 6, + resolverConcurrency: 3, + failureMode: 'continue', + }); + }); + + it('defaults resolver concurrency to work-unit concurrency', () => { + const config = parseKtxProjectConfig(` +ingest: + workUnits: + maxConcurrency: 5 +`); + + expect(config.ingest.workUnits.resolverConcurrency).toBe(5); + }); + + it('rejects concurrency values above the configured caps', () => { + const validation = validateKtxProjectConfig(` +ingest: + sources: + maxConcurrency: 9 + workUnits: + resolverConcurrency: 9 +`); + + expect(validation).toEqual({ + ok: false, + issues: expect.arrayContaining([ + expect.objectContaining({ path: 'ingest.sources.maxConcurrency' }), + expect.objectContaining({ path: 'ingest.workUnits.resolverConcurrency' }), + ]), + }); + }); + it('parses global Vertex LLM config', () => { const config = parseKtxProjectConfig(` llm: @@ -535,6 +589,13 @@ describe('generateKtxProjectConfigJsonSchema', () => { expect(relationships?.properties?.acceptThreshold?.description).toMatch(/auto-accepted/); }); + it('emits ingest concurrency caps in the generated schema', () => { + const serialized = JSON.stringify(schema); + expect(serialized).toContain('"sources"'); + expect(serialized).toContain('"resolverConcurrency"'); + expect(serialized).toContain('"maximum":8'); + }); + it('emits the mappings shapes under connections', () => { const serialized = JSON.stringify(schema); expect(serialized).toContain('databaseMappings'); diff --git a/packages/context/src/project/config.ts b/packages/context/src/project/config.ts index 912c31de..9e375aeb 100644 --- a/packages/context/src/project/config.ts +++ b/packages/context/src/project/config.ts @@ -93,14 +93,35 @@ const embeddingSchema = z const workUnitsSchema = z .strictObject({ stepBudget: z.int().positive().default(40).describe('Maximum number of agent steps allowed per work unit before it is force-terminated.'), - maxConcurrency: z.int().positive().default(1).describe('Maximum number of work units run concurrently during ingest.'), + maxConcurrency: z.int().positive().max(8).default(1).describe('Maximum number of work units run concurrently during ingest.'), + resolverConcurrency: z + .int() + .positive() + .max(8) + .optional() + .describe('Maximum number of textual conflict resolvers run concurrently during ingest. Defaults to maxConcurrency.'), failureMode: z .enum(KTX_WORK_UNIT_FAILURE_MODES) .default('continue') .describe('Behavior when a work unit fails: "abort" stops the whole ingest run; "continue" records the failure and keeps going.'), }) + .transform((workUnits) => ({ + ...workUnits, + resolverConcurrency: workUnits.resolverConcurrency ?? workUnits.maxConcurrency, + })) .describe('Concurrency and failure handling for ingest work units.'); +const sourcesSchema = z + .strictObject({ + maxConcurrency: z + .int() + .positive() + .max(8) + .default(1) + .describe('Maximum number of ingest sources run concurrently by `ktx ingest --all`.'), + }) + .describe('Concurrency policy for top-level ingest sources.'); + const ingestSchema = z .strictObject({ adapters: z @@ -111,6 +132,7 @@ const ingestSchema = z .prefault({ backend: 'deterministic', model: 'deterministic' }) .describe('Embedding configuration used when ingest adapters need to embed documents.'), workUnits: workUnitsSchema.prefault({}).describe('Concurrency and failure handling for ingest work units.'), + sources: sourcesSchema.prefault({}).describe('Concurrency policy for top-level ingest sources.'), }) .describe('Ingest pipeline configuration: adapters, embeddings, and work-unit policy.'); @@ -260,6 +282,7 @@ export type KtxProjectLlmProviderConfig = z.infer; export type KtxProjectEmbeddingConfig = z.infer; export type KtxScanEnrichmentConfig = z.infer; export type KtxIngestWorkUnitsConfig = z.infer; +export type KtxIngestSourcesConfig = z.infer; export type KtxScanRelationshipConfig = z.infer; export type KtxProjectScanConfig = z.infer; export type KtxProjectConnectionConfig = z.infer; @@ -384,6 +407,7 @@ export function serializeKtxProjectConfig(config: KtxProjectConfig): string { ingest: { embeddings: config.ingest.embeddings, workUnits: config.ingest.workUnits, + sources: config.ingest.sources, }, } : config; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 094c1deb..17f795cc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -142,6 +142,9 @@ importers: ink: specifier: ^7.0.2 version: 7.0.2(@types/react@19.2.14)(react@19.2.6) + p-limit: + specifier: ^7.3.0 + version: 7.3.0 react: specifier: ^19.2.6 version: 19.2.6