diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 31d699b..598bc95 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2397,306 +2397,6 @@ async fn change_concurrent_updates_same_key_serialize_via_publisher_cas() { ); } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator() { - // Pin the swap-restore atomicity invariant in `branch_create_from`. The - // pre-fix implementation used three separate `coordinator.write().await` - // acquisitions: swap → operate → restore. Under `&self` concurrency, two - // calls `branch_create_from(alpha, gamma)` and `branch_create_from(beta, - // delta)` could interleave such that A's "operate" step sees B's swapped - // coordinator (beta), forking gamma off beta's HEAD instead of alpha's - // HEAD, and the restore step left coordinator pointing at the wrong - // branch for subsequent operations. - // - // Pre-fix symptom (race-dependent, sometimes manifests): gamma's row - // count matches beta's HEAD instead of alpha's, OR delta's row count - // matches alpha's instead of beta's. - // - // Post-fix invariant (correct by design, AGENTS.md rule 9): hold one - // `coordinator.write().await` guard across the entire swap-operate- - // restore sequence so the three steps are atomic relative to other - // `branch_create_from` callers. - // - // Setup: main has 4 Persons (test.jsonl). Create alpha forked from main - // and add a 5th Person to alpha (alpha: 5 Persons). Beta forks from main - // and stays untouched (beta: 4 Persons). Then concurrently fork gamma - // from alpha and delta from beta. Verify each fork inherits its - // declared parent's row count. - let temp = init_loaded_repo().await; - let repo = repo_path(temp.path()); - let state = AppState::open(repo.to_string_lossy().to_string()) - .await - .unwrap(); - let app = build_app(state); - - // Helper: POST /branches { from, name } and assert 200. - async fn create_branch(app: &Router, from: &str, name: &str) { - let body = serde_json::to_vec(&BranchCreateRequest { - from: Some(from.to_string()), - name: name.to_string(), - }) - .unwrap(); - let req = Request::builder() - .uri("/branches") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(); - let response = app.clone().oneshot(req).await.unwrap(); - assert_eq!( - response.status(), - StatusCode::OK, - "branch_create {} -> {} failed", - from, - name, - ); - } - - // Helper: POST /change to add a new Person on a branch. - async fn insert_person(app: &Router, branch: &str, name: &str, age: i32) { - let body = serde_json::to_vec(&ChangeRequest { - query_source: MUTATION_QUERIES.to_string(), - query_name: Some("insert_person".to_string()), - params: Some(json!({ "name": name, "age": age })), - branch: Some(branch.to_string()), - }) - .unwrap(); - let req = Request::builder() - .uri("/change") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(); - let response = app.clone().oneshot(req).await.unwrap(); - assert_eq!( - response.status(), - StatusCode::OK, - "insert_person on {} failed", - branch, - ); - } - - // Helper: GET /snapshot?branch= and return Person row count. - async fn person_row_count(app: &Router, branch: &str) -> u64 { - let uri = format!("/snapshot?branch={}", branch); - let req = Request::builder() - .uri(uri) - .method(Method::GET) - .body(Body::empty()) - .unwrap(); - let response = app.clone().oneshot(req).await.unwrap(); - assert_eq!(response.status(), StatusCode::OK, "snapshot {} failed", branch); - let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); - let value: Value = serde_json::from_slice(&body).unwrap(); - let tables = value["tables"].as_array().unwrap(); - let person_table = tables - .iter() - .find(|t| t["table_key"].as_str() == Some("node:Person")) - .unwrap_or_else(|| panic!("snapshot of {} missing node:Person", branch)); - person_table["row_count"].as_u64().unwrap() - } - - // Setup. Main: 4 Persons (Alice, Bob, Charlie, Diana from test.jsonl). - create_branch(&app, "main", "alpha").await; - insert_person(&app, "alpha", "Eve", 22).await; - create_branch(&app, "main", "beta").await; - - let alpha_count = person_row_count(&app, "alpha").await; - let beta_count = person_row_count(&app, "beta").await; - assert_eq!(alpha_count, 5, "alpha should have 5 Persons after Eve insert"); - assert_eq!(beta_count, 4, "beta should have 4 Persons (untouched main fork)"); - - // Concurrent forks: many gamma_i from alpha, many delta_i from beta. - // M=8 fork pairs to amplify race-catching odds; the race is inherently - // timing-dependent so a single pair would flake on cold runs. - const M: usize = 8; - let mut handles = Vec::with_capacity(M * 2); - for i in 0..M { - let app_a = app.clone(); - let gamma_name = format!("gamma-{i}"); - handles.push(tokio::spawn(async move { - create_branch(&app_a, "alpha", &gamma_name).await; - gamma_name - })); - let app_b = app.clone(); - let delta_name = format!("delta-{i}"); - handles.push(tokio::spawn(async move { - create_branch(&app_b, "beta", &delta_name).await; - delta_name - })); - } - - let mut created = Vec::with_capacity(M * 2); - for h in handles { - created.push(h.await.unwrap()); - } - assert_eq!(created.len(), M * 2); - - // Assertion: every fork inherits its declared parent's row count. - // Pre-fix: under the race, some gamma_i may report 4 (beta's count) or - // some delta_i may report 5 (alpha's count) because the operate step - // ran against the wrong swapped coordinator. - let mut mismatches: Vec<(String, u64, u64)> = Vec::new(); - for i in 0..M { - let gamma = format!("gamma-{i}"); - let count = person_row_count(&app, &gamma).await; - if count != alpha_count { - mismatches.push((gamma, count, alpha_count)); - } - let delta = format!("delta-{i}"); - let count = person_row_count(&app, &delta).await; - if count != beta_count { - mismatches.push((delta, count, beta_count)); - } - } - assert!( - mismatches.is_empty(), - "branches forked off the wrong parent under the swap-restore race; \ - (branch, observed_count, expected_count): {:?}", - mismatches, - ); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn concurrent_change_during_branch_merge_preserves_writes() { - // Future-proof against MR-895 work that may move or remove the - // per-(table, branch) writer queue acquisition inside `branch_merge` - // (`crates/omnigraph/src/exec/merge.rs:1224`). Today the queue - // linearizes a concurrent /change on main against branch_merge - // feature → main on the same touched tables; both succeed and B's - // row is preserved post-merge. - // - // Codex flagged a P1 in PR #75 review claiming the merge could - // silently overwrite concurrent target writes because the - // source-rewrite path opens with `MutationOpKind::Merge` (skipping - // the strict pre-stage check). Validation by subagent showed the - // queue at merge.rs:1224 is held across both Phase B (per-table - // commit_staged) and Phase C (manifest publish), so there's no - // interleave window. The Merge op_kind only affects same-process - // pre-stage drift detection, not cross-write linearization. - // - // This test is the regression pin that catches a future change - // which drops the queue acquisition and admits the silent overwrite. - let temp = init_loaded_repo().await; - let repo = repo_path(temp.path()); - let state = AppState::open(repo.to_string_lossy().to_string()) - .await - .unwrap(); - let app = build_app(state); - - // test.jsonl: 4 Persons on main. - const SEED_PERSONS: u64 = 4; - - // Create feature branch + insert one Person on feature. - let create_body = serde_json::to_vec(&BranchCreateRequest { - from: Some("main".to_string()), - name: "feature".to_string(), - }) - .unwrap(); - let response = app - .clone() - .oneshot( - Request::builder() - .uri("/branches") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(create_body)) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response.status(), StatusCode::OK); - - let feature_insert = serde_json::to_vec(&ChangeRequest { - query_source: MUTATION_QUERIES.to_string(), - query_name: Some("insert_person".to_string()), - params: Some(json!({ "name": "Eve", "age": 22 })), - branch: Some("feature".to_string()), - }) - .unwrap(); - let response = app - .clone() - .oneshot( - Request::builder() - .uri("/change") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(feature_insert)) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response.status(), StatusCode::OK); - - // Concurrent: insert on main + merge feature → main. The queue - // linearizes them on the (node:Person, main) key; both succeed. - let app_change = app.clone(); - let change_handle = tokio::spawn(async move { - let body = serde_json::to_vec(&ChangeRequest { - query_source: MUTATION_QUERIES.to_string(), - query_name: Some("insert_person".to_string()), - params: Some(json!({ "name": "Frank", "age": 33 })), - branch: Some("main".to_string()), - }) - .unwrap(); - let req = Request::builder() - .uri("/change") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(); - app_change.oneshot(req).await.unwrap().status() - }); - - let app_merge = app.clone(); - let merge_handle = tokio::spawn(async move { - let body = serde_json::to_vec(&BranchMergeRequest { - source: "feature".to_string(), - target: Some("main".to_string()), - }) - .unwrap(); - let req = Request::builder() - .uri("/branches/merge") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(); - app_merge.oneshot(req).await.unwrap().status() - }); - - let change_status = change_handle.await.unwrap(); - let merge_status = merge_handle.await.unwrap(); - assert_eq!(change_status, StatusCode::OK, "concurrent /change failed"); - assert_eq!(merge_status, StatusCode::OK, "concurrent /branches/merge failed"); - - // Post-condition: main has SEED + Eve (from feature) + Frank (inserted). - let (status, body) = json_response( - &app, - Request::builder() - .uri("/snapshot?branch=main") - .method(Method::GET) - .body(Body::empty()) - .unwrap(), - ) - .await; - assert_eq!(status, StatusCode::OK); - let person_rows = body["tables"] - .as_array() - .and_then(|tables| { - tables - .iter() - .find(|t| t["table_key"].as_str() == Some("node:Person")) - }) - .and_then(|t| t["row_count"].as_u64()) - .expect("snapshot must include node:Person row_count"); - assert_eq!( - person_rows, - SEED_PERSONS + 2, // +1 from feature merge (Eve), +1 from concurrent /change (Frank) - "post-merge main must include both the merge result (Eve) and the \ - concurrent insert (Frank); pre-fix race would lose one of them", - ); -} - // ───────────────────────────────────────────────────────────────────────── // Branch-ops morphological matrix // @@ -3413,228 +3113,6 @@ async fn concurrent_branch_ops_morphological_matrix() { } } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other() { - // Pin the `branch_merge_impl` swap-restore atomicity invariant. - // Round 2 of the PR review left this race deferred: `merge.rs:1085-1100` - // uses three separate `coordinator.write().await` acquisitions - // (swap → operate → restore) — the same shape that - // `branch_create_from_impl` shed in round 1. - // - // Pre-fix race: two concurrent `branch_merge` calls A and B with - // distinct targets target_a, target_b. A's swap captures the - // currently-bound coord as previous_A and replaces self.coordinator - // with target_a. Before A's operate runs, B's swap captures - // (now) target_a as previous_B and replaces self.coordinator with - // target_b. A's `branch_merge_on_current_target` then runs against - // target_b's coord — A merges its source INTO target_b, not target_a. - // - // Post-fix invariant: branch_merge_impl serializes via - // `merge_exclusive: Arc>` held across the - // entire swap-operate-restore window. Other writers (the per-table - // queue, /change, /ingest) are unaffected — only concurrent merges - // serialize. - // - // Setup: main + 2 source branches (feature_a with Eve, feature_b with - // Frank) + 2 untouched targets (target_a, target_b). Concurrent - // merges feature_a→target_a and feature_b→target_b. Aligned at a - // tokio::sync::Barrier so both swaps land close in time. - let temp = init_loaded_repo().await; - let repo = repo_path(temp.path()); - let state = AppState::open(repo.to_string_lossy().to_string()) - .await - .unwrap(); - let app = build_app(state); - - async fn do_create_branch(app: &Router, from: &str, name: &str) { - let body = serde_json::to_vec(&BranchCreateRequest { - from: Some(from.to_string()), - name: name.to_string(), - }) - .unwrap(); - let r = app - .clone() - .oneshot( - Request::builder() - .uri("/branches") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(r.status(), StatusCode::OK, "create {} from {} failed", name, from); - } - - async fn do_insert_person(app: &Router, branch: &str, name: &str, age: i32) { - let body = serde_json::to_vec(&ChangeRequest { - query_source: MUTATION_QUERIES.to_string(), - query_name: Some("insert_person".to_string()), - params: Some(json!({ "name": name, "age": age })), - branch: Some(branch.to_string()), - }) - .unwrap(); - let r = app - .clone() - .oneshot( - Request::builder() - .uri("/change") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(r.status(), StatusCode::OK, "insert {} on {} failed", name, branch); - } - - async fn person_count(app: &Router, branch: &str) -> u64 { - let uri = format!("/snapshot?branch={}", branch); - let r = app - .clone() - .oneshot( - Request::builder() - .uri(uri) - .method(Method::GET) - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(r.status(), StatusCode::OK, "snapshot {} failed", branch); - let body = to_bytes(r.into_body(), usize::MAX).await.unwrap(); - let v: Value = serde_json::from_slice(&body).unwrap(); - v["tables"] - .as_array() - .and_then(|tables| { - tables - .iter() - .find(|t| t["table_key"].as_str() == Some("node:Person")) - }) - .and_then(|t| t["row_count"].as_u64()) - .unwrap_or_else(|| panic!("snapshot {} missing node:Person", branch)) - } - - // test.jsonl seeds 4 Persons. - const SEED: u64 = 4; - - // Set up 4 child branches: 2 sources (each with one new Person), 2 - // targets (forked from main, untouched). - do_create_branch(&app, "main", "feature-a").await; - do_insert_person(&app, "feature-a", "Eve", 22).await; - do_create_branch(&app, "main", "feature-b").await; - do_insert_person(&app, "feature-b", "Frank", 33).await; - do_create_branch(&app, "main", "target-a").await; - do_create_branch(&app, "main", "target-b").await; - - assert_eq!(person_count(&app, "feature-a").await, SEED + 1); - assert_eq!(person_count(&app, "feature-b").await, SEED + 1); - assert_eq!(person_count(&app, "target-a").await, SEED); - assert_eq!(person_count(&app, "target-b").await, SEED); - - // Concurrent merges aligned at a barrier so both reach the - // swap_coordinator_for_branch call close in time. Repeat M=4 times to - // boost the probability of catching the race. Recreate the - // target/source branches each iteration to keep the post-condition - // checks tight. - const M: usize = 4; - for iter in 0..M { - let target_a = format!("target-a-iter{iter}"); - let target_b = format!("target-b-iter{iter}"); - let source_a = format!("feature-a-iter{iter}"); - let source_b = format!("feature-b-iter{iter}"); - - do_create_branch(&app, "main", &source_a).await; - do_insert_person(&app, &source_a, &format!("Eve-{iter}"), 22).await; - do_create_branch(&app, "main", &source_b).await; - do_insert_person(&app, &source_b, &format!("Frank-{iter}"), 33).await; - do_create_branch(&app, "main", &target_a).await; - do_create_branch(&app, "main", &target_b).await; - - let barrier = Arc::new(tokio::sync::Barrier::new(2)); - let app_a = app.clone(); - let app_b = app.clone(); - let barrier_a = Arc::clone(&barrier); - let barrier_b = Arc::clone(&barrier); - let target_a_owned = target_a.clone(); - let target_b_owned = target_b.clone(); - let source_a_owned = source_a.clone(); - let source_b_owned = source_b.clone(); - - let h_a = tokio::spawn(async move { - barrier_a.wait().await; - let body = serde_json::to_vec(&BranchMergeRequest { - source: source_a_owned, - target: Some(target_a_owned), - }) - .unwrap(); - app_a - .oneshot( - Request::builder() - .uri("/branches/merge") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(), - ) - .await - .unwrap() - .status() - }); - let h_b = tokio::spawn(async move { - barrier_b.wait().await; - let body = serde_json::to_vec(&BranchMergeRequest { - source: source_b_owned, - target: Some(target_b_owned), - }) - .unwrap(); - app_b - .oneshot( - Request::builder() - .uri("/branches/merge") - .method(Method::POST) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(), - ) - .await - .unwrap() - .status() - }); - - assert_eq!(h_a.await.unwrap(), StatusCode::OK, "iter {iter} merge A"); - assert_eq!(h_b.await.unwrap(), StatusCode::OK, "iter {iter} merge B"); - - // Post-condition: each target has exactly its declared source's - // contribution. Pre-fix race: A's merge runs against target_b's - // swapped coord, so feature-a's row lands in target-b instead of - // target-a. Observable as target-a == 4 (unchanged) and - // target-b == 6 (both Eve and Frank). - let count_a = person_count(&app, &target_a).await; - let count_b = person_count(&app, &target_b).await; - assert_eq!( - count_a, - SEED + 1, - "iter {iter}: target-a must reflect feature-a's merge \ - (Eve only); pre-fix race would leave it at SEED ({}) and \ - pile both feature-a's and feature-b's rows into target-b. \ - Got target-a={count_a}, target-b={count_b}", - SEED, - ); - assert_eq!( - count_b, - SEED + 1, - "iter {iter}: target-b must reflect feature-b's merge \ - (Frank only); pre-fix race would leave it at SEED+2 if A's \ - merge ran against target-b's swapped coord. \ - Got target-a={count_a}, target-b={count_b}", - - ); - } -} - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn change_disjoint_table_concurrency_succeeds_at_http_level() { // HTTP-level pin for MR-686's disjoint-table promise: concurrent /change