tests: pin branch_merge swap-restore race (red)

Per AGENTS.md rule 8, this commit lands the failing regression test
ahead of the fix.

Cursor Bugbot HIGH on commit 22d76db rediscovered the residual flagged
in the round 1 honest-review note: `branch_merge_impl` at
`crates/omnigraph/src/exec/merge.rs:1085-1100` still uses the
swap_coordinator_for_branch + operate + restore_coordinator pattern
across three separate `coordinator.write().await` acquisitions. The
same shape that branch_create_from_impl shed in commit 4ffbf6e.

The test spawns two concurrent /branches/merge calls A (feature-a →
target-a) and B (feature-b → target-b) aligned at a tokio::sync::Barrier
so both reach swap_coordinator_for_branch close in time. M=4
iterations boost race-catching odds.

Currently fails on 22d76db with target-a=5, target-b=4: B's merge
landed on the wrong coord — target-b never got Frank because A's
swap pushed self.coordinator to target-a, B's swap captured target-a
as B's "previous", and B's restore set self.coordinator back to
target-a (not the original main). Subsequent operations using
self.coordinator point at the wrong branch.

Fix lands in the next commit: serialize concurrent branch merges via
`merge_exclusive: Arc<tokio::sync::Mutex<()>>` held across the entire
swap-operate-restore window. Closes the bug class "non-atomic
three-step coordinator manipulation" for branch_merge by serializing
merges relative to each other; per-(table, branch) queue inside the
merge body still lets merges and other writers run concurrently.

A deeper "operate on local coord" refactor (the round-1 fix shape for
branch_create_from) requires unwinding `branch_merge_on_current_target`
and its uses of `self.snapshot()` / `self.ensure_commit_graph_initialized()`;
deferred to a follow-up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 19:12:03 +02:00
parent 22d76dbb40
commit 2b2e723125
No known key found for this signature in database

View file

