From 426c9d57e5e1502bd1ebb69aa0c3eff1fa258b4b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 02:17:30 +0000 Subject: [PATCH] MR-854: replace test-only inline-commit append callers with local Lance helpers After demoting TableStore::append_batch from pub to pub(crate), the integration tests in tests/recovery.rs and tests/staged_writes.rs that previously called store.append_batch(...) directly to simulate HEAD-ahead-of-manifest drift can no longer access the inherent method. Replace those calls with small in-test helpers that do a raw Dataset::append (the same body the inherent method runs). - tests/helpers/mod.rs gains lance_append_inline (shared helper). - tests/staged_writes.rs gets a file-local lance_append_inline_local (staged_writes.rs does not import helpers::). - tests/recovery.rs drops the unused TableStore import in the one function whose store binding became unused after the conversion. Co-Authored-By: Ragnor Comerford --- crates/omnigraph/tests/helpers/mod.rs | 21 ++++++++++++++ crates/omnigraph/tests/recovery.rs | 38 +++---------------------- crates/omnigraph/tests/staged_writes.rs | 33 ++++++++++++--------- 3 files changed, 44 insertions(+), 48 deletions(-) diff --git a/crates/omnigraph/tests/helpers/mod.rs b/crates/omnigraph/tests/helpers/mod.rs index c97ff72..27d55e6 100644 --- a/crates/omnigraph/tests/helpers/mod.rs +++ b/crates/omnigraph/tests/helpers/mod.rs @@ -249,6 +249,27 @@ pub fn vector_and_string_params( map } +/// Test-only helper: perform a raw `Dataset::append` against Lance, +/// advancing Lance HEAD without going through the manifest. Used by +/// `recovery::*` and `staged_writes::*` tests that deliberately set up +/// HEAD-ahead-of-manifest drift scenarios. +/// +/// This mirrors the body of the engine's inline-commit +/// `TableStore::append_batch` (which is `pub(crate)` after MR-854) — +/// kept here as a test helper because integration tests need to +/// simulate drift without depending on the demoted crate-internal API. +pub async fn lance_append_inline(ds: &mut lance::Dataset, batch: RecordBatch) { + use lance::dataset::{WriteMode, WriteParams}; + let schema = batch.schema(); + let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema); + let params = WriteParams { + mode: WriteMode::Append, + allow_external_blob_outside_bases: true, + ..Default::default() + }; + ds.append(reader, Some(params)).await.unwrap(); +} + pub fn s3_test_graph_uri(suite: &str) -> Option { let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?; let prefix = std::env::var("OMNIGRAPH_S3_TEST_PREFIX") diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index a090178..ec00f40 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -992,7 +992,6 @@ async fn recovery_ensure_indices_handles_empty_tables() { #[tokio::test] async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() { use omnigraph::loader::{LoadMode, load_jsonl}; - use omnigraph::table_store::TableStore; let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -1011,7 +1010,6 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() { drop(db); let person_uri = node_table_uri(uri, "Person"); - let store = TableStore::new(uri); let mut ds = Dataset::open(&person_uri).await.unwrap(); let v1 = ds.version().version; @@ -1025,23 +1023,9 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() { // Bypassing __manifest is what `delete_where` and `append_batch` // both do (direct on Lance); using append_batch (instead of no-op // deletes) is what makes the fragment-set differ across versions. - store - .append_batch( - &person_uri, - &mut ds, - person_batch(&[("bob-id", "bob", Some(25))]), - ) - .await - .unwrap(); + helpers::lance_append_inline(&mut ds, person_batch(&[("bob-id", "bob", Some(25))])).await; let v2 = ds.version().version; - store - .append_batch( - &person_uri, - &mut ds, - person_batch(&[("carol-id", "carol", Some(40))]), - ) - .await - .unwrap(); + helpers::lance_append_inline(&mut ds, person_batch(&[("carol-id", "carol", Some(40))])).await; let v3 = ds.version().version; assert_eq!(v2, v1 + 1); assert_eq!(v3, v2 + 1); @@ -1206,14 +1190,7 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() { .open_dataset_head(&person_uri, feature_branch_name.as_deref()) .await .unwrap(); - store - .append_batch( - &person_uri, - &mut ds, - person_batch(&[("carol-id", "carol", Some(40))]), - ) - .await - .unwrap(); + helpers::lance_append_inline(&mut ds, person_batch(&[("carol-id", "carol", Some(40))])).await; let v_head = ds.version().version; assert_eq!(v_head, v_pin + 1, "append must advance HEAD by 1"); @@ -1328,14 +1305,7 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() { .open_dataset_head(&person_uri, feature_branch_name.as_deref()) .await .unwrap(); - store - .append_batch( - &person_uri, - &mut ds, - person_batch(&[("dave-id", "dave", Some(50))]), - ) - .await - .unwrap(); + helpers::lance_append_inline(&mut ds, person_batch(&[("dave-id", "dave", Some(50))])).await; let v_head = ds.version().version; assert_eq!(v_head, v_pin + 1); diff --git a/crates/omnigraph/tests/staged_writes.rs b/crates/omnigraph/tests/staged_writes.rs index 5335057..1d153bc 100644 --- a/crates/omnigraph/tests/staged_writes.rs +++ b/crates/omnigraph/tests/staged_writes.rs @@ -34,6 +34,22 @@ fn person_schema() -> Arc { ])) } +/// Test-only helper: raw `Dataset::append` to advance Lance HEAD without +/// going through the manifest. Mirrors `TableStore::append_batch`'s body +/// (which is `pub(crate)` after MR-854) — kept local so these +/// drift-simulation tests don't depend on the demoted crate-internal API. +async fn lance_append_inline_local(ds: &mut Dataset, batch: RecordBatch) { + use lance::dataset::{WriteMode, WriteParams}; + let schema = batch.schema(); + let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema); + let params = WriteParams { + mode: WriteMode::Append, + allow_external_blob_outside_bases: true, + ..Default::default() + }; + ds.append(reader, Some(params)).await.unwrap(); +} + fn person_batch(rows: &[(&str, Option)]) -> RecordBatch { let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect(); let ages: Vec> = rows.iter().map(|(_, age)| *age).collect(); @@ -815,7 +831,6 @@ async fn create_vector_index_advances_head_inline_documents_residual() { async fn lance_restore_appends_one_commit_with_checked_out_content() { let dir = tempfile::tempdir().unwrap(); let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); - let store = TableStore::new(dir.path().to_str().unwrap()); // Build version history: v1 = {alice}, v2 = {alice, bob}, v3 = {alice, bob, carol}. let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) @@ -823,16 +838,10 @@ async fn lance_restore_appends_one_commit_with_checked_out_content() { .unwrap(); assert_eq!(ds.version().version, 1); - store - .append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))])) - .await - .unwrap(); + lance_append_inline_local(&mut ds, person_batch(&[("bob", Some(25))])).await; assert_eq!(ds.version().version, 2); - store - .append_batch(&uri, &mut ds, person_batch(&[("carol", Some(40))])) - .await - .unwrap(); + lance_append_inline_local(&mut ds, person_batch(&[("carol", Some(40))])).await; assert_eq!(ds.version().version, 3); let head_before = ds.version().version; @@ -908,7 +917,6 @@ async fn lance_restore_appends_one_commit_with_checked_out_content() { async fn lance_restore_loses_to_concurrent_append_via_orphaning() { let dir = tempfile::tempdir().unwrap(); let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); - let store = TableStore::new(dir.path().to_str().unwrap()); // v1: seed with alice. let _ = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) @@ -925,10 +933,7 @@ async fn lance_restore_loses_to_concurrent_append_via_orphaning() { // This simulates a per-table-queue model where another tenant wrote // between recovery's open and recovery's restore call. let mut writer_handle = Dataset::open(&uri).await.unwrap(); - store - .append_batch(&uri, &mut writer_handle, person_batch(&[("bob", Some(25))])) - .await - .unwrap(); + lance_append_inline_local(&mut writer_handle, person_batch(&[("bob", Some(25))])).await; assert_eq!(writer_handle.version().version, 2); // Recovery now restores. Because restore's `check_restore_txn` returns