mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
tests: pin branch_create_from swap-restore race (red)
Per AGENTS.md rule 8, this commit lands the failing regression test
ahead of the fix so the red → green pair is visible in git log.
The test demonstrates that two concurrent `POST /branches` calls with
distinct `from` parents corrupt coordinator state: A's "operate" step
runs against B's swapped coordinator instead of its own, forking the
new branch off the wrong parent's HEAD.
Currently fails on f925ad1 with all 8 gamma branches (declared
parent: alpha, 5 rows) reporting 4 rows — beta's row count. The
operate step ran against beta's coord because B's swap interleaved
between A's swap and A's operate.
Fix lands in the next commit: hold a single `coordinator.write().await`
guard across the entire swap-operate-restore sequence in
`branch_create_from_impl` so the three steps are atomic relative to
other callers.
Closes the bug class "non-atomic three-step coordinator manipulation
under &self callers" rather than guarding the specific call site —
the right architectural seam (single critical section per swap-restore
sequence) eliminates the interleave window for branch_create_from and
any future swap-restore caller.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
4ca527cc53
commit
3b33e9ac56
1 changed files with 160 additions and 0 deletions
|
|
@ -2367,6 +2367,166 @@ async fn change_concurrent_updates_same_key_serialize_via_publisher_cas() {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordinator() {
|
||||
// Pin the swap-restore atomicity invariant in `branch_create_from`. The
|
||||
// pre-fix implementation used three separate `coordinator.write().await`
|
||||
// acquisitions: swap → operate → restore. Under `&self` concurrency, two
|
||||
// calls `branch_create_from(alpha, gamma)` and `branch_create_from(beta,
|
||||
// delta)` could interleave such that A's "operate" step sees B's swapped
|
||||
// coordinator (beta), forking gamma off beta's HEAD instead of alpha's
|
||||
// HEAD, and the restore step left coordinator pointing at the wrong
|
||||
// branch for subsequent operations.
|
||||
//
|
||||
// Pre-fix symptom (race-dependent, sometimes manifests): gamma's row
|
||||
// count matches beta's HEAD instead of alpha's, OR delta's row count
|
||||
// matches alpha's instead of beta's.
|
||||
//
|
||||
// Post-fix invariant (correct by design, AGENTS.md rule 9): hold one
|
||||
// `coordinator.write().await` guard across the entire swap-operate-
|
||||
// restore sequence so the three steps are atomic relative to other
|
||||
// `branch_create_from` callers.
|
||||
//
|
||||
// Setup: main has 4 Persons (test.jsonl). Create alpha forked from main
|
||||
// and add a 5th Person to alpha (alpha: 5 Persons). Beta forks from main
|
||||
// and stays untouched (beta: 4 Persons). Then concurrently fork gamma
|
||||
// from alpha and delta from beta. Verify each fork inherits its
|
||||
// declared parent's row count.
|
||||
let temp = init_loaded_repo().await;
|
||||
let repo = repo_path(temp.path());
|
||||
let state = AppState::open(repo.to_string_lossy().to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let app = build_app(state);
|
||||
|
||||
// Helper: POST /branches { from, name } and assert 200.
|
||||
async fn create_branch(app: &Router, from: &str, name: &str) {
|
||||
let body = serde_json::to_vec(&BranchCreateRequest {
|
||||
from: Some(from.to_string()),
|
||||
name: name.to_string(),
|
||||
})
|
||||
.unwrap();
|
||||
let req = Request::builder()
|
||||
.uri("/branches")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(body))
|
||||
.unwrap();
|
||||
let response = app.clone().oneshot(req).await.unwrap();
|
||||
assert_eq!(
|
||||
response.status(),
|
||||
StatusCode::OK,
|
||||
"branch_create {} -> {} failed",
|
||||
from,
|
||||
name,
|
||||
);
|
||||
}
|
||||
|
||||
// Helper: POST /change to add a new Person on a branch.
|
||||
async fn insert_person(app: &Router, branch: &str, name: &str, age: i32) {
|
||||
let body = serde_json::to_vec(&ChangeRequest {
|
||||
query_source: MUTATION_QUERIES.to_string(),
|
||||
query_name: Some("insert_person".to_string()),
|
||||
params: Some(json!({ "name": name, "age": age })),
|
||||
branch: Some(branch.to_string()),
|
||||
})
|
||||
.unwrap();
|
||||
let req = Request::builder()
|
||||
.uri("/change")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(body))
|
||||
.unwrap();
|
||||
let response = app.clone().oneshot(req).await.unwrap();
|
||||
assert_eq!(
|
||||
response.status(),
|
||||
StatusCode::OK,
|
||||
"insert_person on {} failed",
|
||||
branch,
|
||||
);
|
||||
}
|
||||
|
||||
// Helper: GET /snapshot?branch=<branch> and return Person row count.
|
||||
async fn person_row_count(app: &Router, branch: &str) -> u64 {
|
||||
let uri = format!("/snapshot?branch={}", branch);
|
||||
let req = Request::builder()
|
||||
.uri(uri)
|
||||
.method(Method::GET)
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
let response = app.clone().oneshot(req).await.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::OK, "snapshot {} failed", branch);
|
||||
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
||||
let value: Value = serde_json::from_slice(&body).unwrap();
|
||||
let tables = value["tables"].as_array().unwrap();
|
||||
let person_table = tables
|
||||
.iter()
|
||||
.find(|t| t["table_key"].as_str() == Some("node:Person"))
|
||||
.unwrap_or_else(|| panic!("snapshot of {} missing node:Person", branch));
|
||||
person_table["row_count"].as_u64().unwrap()
|
||||
}
|
||||
|
||||
// Setup. Main: 4 Persons (Alice, Bob, Charlie, Diana from test.jsonl).
|
||||
create_branch(&app, "main", "alpha").await;
|
||||
insert_person(&app, "alpha", "Eve", 22).await;
|
||||
create_branch(&app, "main", "beta").await;
|
||||
|
||||
let alpha_count = person_row_count(&app, "alpha").await;
|
||||
let beta_count = person_row_count(&app, "beta").await;
|
||||
assert_eq!(alpha_count, 5, "alpha should have 5 Persons after Eve insert");
|
||||
assert_eq!(beta_count, 4, "beta should have 4 Persons (untouched main fork)");
|
||||
|
||||
// Concurrent forks: many gamma_i from alpha, many delta_i from beta.
|
||||
// M=8 fork pairs to amplify race-catching odds; the race is inherently
|
||||
// timing-dependent so a single pair would flake on cold runs.
|
||||
const M: usize = 8;
|
||||
let mut handles = Vec::with_capacity(M * 2);
|
||||
for i in 0..M {
|
||||
let app_a = app.clone();
|
||||
let gamma_name = format!("gamma-{i}");
|
||||
handles.push(tokio::spawn(async move {
|
||||
create_branch(&app_a, "alpha", &gamma_name).await;
|
||||
gamma_name
|
||||
}));
|
||||
let app_b = app.clone();
|
||||
let delta_name = format!("delta-{i}");
|
||||
handles.push(tokio::spawn(async move {
|
||||
create_branch(&app_b, "beta", &delta_name).await;
|
||||
delta_name
|
||||
}));
|
||||
}
|
||||
|
||||
let mut created = Vec::with_capacity(M * 2);
|
||||
for h in handles {
|
||||
created.push(h.await.unwrap());
|
||||
}
|
||||
assert_eq!(created.len(), M * 2);
|
||||
|
||||
// Assertion: every fork inherits its declared parent's row count.
|
||||
// Pre-fix: under the race, some gamma_i may report 4 (beta's count) or
|
||||
// some delta_i may report 5 (alpha's count) because the operate step
|
||||
// ran against the wrong swapped coordinator.
|
||||
let mut mismatches: Vec<(String, u64, u64)> = Vec::new();
|
||||
for i in 0..M {
|
||||
let gamma = format!("gamma-{i}");
|
||||
let count = person_row_count(&app, &gamma).await;
|
||||
if count != alpha_count {
|
||||
mismatches.push((gamma, count, alpha_count));
|
||||
}
|
||||
let delta = format!("delta-{i}");
|
||||
let count = person_row_count(&app, &delta).await;
|
||||
if count != beta_count {
|
||||
mismatches.push((delta, count, beta_count));
|
||||
}
|
||||
}
|
||||
assert!(
|
||||
mismatches.is_empty(),
|
||||
"branches forked off the wrong parent under the swap-restore race; \
|
||||
(branch, observed_count, expected_count): {:?}",
|
||||
mismatches,
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn oversized_request_body_returns_payload_too_large() {
|
||||
let (_temp, app) = app_for_loaded_repo().await;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue