From 3b33e9ac56bba44a27dc7067b327e075083bcb3f Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 16:44:50 +0200 Subject: [PATCH] tests: pin branch_create_from swap-restore race (red) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per AGENTS.md rule 8, this commit lands the failing regression test ahead of the fix so the red → green pair is visible in git log. The test demonstrates that two concurrent `POST /branches` calls with distinct `from` parents corrupt coordinator state: A's "operate" step runs against B's swapped coordinator instead of its own, forking the new branch off the wrong parent's HEAD. Currently fails on f925ad1 with all 8 gamma branches (declared parent: alpha, 5 rows) reporting 4 rows — beta's row count. The operate step ran against beta's coord because B's swap interleaved between A's swap and A's operate. Fix lands in the next commit: hold a single `coordinator.write().await` guard across the entire swap-operate-restore sequence in `branch_create_from_impl` so the three steps are atomic relative to other callers. Closes the bug class "non-atomic three-step coordinator manipulation under &self callers" rather than guarding the specific call site — the right architectural seam (single critical section per swap-restore sequence) eliminates the interleave window for branch_create_from and any future swap-restore caller. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/tests/server.rs | 160 ++++++++++++++++++++++++ 1 file changed, 160 insertions(+) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 9c891b4..41bec34 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2367,6 +2367,166 @@ async fn change_concurrent_updates_same_key_serialize_via_publisher_cas() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator() { + // Pin the swap-restore atomicity invariant in `branch_create_from`. The + // pre-fix implementation used three separate `coordinator.write().await` + // acquisitions: swap → operate → restore. Under `&self` concurrency, two + // calls `branch_create_from(alpha, gamma)` and `branch_create_from(beta, + // delta)` could interleave such that A's "operate" step sees B's swapped + // coordinator (beta), forking gamma off beta's HEAD instead of alpha's + // HEAD, and the restore step left coordinator pointing at the wrong + // branch for subsequent operations. + // + // Pre-fix symptom (race-dependent, sometimes manifests): gamma's row + // count matches beta's HEAD instead of alpha's, OR delta's row count + // matches alpha's instead of beta's. + // + // Post-fix invariant (correct by design, AGENTS.md rule 9): hold one + // `coordinator.write().await` guard across the entire swap-operate- + // restore sequence so the three steps are atomic relative to other + // `branch_create_from` callers. + // + // Setup: main has 4 Persons (test.jsonl). Create alpha forked from main + // and add a 5th Person to alpha (alpha: 5 Persons). Beta forks from main + // and stays untouched (beta: 4 Persons). Then concurrently fork gamma + // from alpha and delta from beta. Verify each fork inherits its + // declared parent's row count. + let temp = init_loaded_repo().await; + let repo = repo_path(temp.path()); + let state = AppState::open(repo.to_string_lossy().to_string()) + .await + .unwrap(); + let app = build_app(state); + + // Helper: POST /branches { from, name } and assert 200. + async fn create_branch(app: &Router, from: &str, name: &str) { + let body = serde_json::to_vec(&BranchCreateRequest { + from: Some(from.to_string()), + name: name.to_string(), + }) + .unwrap(); + let req = Request::builder() + .uri("/branches") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let response = app.clone().oneshot(req).await.unwrap(); + assert_eq!( + response.status(), + StatusCode::OK, + "branch_create {} -> {} failed", + from, + name, + ); + } + + // Helper: POST /change to add a new Person on a branch. + async fn insert_person(app: &Router, branch: &str, name: &str, age: i32) { + let body = serde_json::to_vec(&ChangeRequest { + query_source: MUTATION_QUERIES.to_string(), + query_name: Some("insert_person".to_string()), + params: Some(json!({ "name": name, "age": age })), + branch: Some(branch.to_string()), + }) + .unwrap(); + let req = Request::builder() + .uri("/change") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let response = app.clone().oneshot(req).await.unwrap(); + assert_eq!( + response.status(), + StatusCode::OK, + "insert_person on {} failed", + branch, + ); + } + + // Helper: GET /snapshot?branch= and return Person row count. + async fn person_row_count(app: &Router, branch: &str) -> u64 { + let uri = format!("/snapshot?branch={}", branch); + let req = Request::builder() + .uri(uri) + .method(Method::GET) + .body(Body::empty()) + .unwrap(); + let response = app.clone().oneshot(req).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK, "snapshot {} failed", branch); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let value: Value = serde_json::from_slice(&body).unwrap(); + let tables = value["tables"].as_array().unwrap(); + let person_table = tables + .iter() + .find(|t| t["table_key"].as_str() == Some("node:Person")) + .unwrap_or_else(|| panic!("snapshot of {} missing node:Person", branch)); + person_table["row_count"].as_u64().unwrap() + } + + // Setup. Main: 4 Persons (Alice, Bob, Charlie, Diana from test.jsonl). + create_branch(&app, "main", "alpha").await; + insert_person(&app, "alpha", "Eve", 22).await; + create_branch(&app, "main", "beta").await; + + let alpha_count = person_row_count(&app, "alpha").await; + let beta_count = person_row_count(&app, "beta").await; + assert_eq!(alpha_count, 5, "alpha should have 5 Persons after Eve insert"); + assert_eq!(beta_count, 4, "beta should have 4 Persons (untouched main fork)"); + + // Concurrent forks: many gamma_i from alpha, many delta_i from beta. + // M=8 fork pairs to amplify race-catching odds; the race is inherently + // timing-dependent so a single pair would flake on cold runs. + const M: usize = 8; + let mut handles = Vec::with_capacity(M * 2); + for i in 0..M { + let app_a = app.clone(); + let gamma_name = format!("gamma-{i}"); + handles.push(tokio::spawn(async move { + create_branch(&app_a, "alpha", &gamma_name).await; + gamma_name + })); + let app_b = app.clone(); + let delta_name = format!("delta-{i}"); + handles.push(tokio::spawn(async move { + create_branch(&app_b, "beta", &delta_name).await; + delta_name + })); + } + + let mut created = Vec::with_capacity(M * 2); + for h in handles { + created.push(h.await.unwrap()); + } + assert_eq!(created.len(), M * 2); + + // Assertion: every fork inherits its declared parent's row count. + // Pre-fix: under the race, some gamma_i may report 4 (beta's count) or + // some delta_i may report 5 (alpha's count) because the operate step + // ran against the wrong swapped coordinator. + let mut mismatches: Vec<(String, u64, u64)> = Vec::new(); + for i in 0..M { + let gamma = format!("gamma-{i}"); + let count = person_row_count(&app, &gamma).await; + if count != alpha_count { + mismatches.push((gamma, count, alpha_count)); + } + let delta = format!("delta-{i}"); + let count = person_row_count(&app, &delta).await; + if count != beta_count { + mismatches.push((delta, count, beta_count)); + } + } + assert!( + mismatches.is_empty(), + "branches forked off the wrong parent under the swap-restore race; \ + (branch, observed_count, expected_count): {:?}", + mismatches, + ); +} + #[tokio::test(flavor = "multi_thread")] async fn oversized_request_body_returns_payload_too_large() { let (_temp, app) = app_for_loaded_repo().await;