mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-07-03 02:51:04 +02:00
test(engine): red regression for merge-update row-id overlap breaking filtered reads
iss-merge-rowid-overlap-corrupts-filtered-reads / lance#7444: an update-style merge over a merge-written fragment reuses the updated rows' stable row ids (spec-legal overlap); a later delete leaves the overlapping range sparsely tiled and unpatched lance-table 7.0.0's RowIdIndex::new fails every filtered read that builds the id→address map. Two boundaries: - writes.rs::filtered_read_after_merge_update_and_delete_keeps_row_ids_consistent (engine walk: merge-load → same-key merge-load → delete → keyed point lookups), plus the green isolation control filtered_read_after_append_and_delete_is_consistent. - lance_surface_guards.rs::filtered_scan_tolerates_merge_update_row_id_overlap (faithful transcription of lance#7444's minimal repro at the raw Lance API; pins the surface so a future bump that regresses lance#7480 turns red). Both fail on unpatched Lance 7.0.0 with the predicted symptom (rowids/index.rs:50 "Wrong range" via the take path); green arrives with the vendored lance-table patch in the next commit.
This commit is contained in:
parent
98530a0e8a
commit
3b564534a2
2 changed files with 219 additions and 0 deletions
|
|
@ -1136,3 +1136,109 @@ async fn camelcase_index_equality_routes_to_scalar_index() {
|
|||
for camelCase index routing. got plan:\n{err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// --- Guard: filtered scans tolerate merge_insert's overlapping row-id ranges
|
||||
// (lance#7444, fixed by lance#7480; consumed via the vendored lance-table
|
||||
// patch) -------------------------------------------------------------
|
||||
//
|
||||
// An update-style merge_insert over a fragment that was itself merge-written
|
||||
// reuses the updated rows' stable row ids in its rewritten fragments (row-id
|
||||
// lineage spec: updates preserve `_rowid`) while the superseded fragment keeps
|
||||
// its full id sequence plus a deletion vector — legal, overlapping
|
||||
// cross-fragment id ranges. A later delete leaves the overlap sparsely tiled;
|
||||
// unpatched lance-table 7.0.0's `RowIdIndex::new` asserted dense tiling and
|
||||
// failed any filtered read that builds the id→address map: "Wrong range"
|
||||
// debug assert, "all columns in a record batch must have the same length" (or
|
||||
// a silently-wrong batch) in release. Faithful transcription of lance#7444's
|
||||
// minimal repro: merge-seed → merge-update → delete → filter + with_row_id.
|
||||
// This guard turns red if a Lance bump regresses the fix, or if the vendored
|
||||
// patch is dropped before the pinned lance-table ships lance#7480.
|
||||
#[tokio::test]
|
||||
async fn filtered_scan_tolerates_merge_update_row_id_overlap() {
|
||||
use futures::TryStreamExt;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("slug", DataType::Utf8, false),
|
||||
Field::new("title", DataType::Utf8, false),
|
||||
]));
|
||||
let mk_batch = |slugs: Vec<String>, titles: Vec<String>| {
|
||||
RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(StringArray::from(slugs)) as arrow_array::ArrayRef,
|
||||
Arc::new(StringArray::from(titles)) as arrow_array::ArrayRef,
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
// Empty dataset WITH stable row ids; both data writes are merge_inserts
|
||||
// (merge-on-merge is lance#7444's trigger qualifier — a plain
|
||||
// Dataset::write seed does not reproduce).
|
||||
let empty = mk_batch(Vec::new(), Vec::new());
|
||||
let reader = RecordBatchIterator::new(vec![Ok(empty)], schema.clone());
|
||||
let params = WriteParams {
|
||||
mode: WriteMode::Create,
|
||||
enable_stable_row_ids: true,
|
||||
data_storage_version: Some(LanceFileVersion::V2_2),
|
||||
..Default::default()
|
||||
};
|
||||
let ds = Dataset::write(reader, uri, Some(params)).await.unwrap();
|
||||
|
||||
let merge = |ds: Dataset, batch: RecordBatch, schema: Arc<Schema>| async move {
|
||||
let job = MergeInsertBuilder::try_new(Arc::new(ds), vec!["slug".to_string()])
|
||||
.unwrap()
|
||||
.when_matched(WhenMatched::UpdateAll)
|
||||
.when_not_matched(WhenNotMatched::InsertAll)
|
||||
.try_build()
|
||||
.unwrap();
|
||||
let source = RecordBatchIterator::new(vec![Ok(batch)], schema);
|
||||
let (ds, _stats) = job.execute_reader(source).await.unwrap();
|
||||
(*ds).clone()
|
||||
};
|
||||
|
||||
// Merge #1 seeds 40 rows; merge #2 rewrites 15 of them (keeping their
|
||||
// stable ids — the overlap with the merge-written seed fragment).
|
||||
let seed = mk_batch(
|
||||
(1..=40).map(|i| format!("t{i}")).collect(),
|
||||
(1..=40).map(|i| format!("r{i}")).collect(),
|
||||
);
|
||||
let ds = merge(ds, seed, schema.clone()).await;
|
||||
let updates = mk_batch(
|
||||
(1..=15).map(|i| format!("t{i}")).collect(),
|
||||
(1..=15).map(|i| format!("e{i}")).collect(),
|
||||
);
|
||||
let ds = Arc::new(merge(ds, updates, schema.clone()).await);
|
||||
|
||||
// The delete's deletion vector makes the overlapping id region sparse.
|
||||
let staged = lance::dataset::DeleteBuilder::new(ds.clone(), "slug = 't20'")
|
||||
.execute_uncommitted()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(staged.num_deleted_rows, 1, "expected exactly t20 deleted");
|
||||
let ds = CommitBuilder::new(ds)
|
||||
.execute(staged.transaction)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// filter + with_row_id forces the RowIdIndex build (a full scan does
|
||||
// not). On the broken index this errors/panics; on the fixed one every
|
||||
// live id resolves.
|
||||
for (slug, expected) in [("t3", 1usize), ("t20", 0usize)] {
|
||||
let mut scan = ds.scan();
|
||||
scan.with_row_id();
|
||||
scan.filter(&format!("slug = '{slug}'")).unwrap();
|
||||
let batches: Vec<RecordBatch> = scan
|
||||
.try_into_stream()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap();
|
||||
let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
|
||||
assert_eq!(rows, expected, "filtered read for {slug}");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1952,3 +1952,116 @@ async fn post_publish_fold_matches_fresh_reopen() {
|
|||
"post-publish fold diverged from a fresh re-scan (folded {folded} vs scanned {scanned})"
|
||||
);
|
||||
}
|
||||
|
||||
const FIND_PERSON_QUERY: &str = r#"
|
||||
query find_person($name: String) {
|
||||
match { $p: Person { name: $name } }
|
||||
return { $p.name }
|
||||
}
|
||||
"#;
|
||||
|
||||
/// Regression: iss-merge-rowid-overlap-corrupts-filtered-reads / lance#7444.
|
||||
///
|
||||
/// An update-style merge (same-key merge load) reuses the updated rows'
|
||||
/// stable row ids in the rewritten fragments while the superseded fragment
|
||||
/// keeps its full row-id sequence plus a deletion vector — overlapping
|
||||
/// cross-fragment id ranges, legal per the Lance row-id-lineage spec. A
|
||||
/// later delete punches a hole in that overlapping range; on unpatched
|
||||
/// Lance 7.0.0 `RowIdIndex::new` then fails any filtered scan that needs
|
||||
/// the id→address map ("all columns in a record batch must have the same
|
||||
/// length" in release, a "Wrong range" debug assert). Fixed upstream by
|
||||
/// lance#7480; consumed here via the vendored `lance-table` patch.
|
||||
#[tokio::test]
|
||||
async fn filtered_read_after_merge_update_and_delete_keeps_row_ids_consistent() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
|
||||
let seed: String = (1..=40)
|
||||
.map(|i| format!("{{\"type\":\"Person\",\"data\":{{\"name\":\"p{i}\",\"age\":{i}}}}}\n"))
|
||||
.collect();
|
||||
load_jsonl(&mut db, &seed, LoadMode::Merge).await.unwrap();
|
||||
|
||||
// Same-key updates: Lance Operation::Update rewrites these 15 rows into
|
||||
// new fragments that keep their original stable row ids (the overlap).
|
||||
let updates: String = (1..=15)
|
||||
.map(|i| {
|
||||
format!(
|
||||
"{{\"type\":\"Person\",\"data\":{{\"name\":\"p{i}\",\"age\":{}}}}}\n",
|
||||
100 + i
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
load_jsonl(&mut db, &updates, LoadMode::Merge).await.unwrap();
|
||||
|
||||
// The delete adds a deletion vector, so the overlapping region no longer
|
||||
// densely tiles its id range — the shape lance#7444 choked on.
|
||||
mutate_main(
|
||||
&mut db,
|
||||
MUTATION_QUERIES,
|
||||
"remove_person",
|
||||
&mixed_params(&[("$name", "p20")], &[]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Filtered point lookups must still resolve: an updated row, an
|
||||
// untouched row, and the deleted row (absent), each via the key filter.
|
||||
for (name, expected) in [("p3", vec!["p3"]), ("p30", vec!["p30"]), ("p20", vec![])] {
|
||||
let result = query_main(
|
||||
&mut db,
|
||||
FIND_PERSON_QUERY,
|
||||
"find_person",
|
||||
&mixed_params(&[("$name", name)], &[]),
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| panic!("filtered read for {name} failed: {e}"));
|
||||
let got = first_column_sorted(&result);
|
||||
assert_eq!(got, expected, "filtered read for {name}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Isolation control for the regression above: the same load/delete/filtered
|
||||
/// read walk WITHOUT same-key updates (append-only merges, disjoint keys)
|
||||
/// never produces overlapping row-id ranges and passes on unpatched Lance.
|
||||
/// If this one fails alongside the merge-update case, the defect is not the
|
||||
/// lance#7444 overlap shape.
|
||||
#[tokio::test]
|
||||
async fn filtered_read_after_append_and_delete_is_consistent() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
|
||||
let seed: String = (1..=40)
|
||||
.map(|i| format!("{{\"type\":\"Person\",\"data\":{{\"name\":\"p{i}\",\"age\":{i}}}}}\n"))
|
||||
.collect();
|
||||
load_jsonl(&mut db, &seed, LoadMode::Merge).await.unwrap();
|
||||
|
||||
// Disjoint keys: plain inserts, no fragment rewrite, no id reuse.
|
||||
let more: String = (41..=55)
|
||||
.map(|i| format!("{{\"type\":\"Person\",\"data\":{{\"name\":\"p{i}\",\"age\":{i}}}}}\n"))
|
||||
.collect();
|
||||
load_jsonl(&mut db, &more, LoadMode::Merge).await.unwrap();
|
||||
|
||||
mutate_main(
|
||||
&mut db,
|
||||
MUTATION_QUERIES,
|
||||
"remove_person",
|
||||
&mixed_params(&[("$name", "p20")], &[]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for (name, expected) in [("p3", vec!["p3"]), ("p50", vec!["p50"]), ("p20", vec![])] {
|
||||
let result = query_main(
|
||||
&mut db,
|
||||
FIND_PERSON_QUERY,
|
||||
"find_person",
|
||||
&mixed_params(&[("$name", name)], &[]),
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| panic!("filtered read for {name} failed: {e}"));
|
||||
let got = first_column_sorted(&result);
|
||||
assert_eq!(got, expected, "filtered read for {name}");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue