tests: pin disjoint /change concurrency at HTTP level

Closes the cubic acceptance-criteria gap ( "Integration test: two
/change requests targeting different (table_key, branch) execute
concurrently end-to-end"). The bench harness measures the throughput
side; this test is the regression sentinel that catches a future
change which accidentally re-introduces graph-wide serialization on
the disjoint path.

Spawns 4 concurrent /change inserts on node:Person and 4 on
node:Company. All 8 must return 200, and the post-test row counts
on each table must reflect every insert.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 17:01:52 +02:00
parent 6ef07386d3
commit 5520ab72ff
No known key found for this signature in database

View file

@ -2557,6 +2557,124 @@ async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordin
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn change_disjoint_table_concurrency_succeeds_at_http_level() {
// HTTP-level pin for MR-686's disjoint-table promise: concurrent /change
// requests touching different node types must coexist without admission
// rejection or publisher-CAS conflict. The bench harness measures
// throughput; this test is the regression sentinel that catches a
// future change which accidentally re-introduces graph-wide
// serialization on the disjoint path.
//
// Setup: test.jsonl seeds 4 Persons + 2 Companies. Spawn N=4 concurrent
// /change inserts on `node:Person` and N=4 concurrent inserts on
// `node:Company`. All 8 must return 200, and the post-test row counts
// must reflect every insert.
const PERSON_QUERY: &str = r#"
query insert_p($name: String, $age: I32) {
insert Person { name: $name, age: $age }
}
"#;
const COMPANY_QUERY: &str = r#"
query insert_c($name: String) {
insert Company { name: $name }
}
"#;
const SEED_PERSONS: u64 = 4;
const SEED_COMPANIES: u64 = 2;
const PER_TYPE: usize = 4;
let temp = init_loaded_repo().await;
let repo = repo_path(temp.path());
let state = AppState::open(repo.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
let mut handles = Vec::with_capacity(PER_TYPE * 2);
for i in 0..PER_TYPE {
let app_p = app.clone();
handles.push(tokio::spawn(async move {
let body = serde_json::to_vec(&ChangeRequest {
query_source: PERSON_QUERY.to_string(),
query_name: Some("insert_p".to_string()),
params: Some(json!({ "name": format!("p-{i}"), "age": i as i32 })),
branch: Some("main".to_string()),
})
.unwrap();
let req = Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
app_p.oneshot(req).await.unwrap().status()
}));
let app_c = app.clone();
handles.push(tokio::spawn(async move {
let body = serde_json::to_vec(&ChangeRequest {
query_source: COMPANY_QUERY.to_string(),
query_name: Some("insert_c".to_string()),
params: Some(json!({ "name": format!("c-{i}") })),
branch: Some("main".to_string()),
})
.unwrap();
let req = Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
app_c.oneshot(req).await.unwrap().status()
}));
}
let mut statuses = Vec::with_capacity(PER_TYPE * 2);
for h in handles {
statuses.push(h.await.unwrap());
}
let bad: Vec<_> = statuses
.iter()
.enumerate()
.filter(|(_, s)| **s != StatusCode::OK)
.collect();
assert!(
bad.is_empty(),
"expected every disjoint /change insert to return 200, got non-200 for: {:?}",
bad,
);
// Verify both tables landed every insert.
let (status, body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.method(Method::GET)
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::OK);
let lookup_count = |table_key: &str| -> u64 {
body["tables"]
.as_array()
.and_then(|tables| tables.iter().find(|t| t["table_key"].as_str() == Some(table_key)))
.and_then(|t| t["row_count"].as_u64())
.unwrap_or_else(|| panic!("snapshot missing {}", table_key))
};
assert_eq!(
lookup_count("node:Person"),
SEED_PERSONS + PER_TYPE as u64,
"Person row count after concurrent inserts",
);
assert_eq!(
lookup_count("node:Company"),
SEED_COMPANIES + PER_TYPE as u64,
"Company row count after concurrent inserts",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn ingest_per_actor_admission_cap_returns_429() {