mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
MR-854: replace test-only inline-commit append callers with local Lance helpers
After demoting TableStore::append_batch from pub to pub(crate), the integration tests in tests/recovery.rs and tests/staged_writes.rs that previously called store.append_batch(...) directly to simulate HEAD-ahead-of-manifest drift can no longer access the inherent method. Replace those calls with small in-test helpers that do a raw Dataset::append (the same body the inherent method runs). - tests/helpers/mod.rs gains lance_append_inline (shared helper). - tests/staged_writes.rs gets a file-local lance_append_inline_local (staged_writes.rs does not import helpers::). - tests/recovery.rs drops the unused TableStore import in the one function whose store binding became unused after the conversion. Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com>
This commit is contained in:
parent
6a41028bf1
commit
426c9d57e5
3 changed files with 44 additions and 48 deletions
|
|
@ -249,6 +249,27 @@ pub fn vector_and_string_params(
|
|||
map
|
||||
}
|
||||
|
||||
/// Test-only helper: perform a raw `Dataset::append` against Lance,
|
||||
/// advancing Lance HEAD without going through the manifest. Used by
|
||||
/// `recovery::*` and `staged_writes::*` tests that deliberately set up
|
||||
/// HEAD-ahead-of-manifest drift scenarios.
|
||||
///
|
||||
/// This mirrors the body of the engine's inline-commit
|
||||
/// `TableStore::append_batch` (which is `pub(crate)` after MR-854) —
|
||||
/// kept here as a test helper because integration tests need to
|
||||
/// simulate drift without depending on the demoted crate-internal API.
|
||||
pub async fn lance_append_inline(ds: &mut lance::Dataset, batch: RecordBatch) {
|
||||
use lance::dataset::{WriteMode, WriteParams};
|
||||
let schema = batch.schema();
|
||||
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
|
||||
let params = WriteParams {
|
||||
mode: WriteMode::Append,
|
||||
allow_external_blob_outside_bases: true,
|
||||
..Default::default()
|
||||
};
|
||||
ds.append(reader, Some(params)).await.unwrap();
|
||||
}
|
||||
|
||||
pub fn s3_test_graph_uri(suite: &str) -> Option<String> {
|
||||
let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?;
|
||||
let prefix = std::env::var("OMNIGRAPH_S3_TEST_PREFIX")
|
||||
|
|
|
|||
|
|
@ -992,7 +992,6 @@ async fn recovery_ensure_indices_handles_empty_tables() {
|
|||
#[tokio::test]
|
||||
async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() {
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
use omnigraph::table_store::TableStore;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
|
@ -1011,7 +1010,6 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() {
|
|||
drop(db);
|
||||
|
||||
let person_uri = node_table_uri(uri, "Person");
|
||||
let store = TableStore::new(uri);
|
||||
let mut ds = Dataset::open(&person_uri).await.unwrap();
|
||||
let v1 = ds.version().version;
|
||||
|
||||
|
|
@ -1025,23 +1023,9 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() {
|
|||
// Bypassing __manifest is what `delete_where` and `append_batch`
|
||||
// both do (direct on Lance); using append_batch (instead of no-op
|
||||
// deletes) is what makes the fragment-set differ across versions.
|
||||
store
|
||||
.append_batch(
|
||||
&person_uri,
|
||||
&mut ds,
|
||||
person_batch(&[("bob-id", "bob", Some(25))]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
helpers::lance_append_inline(&mut ds, person_batch(&[("bob-id", "bob", Some(25))])).await;
|
||||
let v2 = ds.version().version;
|
||||
store
|
||||
.append_batch(
|
||||
&person_uri,
|
||||
&mut ds,
|
||||
person_batch(&[("carol-id", "carol", Some(40))]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
helpers::lance_append_inline(&mut ds, person_batch(&[("carol-id", "carol", Some(40))])).await;
|
||||
let v3 = ds.version().version;
|
||||
assert_eq!(v2, v1 + 1);
|
||||
assert_eq!(v3, v2 + 1);
|
||||
|
|
@ -1206,14 +1190,7 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() {
|
|||
.open_dataset_head(&person_uri, feature_branch_name.as_deref())
|
||||
.await
|
||||
.unwrap();
|
||||
store
|
||||
.append_batch(
|
||||
&person_uri,
|
||||
&mut ds,
|
||||
person_batch(&[("carol-id", "carol", Some(40))]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
helpers::lance_append_inline(&mut ds, person_batch(&[("carol-id", "carol", Some(40))])).await;
|
||||
let v_head = ds.version().version;
|
||||
assert_eq!(v_head, v_pin + 1, "append must advance HEAD by 1");
|
||||
|
||||
|
|
@ -1328,14 +1305,7 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() {
|
|||
.open_dataset_head(&person_uri, feature_branch_name.as_deref())
|
||||
.await
|
||||
.unwrap();
|
||||
store
|
||||
.append_batch(
|
||||
&person_uri,
|
||||
&mut ds,
|
||||
person_batch(&[("dave-id", "dave", Some(50))]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
helpers::lance_append_inline(&mut ds, person_batch(&[("dave-id", "dave", Some(50))])).await;
|
||||
let v_head = ds.version().version;
|
||||
assert_eq!(v_head, v_pin + 1);
|
||||
|
||||
|
|
|
|||
|
|
@ -34,6 +34,22 @@ fn person_schema() -> Arc<Schema> {
|
|||
]))
|
||||
}
|
||||
|
||||
/// Test-only helper: raw `Dataset::append` to advance Lance HEAD without
|
||||
/// going through the manifest. Mirrors `TableStore::append_batch`'s body
|
||||
/// (which is `pub(crate)` after MR-854) — kept local so these
|
||||
/// drift-simulation tests don't depend on the demoted crate-internal API.
|
||||
async fn lance_append_inline_local(ds: &mut Dataset, batch: RecordBatch) {
|
||||
use lance::dataset::{WriteMode, WriteParams};
|
||||
let schema = batch.schema();
|
||||
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
|
||||
let params = WriteParams {
|
||||
mode: WriteMode::Append,
|
||||
allow_external_blob_outside_bases: true,
|
||||
..Default::default()
|
||||
};
|
||||
ds.append(reader, Some(params)).await.unwrap();
|
||||
}
|
||||
|
||||
fn person_batch(rows: &[(&str, Option<i32>)]) -> RecordBatch {
|
||||
let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect();
|
||||
let ages: Vec<Option<i32>> = rows.iter().map(|(_, age)| *age).collect();
|
||||
|
|
@ -815,7 +831,6 @@ async fn create_vector_index_advances_head_inline_documents_residual() {
|
|||
async fn lance_restore_appends_one_commit_with_checked_out_content() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
|
||||
let store = TableStore::new(dir.path().to_str().unwrap());
|
||||
|
||||
// Build version history: v1 = {alice}, v2 = {alice, bob}, v3 = {alice, bob, carol}.
|
||||
let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
|
||||
|
|
@ -823,16 +838,10 @@ async fn lance_restore_appends_one_commit_with_checked_out_content() {
|
|||
.unwrap();
|
||||
assert_eq!(ds.version().version, 1);
|
||||
|
||||
store
|
||||
.append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))]))
|
||||
.await
|
||||
.unwrap();
|
||||
lance_append_inline_local(&mut ds, person_batch(&[("bob", Some(25))])).await;
|
||||
assert_eq!(ds.version().version, 2);
|
||||
|
||||
store
|
||||
.append_batch(&uri, &mut ds, person_batch(&[("carol", Some(40))]))
|
||||
.await
|
||||
.unwrap();
|
||||
lance_append_inline_local(&mut ds, person_batch(&[("carol", Some(40))])).await;
|
||||
assert_eq!(ds.version().version, 3);
|
||||
|
||||
let head_before = ds.version().version;
|
||||
|
|
@ -908,7 +917,6 @@ async fn lance_restore_appends_one_commit_with_checked_out_content() {
|
|||
async fn lance_restore_loses_to_concurrent_append_via_orphaning() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
|
||||
let store = TableStore::new(dir.path().to_str().unwrap());
|
||||
|
||||
// v1: seed with alice.
|
||||
let _ = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
|
||||
|
|
@ -925,10 +933,7 @@ async fn lance_restore_loses_to_concurrent_append_via_orphaning() {
|
|||
// This simulates a per-table-queue model where another tenant wrote
|
||||
// between recovery's open and recovery's restore call.
|
||||
let mut writer_handle = Dataset::open(&uri).await.unwrap();
|
||||
store
|
||||
.append_batch(&uri, &mut writer_handle, person_batch(&[("bob", Some(25))]))
|
||||
.await
|
||||
.unwrap();
|
||||
lance_append_inline_local(&mut writer_handle, person_batch(&[("bob", Some(25))])).await;
|
||||
assert_eq!(writer_handle.version().version, 2);
|
||||
|
||||
// Recovery now restores. Because restore's `check_restore_txn` returns
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue