From 976aa0ec1d7c7c6afea58e9df30ae43733b65da5 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 17:03:05 +0200 Subject: [PATCH] tests: pin concurrent /change + branch_merge interleave preserves writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Future-proofs against MR-895 work that may move or remove the per-(table, branch) writer queue acquisition inside `branch_merge` (`crates/omnigraph/src/exec/merge.rs:1224`). Today the queue linearizes a concurrent /change on main against a `branch_merge feature → main` on the same touched tables; both succeed and the inserted row is preserved post-merge. Codex flagged this scenario as a P1 in PR #75 review claiming the merge could silently overwrite concurrent target writes because the source-rewrite path opens with `MutationOpKind::Merge` (skipping the strict pre-stage check). Validation showed the queue at merge.rs:1224 is held across both Phase B (per-table commit_staged) and Phase C (manifest publish), so there's no interleave window. The Merge op_kind only affects same-process pre-stage drift detection, not cross-write linearization. The test passes on f925ad1; landing it as a regression sentinel catches future changes that drop the queue acquisition. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/tests/server.rs | 140 ++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 0ebe652..0cfab94 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2557,6 +2557,146 @@ async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordin ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_change_during_branch_merge_preserves_writes() { + // Future-proof against MR-895 work that may move or remove the + // per-(table, branch) writer queue acquisition inside `branch_merge` + // (`crates/omnigraph/src/exec/merge.rs:1224`). Today the queue + // linearizes a concurrent /change on main against branch_merge + // feature → main on the same touched tables; both succeed and B's + // row is preserved post-merge. + // + // Codex flagged a P1 in PR #75 review claiming the merge could + // silently overwrite concurrent target writes because the + // source-rewrite path opens with `MutationOpKind::Merge` (skipping + // the strict pre-stage check). Validation by subagent showed the + // queue at merge.rs:1224 is held across both Phase B (per-table + // commit_staged) and Phase C (manifest publish), so there's no + // interleave window. The Merge op_kind only affects same-process + // pre-stage drift detection, not cross-write linearization. + // + // This test is the regression pin that catches a future change + // which drops the queue acquisition and admits the silent overwrite. + let temp = init_loaded_repo().await; + let repo = repo_path(temp.path()); + let state = AppState::open(repo.to_string_lossy().to_string()) + .await + .unwrap(); + let app = build_app(state); + + // test.jsonl: 4 Persons on main. + const SEED_PERSONS: u64 = 4; + + // Create feature branch + insert one Person on feature. + let create_body = serde_json::to_vec(&BranchCreateRequest { + from: Some("main".to_string()), + name: "feature".to_string(), + }) + .unwrap(); + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/branches") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(create_body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let feature_insert = serde_json::to_vec(&ChangeRequest { + query_source: MUTATION_QUERIES.to_string(), + query_name: Some("insert_person".to_string()), + params: Some(json!({ "name": "Eve", "age": 22 })), + branch: Some("feature".to_string()), + }) + .unwrap(); + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/change") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(feature_insert)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + // Concurrent: insert on main + merge feature → main. The queue + // linearizes them on the (node:Person, main) key; both succeed. + let app_change = app.clone(); + let change_handle = tokio::spawn(async move { + let body = serde_json::to_vec(&ChangeRequest { + query_source: MUTATION_QUERIES.to_string(), + query_name: Some("insert_person".to_string()), + params: Some(json!({ "name": "Frank", "age": 33 })), + branch: Some("main".to_string()), + }) + .unwrap(); + let req = Request::builder() + .uri("/change") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + app_change.oneshot(req).await.unwrap().status() + }); + + let app_merge = app.clone(); + let merge_handle = tokio::spawn(async move { + let body = serde_json::to_vec(&BranchMergeRequest { + source: "feature".to_string(), + target: Some("main".to_string()), + }) + .unwrap(); + let req = Request::builder() + .uri("/branches/merge") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + app_merge.oneshot(req).await.unwrap().status() + }); + + let change_status = change_handle.await.unwrap(); + let merge_status = merge_handle.await.unwrap(); + assert_eq!(change_status, StatusCode::OK, "concurrent /change failed"); + assert_eq!(merge_status, StatusCode::OK, "concurrent /branches/merge failed"); + + // Post-condition: main has SEED + Eve (from feature) + Frank (inserted). + let (status, body) = json_response( + &app, + Request::builder() + .uri("/snapshot?branch=main") + .method(Method::GET) + .body(Body::empty()) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK); + let person_rows = body["tables"] + .as_array() + .and_then(|tables| { + tables + .iter() + .find(|t| t["table_key"].as_str() == Some("node:Person")) + }) + .and_then(|t| t["row_count"].as_u64()) + .expect("snapshot must include node:Person row_count"); + assert_eq!( + person_rows, + SEED_PERSONS + 2, // +1 from feature merge (Eve), +1 from concurrent /change (Frank) + "post-merge main must include both the merge result (Eve) and the \ + concurrent insert (Frank); pre-fix race would lose one of them", + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn change_disjoint_table_concurrency_succeeds_at_http_level() { // HTTP-level pin for MR-686's disjoint-table promise: concurrent /change