tests: remove three narrow concurrent_branch_* tests subsumed by T1

The previous commit added `concurrent_branch_ops_morphological_matrix`
covering 11 cells with stronger assertions (identity + post-op /change
+ reopen). The three narrow tests it replaces:

- concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator
  → matrix cell f, with identity assertions added
- concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other
  → matrix cells a + b + c, with identity assertions that close the
    symmetric-swap blind spot cubic flagged on commit 64f2b99
- concurrent_change_during_branch_merge_preserves_writes
  → matrix cell d

The matrix retains the original tests' diagnostic granularity through
named cell labels in every assertion message ("[a:merge×merge:distinct-targets]
merge a"), so a CI failure points to the exact cell + invariant.

Net: 522 lines removed, 0 coverage lost. All other server tests pass
unchanged (44 total).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 20:09:21 +02:00
parent ac8594462e
commit 99b0941478
No known key found for this signature in database

View file

@ -2397,306 +2397,6 @@ async fn change_concurrent_updates_same_key_serialize_via_publisher_cas() {
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator() {
// Pin the swap-restore atomicity invariant in `branch_create_from`. The
// pre-fix implementation used three separate `coordinator.write().await`
// acquisitions: swap → operate → restore. Under `&self` concurrency, two
// calls `branch_create_from(alpha, gamma)` and `branch_create_from(beta,
// delta)` could interleave such that A's "operate" step sees B's swapped
// coordinator (beta), forking gamma off beta's HEAD instead of alpha's
// HEAD, and the restore step left coordinator pointing at the wrong
// branch for subsequent operations.
//
// Pre-fix symptom (race-dependent, sometimes manifests): gamma's row
// count matches beta's HEAD instead of alpha's, OR delta's row count
// matches alpha's instead of beta's.
//
// Post-fix invariant (correct by design, AGENTS.md rule 9): hold one
// `coordinator.write().await` guard across the entire swap-operate-
// restore sequence so the three steps are atomic relative to other
// `branch_create_from` callers.
//
// Setup: main has 4 Persons (test.jsonl). Create alpha forked from main
// and add a 5th Person to alpha (alpha: 5 Persons). Beta forks from main
// and stays untouched (beta: 4 Persons). Then concurrently fork gamma
// from alpha and delta from beta. Verify each fork inherits its
// declared parent's row count.
let temp = init_loaded_repo().await;
let repo = repo_path(temp.path());
let state = AppState::open(repo.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
// Helper: POST /branches { from, name } and assert 200.
async fn create_branch(app: &Router, from: &str, name: &str) {
let body = serde_json::to_vec(&BranchCreateRequest {
from: Some(from.to_string()),
name: name.to_string(),
})
.unwrap();
let req = Request::builder()
.uri("/branches")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
let response = app.clone().oneshot(req).await.unwrap();
assert_eq!(
response.status(),
StatusCode::OK,
"branch_create {} -> {} failed",
from,
name,
);
}
// Helper: POST /change to add a new Person on a branch.
async fn insert_person(app: &Router, branch: &str, name: &str, age: i32) {
let body = serde_json::to_vec(&ChangeRequest {
query_source: MUTATION_QUERIES.to_string(),
query_name: Some("insert_person".to_string()),
params: Some(json!({ "name": name, "age": age })),
branch: Some(branch.to_string()),
})
.unwrap();
let req = Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
let response = app.clone().oneshot(req).await.unwrap();
assert_eq!(
response.status(),
StatusCode::OK,
"insert_person on {} failed",
branch,
);
}
// Helper: GET /snapshot?branch=<branch> and return Person row count.
async fn person_row_count(app: &Router, branch: &str) -> u64 {
let uri = format!("/snapshot?branch={}", branch);
let req = Request::builder()
.uri(uri)
.method(Method::GET)
.body(Body::empty())
.unwrap();
let response = app.clone().oneshot(req).await.unwrap();
assert_eq!(response.status(), StatusCode::OK, "snapshot {} failed", branch);
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let value: Value = serde_json::from_slice(&body).unwrap();
let tables = value["tables"].as_array().unwrap();
let person_table = tables
.iter()
.find(|t| t["table_key"].as_str() == Some("node:Person"))
.unwrap_or_else(|| panic!("snapshot of {} missing node:Person", branch));
person_table["row_count"].as_u64().unwrap()
}
// Setup. Main: 4 Persons (Alice, Bob, Charlie, Diana from test.jsonl).
create_branch(&app, "main", "alpha").await;
insert_person(&app, "alpha", "Eve", 22).await;
create_branch(&app, "main", "beta").await;
let alpha_count = person_row_count(&app, "alpha").await;
let beta_count = person_row_count(&app, "beta").await;
assert_eq!(alpha_count, 5, "alpha should have 5 Persons after Eve insert");
assert_eq!(beta_count, 4, "beta should have 4 Persons (untouched main fork)");
// Concurrent forks: many gamma_i from alpha, many delta_i from beta.
// M=8 fork pairs to amplify race-catching odds; the race is inherently
// timing-dependent so a single pair would flake on cold runs.
const M: usize = 8;
let mut handles = Vec::with_capacity(M * 2);
for i in 0..M {
let app_a = app.clone();
let gamma_name = format!("gamma-{i}");
handles.push(tokio::spawn(async move {
create_branch(&app_a, "alpha", &gamma_name).await;
gamma_name
}));
let app_b = app.clone();
let delta_name = format!("delta-{i}");
handles.push(tokio::spawn(async move {
create_branch(&app_b, "beta", &delta_name).await;
delta_name
}));
}
let mut created = Vec::with_capacity(M * 2);
for h in handles {
created.push(h.await.unwrap());
}
assert_eq!(created.len(), M * 2);
// Assertion: every fork inherits its declared parent's row count.
// Pre-fix: under the race, some gamma_i may report 4 (beta's count) or
// some delta_i may report 5 (alpha's count) because the operate step
// ran against the wrong swapped coordinator.
let mut mismatches: Vec<(String, u64, u64)> = Vec::new();
for i in 0..M {
let gamma = format!("gamma-{i}");
let count = person_row_count(&app, &gamma).await;
if count != alpha_count {
mismatches.push((gamma, count, alpha_count));
}
let delta = format!("delta-{i}");
let count = person_row_count(&app, &delta).await;
if count != beta_count {
mismatches.push((delta, count, beta_count));
}
}
assert!(
mismatches.is_empty(),
"branches forked off the wrong parent under the swap-restore race; \
(branch, observed_count, expected_count): {:?}",
mismatches,
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_change_during_branch_merge_preserves_writes() {
// Future-proof against MR-895 work that may move or remove the
// per-(table, branch) writer queue acquisition inside `branch_merge`
// (`crates/omnigraph/src/exec/merge.rs:1224`). Today the queue
// linearizes a concurrent /change on main against branch_merge
// feature → main on the same touched tables; both succeed and B's
// row is preserved post-merge.
//
// Codex flagged a P1 in PR #75 review claiming the merge could
// silently overwrite concurrent target writes because the
// source-rewrite path opens with `MutationOpKind::Merge` (skipping
// the strict pre-stage check). Validation by subagent showed the
// queue at merge.rs:1224 is held across both Phase B (per-table
// commit_staged) and Phase C (manifest publish), so there's no
// interleave window. The Merge op_kind only affects same-process
// pre-stage drift detection, not cross-write linearization.
//
// This test is the regression pin that catches a future change
// which drops the queue acquisition and admits the silent overwrite.
let temp = init_loaded_repo().await;
let repo = repo_path(temp.path());
let state = AppState::open(repo.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
// test.jsonl: 4 Persons on main.
const SEED_PERSONS: u64 = 4;
// Create feature branch + insert one Person on feature.
let create_body = serde_json::to_vec(&BranchCreateRequest {
from: Some("main".to_string()),
name: "feature".to_string(),
})
.unwrap();
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/branches")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(create_body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let feature_insert = serde_json::to_vec(&ChangeRequest {
query_source: MUTATION_QUERIES.to_string(),
query_name: Some("insert_person".to_string()),
params: Some(json!({ "name": "Eve", "age": 22 })),
branch: Some("feature".to_string()),
})
.unwrap();
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(feature_insert))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
// Concurrent: insert on main + merge feature → main. The queue
// linearizes them on the (node:Person, main) key; both succeed.
let app_change = app.clone();
let change_handle = tokio::spawn(async move {
let body = serde_json::to_vec(&ChangeRequest {
query_source: MUTATION_QUERIES.to_string(),
query_name: Some("insert_person".to_string()),
params: Some(json!({ "name": "Frank", "age": 33 })),
branch: Some("main".to_string()),
})
.unwrap();
let req = Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
app_change.oneshot(req).await.unwrap().status()
});
let app_merge = app.clone();
let merge_handle = tokio::spawn(async move {
let body = serde_json::to_vec(&BranchMergeRequest {
source: "feature".to_string(),
target: Some("main".to_string()),
})
.unwrap();
let req = Request::builder()
.uri("/branches/merge")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
app_merge.oneshot(req).await.unwrap().status()
});
let change_status = change_handle.await.unwrap();
let merge_status = merge_handle.await.unwrap();
assert_eq!(change_status, StatusCode::OK, "concurrent /change failed");
assert_eq!(merge_status, StatusCode::OK, "concurrent /branches/merge failed");
// Post-condition: main has SEED + Eve (from feature) + Frank (inserted).
let (status, body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.method(Method::GET)
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::OK);
let person_rows = body["tables"]
.as_array()
.and_then(|tables| {
tables
.iter()
.find(|t| t["table_key"].as_str() == Some("node:Person"))
})
.and_then(|t| t["row_count"].as_u64())
.expect("snapshot must include node:Person row_count");
assert_eq!(
person_rows,
SEED_PERSONS + 2, // +1 from feature merge (Eve), +1 from concurrent /change (Frank)
"post-merge main must include both the merge result (Eve) and the \
concurrent insert (Frank); pre-fix race would lose one of them",
);
}
// ─────────────────────────────────────────────────────────────────────────
// Branch-ops morphological matrix
//
@ -3413,228 +3113,6 @@ async fn concurrent_branch_ops_morphological_matrix() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_branch_merges_distinct_targets_do_not_swap_into_each_other() {
// Pin the `branch_merge_impl` swap-restore atomicity invariant.
// Round 2 of the PR review left this race deferred: `merge.rs:1085-1100`
// uses three separate `coordinator.write().await` acquisitions
// (swap → operate → restore) — the same shape that
// `branch_create_from_impl` shed in round 1.
//
// Pre-fix race: two concurrent `branch_merge` calls A and B with
// distinct targets target_a, target_b. A's swap captures the
// currently-bound coord as previous_A and replaces self.coordinator
// with target_a. Before A's operate runs, B's swap captures
// (now) target_a as previous_B and replaces self.coordinator with
// target_b. A's `branch_merge_on_current_target` then runs against
// target_b's coord — A merges its source INTO target_b, not target_a.
//
// Post-fix invariant: branch_merge_impl serializes via
// `merge_exclusive: Arc<tokio::sync::Mutex<()>>` held across the
// entire swap-operate-restore window. Other writers (the per-table
// queue, /change, /ingest) are unaffected — only concurrent merges
// serialize.
//
// Setup: main + 2 source branches (feature_a with Eve, feature_b with
// Frank) + 2 untouched targets (target_a, target_b). Concurrent
// merges feature_a→target_a and feature_b→target_b. Aligned at a
// tokio::sync::Barrier so both swaps land close in time.
let temp = init_loaded_repo().await;
let repo = repo_path(temp.path());
let state = AppState::open(repo.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
async fn do_create_branch(app: &Router, from: &str, name: &str) {
let body = serde_json::to_vec(&BranchCreateRequest {
from: Some(from.to_string()),
name: name.to_string(),
})
.unwrap();
let r = app
.clone()
.oneshot(
Request::builder()
.uri("/branches")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::OK, "create {} from {} failed", name, from);
}
async fn do_insert_person(app: &Router, branch: &str, name: &str, age: i32) {
let body = serde_json::to_vec(&ChangeRequest {
query_source: MUTATION_QUERIES.to_string(),
query_name: Some("insert_person".to_string()),
params: Some(json!({ "name": name, "age": age })),
branch: Some(branch.to_string()),
})
.unwrap();
let r = app
.clone()
.oneshot(
Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::OK, "insert {} on {} failed", name, branch);
}
async fn person_count(app: &Router, branch: &str) -> u64 {
let uri = format!("/snapshot?branch={}", branch);
let r = app
.clone()
.oneshot(
Request::builder()
.uri(uri)
.method(Method::GET)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::OK, "snapshot {} failed", branch);
let body = to_bytes(r.into_body(), usize::MAX).await.unwrap();
let v: Value = serde_json::from_slice(&body).unwrap();
v["tables"]
.as_array()
.and_then(|tables| {
tables
.iter()
.find(|t| t["table_key"].as_str() == Some("node:Person"))
})
.and_then(|t| t["row_count"].as_u64())
.unwrap_or_else(|| panic!("snapshot {} missing node:Person", branch))
}
// test.jsonl seeds 4 Persons.
const SEED: u64 = 4;
// Set up 4 child branches: 2 sources (each with one new Person), 2
// targets (forked from main, untouched).
do_create_branch(&app, "main", "feature-a").await;
do_insert_person(&app, "feature-a", "Eve", 22).await;
do_create_branch(&app, "main", "feature-b").await;
do_insert_person(&app, "feature-b", "Frank", 33).await;
do_create_branch(&app, "main", "target-a").await;
do_create_branch(&app, "main", "target-b").await;
assert_eq!(person_count(&app, "feature-a").await, SEED + 1);
assert_eq!(person_count(&app, "feature-b").await, SEED + 1);
assert_eq!(person_count(&app, "target-a").await, SEED);
assert_eq!(person_count(&app, "target-b").await, SEED);
// Concurrent merges aligned at a barrier so both reach the
// swap_coordinator_for_branch call close in time. Repeat M=4 times to
// boost the probability of catching the race. Recreate the
// target/source branches each iteration to keep the post-condition
// checks tight.
const M: usize = 4;
for iter in 0..M {
let target_a = format!("target-a-iter{iter}");
let target_b = format!("target-b-iter{iter}");
let source_a = format!("feature-a-iter{iter}");
let source_b = format!("feature-b-iter{iter}");
do_create_branch(&app, "main", &source_a).await;
do_insert_person(&app, &source_a, &format!("Eve-{iter}"), 22).await;
do_create_branch(&app, "main", &source_b).await;
do_insert_person(&app, &source_b, &format!("Frank-{iter}"), 33).await;
do_create_branch(&app, "main", &target_a).await;
do_create_branch(&app, "main", &target_b).await;
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let app_a = app.clone();
let app_b = app.clone();
let barrier_a = Arc::clone(&barrier);
let barrier_b = Arc::clone(&barrier);
let target_a_owned = target_a.clone();
let target_b_owned = target_b.clone();
let source_a_owned = source_a.clone();
let source_b_owned = source_b.clone();
let h_a = tokio::spawn(async move {
barrier_a.wait().await;
let body = serde_json::to_vec(&BranchMergeRequest {
source: source_a_owned,
target: Some(target_a_owned),
})
.unwrap();
app_a
.oneshot(
Request::builder()
.uri("/branches/merge")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap()
.status()
});
let h_b = tokio::spawn(async move {
barrier_b.wait().await;
let body = serde_json::to_vec(&BranchMergeRequest {
source: source_b_owned,
target: Some(target_b_owned),
})
.unwrap();
app_b
.oneshot(
Request::builder()
.uri("/branches/merge")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap()
.status()
});
assert_eq!(h_a.await.unwrap(), StatusCode::OK, "iter {iter} merge A");
assert_eq!(h_b.await.unwrap(), StatusCode::OK, "iter {iter} merge B");
// Post-condition: each target has exactly its declared source's
// contribution. Pre-fix race: A's merge runs against target_b's
// swapped coord, so feature-a's row lands in target-b instead of
// target-a. Observable as target-a == 4 (unchanged) and
// target-b == 6 (both Eve and Frank).
let count_a = person_count(&app, &target_a).await;
let count_b = person_count(&app, &target_b).await;
assert_eq!(
count_a,
SEED + 1,
"iter {iter}: target-a must reflect feature-a's merge \
(Eve only); pre-fix race would leave it at SEED ({}) and \
pile both feature-a's and feature-b's rows into target-b. \
Got target-a={count_a}, target-b={count_b}",
SEED,
);
assert_eq!(
count_b,
SEED + 1,
"iter {iter}: target-b must reflect feature-b's merge \
(Frank only); pre-fix race would leave it at SEED+2 if A's \
merge ran against target-b's swapped coord. \
Got target-a={count_a}, target-b={count_b}",
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn change_disjoint_table_concurrency_succeeds_at_http_level() {
// HTTP-level pin for MR-686's disjoint-table promise: concurrent /change