From 82350bdc4a82debf6155b922bac2080b18eff39f Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 1 May 2026 10:43:00 +0200 Subject: [PATCH] =?UTF-8?q?MR-794=20step=202:=20tests=20=E2=80=94=20flip?= =?UTF-8?q?=20partial=5Ffailure=20+=20add=20coalesce/D=E2=82=82/load=20tes?= =?UTF-8?q?ts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Flip partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_table → partial_failure_leaves_target_queryable_and_unblocks_next_mutation. Asserts the next mutation succeeds (no drift) instead of expecting the legacy ExpectedVersionMismatch. * New: mutation_rejects_mixed_insert_and_delete_at_parse_time — D₂ check fires before any I/O. * New: mixed_insert_and_update_on_same_person_coalesces_to_one_merge — verifies dedupe-by-id last-write-wins in MutationStaging::finalize. * New: multiple_appends_to_same_edge_coalesce_to_one_append — two-statement edge insert publishes exactly once. * New: multi_statement_inserts_publish_exactly_once — manifest version advances by exactly 1 per multi-statement query. * New: load_with_bad_edge_reference_unblocks_next_load — RI violation aborts before publish; next load succeeds. * New: load_with_cardinality_violation_unblocks_next_load — uses a custom WorksAt @card(0..1) schema; cardinality violation aborts before publish; next load succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/tests/runs.rs | 436 ++++++++++++++++++++++++++++++--- 1 file changed, 401 insertions(+), 35 deletions(-) diff --git a/crates/omnigraph/tests/runs.rs b/crates/omnigraph/tests/runs.rs index c83d914..2e9b480 100644 --- a/crates/omnigraph/tests/runs.rs +++ b/crates/omnigraph/tests/runs.rs @@ -14,6 +14,7 @@ mod helpers; +use arrow_array::Array; use omnigraph::db::commit_graph::CommitGraph; use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError}; @@ -160,30 +161,27 @@ async fn multi_statement_mutation_is_atomic_with_read_your_writes() { assert_eq!(friends.num_rows(), 1); } -/// Mid-query partial failure: op-1 writes a Lance fragment, op-2 fails. -/// Documents the *current* observable behavior — not the desired one. The -/// publisher never publishes (good — no manifest commit, target state -/// unchanged), but Lance HEAD on the touched table is now ahead of the -/// manifest-recorded version. The next mutation against that table fails -/// loudly with `ExpectedVersionMismatch` (the engine's -/// `ensure_expected_version` strict-equality check refuses the drift). +/// Mid-query partial failure: op-1 stages a Person insert, op-2 fails on +/// referential integrity (validate_edge_insert_endpoints). Under the +/// MR-794 staged-write rewire, op-1's batch lives in the in-memory +/// accumulator and never reaches Lance — Lance HEAD on `node:Person` +/// stays at the pre-mutation version. The publisher never publishes, +/// the manifest never advances, and the next mutation against the same +/// table proceeds normally (no `ExpectedVersionMismatch`). /// -/// **Known limitation, MR-771 follow-up**: a proper rollback requires -/// per-table Lance branches (write to a transient branch, fast-forward -/// main on success, drop on failure). Lance's `restore()` is not a rewind -/// — it appends a new commit, advancing HEAD further. See `docs/runs.md` -/// for the workaround (rare in practice: most validation runs before any -/// Lance write, so this only fires on multi-statement queries where a -/// late op fails after an earlier op committed). +/// This test pins the post-MR-794 contract: +/// - Failed multi-statement mutation surfaces a clear error, no manifest +/// commit, no observable state change. +/// - The touched tables stay queryable and writable from the next +/// query — Lance HEAD has not drifted. #[tokio::test] -async fn partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_table() { +async fn partial_failure_leaves_target_queryable_and_unblocks_next_mutation() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; - // Op-1 inserts Person "Eve" successfully (advancing Lance HEAD on - // node:Person). Op-2 inserts Knows from Eve to "Missing" — fails at - // validate_edge_insert_endpoints because "Missing" doesn't exist. - // The query as a whole errors; the publisher never runs. + // Op-1 stages a Person 'Eve' insert. Op-2 attempts an edge to + // 'Missing' — fails at validate_edge_insert_endpoints because + // 'Missing' doesn't exist (and isn't pending). let err = db .mutate( "main", @@ -205,10 +203,9 @@ async fn partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_ manifest_err.message, ); - // Atomicity at the manifest level: Eve is *not* observable. The Lance - // fragment from op-1 exists on disk but is not referenced by the - // manifest; readers at the manifest's pinned version see the - // pre-mutation state. + // Atomicity at the manifest level: Eve is *not* observable. The + // staged batch never reached Lance, so neither the Lance HEAD nor + // the manifest moved. let eve = db .query( ReadTarget::branch("main"), @@ -218,11 +215,12 @@ async fn partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_ ) .await .unwrap(); - assert_eq!(eve.num_rows(), 0, "partial Lance write must not be visible"); + assert_eq!(eve.num_rows(), 0, "partial mutation must not be visible"); - // The next mutation against the *same* table fails loudly. Other - // tables are unaffected, and reads still work. - let blocked = db + // The next mutation against the same table SUCCEEDS — staged writes + // never advance Lance HEAD on a failed query, so there is no drift + // to trip the publisher's CAS. + let result = db .mutate( "main", MUTATION_QUERIES, @@ -230,14 +228,23 @@ async fn partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_ &mixed_params(&[("$name", "Frank")], &[("$age", 33)]), ) .await - .expect_err("next mutation on the touched table is blocked by orphan Lance HEAD"); - let OmniError::Manifest(blocked_err) = blocked else { - panic!("expected Manifest error, got {blocked:?}"); - }; - assert!(matches!( - blocked_err.details, - Some(omnigraph::error::ManifestConflictDetails::ExpectedVersionMismatch { .. }) - )); + .expect("next mutation on the touched table must succeed under MR-794"); + assert_eq!( + result.affected_nodes, 1, + "follow-up insert should report 1 affected node" + ); + + // And Frank is observable. + let frank = db + .query( + ReadTarget::branch("main"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "Frank")]), + ) + .await + .unwrap(); + assert_eq!(frank.num_rows(), 1, "Frank must be visible after publish"); } /// Concurrent writers to the same `(table, branch)` produce exactly one @@ -472,3 +479,362 @@ async fn public_branch_apis_reject_internal_run_refs() { err.message ); } + +// ─── MR-794: staged-write rewire — additional contract tests ──────────────── + +/// Mutation queries used only by the MR-794 tests below. Kept in the test +/// file (not in helpers' shared `MUTATION_QUERIES`) to keep their scope +/// local to the staged-write coverage. +const STAGED_QUERIES: &str = r#" +query insert_two_persons($a_name: String, $a_age: I32, $b_name: String, $b_age: I32) { + insert Person { name: $a_name, age: $a_age } + insert Person { name: $b_name, age: $b_age } +} + +query insert_then_update_same_person( + $name: String, $insert_age: I32, $update_age: I32 +) { + insert Person { name: $name, age: $insert_age } + update Person set { age: $update_age } where name = $name +} + +query insert_two_friends($from: String, $a: String, $b: String) { + insert Knows { from: $from, to: $a } + insert Knows { from: $from, to: $b } +} + +query mixed_insert_and_delete($name: String, $age: I32, $victim: String) { + insert Person { name: $name, age: $age } + delete Person where name = $victim +} +"#; + +/// D₂: a query mixing inserts/updates with deletes is rejected at parse +/// time, BEFORE any I/O. The error shape directs the user to split the +/// query into two mutations. +#[tokio::test] +async fn mutation_rejects_mixed_insert_and_delete_at_parse_time() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + // Capture pre-mutation state on touched tables to confirm no I/O. + let persons_before = count_rows(&db, "node:Person").await; + + let err = db + .mutate( + "main", + STAGED_QUERIES, + "mixed_insert_and_delete", + &mixed_params( + &[("$name", "Eve"), ("$victim", "Alice")], + &[("$age", 22)], + ), + ) + .await + .expect_err("D₂ must reject mixed insert+delete"); + let OmniError::Manifest(manifest_err) = err else { + panic!("expected Manifest error, got {err:?}"); + }; + assert!( + manifest_err.message.contains("inserts/updates and deletes"), + "unexpected error message: {}", + manifest_err.message, + ); + assert!( + manifest_err.message.contains("split into separate mutations"), + "error message should direct user to split: {}", + manifest_err.message, + ); + + // No I/O — counts unchanged, branches unchanged. + let persons_after = count_rows(&db, "node:Person").await; + assert_eq!( + persons_before, persons_after, + "D₂ rejection must fire before any write", + ); + assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]); +} + +/// `insert Person 'X'; update Person where name='X' set age=...` — both +/// ops produce content on `node:Person` and coalesce into one +/// `stage_merge_insert` at end-of-query. The accumulator's last-write-wins +/// dedupe (in `MutationStaging::finalize`) ensures the update's value +/// wins. Single Lance commit per table per query. +#[tokio::test] +async fn mixed_insert_and_update_on_same_person_coalesces_to_one_merge() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let pre_version = version_main(&db).await.unwrap(); + + let result = db + .mutate( + "main", + STAGED_QUERIES, + "insert_then_update_same_person", + &mixed_params( + &[("$name", "Yves")], + &[("$insert_age", 10), ("$update_age", 99)], + ), + ) + .await + .unwrap(); + assert_eq!(result.affected_nodes, 2, "1 insert + 1 update reported"); + + // The end-state row carries the update value (last-write-wins via + // dedupe in finalize), proving the staged merge_insert ran with the + // correct source dedupe. Read the underlying Person table directly + // and assert age=99 for the row we just inserted+updated. + let batches = read_table(&db, "node:Person").await; + let mut found_age: Option = None; + for batch in &batches { + let names = batch + .column_by_name("name") + .expect("Person table missing 'name' column") + .as_any() + .downcast_ref::() + .expect("'name' should be Utf8"); + let ages = batch + .column_by_name("age") + .expect("Person table missing 'age' column") + .as_any() + .downcast_ref::() + .expect("'age' should be I32"); + for i in 0..batch.num_rows() { + if names.is_valid(i) && names.value(i) == "Yves" { + if ages.is_valid(i) { + found_age = Some(ages.value(i)); + } + } + } + } + assert_eq!( + found_age, + Some(99), + "dedupe must keep the update's age value, not the insert's", + ); + + // One-publish guarantee: manifest version advanced by exactly 1. + let post_version = version_main(&db).await.unwrap(); + assert_eq!( + post_version, + pre_version + 1, + "insert+update query must publish exactly once", + ); +} + +/// `insert Knows from='Alice' to='Bob'; insert Knows from='Alice' to='Eve'` +/// — both append to `edge:Knows`. The accumulator coalesces them into one +/// `stage_append` at end-of-query. Edge IDs are ULID-generated so no +/// dedupe is needed (Append mode). +#[tokio::test] +async fn multiple_appends_to_same_edge_coalesce_to_one_append() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + // Add Eve so the second edge has a valid endpoint. + db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + + let edges_before = count_rows(&db, "edge:Knows").await; + let pre_version = version_main(&db).await.unwrap(); + + let result = db + .mutate( + "main", + STAGED_QUERIES, + "insert_two_friends", + ¶ms(&[ + ("$from", "Alice"), + ("$a", "Bob"), + ("$b", "Eve"), + ]), + ) + .await + .unwrap(); + assert_eq!(result.affected_edges, 2); + + // Both edges visible. + let edges_after = count_rows(&db, "edge:Knows").await; + assert_eq!(edges_after, edges_before + 2); + + // One manifest version bump for the two-edge query (atomic publish). + let post_version = version_main(&db).await.unwrap(); + assert_eq!( + post_version, + pre_version + 1, + "two-statement edge insert must publish exactly once", + ); +} + +/// A multi-statement insert query touching two Person rows produces a +/// single `stage_*` + `commit_staged` per table — verified by checking +/// that the manifest version advances exactly once across the query. +#[tokio::test] +async fn multi_statement_inserts_publish_exactly_once() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let pre_version = version_main(&db).await.unwrap(); + + db.mutate( + "main", + STAGED_QUERIES, + "insert_two_persons", + &mixed_params( + &[("$a_name", "Owen"), ("$b_name", "Pat")], + &[("$a_age", 50), ("$b_age", 51)], + ), + ) + .await + .unwrap(); + + let post_version = version_main(&db).await.unwrap(); + assert_eq!( + post_version, + pre_version + 1, + "two-statement insert query must publish exactly once", + ); + + // Both rows visible. + let owen = db + .query( + ReadTarget::branch("main"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "Owen")]), + ) + .await + .unwrap(); + assert_eq!(owen.num_rows(), 1); + let pat = db + .query( + ReadTarget::branch("main"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "Pat")]), + ) + .await + .unwrap(); + assert_eq!(pat.num_rows(), 1); +} + +/// A load with a mid-input edge RI violation must leave Lance HEAD on +/// the touched node tables untouched (staged loader never commits any +/// fragment when the load fails). The next load on the same tables +/// succeeds — no `ExpectedVersionMismatch` from drift. +#[tokio::test] +async fn load_with_bad_edge_reference_unblocks_next_load() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + // Seed with the standard fixture so we're working from a non-empty + // baseline. + load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite) + .await + .unwrap(); + + let pre_persons = count_rows(&db, "node:Person").await; + let pre_edges = count_rows(&db, "edge:Knows").await; + + // First load: append a Person + an edge whose `to` points to a + // non-existent Person. RI fails AFTER the staged Person is in the + // accumulator but BEFORE the publish. + let bad = r#"{"type": "Person", "data": {"name": "Mallory", "age": 5}} +{"edge": "Knows", "from": "Mallory", "to": "Ghost"} +"#; + let err = load_jsonl(&mut db, bad, LoadMode::Append) + .await + .expect_err("RI violation must fail the load"); + let OmniError::Manifest(manifest_err) = err else { + panic!("expected Manifest error, got {err:?}"); + }; + assert!( + manifest_err.message.contains("not found"), + "unexpected error: {}", + manifest_err.message, + ); + + // No write made it to disk: counts unchanged. + let mid_persons = count_rows(&db, "node:Person").await; + let mid_edges = count_rows(&db, "edge:Knows").await; + assert_eq!(mid_persons, pre_persons, "failed load must not advance Person count"); + assert_eq!(mid_edges, pre_edges, "failed load must not advance Knows count"); + + // Second load against the same tables — succeeds (no HEAD drift). + let good = r#"{"type": "Person", "data": {"name": "Pat", "age": 55}}"#; + load_jsonl(&mut db, good, LoadMode::Append).await.unwrap(); + assert_eq!( + count_rows(&db, "node:Person").await, + pre_persons + 1, + "follow-up load must succeed (no drift)", + ); +} + +/// Same shape as the RI test above, but driven by a cardinality +/// violation (`@card(0..1)` on `WorksAt`). The staged loader's pending +/// edge accumulator drives the cardinality scan; a violation aborts +/// the load before publish; the next load on the same tables succeeds. +#[tokio::test] +async fn load_with_cardinality_violation_unblocks_next_load() { + // Use a custom schema where WorksAt has a strict 0..1 cardinality + // bound — the default test schema leaves WorksAt unbounded. Seed + // Alice + two companies, then attempt two WorksAt edges from Alice, + // which violates the bound. + const CARD_SCHEMA: &str = r#" +node Person { + name: String @key + age: I32? +} +node Company { + name: String @key +} +edge WorksAt: Person -> Company @card(0..1) +"#; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap(); + + let seed = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}} +{"type": "Company", "data": {"name": "Acme"}} +{"type": "Company", "data": {"name": "Bigco"}} +"#; + load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap(); + + let pre_works = count_rows(&db, "edge:WorksAt").await; + + // Two WorksAt edges from Alice — exceeds @card(0..1). + let bad = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"} +{"edge": "WorksAt", "from": "Alice", "to": "Bigco"} +"#; + let err = load_jsonl(&mut db, bad, LoadMode::Append) + .await + .expect_err("cardinality violation must fail the load"); + let OmniError::Manifest(manifest_err) = err else { + panic!("expected Manifest error, got {err:?}"); + }; + assert!( + manifest_err.message.contains("@card violation"), + "unexpected error: {}", + manifest_err.message, + ); + + // No edges added; next load on the same edge table succeeds. + let mid_works = count_rows(&db, "edge:WorksAt").await; + assert_eq!(mid_works, pre_works); + + let good = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}"#; + load_jsonl(&mut db, good, LoadMode::Append).await.unwrap(); + assert_eq!( + count_rows(&db, "edge:WorksAt").await, + pre_works + 1, + "follow-up load must succeed (no drift on edge table)", + ); +}