tests: pin UPDATE RYW under in-process concurrency (red)

Per AGENTS.md rule 8, this commit lands the failing regression test
ahead of the fix so the red → green pair is visible in git log.

The test asserts the RYW invariant for in-process concurrent UPDATEs on
the same row: exactly one writer commits and N-1 receive 409
manifest_conflict. Currently fails on f925ad1 with 1 x 200 + 7 x 500:

> "storage: Retryable commit conflict for version 6: This Update
>  transaction was preempted by concurrent transaction Update at
>  version 6. Please retry."

Lance's transaction conflict resolver correctly detects the Update vs
Update race, but the error wraps as `OmniError::Lance(<string>)` and the
API surfaces it as 500 internal rather than 409 retryable conflict. Users
see "internal server error" for what is documented as a retryable
conflict path.

The fix lands in the next commit: an op-kind-aware drift check at the
commit_all entry that returns 409 ExpectedVersionMismatch for tables
whose first touch was Update / Delete / SchemaRewrite when the staged
dataset version drifts from the manifest pin under the queue.

Closes the bug class "Lance internal conflict surfaces as 500 instead
of 409" rather than mapping the specific Lance error variant — the
right architectural layer (engine boundary, under the queue) catches
the drift before commit_staged ever runs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 16:33:53 +02:00
parent 6bd9f1c085
commit ebf5a5769d
No known key found for this signature in database

View file

@ -2248,6 +2248,125 @@ async fn change_concurrent_inserts_same_key_serialize_without_409() {
// would have done pre-Phase-2).
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn change_concurrent_updates_same_key_serialize_via_publisher_cas() {
// Pin Update RYW semantics under in-process concurrency on the same
// `(table, branch)`. With per-table queue serialization and op-kind-aware
// drift detection at commit time, exactly one of N concurrent UPDATEs
// on the same row commits; the rest are rejected as 409 manifest_conflict.
//
// Pre-fix bug class: in `MutationStaging::commit_all`, after queue
// acquisition, the staged Lance transaction is handed straight to
// `commit_staged`. For a writer whose staged dataset is at V0 but
// Lance HEAD has advanced to V1 (because the queue's prior winner
// already published), Lance's transaction conflict resolver fires
// `RetryableCommitConflict` on Update vs Update on the same row.
// That error gets wrapped as `OmniError::Lance(<string>)` and the
// API surfaces it as **500 internal**, not 409. Users see "internal
// server error" instead of a retryable conflict, breaking the
// documented 409 contract for in-process drift.
//
// Post-fix invariant: `commit_all` does an op-kind-aware drift check
// before each `commit_staged`. For tables whose tracked op_kind has
// `strict_pre_stage_version_check() == true` (Update / Delete /
// SchemaRewrite), if the staged dataset's version doesn't match the
// fresh manifest pin, return `OmniError::manifest_expected_version_mismatch`
// → 409 ExpectedVersionMismatch. The N-1 losers see a clean 409
// before Lance's commit_staged ever runs.
//
// Why correct-by-design: closing the class "Lance internal conflict
// surfaces as 500 instead of 409" rather than mapping the specific
// Lance error variant. The drift check fires at the right architectural
// layer (engine boundary, under the queue) and respects the existing
// `MutationOpKind` policy.
let temp = init_loaded_repo().await;
let repo = repo_path(temp.path());
let state = AppState::open(repo.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
// Spawn N=8 concurrent UPDATEs on Alice (from test.jsonl, age=30 at V0)
// writing distinct ages.
const N: usize = 8;
let mut handles = Vec::with_capacity(N);
for i in 0..N {
let app = app.clone();
let target_age = 100 + i as i32;
handles.push(tokio::spawn(async move {
let body = serde_json::to_vec(&ChangeRequest {
query_source: MUTATION_QUERIES.to_string(),
query_name: Some("set_age".to_string()),
params: Some(json!({ "name": "Alice", "age": target_age })),
branch: Some("main".to_string()),
})
.unwrap();
let req = Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
let response = app.oneshot(req).await.unwrap();
let status = response.status();
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
(status, body.to_vec())
}));
}
let mut results = Vec::with_capacity(N);
for h in handles {
results.push(h.await.unwrap());
}
let statuses: Vec<StatusCode> = results.iter().map(|(s, _)| *s).collect();
let ok_count = statuses
.iter()
.filter(|s| **s == StatusCode::OK)
.count();
let conflict_count = statuses
.iter()
.filter(|s| **s == StatusCode::CONFLICT)
.count();
let other: Vec<_> = statuses
.iter()
.enumerate()
.filter(|(_, s)| **s != StatusCode::OK && **s != StatusCode::CONFLICT)
.collect();
let other_bodies: Vec<(usize, StatusCode, String)> = other
.iter()
.map(|(i, s)| {
let body_str = String::from_utf8_lossy(&results[*i].1).to_string();
(*i, **s, body_str)
})
.collect();
assert!(
other.is_empty(),
"expected only 200 or 409 statuses, got non-200/409 entries: {:?}",
other_bodies
);
assert_eq!(
ok_count + conflict_count,
N,
"all responses must be 200 or 409 to satisfy the RYW invariant; statuses: {:?}",
statuses
);
assert_eq!(
ok_count, 1,
"expected exactly one update to commit and N-1 to receive 409 manifest_conflict \
(op-kind-aware drift check rejects stale-V0 staged datasets at commit_all entry). \
Got {} OK + {} 409 + {} other. \
Pre-fix symptom: 1 OK + (N-1) x 500 because Lance's RetryableCommitConflict for \
Update vs Update on the same row bubbles up as `OmniError::Lance(<string>)` and \
the API maps it to 500 internal, not 409. Statuses: {:?}",
ok_count,
conflict_count,
statuses.len() - ok_count - conflict_count,
statuses,
);
}
#[tokio::test(flavor = "multi_thread")]
async fn oversized_request_body_returns_payload_too_large() {
let (_temp, app) = app_for_loaded_repo().await;