@ -2697,6 +2697,228 @@ async fn concurrent_change_during_branch_merge_preserves_writes() {
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other() {
// Pin the `branch_merge_impl` swap-restore atomicity invariant.
// Round 2 of the PR review left this race deferred: `merge.rs:1085-1100`
// uses three separate `coordinator.write().await` acquisitions
// (swap → operate → restore) — the same shape that
// `branch_create_from_impl` shed in round 1.
//
// Pre-fix race: two concurrent `branch_merge` calls A and B with
// distinct targets target_a, target_b. A's swap captures the
// currently-bound coord as previous_A and replaces self.coordinator
// with target_a. Before A's operate runs, B's swap captures
// (now) target_a as previous_B and replaces self.coordinator with
// target_b. A's `branch_merge_on_current_target` then runs against
// target_b's coord — A merges its source INTO target_b, not target_a.
//
// Post-fix invariant: branch_merge_impl serializes via
// `merge_exclusive: Arc<tokio::sync::Mutex<()>>` held across the
// entire swap-operate-restore window. Other writers (the per-table
// queue, /change, /ingest) are unaffected — only concurrent merges
// serialize.
//
// Setup: main + 2 source branches (feature_a with Eve, feature_b with
// Frank) + 2 untouched targets (target_a, target_b). Concurrent
// merges feature_a→target_a and feature_b→target_b. Aligned at a
// tokio::sync::Barrier so both swaps land close in time.
let temp = init_loaded_repo().await;
let repo = repo_path(temp.path());
let state = AppState::open(repo.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
async fn do_create_branch(app: &Router, from: &str, name: &str) {
let body = serde_json::to_vec(&BranchCreateRequest {
from: Some(from.to_string()),
name: name.to_string(),
})
.unwrap();
let r = app
.clone()
.oneshot(
Request::builder()
.uri("/branches")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::OK, "create {} from {} failed", name, from);
}
async fn do_insert_person(app: &Router, branch: &str, name: &str, age: i32) {
let body = serde_json::to_vec(&ChangeRequest {
query_source: MUTATION_QUERIES.to_string(),
query_name: Some("insert_person".to_string()),
params: Some(json!({ "name": name, "age": age })),
branch: Some(branch.to_string()),
})
.unwrap();
let r = app
.clone()
.oneshot(
Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::OK, "insert {} on {} failed", name, branch);
}
async fn person_count(app: &Router, branch: &str) -> u64 {
let uri = format!("/snapshot?branch={}", branch);
let r = app
.clone()
.oneshot(
Request::builder()
.uri(uri)
.method(Method::GET)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::OK, "snapshot {} failed", branch);
let body = to_bytes(r.into_body(), usize::MAX).await.unwrap();
let v: Value = serde_json::from_slice(&body).unwrap();
v["tables"]
.as_array()
.and_then(|tables| {
tables
.iter()
.find(|t| t["table_key"].as_str() == Some("node:Person"))
})
.and_then(|t| t["row_count"].as_u64())
.unwrap_or_else(|| panic!("snapshot {} missing node:Person", branch))
}
// test.jsonl seeds 4 Persons.
const SEED: u64 = 4;
// Set up 4 child branches: 2 sources (each with one new Person), 2
// targets (forked from main, untouched).
do_create_branch(&app, "main", "feature-a").await;
do_insert_person(&app, "feature-a", "Eve", 22).await;
do_create_branch(&app, "main", "feature-b").await;
do_insert_person(&app, "feature-b", "Frank", 33).await;
do_create_branch(&app, "main", "target-a").await;
do_create_branch(&app, "main", "target-b").await;
assert_eq!(person_count(&app, "feature-a").await, SEED + 1);
assert_eq!(person_count(&app, "feature-b").await, SEED + 1);
assert_eq!(person_count(&app, "target-a").await, SEED);
assert_eq!(person_count(&app, "target-b").await, SEED);
// Concurrent merges aligned at a barrier so both reach the
// swap_coordinator_for_branch call close in time. Repeat M=4 times to
// boost the probability of catching the race. Recreate the
// target/source branches each iteration to keep the post-condition
// checks tight.
const M: usize = 4;
for iter in 0..M {
let target_a = format!("target-a-iter{iter}");
let target_b = format!("target-b-iter{iter}");
let source_a = format!("feature-a-iter{iter}");
let source_b = format!("feature-b-iter{iter}");
do_create_branch(&app, "main", &source_a).await;
do_insert_person(&app, &source_a, &format!("Eve-{iter}"), 22).await;
do_create_branch(&app, "main", &source_b).await;
do_insert_person(&app, &source_b, &format!("Frank-{iter}"), 33).await;
do_create_branch(&app, "main", &target_a).await;
do_create_branch(&app, "main", &target_b).await;
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let app_a = app.clone();
let app_b = app.clone();
let barrier_a = Arc::clone(&barrier);
let barrier_b = Arc::clone(&barrier);
let target_a_owned = target_a.clone();
let target_b_owned = target_b.clone();
let source_a_owned = source_a.clone();
let source_b_owned = source_b.clone();
let h_a = tokio::spawn(async move {
barrier_a.wait().await;
let body = serde_json::to_vec(&BranchMergeRequest {
source: source_a_owned,
target: Some(target_a_owned),
})
.unwrap();
app_a
.oneshot(
Request::builder()
.uri("/branches/merge")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap()
.status()
});
let h_b = tokio::spawn(async move {
barrier_b.wait().await;
let body = serde_json::to_vec(&BranchMergeRequest {
source: source_b_owned,
target: Some(target_b_owned),
})
.unwrap();
app_b
.oneshot(
Request::builder()
.uri("/branches/merge")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap()
.status()
});
assert_eq!(h_a.await.unwrap(), StatusCode::OK, "iter {iter} merge A");
assert_eq!(h_b.await.unwrap(), StatusCode::OK, "iter {iter} merge B");
// Post-condition: each target has exactly its declared source's
// contribution. Pre-fix race: A's merge runs against target_b's
// swapped coord, so feature-a's row lands in target-b instead of
// target-a. Observable as target-a == 4 (unchanged) and
// target-b == 6 (both Eve and Frank).
let count_a = person_count(&app, &target_a).await;
let count_b = person_count(&app, &target_b).await;
assert_eq!(
count_a,
SEED + 1,
"iter {iter}: target-a must reflect feature-a's merge \
(Eve only); pre-fix race would leave it at SEED ({}) and \
pile both feature-a's and feature-b's rows into target-b. \
Got target-a={count_a}, target-b={count_b}",
SEED,
);
assert_eq!(
count_b,
SEED + 1,
"iter {iter}: target-b must reflect feature-b's merge \
(Frank only); pre-fix race would leave it at SEED+2 if A's \
merge ran against target-b's swapped coord. \
Got target-a={count_a}, target-b={count_b}",
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn change_disjoint_table_concurrency_succeeds_at_http_level() {
// HTTP-level pin for MR-686's disjoint-table promise: concurrent /change