diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index f2082c4..50d4963 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -106,6 +106,28 @@ pub struct Omnigraph { /// ensure_indices, delete_where) and from future MR-870 recovery /// reconciler. PR 1b adds the field; callers acquire in commits 4+. write_queue: Arc, + /// Process-wide mutex held across the swap → operate → restore window + /// in `branch_merge_impl`. Two concurrent merges with distinct targets + /// would otherwise interleave their three separate + /// `coordinator.write().await` acquisitions, leaving each merge's + /// inner body running against the other's swapped coord. Pinned by + /// `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other` + /// in `crates/omnigraph-server/tests/server.rs`. + /// + /// Cost: serializes ALL concurrent branch merges process-wide. + /// Acceptable because branch merges are heavy (table rewrites, index + /// rebuilds), per-(table, branch) queues inside `commit_all` already + /// serialize the data path, and merges are rare relative to /change + /// or /ingest. A finer-grained per-target-branch mutex is a follow-up + /// if telemetry shows merge concurrency matters. + /// + /// The deeper fix — refactor `branch_merge_on_current_target` to take + /// an explicit target coord parameter so `self.coordinator` is never + /// used as scratch space — is the round-1 shape applied to + /// `branch_create_from_impl`. Deferred because it requires unwinding + /// every `self.snapshot()` and `self.ensure_commit_graph_initialized()` + /// call inside the merge body. + merge_exclusive: Arc>, } /// Whether [`Omnigraph::open`] runs the open-time recovery sweep. @@ -161,6 +183,7 @@ impl Omnigraph { catalog: Arc::new(ArcSwap::from_pointee(catalog)), schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())), write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), + merge_exclusive: Arc::new(tokio::sync::Mutex::new(())), }) } @@ -247,6 +270,7 @@ impl Omnigraph { catalog: Arc::new(ArcSwap::from_pointee(catalog)), schema_source: Arc::new(ArcSwap::from_pointee(schema_source)), write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), + merge_exclusive: Arc::new(tokio::sync::Mutex::new(())), }) } @@ -333,6 +357,15 @@ impl Omnigraph { Arc::clone(&self.write_queue) } + /// Engine-internal access to the merge-exclusive mutex. Held across + /// the swap → operate → restore window in `branch_merge_impl` so + /// concurrent merges with distinct targets don't corrupt + /// `self.coordinator` mid-operation. See the field doc on + /// `Omnigraph::merge_exclusive` for the full design rationale. + pub(crate) fn merge_exclusive(&self) -> Arc> { + Arc::clone(&self.merge_exclusive) + } + /// Engine-level access to the repo's normalized root URI. Used by /// the recovery sidecar protocol to compute `__recovery/` paths. pub(crate) fn root_uri(&self) -> &str { diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index ec02e83..e81fb0b 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1083,6 +1083,16 @@ impl Omnigraph { )) .await? .snapshot; + // Hold the merge-exclusive mutex across the full swap → operate + // → restore window. Two concurrent branch_merge calls would + // otherwise interleave their three separate `coordinator.write()` + // acquisitions, leaving each merge's body running against the + // other's swapped coord. Pinned by + // `concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other` + // in `crates/omnigraph-server/tests/server.rs`. + let merge_exclusive = self.merge_exclusive(); + let _merge_guard = merge_exclusive.lock().await; + let previous_branch = self.active_branch().await; let previous = self .swap_coordinator_for_branch(target_branch.as_deref())