From 5520ab72ffb3e02c2222cf64643e57d8bad70bcc Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 17:01:52 +0200 Subject: [PATCH] tests: pin disjoint /change concurrency at HTTP level MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the cubic acceptance-criteria gap (❌ "Integration test: two /change requests targeting different (table_key, branch) execute concurrently end-to-end"). The bench harness measures the throughput side; this test is the regression sentinel that catches a future change which accidentally re-introduces graph-wide serialization on the disjoint path. Spawns 4 concurrent /change inserts on node:Person and 4 on node:Company. All 8 must return 200, and the post-test row counts on each table must reflect every insert. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/tests/server.rs | 118 ++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 9c17e2f..0ebe652 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2557,6 +2557,124 @@ async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordin ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn change_disjoint_table_concurrency_succeeds_at_http_level() { + // HTTP-level pin for MR-686's disjoint-table promise: concurrent /change + // requests touching different node types must coexist without admission + // rejection or publisher-CAS conflict. The bench harness measures + // throughput; this test is the regression sentinel that catches a + // future change which accidentally re-introduces graph-wide + // serialization on the disjoint path. + // + // Setup: test.jsonl seeds 4 Persons + 2 Companies. Spawn N=4 concurrent + // /change inserts on `node:Person` and N=4 concurrent inserts on + // `node:Company`. All 8 must return 200, and the post-test row counts + // must reflect every insert. + const PERSON_QUERY: &str = r#" +query insert_p($name: String, $age: I32) { + insert Person { name: $name, age: $age } +} +"#; + const COMPANY_QUERY: &str = r#" +query insert_c($name: String) { + insert Company { name: $name } +} +"#; + const SEED_PERSONS: u64 = 4; + const SEED_COMPANIES: u64 = 2; + const PER_TYPE: usize = 4; + + let temp = init_loaded_repo().await; + let repo = repo_path(temp.path()); + let state = AppState::open(repo.to_string_lossy().to_string()) + .await + .unwrap(); + let app = build_app(state); + + let mut handles = Vec::with_capacity(PER_TYPE * 2); + for i in 0..PER_TYPE { + let app_p = app.clone(); + handles.push(tokio::spawn(async move { + let body = serde_json::to_vec(&ChangeRequest { + query_source: PERSON_QUERY.to_string(), + query_name: Some("insert_p".to_string()), + params: Some(json!({ "name": format!("p-{i}"), "age": i as i32 })), + branch: Some("main".to_string()), + }) + .unwrap(); + let req = Request::builder() + .uri("/change") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + app_p.oneshot(req).await.unwrap().status() + })); + let app_c = app.clone(); + handles.push(tokio::spawn(async move { + let body = serde_json::to_vec(&ChangeRequest { + query_source: COMPANY_QUERY.to_string(), + query_name: Some("insert_c".to_string()), + params: Some(json!({ "name": format!("c-{i}") })), + branch: Some("main".to_string()), + }) + .unwrap(); + let req = Request::builder() + .uri("/change") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + app_c.oneshot(req).await.unwrap().status() + })); + } + + let mut statuses = Vec::with_capacity(PER_TYPE * 2); + for h in handles { + statuses.push(h.await.unwrap()); + } + + let bad: Vec<_> = statuses + .iter() + .enumerate() + .filter(|(_, s)| **s != StatusCode::OK) + .collect(); + assert!( + bad.is_empty(), + "expected every disjoint /change insert to return 200, got non-200 for: {:?}", + bad, + ); + + // Verify both tables landed every insert. + let (status, body) = json_response( + &app, + Request::builder() + .uri("/snapshot?branch=main") + .method(Method::GET) + .body(Body::empty()) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK); + let lookup_count = |table_key: &str| -> u64 { + body["tables"] + .as_array() + .and_then(|tables| tables.iter().find(|t| t["table_key"].as_str() == Some(table_key))) + .and_then(|t| t["row_count"].as_u64()) + .unwrap_or_else(|| panic!("snapshot missing {}", table_key)) + }; + assert_eq!( + lookup_count("node:Person"), + SEED_PERSONS + PER_TYPE as u64, + "Person row count after concurrent inserts", + ); + assert_eq!( + lookup_count("node:Company"), + SEED_COMPANIES + PER_TYPE as u64, + "Company row count after concurrent inserts", + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] async fn ingest_per_actor_admission_cap_returns_429() {