engine: inline-delete sidecar covers version-mismatch check

This commit is contained in:
Devin AI 2026-05-10 10:37:46 +00:00
parent 6a3f0677ae
commit 31b8ffe7b5
3 changed files with 116 additions and 57 deletions

View file

@ -1206,6 +1206,7 @@ impl Omnigraph {
crate::db::MutationOpKind::Delete,
)
.await?;
crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?;
let delete_state = self
.table_store()
.delete_where(&full_path, &mut ds, &pred_sql)

View file

@ -549,36 +549,13 @@ impl StagedMutation {
entry.expected_version = current;
expected_versions.insert(entry.table_key.clone(), current);
}
for (table_key, _update) in inline_committed.iter() {
let current = snapshot
.entry(table_key)
.map(|e| e.table_version)
.ok_or_else(|| {
OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
table_key,
))
})?;
let expected = expected_versions.get(table_key).copied().ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing expected version for inline-committed table '{}'",
table_key
))
})?;
if expected != current {
return Err(OmniError::manifest_expected_version_mismatch(
table_key.clone(),
expected,
current,
));
}
expected_versions.insert(table_key.clone(), current);
}
// Sidecar protocol: build the per-table pin list and write the
// sidecar BEFORE any Lance commit_staged runs, so a crash
// between Phase B (this loop) and Phase C (the caller's manifest
// publish) is recoverable on next open.
// sidecar BEFORE any later error can return after Lance HEAD has
// already moved. For staged tables this still happens before any
// Lance commit_staged runs. For inline-committed delete tables,
// Lance HEAD moved inside delete_where before commit_all, so the
// sidecar must also exist before the inline manifest-version check
// below can reject a stale query.
//
// Pins cover BOTH staged tables (Lance HEAD will advance below
// when `commit_staged` runs) AND inline-committed tables
@ -627,8 +604,6 @@ impl StagedMutation {
});
}
let mut updates: Vec<SubTableUpdate> = inline_committed.into_values().collect();
let sidecar_handle = if pins.is_empty() {
None
} else {
@ -641,6 +616,34 @@ impl StagedMutation {
Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?)
};
for (table_key, _update) in inline_committed.iter() {
let current = snapshot
.entry(table_key)
.map(|e| e.table_version)
.ok_or_else(|| {
OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
table_key,
))
})?;
let expected = expected_versions.get(table_key).copied().ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing expected version for inline-committed table '{}'",
table_key
))
})?;
if expected != current {
return Err(OmniError::manifest_expected_version_mismatch(
table_key.clone(),
expected,
current,
));
}
expected_versions.insert(table_key.clone(), current);
}
let mut updates: Vec<SubTableUpdate> = inline_committed.into_values().collect();
for entry in staged {
let StagedTableEntry {
table_key,

View file

@ -3,6 +3,7 @@
mod helpers;
use fail::FailScenario;
use futures::FutureExt;
use omnigraph::db::Omnigraph;
use omnigraph::failpoints::ScopedFailPoint;
@ -25,31 +26,6 @@ fn node_table_uri(root: &str, type_name: &str) -> String {
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
fn person_batch(rows: &[(&str, &str, Option<i32>)]) -> arrow_array::RecordBatch {
use std::sync::Arc;
use arrow_array::{Int32Array, StringArray};
use arrow_schema::{DataType, Field, Schema};
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
Field::new("name", DataType::Utf8, false),
]));
let ids: Vec<&str> = rows.iter().map(|(id, _, _)| *id).collect();
let names: Vec<&str> = rows.iter().map(|(_, name, _)| *name).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, _, age)| *age).collect();
arrow_array::RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
Arc::new(StringArray::from(names)),
],
)
.unwrap()
}
#[tokio::test]
async fn branch_create_failpoint_triggers() {
let _scenario = FailScenario::setup();
@ -65,7 +41,7 @@ async fn branch_create_failpoint_triggers() {
);
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn graph_publish_failpoint_triggers_before_commit_append() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
@ -312,6 +288,85 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() {
);
}
#[tokio::test]
async fn inline_delete_conflict_writes_sidecar_before_rejecting() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let db = helpers::init_and_load(&dir).await;
let pre_snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
let pre_person_pin = pre_snapshot.entry("node:Person").unwrap().table_version;
let person_uri = node_table_uri(&uri, "Person");
{
let _pause_delete = ScopedFailPoint::new("mutation.delete_node_pre_primary_delete", "pause");
let delete_params = helpers::params(&[("$name", "Alice")]);
let delete = db.mutate(
"main",
MUTATION_QUERIES,
"remove_person",
&delete_params,
);
tokio::pin!(delete);
let mut concurrent_update_succeeded = false;
for _ in 0..50 {
if delete.as_mut().now_or_never().is_some() {
panic!("delete mutation completed before primary-delete failpoint was released");
}
let mut concurrent = Omnigraph::open_read_only(&uri).await.unwrap();
if mutate_main(
&mut concurrent,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Bob")], &[("$age", 26)]),
)
.await
.is_ok()
{
concurrent_update_succeeded = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
assert!(concurrent_update_succeeded, "concurrent update must land while delete is paused");
fail::remove("mutation.delete_node_pre_primary_delete");
let err = delete.await.unwrap_err();
assert!(
err.to_string().contains("stale view of 'node:Person'")
|| err.to_string().contains("ExpectedVersionMismatch")
|| err.to_string().contains("expected version mismatch"),
"unexpected error: {err}",
);
}
let person_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
assert!(
person_head > pre_person_pin,
"primary inline delete must have advanced node:Person before rejecting"
);
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
4,
"manifest-conflicted delete must not remove net Person rows after recovery"
);
assert_eq!(
helpers::count_rows(&db, "edge:Knows").await,
3,
"manifest-conflicted delete must not remove net Knows rows after recovery"
);
}
#[tokio::test]
async fn recovery_rolls_forward_load_on_feature_branch() {
use omnigraph::loader::LoadMode;