mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
tests: pin concurrent /change + branch_merge interleave preserves writes
Future-proofs against MR-895 work that may move or remove the
per-(table, branch) writer queue acquisition inside `branch_merge`
(`crates/omnigraph/src/exec/merge.rs:1224`). Today the queue
linearizes a concurrent /change on main against a `branch_merge
feature → main` on the same touched tables; both succeed and the
inserted row is preserved post-merge.
Codex flagged this scenario as a P1 in PR #75 review claiming the
merge could silently overwrite concurrent target writes because the
source-rewrite path opens with `MutationOpKind::Merge` (skipping the
strict pre-stage check). Validation showed the queue at merge.rs:1224
is held across both Phase B (per-table commit_staged) and Phase C
(manifest publish), so there's no interleave window. The Merge
op_kind only affects same-process pre-stage drift detection, not
cross-write linearization. The test passes on f925ad1; landing it
as a regression sentinel catches future changes that drop the queue
acquisition.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
5520ab72ff
commit
976aa0ec1d
1 changed files with 140 additions and 0 deletions
|
|
@ -2557,6 +2557,146 @@ async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordin
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn concurrent_change_during_branch_merge_preserves_writes() {
|
||||
// Future-proof against MR-895 work that may move or remove the
|
||||
// per-(table, branch) writer queue acquisition inside `branch_merge`
|
||||
// (`crates/omnigraph/src/exec/merge.rs:1224`). Today the queue
|
||||
// linearizes a concurrent /change on main against branch_merge
|
||||
// feature → main on the same touched tables; both succeed and B's
|
||||
// row is preserved post-merge.
|
||||
//
|
||||
// Codex flagged a P1 in PR #75 review claiming the merge could
|
||||
// silently overwrite concurrent target writes because the
|
||||
// source-rewrite path opens with `MutationOpKind::Merge` (skipping
|
||||
// the strict pre-stage check). Validation by subagent showed the
|
||||
// queue at merge.rs:1224 is held across both Phase B (per-table
|
||||
// commit_staged) and Phase C (manifest publish), so there's no
|
||||
// interleave window. The Merge op_kind only affects same-process
|
||||
// pre-stage drift detection, not cross-write linearization.
|
||||
//
|
||||
// This test is the regression pin that catches a future change
|
||||
// which drops the queue acquisition and admits the silent overwrite.
|
||||
let temp = init_loaded_repo().await;
|
||||
let repo = repo_path(temp.path());
|
||||
let state = AppState::open(repo.to_string_lossy().to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let app = build_app(state);
|
||||
|
||||
// test.jsonl: 4 Persons on main.
|
||||
const SEED_PERSONS: u64 = 4;
|
||||
|
||||
// Create feature branch + insert one Person on feature.
|
||||
let create_body = serde_json::to_vec(&BranchCreateRequest {
|
||||
from: Some("main".to_string()),
|
||||
name: "feature".to_string(),
|
||||
})
|
||||
.unwrap();
|
||||
let response = app
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/branches")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(create_body))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
let feature_insert = serde_json::to_vec(&ChangeRequest {
|
||||
query_source: MUTATION_QUERIES.to_string(),
|
||||
query_name: Some("insert_person".to_string()),
|
||||
params: Some(json!({ "name": "Eve", "age": 22 })),
|
||||
branch: Some("feature".to_string()),
|
||||
})
|
||||
.unwrap();
|
||||
let response = app
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/change")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(feature_insert))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
// Concurrent: insert on main + merge feature → main. The queue
|
||||
// linearizes them on the (node:Person, main) key; both succeed.
|
||||
let app_change = app.clone();
|
||||
let change_handle = tokio::spawn(async move {
|
||||
let body = serde_json::to_vec(&ChangeRequest {
|
||||
query_source: MUTATION_QUERIES.to_string(),
|
||||
query_name: Some("insert_person".to_string()),
|
||||
params: Some(json!({ "name": "Frank", "age": 33 })),
|
||||
branch: Some("main".to_string()),
|
||||
})
|
||||
.unwrap();
|
||||
let req = Request::builder()
|
||||
.uri("/change")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(body))
|
||||
.unwrap();
|
||||
app_change.oneshot(req).await.unwrap().status()
|
||||
});
|
||||
|
||||
let app_merge = app.clone();
|
||||
let merge_handle = tokio::spawn(async move {
|
||||
let body = serde_json::to_vec(&BranchMergeRequest {
|
||||
source: "feature".to_string(),
|
||||
target: Some("main".to_string()),
|
||||
})
|
||||
.unwrap();
|
||||
let req = Request::builder()
|
||||
.uri("/branches/merge")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(body))
|
||||
.unwrap();
|
||||
app_merge.oneshot(req).await.unwrap().status()
|
||||
});
|
||||
|
||||
let change_status = change_handle.await.unwrap();
|
||||
let merge_status = merge_handle.await.unwrap();
|
||||
assert_eq!(change_status, StatusCode::OK, "concurrent /change failed");
|
||||
assert_eq!(merge_status, StatusCode::OK, "concurrent /branches/merge failed");
|
||||
|
||||
// Post-condition: main has SEED + Eve (from feature) + Frank (inserted).
|
||||
let (status, body) = json_response(
|
||||
&app,
|
||||
Request::builder()
|
||||
.uri("/snapshot?branch=main")
|
||||
.method(Method::GET)
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK);
|
||||
let person_rows = body["tables"]
|
||||
.as_array()
|
||||
.and_then(|tables| {
|
||||
tables
|
||||
.iter()
|
||||
.find(|t| t["table_key"].as_str() == Some("node:Person"))
|
||||
})
|
||||
.and_then(|t| t["row_count"].as_u64())
|
||||
.expect("snapshot must include node:Person row_count");
|
||||
assert_eq!(
|
||||
person_rows,
|
||||
SEED_PERSONS + 2, // +1 from feature merge (Eve), +1 from concurrent /change (Frank)
|
||||
"post-merge main must include both the merge result (Eve) and the \
|
||||
concurrent insert (Frank); pre-fix race would lose one of them",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn change_disjoint_table_concurrency_succeeds_at_http_level() {
|
||||
// HTTP-level pin for MR-686's disjoint-table promise: concurrent /change
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue