From 2b2e72312510e57af2272d55aaf238a4c804dde9 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 19:12:03 +0200 Subject: [PATCH] tests: pin branch_merge swap-restore race (red) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per AGENTS.md rule 8, this commit lands the failing regression test ahead of the fix. Cursor Bugbot HIGH on commit 22d76db rediscovered the residual flagged in the round 1 honest-review note: `branch_merge_impl` at `crates/omnigraph/src/exec/merge.rs:1085-1100` still uses the swap_coordinator_for_branch + operate + restore_coordinator pattern across three separate `coordinator.write().await` acquisitions. The same shape that branch_create_from_impl shed in commit 4ffbf6e. The test spawns two concurrent /branches/merge calls A (feature-a → target-a) and B (feature-b → target-b) aligned at a tokio::sync::Barrier so both reach swap_coordinator_for_branch close in time. M=4 iterations boost race-catching odds. Currently fails on 22d76db with target-a=5, target-b=4: B's merge landed on the wrong coord — target-b never got Frank because A's swap pushed self.coordinator to target-a, B's swap captured target-a as B's "previous", and B's restore set self.coordinator back to target-a (not the original main). Subsequent operations using self.coordinator point at the wrong branch. Fix lands in the next commit: serialize concurrent branch merges via `merge_exclusive: Arc>` held across the entire swap-operate-restore window. Closes the bug class "non-atomic three-step coordinator manipulation" for branch_merge by serializing merges relative to each other; per-(table, branch) queue inside the merge body still lets merges and other writers run concurrently. A deeper "operate on local coord" refactor (the round-1 fix shape for branch_create_from) requires unwinding `branch_merge_on_current_target` and its uses of `self.snapshot()` / `self.ensure_commit_graph_initialized()`; deferred to a follow-up. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/tests/server.rs | 222 ++++++++++++++++++++++++ 1 file changed, 222 insertions(+) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 0cfab94..91743c7 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2697,6 +2697,228 @@ async fn concurrent_change_during_branch_merge_preserves_writes() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other() { + // Pin the `branch_merge_impl` swap-restore atomicity invariant. + // Round 2 of the PR review left this race deferred: `merge.rs:1085-1100` + // uses three separate `coordinator.write().await` acquisitions + // (swap → operate → restore) — the same shape that + // `branch_create_from_impl` shed in round 1. + // + // Pre-fix race: two concurrent `branch_merge` calls A and B with + // distinct targets target_a, target_b. A's swap captures the + // currently-bound coord as previous_A and replaces self.coordinator + // with target_a. Before A's operate runs, B's swap captures + // (now) target_a as previous_B and replaces self.coordinator with + // target_b. A's `branch_merge_on_current_target` then runs against + // target_b's coord — A merges its source INTO target_b, not target_a. + // + // Post-fix invariant: branch_merge_impl serializes via + // `merge_exclusive: Arc>` held across the + // entire swap-operate-restore window. Other writers (the per-table + // queue, /change, /ingest) are unaffected — only concurrent merges + // serialize. + // + // Setup: main + 2 source branches (feature_a with Eve, feature_b with + // Frank) + 2 untouched targets (target_a, target_b). Concurrent + // merges feature_a→target_a and feature_b→target_b. Aligned at a + // tokio::sync::Barrier so both swaps land close in time. + let temp = init_loaded_repo().await; + let repo = repo_path(temp.path()); + let state = AppState::open(repo.to_string_lossy().to_string()) + .await + .unwrap(); + let app = build_app(state); + + async fn do_create_branch(app: &Router, from: &str, name: &str) { + let body = serde_json::to_vec(&BranchCreateRequest { + from: Some(from.to_string()), + name: name.to_string(), + }) + .unwrap(); + let r = app + .clone() + .oneshot( + Request::builder() + .uri("/branches") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(r.status(), StatusCode::OK, "create {} from {} failed", name, from); + } + + async fn do_insert_person(app: &Router, branch: &str, name: &str, age: i32) { + let body = serde_json::to_vec(&ChangeRequest { + query_source: MUTATION_QUERIES.to_string(), + query_name: Some("insert_person".to_string()), + params: Some(json!({ "name": name, "age": age })), + branch: Some(branch.to_string()), + }) + .unwrap(); + let r = app + .clone() + .oneshot( + Request::builder() + .uri("/change") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(r.status(), StatusCode::OK, "insert {} on {} failed", name, branch); + } + + async fn person_count(app: &Router, branch: &str) -> u64 { + let uri = format!("/snapshot?branch={}", branch); + let r = app + .clone() + .oneshot( + Request::builder() + .uri(uri) + .method(Method::GET) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(r.status(), StatusCode::OK, "snapshot {} failed", branch); + let body = to_bytes(r.into_body(), usize::MAX).await.unwrap(); + let v: Value = serde_json::from_slice(&body).unwrap(); + v["tables"] + .as_array() + .and_then(|tables| { + tables + .iter() + .find(|t| t["table_key"].as_str() == Some("node:Person")) + }) + .and_then(|t| t["row_count"].as_u64()) + .unwrap_or_else(|| panic!("snapshot {} missing node:Person", branch)) + } + + // test.jsonl seeds 4 Persons. + const SEED: u64 = 4; + + // Set up 4 child branches: 2 sources (each with one new Person), 2 + // targets (forked from main, untouched). + do_create_branch(&app, "main", "feature-a").await; + do_insert_person(&app, "feature-a", "Eve", 22).await; + do_create_branch(&app, "main", "feature-b").await; + do_insert_person(&app, "feature-b", "Frank", 33).await; + do_create_branch(&app, "main", "target-a").await; + do_create_branch(&app, "main", "target-b").await; + + assert_eq!(person_count(&app, "feature-a").await, SEED + 1); + assert_eq!(person_count(&app, "feature-b").await, SEED + 1); + assert_eq!(person_count(&app, "target-a").await, SEED); + assert_eq!(person_count(&app, "target-b").await, SEED); + + // Concurrent merges aligned at a barrier so both reach the + // swap_coordinator_for_branch call close in time. Repeat M=4 times to + // boost the probability of catching the race. Recreate the + // target/source branches each iteration to keep the post-condition + // checks tight. + const M: usize = 4; + for iter in 0..M { + let target_a = format!("target-a-iter{iter}"); + let target_b = format!("target-b-iter{iter}"); + let source_a = format!("feature-a-iter{iter}"); + let source_b = format!("feature-b-iter{iter}"); + + do_create_branch(&app, "main", &source_a).await; + do_insert_person(&app, &source_a, &format!("Eve-{iter}"), 22).await; + do_create_branch(&app, "main", &source_b).await; + do_insert_person(&app, &source_b, &format!("Frank-{iter}"), 33).await; + do_create_branch(&app, "main", &target_a).await; + do_create_branch(&app, "main", &target_b).await; + + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + let app_a = app.clone(); + let app_b = app.clone(); + let barrier_a = Arc::clone(&barrier); + let barrier_b = Arc::clone(&barrier); + let target_a_owned = target_a.clone(); + let target_b_owned = target_b.clone(); + let source_a_owned = source_a.clone(); + let source_b_owned = source_b.clone(); + + let h_a = tokio::spawn(async move { + barrier_a.wait().await; + let body = serde_json::to_vec(&BranchMergeRequest { + source: source_a_owned, + target: Some(target_a_owned), + }) + .unwrap(); + app_a + .oneshot( + Request::builder() + .uri("/branches/merge") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap() + .status() + }); + let h_b = tokio::spawn(async move { + barrier_b.wait().await; + let body = serde_json::to_vec(&BranchMergeRequest { + source: source_b_owned, + target: Some(target_b_owned), + }) + .unwrap(); + app_b + .oneshot( + Request::builder() + .uri("/branches/merge") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap() + .status() + }); + + assert_eq!(h_a.await.unwrap(), StatusCode::OK, "iter {iter} merge A"); + assert_eq!(h_b.await.unwrap(), StatusCode::OK, "iter {iter} merge B"); + + // Post-condition: each target has exactly its declared source's + // contribution. Pre-fix race: A's merge runs against target_b's + // swapped coord, so feature-a's row lands in target-b instead of + // target-a. Observable as target-a == 4 (unchanged) and + // target-b == 6 (both Eve and Frank). + let count_a = person_count(&app, &target_a).await; + let count_b = person_count(&app, &target_b).await; + assert_eq!( + count_a, + SEED + 1, + "iter {iter}: target-a must reflect feature-a's merge \ + (Eve only); pre-fix race would leave it at SEED ({}) and \ + pile both feature-a's and feature-b's rows into target-b. \ + Got target-a={count_a}, target-b={count_b}", + SEED, + ); + assert_eq!( + count_b, + SEED + 1, + "iter {iter}: target-b must reflect feature-b's merge \ + (Frank only); pre-fix race would leave it at SEED+2 if A's \ + merge ran against target-b's swapped coord. \ + Got target-a={count_a}, target-b={count_b}", + + ); + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn change_disjoint_table_concurrency_succeeds_at_http_level() { // HTTP-level pin for MR-686's disjoint-table promise: concurrent /change