diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index d1ac9c3..e58b718 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -1206,6 +1206,7 @@ impl Omnigraph { crate::db::MutationOpKind::Delete, ) .await?; + crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?; let delete_state = self .table_store() .delete_where(&full_path, &mut ds, &pred_sql) diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 8054e9c..ad39bc0 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -549,36 +549,13 @@ impl StagedMutation { entry.expected_version = current; expected_versions.insert(entry.table_key.clone(), current); } - for (table_key, _update) in inline_committed.iter() { - let current = snapshot - .entry(table_key) - .map(|e| e.table_version) - .ok_or_else(|| { - OmniError::manifest_conflict(format!( - "table '{}' missing from manifest at commit time", - table_key, - )) - })?; - let expected = expected_versions.get(table_key).copied().ok_or_else(|| { - OmniError::manifest_internal(format!( - "StagedMutation::commit_all: missing expected version for inline-committed table '{}'", - table_key - )) - })?; - if expected != current { - return Err(OmniError::manifest_expected_version_mismatch( - table_key.clone(), - expected, - current, - )); - } - expected_versions.insert(table_key.clone(), current); - } - // Sidecar protocol: build the per-table pin list and write the - // sidecar BEFORE any Lance commit_staged runs, so a crash - // between Phase B (this loop) and Phase C (the caller's manifest - // publish) is recoverable on next open. + // sidecar BEFORE any later error can return after Lance HEAD has + // already moved. For staged tables this still happens before any + // Lance commit_staged runs. For inline-committed delete tables, + // Lance HEAD moved inside delete_where before commit_all, so the + // sidecar must also exist before the inline manifest-version check + // below can reject a stale query. // // Pins cover BOTH staged tables (Lance HEAD will advance below // when `commit_staged` runs) AND inline-committed tables @@ -627,8 +604,6 @@ impl StagedMutation { }); } - let mut updates: Vec = inline_committed.into_values().collect(); - let sidecar_handle = if pins.is_empty() { None } else { @@ -641,6 +616,34 @@ impl StagedMutation { Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?) }; + for (table_key, _update) in inline_committed.iter() { + let current = snapshot + .entry(table_key) + .map(|e| e.table_version) + .ok_or_else(|| { + OmniError::manifest_conflict(format!( + "table '{}' missing from manifest at commit time", + table_key, + )) + })?; + let expected = expected_versions.get(table_key).copied().ok_or_else(|| { + OmniError::manifest_internal(format!( + "StagedMutation::commit_all: missing expected version for inline-committed table '{}'", + table_key + )) + })?; + if expected != current { + return Err(OmniError::manifest_expected_version_mismatch( + table_key.clone(), + expected, + current, + )); + } + expected_versions.insert(table_key.clone(), current); + } + + let mut updates: Vec = inline_committed.into_values().collect(); + for entry in staged { let StagedTableEntry { table_key, diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 72190b2..e8de05e 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -3,6 +3,7 @@ mod helpers; use fail::FailScenario; +use futures::FutureExt; use omnigraph::db::Omnigraph; use omnigraph::failpoints::ScopedFailPoint; @@ -25,31 +26,6 @@ fn node_table_uri(root: &str, type_name: &str) -> String { format!("{}/nodes/{hash:016x}", root.trim_end_matches('/')) } -fn person_batch(rows: &[(&str, &str, Option)]) -> arrow_array::RecordBatch { - use std::sync::Arc; - - use arrow_array::{Int32Array, StringArray}; - use arrow_schema::{DataType, Field, Schema}; - - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Utf8, false), - Field::new("age", DataType::Int32, true), - Field::new("name", DataType::Utf8, false), - ])); - let ids: Vec<&str> = rows.iter().map(|(id, _, _)| *id).collect(); - let names: Vec<&str> = rows.iter().map(|(_, name, _)| *name).collect(); - let ages: Vec> = rows.iter().map(|(_, _, age)| *age).collect(); - arrow_array::RecordBatch::try_new( - schema, - vec![ - Arc::new(StringArray::from(ids)), - Arc::new(Int32Array::from(ages)), - Arc::new(StringArray::from(names)), - ], - ) - .unwrap() -} - #[tokio::test] async fn branch_create_failpoint_triggers() { let _scenario = FailScenario::setup(); @@ -65,7 +41,7 @@ async fn branch_create_failpoint_triggers() { ); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn graph_publish_failpoint_triggers_before_commit_append() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -312,6 +288,85 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() { ); } +#[tokio::test] +async fn inline_delete_conflict_writes_sidecar_before_rejecting() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let db = helpers::init_and_load(&dir).await; + + let pre_snapshot = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap(); + let pre_person_pin = pre_snapshot.entry("node:Person").unwrap().table_version; + let person_uri = node_table_uri(&uri, "Person"); + + { + let _pause_delete = ScopedFailPoint::new("mutation.delete_node_pre_primary_delete", "pause"); + let delete_params = helpers::params(&[("$name", "Alice")]); + let delete = db.mutate( + "main", + MUTATION_QUERIES, + "remove_person", + &delete_params, + ); + tokio::pin!(delete); + + let mut concurrent_update_succeeded = false; + for _ in 0..50 { + if delete.as_mut().now_or_never().is_some() { + panic!("delete mutation completed before primary-delete failpoint was released"); + } + let mut concurrent = Omnigraph::open_read_only(&uri).await.unwrap(); + if mutate_main( + &mut concurrent, + MUTATION_QUERIES, + "set_age", + &mixed_params(&[("$name", "Bob")], &[("$age", 26)]), + ) + .await + .is_ok() + { + concurrent_update_succeeded = true; + break; + } + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + } + assert!(concurrent_update_succeeded, "concurrent update must land while delete is paused"); + fail::remove("mutation.delete_node_pre_primary_delete"); + + let err = delete.await.unwrap_err(); + assert!( + err.to_string().contains("stale view of 'node:Person'") + || err.to_string().contains("ExpectedVersionMismatch") + || err.to_string().contains("expected version mismatch"), + "unexpected error: {err}", + ); + } + + let person_head = lance::Dataset::open(&person_uri) + .await + .unwrap() + .version() + .version; + assert!( + person_head > pre_person_pin, + "primary inline delete must have advanced node:Person before rejecting" + ); + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 4, + "manifest-conflicted delete must not remove net Person rows after recovery" + ); + assert_eq!( + helpers::count_rows(&db, "edge:Knows").await, + 3, + "manifest-conflicted delete must not remove net Knows rows after recovery" + ); +} + #[tokio::test] async fn recovery_rolls_forward_load_on_feature_branch() { use omnigraph::loader::LoadMode;