From 3e6b2af4e9debac3b63cf89355a79a3ca51d9056 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 19:14:54 +0200 Subject: [PATCH] engine: serialize concurrent branch merges via merge_exclusive mutex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the Cursor Bugbot HIGH on commit 22d76db (round 2 review): `branch_merge_impl` at `crates/omnigraph/src/exec/merge.rs:1085-1100` still used the swap_coordinator_for_branch + operate + restore_coordinator pattern across three separate `coordinator.write().await` acquisitions. Two concurrent merges with distinct targets would interleave their swaps, leaving each merge's body running against the other's swapped coord — A's `feature_a → target_a` would land its rewrite in target_b instead. Adds `merge_exclusive: Arc>` to `Omnigraph`, held across the entire swap → operate → restore window in `branch_merge_impl`. Concurrent branch merges now serialize relative to each other; everything else (per-(table, branch) writer queues, /change, /ingest) is unaffected. Why the mutex rather than the deeper "operate on local coord" refactor (the round-1 fix shape applied to `branch_create_from`): `branch_merge_on_current_target` calls `self.snapshot()` and `self.ensure_commit_graph_initialized()` internally, which use `self.coordinator` directly. Threading an explicit target coord parameter through the merge body would unwind dozens of call sites. The mutex is a smaller intrusion that fully closes the race. Documented as a follow-up if telemetry shows merge concurrency matters. Pinned by `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other` (previous commit). Pre-fix: M=4 iterations of concurrent merges deterministically corrupted target row counts. Post-fix: all M iterations land each merge on its declared target. The two adjacent branch concurrency tests (`concurrent_change_during_branch_merge_preserves_writes`, `concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator`) still pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/db/omnigraph.rs | 33 ++++++++++++++++++++++++++++ crates/omnigraph/src/exec/merge.rs | 10 +++++++++ 2 files changed, 43 insertions(+) diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index f2082c4..50d4963 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -106,6 +106,28 @@ pub struct Omnigraph { /// ensure_indices, delete_where) and from future MR-870 recovery /// reconciler. PR 1b adds the field; callers acquire in commits 4+. write_queue: Arc, + /// Process-wide mutex held across the swap → operate → restore window + /// in `branch_merge_impl`. Two concurrent merges with distinct targets + /// would otherwise interleave their three separate + /// `coordinator.write().await` acquisitions, leaving each merge's + /// inner body running against the other's swapped coord. Pinned by + /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other` + /// in `crates/omnigraph-server/tests/server.rs`. + /// + /// Cost: serializes ALL concurrent branch merges process-wide. + /// Acceptable because branch merges are heavy (table rewrites, index + /// rebuilds), per-(table, branch) queues inside `commit_all` already + /// serialize the data path, and merges are rare relative to /change + /// or /ingest. A finer-grained per-target-branch mutex is a follow-up + /// if telemetry shows merge concurrency matters. + /// + /// The deeper fix — refactor `branch_merge_on_current_target` to take + /// an explicit target coord parameter so `self.coordinator` is never + /// used as scratch space — is the round-1 shape applied to + /// `branch_create_from_impl`. Deferred because it requires unwinding + /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()` + /// call inside the merge body. + merge_exclusive: Arc>, } /// Whether [`Omnigraph::open`] runs the open-time recovery sweep. @@ -161,6 +183,7 @@ impl Omnigraph { catalog: Arc::new(ArcSwap::from_pointee(catalog)), schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())), write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), + merge_exclusive: Arc::new(tokio::sync::Mutex::new(())), }) } @@ -247,6 +270,7 @@ impl Omnigraph { catalog: Arc::new(ArcSwap::from_pointee(catalog)), schema_source: Arc::new(ArcSwap::from_pointee(schema_source)), write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), + merge_exclusive: Arc::new(tokio::sync::Mutex::new(())), }) } @@ -333,6 +357,15 @@ impl Omnigraph { Arc::clone(&self.write_queue) } + /// Engine-internal access to the merge-exclusive mutex. Held across + /// the swap → operate → restore window in `branch_merge_impl` so + /// concurrent merges with distinct targets don't corrupt + /// `self.coordinator` mid-operation. See the field doc on + /// `Omnigraph::merge_exclusive` for the full design rationale. + pub(crate) fn merge_exclusive(&self) -> Arc> { + Arc::clone(&self.merge_exclusive) + } + /// Engine-level access to the repo's normalized root URI. Used by /// the recovery sidecar protocol to compute `__recovery/` paths. pub(crate) fn root_uri(&self) -> &str { diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index ec02e83..e81fb0b 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1083,6 +1083,16 @@ impl Omnigraph { )) .await? .snapshot; + // Hold the merge-exclusive mutex across the full swap → operate + // → restore window. Two concurrent branch_merge calls would + // otherwise interleave their three separate `coordinator.write()` + // acquisitions, leaving each merge's body running against the + // other's swapped coord. Pinned by + // `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other` + // in `crates/omnigraph-server/tests/server.rs`. + let merge_exclusive = self.merge_exclusive(); + let _merge_guard = merge_exclusive.lock().await; + let previous_branch = self.active_branch().await; let previous = self .swap_coordinator_for_branch(target_branch.as_deref())