diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index ef4ca41..9c891b4 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2248,6 +2248,125 @@ async fn change_concurrent_inserts_same_key_serialize_without_409() { // would have done pre-Phase-2). } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn change_concurrent_updates_same_key_serialize_via_publisher_cas() { + // Pin Update RYW semantics under in-process concurrency on the same + // `(table, branch)`. With per-table queue serialization and op-kind-aware + // drift detection at commit time, exactly one of N concurrent UPDATEs + // on the same row commits; the rest are rejected as 409 manifest_conflict. + // + // Pre-fix bug class: in `MutationStaging::commit_all`, after queue + // acquisition, the staged Lance transaction is handed straight to + // `commit_staged`. For a writer whose staged dataset is at V0 but + // Lance HEAD has advanced to V1 (because the queue's prior winner + // already published), Lance's transaction conflict resolver fires + // `RetryableCommitConflict` on Update vs Update on the same row. + // That error gets wrapped as `OmniError::Lance()` and the + // API surfaces it as **500 internal**, not 409. Users see "internal + // server error" instead of a retryable conflict, breaking the + // documented 409 contract for in-process drift. + // + // Post-fix invariant: `commit_all` does an op-kind-aware drift check + // before each `commit_staged`. For tables whose tracked op_kind has + // `strict_pre_stage_version_check() == true` (Update / Delete / + // SchemaRewrite), if the staged dataset's version doesn't match the + // fresh manifest pin, return `OmniError::manifest_expected_version_mismatch` + // → 409 ExpectedVersionMismatch. The N-1 losers see a clean 409 + // before Lance's commit_staged ever runs. + // + // Why correct-by-design: closing the class "Lance internal conflict + // surfaces as 500 instead of 409" rather than mapping the specific + // Lance error variant. The drift check fires at the right architectural + // layer (engine boundary, under the queue) and respects the existing + // `MutationOpKind` policy. + let temp = init_loaded_repo().await; + let repo = repo_path(temp.path()); + let state = AppState::open(repo.to_string_lossy().to_string()) + .await + .unwrap(); + let app = build_app(state); + + // Spawn N=8 concurrent UPDATEs on Alice (from test.jsonl, age=30 at V0) + // writing distinct ages. + const N: usize = 8; + let mut handles = Vec::with_capacity(N); + for i in 0..N { + let app = app.clone(); + let target_age = 100 + i as i32; + handles.push(tokio::spawn(async move { + let body = serde_json::to_vec(&ChangeRequest { + query_source: MUTATION_QUERIES.to_string(), + query_name: Some("set_age".to_string()), + params: Some(json!({ "name": "Alice", "age": target_age })), + branch: Some("main".to_string()), + }) + .unwrap(); + let req = Request::builder() + .uri("/change") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + let status = response.status(); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + (status, body.to_vec()) + })); + } + + let mut results = Vec::with_capacity(N); + for h in handles { + results.push(h.await.unwrap()); + } + let statuses: Vec = results.iter().map(|(s, _)| *s).collect(); + + let ok_count = statuses + .iter() + .filter(|s| **s == StatusCode::OK) + .count(); + let conflict_count = statuses + .iter() + .filter(|s| **s == StatusCode::CONFLICT) + .count(); + let other: Vec<_> = statuses + .iter() + .enumerate() + .filter(|(_, s)| **s != StatusCode::OK && **s != StatusCode::CONFLICT) + .collect(); + + let other_bodies: Vec<(usize, StatusCode, String)> = other + .iter() + .map(|(i, s)| { + let body_str = String::from_utf8_lossy(&results[*i].1).to_string(); + (*i, **s, body_str) + }) + .collect(); + assert!( + other.is_empty(), + "expected only 200 or 409 statuses, got non-200/409 entries: {:?}", + other_bodies + ); + assert_eq!( + ok_count + conflict_count, + N, + "all responses must be 200 or 409 to satisfy the RYW invariant; statuses: {:?}", + statuses + ); + assert_eq!( + ok_count, 1, + "expected exactly one update to commit and N-1 to receive 409 manifest_conflict \ + (op-kind-aware drift check rejects stale-V0 staged datasets at commit_all entry). \ + Got {} OK + {} 409 + {} other. \ + Pre-fix symptom: 1 OK + (N-1) x 500 because Lance's RetryableCommitConflict for \ + Update vs Update on the same row bubbles up as `OmniError::Lance()` and \ + the API maps it to 500 internal, not 409. Statuses: {:?}", + ok_count, + conflict_count, + statuses.len() - ok_count - conflict_count, + statuses, + ); +} + #[tokio::test(flavor = "multi_thread")] async fn oversized_request_body_returns_payload_too_large() { let (_temp, app) = app_for_loaded_repo().await;