MR-794 step 2: tests — flip partial_failure + add coalesce/D₂/load tests

* Flip partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_table
  → partial_failure_leaves_target_queryable_and_unblocks_next_mutation.
  Asserts the next mutation succeeds (no drift) instead of expecting
  the legacy ExpectedVersionMismatch.
* New: mutation_rejects_mixed_insert_and_delete_at_parse_time —
  D₂ check fires before any I/O.
* New: mixed_insert_and_update_on_same_person_coalesces_to_one_merge —
  verifies dedupe-by-id last-write-wins in MutationStaging::finalize.
* New: multiple_appends_to_same_edge_coalesce_to_one_append —
  two-statement edge insert publishes exactly once.
* New: multi_statement_inserts_publish_exactly_once — manifest
  version advances by exactly 1 per multi-statement query.
* New: load_with_bad_edge_reference_unblocks_next_load — RI
  violation aborts before publish; next load succeeds.
* New: load_with_cardinality_violation_unblocks_next_load — uses
  a custom WorksAt @card(0..1) schema; cardinality violation aborts
  before publish; next load succeeds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-01 10:43:00 +02:00
parent bbf610ea9b
commit 82350bdc4a
No known key found for this signature in database

View file

@ -14,6 +14,7 @@
mod helpers;
use arrow_array::Array;
use omnigraph::db::commit_graph::CommitGraph;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
@ -160,30 +161,27 @@ async fn multi_statement_mutation_is_atomic_with_read_your_writes() {
assert_eq!(friends.num_rows(), 1);
}
/// Mid-query partial failure: op-1 writes a Lance fragment, op-2 fails.
/// Documents the *current* observable behavior — not the desired one. The
/// publisher never publishes (good — no manifest commit, target state
/// unchanged), but Lance HEAD on the touched table is now ahead of the
/// manifest-recorded version. The next mutation against that table fails
/// loudly with `ExpectedVersionMismatch` (the engine's
/// `ensure_expected_version` strict-equality check refuses the drift).
/// Mid-query partial failure: op-1 stages a Person insert, op-2 fails on
/// referential integrity (validate_edge_insert_endpoints). Under the
/// MR-794 staged-write rewire, op-1's batch lives in the in-memory
/// accumulator and never reaches Lance — Lance HEAD on `node:Person`
/// stays at the pre-mutation version. The publisher never publishes,
/// the manifest never advances, and the next mutation against the same
/// table proceeds normally (no `ExpectedVersionMismatch`).
///
/// **Known limitation, MR-771 follow-up**: a proper rollback requires
/// per-table Lance branches (write to a transient branch, fast-forward
/// main on success, drop on failure). Lance's `restore()` is not a rewind
/// — it appends a new commit, advancing HEAD further. See `docs/runs.md`
/// for the workaround (rare in practice: most validation runs before any
/// Lance write, so this only fires on multi-statement queries where a
/// late op fails after an earlier op committed).
/// This test pins the post-MR-794 contract:
/// - Failed multi-statement mutation surfaces a clear error, no manifest
/// commit, no observable state change.
/// - The touched tables stay queryable and writable from the next
/// query — Lance HEAD has not drifted.
#[tokio::test]
async fn partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_table() {
async fn partial_failure_leaves_target_queryable_and_unblocks_next_mutation() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Op-1 inserts Person "Eve" successfully (advancing Lance HEAD on
// node:Person). Op-2 inserts Knows from Eve to "Missing" — fails at
// validate_edge_insert_endpoints because "Missing" doesn't exist.
// The query as a whole errors; the publisher never runs.
// Op-1 stages a Person 'Eve' insert. Op-2 attempts an edge to
// 'Missing' — fails at validate_edge_insert_endpoints because
// 'Missing' doesn't exist (and isn't pending).
let err = db
.mutate(
"main",
@ -205,10 +203,9 @@ async fn partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_
manifest_err.message,
);
// Atomicity at the manifest level: Eve is *not* observable. The Lance
// fragment from op-1 exists on disk but is not referenced by the
// manifest; readers at the manifest's pinned version see the
// pre-mutation state.
// Atomicity at the manifest level: Eve is *not* observable. The
// staged batch never reached Lance, so neither the Lance HEAD nor
// the manifest moved.
let eve = db
.query(
ReadTarget::branch("main"),
@ -218,11 +215,12 @@ async fn partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_
)
.await
.unwrap();
assert_eq!(eve.num_rows(), 0, "partial Lance write must not be visible");
assert_eq!(eve.num_rows(), 0, "partial mutation must not be visible");
// The next mutation against the *same* table fails loudly. Other
// tables are unaffected, and reads still work.
let blocked = db
// The next mutation against the same table SUCCEEDS — staged writes
// never advance Lance HEAD on a failed query, so there is no drift
// to trip the publisher's CAS.
let result = db
.mutate(
"main",
MUTATION_QUERIES,
@ -230,14 +228,23 @@ async fn partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.expect_err("next mutation on the touched table is blocked by orphan Lance HEAD");
let OmniError::Manifest(blocked_err) = blocked else {
panic!("expected Manifest error, got {blocked:?}");
};
assert!(matches!(
blocked_err.details,
Some(omnigraph::error::ManifestConflictDetails::ExpectedVersionMismatch { .. })
));
.expect("next mutation on the touched table must succeed under MR-794");
assert_eq!(
result.affected_nodes, 1,
"follow-up insert should report 1 affected node"
);
// And Frank is observable.
let frank = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "Frank")]),
)
.await
.unwrap();
assert_eq!(frank.num_rows(), 1, "Frank must be visible after publish");
}
/// Concurrent writers to the same `(table, branch)` produce exactly one
@ -472,3 +479,362 @@ async fn public_branch_apis_reject_internal_run_refs() {
err.message
);
}
// ─── MR-794: staged-write rewire — additional contract tests ────────────────
/// Mutation queries used only by the MR-794 tests below. Kept in the test
/// file (not in helpers' shared `MUTATION_QUERIES`) to keep their scope
/// local to the staged-write coverage.
const STAGED_QUERIES: &str = r#"
query insert_two_persons($a_name: String, $a_age: I32, $b_name: String, $b_age: I32) {
insert Person { name: $a_name, age: $a_age }
insert Person { name: $b_name, age: $b_age }
}
query insert_then_update_same_person(
$name: String, $insert_age: I32, $update_age: I32
) {
insert Person { name: $name, age: $insert_age }
update Person set { age: $update_age } where name = $name
}
query insert_two_friends($from: String, $a: String, $b: String) {
insert Knows { from: $from, to: $a }
insert Knows { from: $from, to: $b }
}
query mixed_insert_and_delete($name: String, $age: I32, $victim: String) {
insert Person { name: $name, age: $age }
delete Person where name = $victim
}
"#;
/// D₂: a query mixing inserts/updates with deletes is rejected at parse
/// time, BEFORE any I/O. The error shape directs the user to split the
/// query into two mutations.
#[tokio::test]
async fn mutation_rejects_mixed_insert_and_delete_at_parse_time() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Capture pre-mutation state on touched tables to confirm no I/O.
let persons_before = count_rows(&db, "node:Person").await;
let err = db
.mutate(
"main",
STAGED_QUERIES,
"mixed_insert_and_delete",
&mixed_params(
&[("$name", "Eve"), ("$victim", "Alice")],
&[("$age", 22)],
),
)
.await
.expect_err("D₂ must reject mixed insert+delete");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("inserts/updates and deletes"),
"unexpected error message: {}",
manifest_err.message,
);
assert!(
manifest_err.message.contains("split into separate mutations"),
"error message should direct user to split: {}",
manifest_err.message,
);
// No I/O — counts unchanged, branches unchanged.
let persons_after = count_rows(&db, "node:Person").await;
assert_eq!(
persons_before, persons_after,
"D₂ rejection must fire before any write",
);
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
}
/// `insert Person 'X'; update Person where name='X' set age=...` — both
/// ops produce content on `node:Person` and coalesce into one
/// `stage_merge_insert` at end-of-query. The accumulator's last-write-wins
/// dedupe (in `MutationStaging::finalize`) ensures the update's value
/// wins. Single Lance commit per table per query.
#[tokio::test]
async fn mixed_insert_and_update_on_same_person_coalesces_to_one_merge() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_version = version_main(&db).await.unwrap();
let result = db
.mutate(
"main",
STAGED_QUERIES,
"insert_then_update_same_person",
&mixed_params(
&[("$name", "Yves")],
&[("$insert_age", 10), ("$update_age", 99)],
),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 2, "1 insert + 1 update reported");
// The end-state row carries the update value (last-write-wins via
// dedupe in finalize), proving the staged merge_insert ran with the
// correct source dedupe. Read the underlying Person table directly
// and assert age=99 for the row we just inserted+updated.
let batches = read_table(&db, "node:Person").await;
let mut found_age: Option<i32> = None;
for batch in &batches {
let names = batch
.column_by_name("name")
.expect("Person table missing 'name' column")
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.expect("'name' should be Utf8");
let ages = batch
.column_by_name("age")
.expect("Person table missing 'age' column")
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("'age' should be I32");
for i in 0..batch.num_rows() {
if names.is_valid(i) && names.value(i) == "Yves" {
if ages.is_valid(i) {
found_age = Some(ages.value(i));
}
}
}
}
assert_eq!(
found_age,
Some(99),
"dedupe must keep the update's age value, not the insert's",
);
// One-publish guarantee: manifest version advanced by exactly 1.
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"insert+update query must publish exactly once",
);
}
/// `insert Knows from='Alice' to='Bob'; insert Knows from='Alice' to='Eve'`
/// — both append to `edge:Knows`. The accumulator coalesces them into one
/// `stage_append` at end-of-query. Edge IDs are ULID-generated so no
/// dedupe is needed (Append mode).
#[tokio::test]
async fn multiple_appends_to_same_edge_coalesce_to_one_append() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Add Eve so the second edge has a valid endpoint.
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
let edges_before = count_rows(&db, "edge:Knows").await;
let pre_version = version_main(&db).await.unwrap();
let result = db
.mutate(
"main",
STAGED_QUERIES,
"insert_two_friends",
&params(&[
("$from", "Alice"),
("$a", "Bob"),
("$b", "Eve"),
]),
)
.await
.unwrap();
assert_eq!(result.affected_edges, 2);
// Both edges visible.
let edges_after = count_rows(&db, "edge:Knows").await;
assert_eq!(edges_after, edges_before + 2);
// One manifest version bump for the two-edge query (atomic publish).
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"two-statement edge insert must publish exactly once",
);
}
/// A multi-statement insert query touching two Person rows produces a
/// single `stage_*` + `commit_staged` per table — verified by checking
/// that the manifest version advances exactly once across the query.
#[tokio::test]
async fn multi_statement_inserts_publish_exactly_once() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_version = version_main(&db).await.unwrap();
db.mutate(
"main",
STAGED_QUERIES,
"insert_two_persons",
&mixed_params(
&[("$a_name", "Owen"), ("$b_name", "Pat")],
&[("$a_age", 50), ("$b_age", 51)],
),
)
.await
.unwrap();
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"two-statement insert query must publish exactly once",
);
// Both rows visible.
let owen = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "Owen")]),
)
.await
.unwrap();
assert_eq!(owen.num_rows(), 1);
let pat = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "Pat")]),
)
.await
.unwrap();
assert_eq!(pat.num_rows(), 1);
}
/// A load with a mid-input edge RI violation must leave Lance HEAD on
/// the touched node tables untouched (staged loader never commits any
/// fragment when the load fails). The next load on the same tables
/// succeeds — no `ExpectedVersionMismatch` from drift.
#[tokio::test]
async fn load_with_bad_edge_reference_unblocks_next_load() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
// Seed with the standard fixture so we're working from a non-empty
// baseline.
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
let pre_persons = count_rows(&db, "node:Person").await;
let pre_edges = count_rows(&db, "edge:Knows").await;
// First load: append a Person + an edge whose `to` points to a
// non-existent Person. RI fails AFTER the staged Person is in the
// accumulator but BEFORE the publish.
let bad = r#"{"type": "Person", "data": {"name": "Mallory", "age": 5}}
{"edge": "Knows", "from": "Mallory", "to": "Ghost"}
"#;
let err = load_jsonl(&mut db, bad, LoadMode::Append)
.await
.expect_err("RI violation must fail the load");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("not found"),
"unexpected error: {}",
manifest_err.message,
);
// No write made it to disk: counts unchanged.
let mid_persons = count_rows(&db, "node:Person").await;
let mid_edges = count_rows(&db, "edge:Knows").await;
assert_eq!(mid_persons, pre_persons, "failed load must not advance Person count");
assert_eq!(mid_edges, pre_edges, "failed load must not advance Knows count");
// Second load against the same tables — succeeds (no HEAD drift).
let good = r#"{"type": "Person", "data": {"name": "Pat", "age": 55}}"#;
load_jsonl(&mut db, good, LoadMode::Append).await.unwrap();
assert_eq!(
count_rows(&db, "node:Person").await,
pre_persons + 1,
"follow-up load must succeed (no drift)",
);
}
/// Same shape as the RI test above, but driven by a cardinality
/// violation (`@card(0..1)` on `WorksAt`). The staged loader's pending
/// edge accumulator drives the cardinality scan; a violation aborts
/// the load before publish; the next load on the same tables succeeds.
#[tokio::test]
async fn load_with_cardinality_violation_unblocks_next_load() {
// Use a custom schema where WorksAt has a strict 0..1 cardinality
// bound — the default test schema leaves WorksAt unbounded. Seed
// Alice + two companies, then attempt two WorksAt edges from Alice,
// which violates the bound.
const CARD_SCHEMA: &str = r#"
node Person {
name: String @key
age: I32?
}
node Company {
name: String @key
}
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
let seed = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Bigco"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap();
let pre_works = count_rows(&db, "edge:WorksAt").await;
// Two WorksAt edges from Alice — exceeds @card(0..1).
let bad = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
{"edge": "WorksAt", "from": "Alice", "to": "Bigco"}
"#;
let err = load_jsonl(&mut db, bad, LoadMode::Append)
.await
.expect_err("cardinality violation must fail the load");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("@card violation"),
"unexpected error: {}",
manifest_err.message,
);
// No edges added; next load on the same edge table succeeds.
let mid_works = count_rows(&db, "edge:WorksAt").await;
assert_eq!(mid_works, pre_works);
let good = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}"#;
load_jsonl(&mut db, good, LoadMode::Append).await.unwrap();
assert_eq!(
count_rows(&db, "edge:WorksAt").await,
pre_works + 1,
"follow-up load must succeed (no drift on edge table)",
);
}