diff --git a/crates/omnigraph/src/changes/mod.rs b/crates/omnigraph/src/changes/mod.rs index 7c9e8ea..d4a3fe7 100644 --- a/crates/omnigraph/src/changes/mod.rs +++ b/crates/omnigraph/src/changes/mod.rs @@ -7,6 +7,7 @@ use lance::dataset::scanner::ColumnOrdering; use crate::db::SubTableEntry; use crate::db::manifest::Snapshot; use crate::error::Result; +use crate::storage_layer::{SnapshotHandle, TableStorage}; use crate::table_store::TableStore; // ─── Types ────────────────────────────────────────────────────────────────── @@ -229,7 +230,8 @@ async fn diff_table_same_lineage( ) -> Result> { let vf = from_entry.table_version; let vt = to_entry.table_version; - let to_ds = table_store.open_at_entry(to_entry).await?; + let storage: &dyn TableStorage = table_store; + let to_ds = storage.open_snapshot_at_entry(to_entry).await?; let cols: Vec<&str> = if is_edge { vec!["id", "src", "dst", "_row_last_updated_at_version"] @@ -257,12 +259,12 @@ async fn diff_table_same_lineage( "_row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}", vf, vt ); - let changed_rows = scan_with_filter(table_store, &to_ds, &cols, &filter_sql).await?; + let changed_rows = scan_with_filter(storage, &to_ds, &cols, &filter_sql).await?; if !changed_rows.is_empty() { // Build the set of IDs that existed at the from version - let from_ds = table_store.open_at_entry(from_entry).await?; - let from_ids: HashSet = scan_id_set(table_store, &from_ds, &["id"]) + let from_ds = storage.open_snapshot_at_entry(from_entry).await?; + let from_ids: HashSet = scan_id_set(storage, &from_ds, &["id"]) .await? .into_iter() .map(|r| r.id) @@ -282,8 +284,8 @@ async fn diff_table_same_lineage( // Deletes: ID set-difference if wants_deletes { - let from_ds = table_store.open_at_entry(from_entry).await?; - let deleted = deleted_ids_by_set_diff(table_store, &from_ds, &to_ds, is_edge).await?; + let from_ds = storage.open_snapshot_at_entry(from_entry).await?; + let deleted = deleted_ids_by_set_diff(storage, &from_ds, &to_ds, is_edge).await?; changes.extend(deleted); } @@ -300,13 +302,14 @@ async fn diff_table_cross_branch( is_edge: bool, filter: &ChangeFilter, ) -> Result> { - let from_ds = table_store - .open_snapshot_table(from_snap, table_key) + let storage: &dyn TableStorage = table_store; + let from_ds = storage + .open_snapshot_at_table(from_snap, table_key) .await?; - let to_ds = table_store.open_snapshot_table(to_snap, table_key).await?; + let to_ds = storage.open_snapshot_at_table(to_snap, table_key).await?; - let from_rows = scan_all_rows_ordered(table_store, &from_ds, is_edge).await?; - let to_rows = scan_all_rows_ordered(table_store, &to_ds, is_edge).await?; + let from_rows = scan_all_rows_ordered(storage, &from_ds, is_edge).await?; + let to_rows = scan_all_rows_ordered(storage, &to_ds, is_edge).await?; let mut changes = Vec::new(); let mut fi = 0; @@ -392,8 +395,9 @@ async fn diff_table_added( if !filter.wants_op(ChangeOp::Insert) { return Ok(Vec::new()); } - let ds = table_store.open_snapshot_table(to_snap, table_key).await?; - let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?; + let storage: &dyn TableStorage = table_store; + let ds = storage.open_snapshot_at_table(to_snap, table_key).await?; + let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?; Ok(rows .into_iter() .map(|r| entity_change_from_row(&r, ChangeOp::Insert, is_edge)) @@ -410,10 +414,11 @@ async fn diff_table_removed( if !filter.wants_op(ChangeOp::Delete) { return Ok(Vec::new()); } - let ds = table_store - .open_snapshot_table(from_snap, table_key) + let storage: &dyn TableStorage = table_store; + let ds = storage + .open_snapshot_at_table(from_snap, table_key) .await?; - let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?; + let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?; Ok(rows .into_iter() .map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge)) @@ -424,12 +429,12 @@ async fn diff_table_removed( /// Scan with a SQL filter, projecting specific columns. async fn scan_with_filter( - table_store: &TableStore, - ds: &lance::Dataset, + storage: &dyn TableStorage, + ds: &SnapshotHandle, cols: &[&str], filter_sql: &str, ) -> Result> { - let batches = table_store + let batches = storage .scan(ds, Some(cols), Some(filter_sql), None) .await?; Ok(extract_rows(&batches)) @@ -437,11 +442,11 @@ async fn scan_with_filter( /// Scan all rows ordered by id, projecting id (+ src/dst for edges) + all columns for signature. async fn scan_all_rows_ordered( - table_store: &TableStore, - ds: &lance::Dataset, + storage: &dyn TableStorage, + ds: &SnapshotHandle, is_edge: bool, ) -> Result> { - let batches = table_store + let batches = storage .scan( ds, None, @@ -454,9 +459,9 @@ async fn scan_all_rows_ordered( /// Compute deleted IDs: scan id at from and to, set-difference. async fn deleted_ids_by_set_diff( - table_store: &TableStore, - from_ds: &lance::Dataset, - to_ds: &lance::Dataset, + storage: &dyn TableStorage, + from_ds: &SnapshotHandle, + to_ds: &SnapshotHandle, is_edge: bool, ) -> Result> { let cols: Vec<&str> = if is_edge { @@ -465,8 +470,8 @@ async fn deleted_ids_by_set_diff( vec!["id"] }; - let from_rows = scan_id_set(table_store, from_ds, &cols).await?; - let to_ids: HashSet = scan_id_set(table_store, to_ds, &["id"]) + let from_rows = scan_id_set(storage, from_ds, &cols).await?; + let to_ids: HashSet = scan_id_set(storage, to_ds, &["id"]) .await? .into_iter() .map(|r| r.id) @@ -480,11 +485,11 @@ async fn deleted_ids_by_set_diff( } async fn scan_id_set( - table_store: &TableStore, - ds: &lance::Dataset, + storage: &dyn TableStorage, + ds: &SnapshotHandle, cols: &[&str], ) -> Result> { - let batches = table_store.scan(ds, Some(cols), None, None).await?; + let batches = storage.scan(ds, Some(cols), None, None).await?; Ok(extract_rows(&batches)) } diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 7b8a3f6..4eee566 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -26,6 +26,7 @@ use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot}; use crate::error::{OmniError, Result}; use crate::runtime_cache::RuntimeCache; use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri}; +use crate::storage_layer::SnapshotHandle; use crate::table_store::TableStore; mod export; @@ -569,18 +570,15 @@ impl Omnigraph { schema_apply::ensure_schema_apply_not_locked(self, operation).await } - pub(crate) fn table_store(&self) -> &TableStore { - &self.table_store - } - /// Engine-facing trait surface around `TableStore`. /// - /// This is the canonical accessor for newly-written engine code. The - /// trait's signatures use opaque `SnapshotHandle` / `StagedHandle` - /// instead of leaking `lance::Dataset` / - /// `lance::dataset::transaction::Transaction`. Existing call sites - /// that still use `db.table_store.X(...)` (the inherent struct - /// methods) are migrated incrementally. + /// This is the **only** accessor for engine code reaching into the + /// storage layer. The trait's signatures use opaque `SnapshotHandle` + /// / `StagedHandle` instead of leaking `lance::Dataset` / + /// `lance::dataset::transaction::Transaction`, so newly-added engine + /// call sites cannot drift the staged-write invariant by mistake + /// (the trait's `stage_*` + `commit_staged` pair is the only way to + /// land a write). pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage { &self.table_store } @@ -1110,10 +1108,10 @@ impl Omnigraph { cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0)); for (table_key, table_path) in cleanup_targets { - let dataset_uri = self.table_store.dataset_uri(&table_path); + let dataset_uri = self.storage().dataset_uri(&table_path); let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup") { - Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await, + Ok(()) => self.storage().force_delete_branch(&dataset_uri, branch).await, Err(injected) => Err(injected), }; if let Err(err) = outcome { @@ -1339,7 +1337,7 @@ impl Omnigraph { &self, table_key: &str, op_kind: crate::db::MutationOpKind, - ) -> Result<(Dataset, String, Option)> { + ) -> Result<(SnapshotHandle, String, Option)> { table_ops::open_for_mutation(self, table_key, op_kind).await } @@ -1348,7 +1346,7 @@ impl Omnigraph { branch: Option<&str>, table_key: &str, op_kind: crate::db::MutationOpKind, - ) -> Result<(Dataset, String, Option)> { + ) -> Result<(SnapshotHandle, String, Option)> { table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await } @@ -1359,7 +1357,7 @@ impl Omnigraph { source_branch: Option<&str>, source_version: u64, active_branch: &str, - ) -> Result { + ) -> Result { table_ops::fork_dataset_from_entry_state( self, table_key, @@ -1378,7 +1376,7 @@ impl Omnigraph { table_branch: Option<&str>, expected_version: u64, op_kind: crate::db::MutationOpKind, - ) -> Result { + ) -> Result { table_ops::reopen_for_mutation( self, table_key, @@ -1395,14 +1393,14 @@ impl Omnigraph { table_path: &str, table_branch: Option<&str>, table_version: u64, - ) -> Result { + ) -> Result { table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await } pub(crate) async fn build_indices_on_dataset( &self, table_key: &str, - ds: &mut Dataset, + ds: &mut SnapshotHandle, ) -> Result<()> { table_ops::build_indices_on_dataset(self, table_key, ds).await } @@ -1411,7 +1409,7 @@ impl Omnigraph { &self, catalog: &Catalog, table_key: &str, - ds: &mut Dataset, + ds: &mut SnapshotHandle, ) -> Result<()> { table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await } @@ -2115,8 +2113,12 @@ edge WorksAt: Person -> Company async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec { let snapshot = db.snapshot().await; - let ds = snapshot.open(table_key).await.unwrap(); - let batches = db.table_store().scan_batches(&ds).await.unwrap(); + let ds = db + .storage() + .open_snapshot_at_table(&snapshot, table_key) + .await + .unwrap(); + let batches = db.storage().scan_batches(&ds).await.unwrap(); batches .into_iter() .flat_map(|batch| { @@ -2128,11 +2130,11 @@ edge WorksAt: Person -> Company } async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option) { - let (mut ds, full_path, table_branch) = db + let (ds, full_path, table_branch) = db .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert) .await .unwrap(); - let schema: Arc = Arc::new(ds.schema().into()); + let schema: Arc = Arc::new(ds.dataset().schema().into()); let columns: Vec> = schema .fields() .iter() @@ -2144,9 +2146,9 @@ edge WorksAt: Person -> Company }) .collect(); let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); - let state = db - .table_store() - .append_batch(&full_path, &mut ds, batch) + let (_new_ds, state) = db + .storage() + .append_batch(&full_path, ds, batch) .await .unwrap(); db.commit_updates(&[crate::db::SubTableUpdate { @@ -2280,8 +2282,12 @@ edge WorksAt: Person -> Company db.apply_schema(&desired).await.unwrap(); let snapshot = db.snapshot().await; - let ds = snapshot.open("node:Person").await.unwrap(); - assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap()); + let ds = db + .storage() + .open_snapshot_at_table(&snapshot, "node:Person") + .await + .unwrap(); + assert!(db.storage().has_fts_index(&ds, "name").await.unwrap()); } #[tokio::test] @@ -2299,9 +2305,13 @@ edge WorksAt: Person -> Company db.apply_schema(&desired).await.unwrap(); let snapshot = db.snapshot().await; - let ds = snapshot.open("node:Person").await.unwrap(); - assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap()); - assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap()); + let ds = db + .storage() + .open_snapshot_at_table(&snapshot, "node:Person") + .await + .unwrap(); + assert!(db.storage().has_btree_index(&ds, "id").await.unwrap()); + assert!(db.storage().has_fts_index(&ds, "name").await.unwrap()); } #[tokio::test] diff --git a/crates/omnigraph/src/db/omnigraph/export.rs b/crates/omnigraph/src/db/omnigraph/export.rs index 366f50a..3043ce9 100644 --- a/crates/omnigraph/src/db/omnigraph/export.rs +++ b/crates/omnigraph/src/db/omnigraph/export.rs @@ -143,23 +143,23 @@ async fn export_table_to_writer( writer: &mut W, ) -> Result<()> { let ds = db - .table_store - .open_snapshot_table(snapshot, table_key) + .storage() + .open_snapshot_at_table(snapshot, table_key) .await?; let ordering = Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]); let catalog = db.catalog(); let blob_properties = blob_properties_for_table_key(&catalog, table_key)?; if blob_properties.is_empty() { - for batch in db.table_store.scan(&ds, None, None, ordering).await? { + for batch in db.storage().scan(&ds, None, None, ordering).await? { write_export_rows_from_batch(db, table_key, &batch, None, writer)?; } return Ok(()); } let batches = db - .table_store - .scan_with(&ds, None, None, ordering, true, |_| Ok(())) + .storage() + .scan_with_row_id(&ds, None, None, ordering, true) .await?; for batch in batches { let row_ids = batch @@ -175,7 +175,13 @@ async fn export_table_to_writer( .iter() .copied() .collect::>(); - let blob_values = export_blob_values(&ds, &batch, &row_ids, blob_properties).await?; + // Blob materialization reaches through to the inner Lance + // `Dataset` because `take_blobs` is a Lance-only API not lifted + // onto the `TableStorage` trait surface (the trait covers + // staged-write and snapshot-scan primitives; blob descriptor + // materialization sits outside that surface). + let blob_values = + export_blob_values(ds.dataset(), &batch, &row_ids, blob_properties).await?; write_export_rows_from_batch(db, table_key, &batch, Some(&blob_values), writer)?; } Ok(()) diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index fff3f54..ae060cf 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -183,7 +183,7 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result> = futures::stream::iter(table_tasks.into_iter()) .map(|(table_key, full_path, has_blob)| async move { @@ -204,9 +204,16 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result = async { crate::failpoints::maybe_fail("cleanup.table_gc")?; - let ds = table_store + // `cleanup_old_versions` is a Lance-only maintenance API not + // surfaced through `TableStorage` — see the optimize path + // above for the same rationale. Unwrap via `into_dataset()`. + let handle = storage .open_dataset_head_for_write(&table_key, &full_path, None) .await?; + let ds = handle.into_dataset(); let before_version = keep_versions .map(|n| ds.version().version.saturating_sub(n as u64)) .filter(|v| *v > 0); @@ -395,8 +406,9 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result listed, Err(err) => { tracing::warn!( @@ -411,7 +423,7 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result db.table_store.force_delete_branch(&full_path, &branch).await, + Ok(()) => storage.force_delete_branch(&full_path, &branch).await, Err(injected) => Err(injected), }; match outcome { diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 35fe161..dd36bc0 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -355,7 +355,7 @@ where let entry = snapshot.entry(table_key)?; Some(crate::db::manifest::SidecarTablePin { table_key: table_key.clone(), - table_path: db.table_store.dataset_uri(&entry.table_path), + table_path: db.storage().dataset_uri(&entry.table_path), expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, table_branch: entry.table_branch.clone(), @@ -469,12 +469,14 @@ where for table_key in &added_tables { let table_path = table_path_for_table_key(table_key)?; - let dataset_uri = db.table_store.dataset_uri(&table_path); + let dataset_uri = db.storage().dataset_uri(&table_path); let schema = schema_for_table_key(&desired_catalog, table_key)?; - let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?; + let mut ds = SnapshotHandle::new( + TableStore::create_empty_dataset(&dataset_uri, &schema).await?, + ); db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) .await?; - let state = db.table_store.table_state(&dataset_uri, &ds).await?; + let state = db.storage().table_state(&dataset_uri, &ds).await?; table_registrations.insert(table_key.clone(), table_path); table_updates.insert( table_key.clone(), @@ -496,7 +498,10 @@ where )) })?; ensure_snapshot_entry_head_matches(db, source_entry).await?; - let source_ds = snapshot.open(source_table_key).await?; + let source_ds = db + .storage() + .open_snapshot_at_table(&snapshot, source_table_key) + .await?; let current_catalog = db.catalog(); let batch = batch_for_schema_apply_rewrite( db, @@ -509,11 +514,13 @@ where ) .await?; let table_path = table_path_for_table_key(target_table_key)?; - let dataset_uri = db.table_store.dataset_uri(&table_path); - let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?; + let dataset_uri = db.storage().dataset_uri(&table_path); + let mut target_ds = SnapshotHandle::new( + TableStore::write_dataset(&dataset_uri, batch).await?, + ); db.build_indices_on_dataset_for_catalog(&desired_catalog, target_table_key, &mut target_ds) .await?; - let state = db.table_store.table_state(&dataset_uri, &target_ds).await?; + let state = db.storage().table_state(&dataset_uri, &target_ds).await?; table_registrations.insert(target_table_key.clone(), table_path); table_updates.insert( target_table_key.clone(), @@ -542,7 +549,10 @@ where )) })?; ensure_snapshot_entry_head_matches(db, entry).await?; - let source_ds = snapshot.open(table_key).await?; + let source_ds = db + .storage() + .open_snapshot_at_table(&snapshot, table_key) + .await?; let current_catalog = db.catalog(); let batch = batch_for_schema_apply_rewrite( db, @@ -554,7 +564,7 @@ where property_renames.get(table_key), ) .await?; - let dataset_uri = db.table_store.dataset_uri(&entry.table_path); + let dataset_uri = db.storage().dataset_uri(&entry.table_path); // Route through stage_overwrite + commit_staged for non-empty // batches. Lance's `InsertBuilder::execute_uncommitted` // errors on empty data (lance-4.0.0 `src/dataset/write/insert.rs:144`), @@ -564,7 +574,7 @@ where // — and schema_apply runs under `__schema_apply_lock__` so the // narrow inline-commit residual is bounded. let mut target_ds = if batch.num_rows() == 0 { - TableStore::overwrite_dataset(&dataset_uri, batch).await? + SnapshotHandle::new(TableStore::overwrite_dataset(&dataset_uri, batch).await?) } else { // Pass `entry.table_branch.as_deref()` (not `None`) for // consistency with the indexed_tables block below. Schema @@ -574,17 +584,15 @@ where // means a future relaxation of the lock-check can't quietly // open the wrong HEAD here. let existing = db - .table_store + .storage() .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref()) .await?; - let staged = db.table_store.stage_overwrite(&existing, batch).await?; - db.table_store - .commit_staged(Arc::new(existing), staged.transaction) - .await? + let staged = db.storage().stage_overwrite(&existing, batch).await?; + db.storage().commit_staged(existing, staged).await? }; db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds) .await?; - let state = db.table_store.table_state(&dataset_uri, &target_ds).await?; + let state = db.storage().table_state(&dataset_uri, &target_ds).await?; table_updates.insert( table_key.clone(), crate::db::SubTableUpdate { @@ -611,16 +619,16 @@ where )) })?; ensure_snapshot_entry_head_matches(db, entry).await?; - let dataset_uri = db.table_store.dataset_uri(&entry.table_path); + let dataset_uri = db.storage().dataset_uri(&entry.table_path); let mut ds = db - .table_store + .storage() .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref()) .await?; - db.table_store + db.storage() .ensure_expected_version(&ds, table_key, entry.table_version)?; db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) .await?; - let state = db.table_store.table_state(&dataset_uri, &ds).await?; + let state = db.storage().table_state(&dataset_uri, &ds).await?; table_updates.insert( table_key.clone(), crate::db::SubTableUpdate { @@ -869,22 +877,22 @@ pub(super) async fn ensure_snapshot_entry_head_matches( db: &Omnigraph, entry: &SubTableEntry, ) -> Result<()> { - let dataset_uri = db.table_store.dataset_uri(&entry.table_path); + let dataset_uri = db.storage().dataset_uri(&entry.table_path); let ds = db - .table_store + .storage() .open_dataset_head_for_write( &entry.table_key, &dataset_uri, entry.table_branch.as_deref(), ) .await?; - db.table_store + db.storage() .ensure_expected_version(&ds, &entry.table_key, entry.table_version) } pub(super) async fn batch_for_schema_apply_rewrite( db: &Omnigraph, - source_ds: &Dataset, + source_ds: &SnapshotHandle, source_table_key: &str, source_catalog: &Catalog, target_table_key: &str, @@ -896,11 +904,11 @@ pub(super) async fn batch_for_schema_apply_rewrite( let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?; let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty(); let batches = if needs_row_ids { - db.table_store() - .scan_with(source_ds, None, None, None, true, |_| Ok(())) + db.storage() + .scan_with_row_id(source_ds, None, None, None, true) .await? } else { - db.table_store().scan_batches(source_ds).await? + db.storage().scan_batches(source_ds).await? }; if batches.is_empty() { return Ok(RecordBatch::new_empty(target_schema)); @@ -970,7 +978,7 @@ pub(super) async fn batch_for_schema_apply_rewrite( async fn rebuild_blob_column( _db: &Omnigraph, - source_ds: &Dataset, + source_ds: &SnapshotHandle, column_name: &str, descriptions: &StructArray, row_ids: &[u64], @@ -990,7 +998,7 @@ async fn rebuild_blob_column( let blob_files = if non_null_row_ids.is_empty() { Vec::new() } else { - Arc::new(source_ds.clone()) + Arc::new(source_ds.dataset().clone()) .take_blobs(&non_null_row_ids, column_name) .await .map_err(|e| OmniError::Lance(e.to_string()))? diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 3ed9c43..06f81b4 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -50,10 +50,10 @@ pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test( .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; let full_path = format!("{}/{}", db.root_uri, entry.table_path); let ds = db - .table_store + .storage() .open_dataset_head_for_write(table_key, &full_path, table_branch) .await?; - let state = db.table_store.table_state(&full_path, &ds).await?; + let state = db.storage().table_state(&full_path, &ds).await?; let update = crate::db::SubTableUpdate { table_key: table_key.to_string(), table_version: state.version, @@ -209,18 +209,18 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st } }, None => ( - db.table_store + db.storage() .open_dataset_head_for_write(&table_key, &full_path, None) .await?, None, ), }; - let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0); + let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0); if row_count > 0 { build_indices_on_dataset(db, &table_key, &mut ds).await?; } - let state = db.table_store.table_state(&full_path, &ds).await?; + let state = db.storage().table_state(&full_path, &ds).await?; if state.version != entry.table_version || resolved_branch.as_deref() != entry.table_branch.as_deref() { @@ -257,18 +257,18 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st } }, None => ( - db.table_store + db.storage() .open_dataset_head_for_write(&table_key, &full_path, None) .await?, None, ), }; - let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0); + let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0); if row_count > 0 { build_indices_on_dataset(db, &table_key, &mut ds).await?; } - let state = db.table_store.table_state(&full_path, &ds).await?; + let state = db.storage().table_state(&full_path, &ds).await?; if state.version != entry.table_version || resolved_branch.as_deref() != entry.table_branch.as_deref() { @@ -331,7 +331,7 @@ async fn needs_index_work_node( table_branch: Option<&str>, ) -> Result { let ds = db - .table_store + .storage() .open_dataset_head_for_write(table_key, full_path, table_branch) .await?; // Empty tables are skipped by the ensure_indices loop, so they must @@ -341,10 +341,10 @@ async fn needs_index_work_node( // Errors from count_rows are propagated: silently treating them as // "0 rows" risks skipping a table that is actually about to be // modified. - if db.table_store.count_rows(&ds, None).await? == 0 { + if db.storage().count_rows(&ds, None).await? == 0 { return Ok(false); } - if !db.table_store.has_btree_index(&ds, "id").await? { + if !db.storage().has_btree_index(&ds, "id").await? { return Ok(true); } let catalog = db.catalog(); @@ -360,11 +360,11 @@ async fn needs_index_work_node( continue; }; if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list { - if !db.table_store.has_fts_index(&ds, prop_name).await? { + if !db.storage().has_fts_index(&ds, prop_name).await? { return Ok(true); } } else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list { - if !db.table_store.has_vector_index(&ds, prop_name).await? { + if !db.storage().has_vector_index(&ds, prop_name).await? { return Ok(true); } } @@ -389,22 +389,22 @@ async fn needs_index_work_edge( table_branch: Option<&str>, ) -> Result { let ds = db - .table_store + .storage() .open_dataset_head_for_write(table_key, full_path, table_branch) .await?; - if db.table_store.count_rows(&ds, None).await? == 0 { + if db.storage().count_rows(&ds, None).await? == 0 { return Ok(false); } - Ok(!db.table_store.has_btree_index(&ds, "id").await? - || !db.table_store.has_btree_index(&ds, "src").await? - || !db.table_store.has_btree_index(&ds, "dst").await?) + Ok(!db.storage().has_btree_index(&ds, "id").await? + || !db.storage().has_btree_index(&ds, "src").await? + || !db.storage().has_btree_index(&ds, "dst").await?) } pub(super) async fn open_for_mutation( db: &Omnigraph, table_key: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(Dataset, String, Option)> { +) -> Result<(SnapshotHandle, String, Option)> { let current_branch = db .coordinator .read() @@ -425,7 +425,7 @@ pub(super) async fn open_for_mutation_on_branch( branch: Option<&str>, table_key: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(Dataset, String, Option)> { +) -> Result<(SnapshotHandle, String, Option)> { db.ensure_schema_apply_not_locked("write").await?; let resolved = db.resolved_branch_target(branch).await?; let entry = resolved @@ -436,11 +436,11 @@ pub(super) async fn open_for_mutation_on_branch( match resolved.branch.as_deref() { None => { let ds = db - .table_store + .storage() .open_dataset_head_for_write(table_key, &full_path, None) .await?; if op_kind.strict_pre_stage_version_check() { - db.table_store + db.storage() .ensure_expected_version(&ds, table_key, entry.table_version)?; } Ok((ds, full_path, None)) @@ -469,15 +469,15 @@ pub(super) async fn open_owned_dataset_for_branch_write( entry_version: u64, active_branch: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(Dataset, Option)> { +) -> Result<(SnapshotHandle, Option)> { match entry_branch { Some(branch) if branch == active_branch => { let ds = db - .table_store + .storage() .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) .await?; if op_kind.strict_pre_stage_version_check() { - db.table_store + db.storage() .ensure_expected_version(&ds, table_key, entry_version)?; } Ok((ds, Some(active_branch.to_string()))) @@ -509,11 +509,11 @@ pub(super) async fn open_owned_dataset_for_branch_write( ) .await?; let ds = db - .table_store + .storage() .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) .await?; if op_kind.strict_pre_stage_version_check() { - db.table_store + db.storage() .ensure_expected_version(&ds, table_key, entry_version)?; } Ok((ds, Some(active_branch.to_string()))) @@ -528,8 +528,8 @@ pub(super) async fn fork_dataset_from_entry_state( source_branch: Option<&str>, source_version: u64, active_branch: &str, -) -> Result { - db.table_store +) -> Result { + db.storage() .fork_branch_from_state( full_path, source_branch, @@ -547,10 +547,10 @@ pub(super) async fn reopen_for_mutation( table_branch: Option<&str>, expected_version: u64, op_kind: crate::db::MutationOpKind, -) -> Result { +) -> Result { db.ensure_schema_apply_not_locked("write").await?; if op_kind.strict_pre_stage_version_check() { - db.table_store + db.storage() .reopen_for_mutation(full_path, table_branch, table_key, expected_version) .await } else { @@ -563,7 +563,7 @@ pub(super) async fn reopen_for_mutation( // genuine cross-process drift as 409. See // [`crate::db::MutationOpKind`] for the policy rationale. let _ = expected_version; - db.table_store + db.storage() .open_dataset_head_for_write(table_key, full_path, table_branch) .await } @@ -574,8 +574,8 @@ pub(super) async fn open_dataset_at_state( table_path: &str, table_branch: Option<&str>, table_version: u64, -) -> Result { - db.table_store +) -> Result { + db.storage() .open_dataset_at_state(table_path, table_branch, table_version) .await } @@ -583,7 +583,7 @@ pub(super) async fn open_dataset_at_state( pub(super) async fn build_indices_on_dataset( db: &Omnigraph, table_key: &str, - ds: &mut Dataset, + ds: &mut SnapshotHandle, ) -> Result<()> { let catalog = db.catalog(); build_indices_on_dataset_for_catalog(db, &catalog, table_key, ds).await @@ -593,10 +593,10 @@ pub(super) async fn build_indices_on_dataset_for_catalog( db: &Omnigraph, catalog: &Catalog, table_key: &str, - ds: &mut Dataset, + ds: &mut SnapshotHandle, ) -> Result<()> { if let Some(type_name) = table_key.strip_prefix("node:") { - if !db.table_store.has_btree_index(ds, "id").await? { + if !db.storage().has_btree_index(ds, "id").await? { stage_and_commit_btree(db, table_key, ds, &["id"]).await?; } @@ -616,19 +616,20 @@ pub(super) async fn build_indices_on_dataset_for_catalog( let prop_name = &index_cols[0]; if let Some(prop_type) = node_type.properties.get(prop_name) { if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list { - if !db.table_store.has_fts_index(ds, prop_name).await? { + if !db.storage().has_fts_index(ds, prop_name).await? { stage_and_commit_inverted(db, table_key, ds, prop_name.as_str()) .await?; } } else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list { - if !db.table_store.has_vector_index(ds, prop_name).await? { + if !db.storage().has_vector_index(ds, prop_name).await? { // Inline-commit residual: lance-4.0.0 does not // expose `build_index_metadata_from_segments` as // `pub`, so vector indices cannot be staged from // outside the lance crate. Document at the call // site; companion ticket to lance-format/lance#6658. - db.table_store - .create_vector_index(ds, prop_name.as_str()) + let new_snap = db + .storage() + .create_vector_index(ds.clone(), prop_name.as_str()) .await .map_err(|e| { OmniError::Lance(format!( @@ -636,6 +637,7 @@ pub(super) async fn build_indices_on_dataset_for_catalog( table_key, prop_name, e )) })?; + *ds = new_snap; } } } @@ -645,13 +647,13 @@ pub(super) async fn build_indices_on_dataset_for_catalog( } if table_key.starts_with("edge:") { - if !db.table_store.has_btree_index(ds, "id").await? { + if !db.storage().has_btree_index(ds, "id").await? { stage_and_commit_btree(db, table_key, ds, &["id"]).await?; } - if !db.table_store.has_btree_index(ds, "src").await? { + if !db.storage().has_btree_index(ds, "src").await? { stage_and_commit_btree(db, table_key, ds, &["src"]).await?; } - if !db.table_store.has_btree_index(ds, "dst").await? { + if !db.storage().has_btree_index(ds, "dst").await? { stage_and_commit_btree(db, table_key, ds, &["dst"]).await?; } return Ok(()); @@ -674,11 +676,11 @@ pub(super) async fn build_indices_on_dataset_for_catalog( async fn stage_and_commit_btree( db: &Omnigraph, table_key: &str, - ds: &mut Dataset, + ds: &mut SnapshotHandle, columns: &[&str], ) -> Result<()> { let staged = db - .table_store + .storage() .stage_create_btree_index(ds, columns) .await .map_err(|e| { @@ -693,8 +695,8 @@ async fn stage_and_commit_btree( // yet called) leaves no Lance-HEAD drift on the touched table. crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?; let new_ds = db - .table_store - .commit_staged(Arc::new(ds.clone()), staged.transaction) + .storage() + .commit_staged(ds.clone(), staged) .await .map_err(|e| { OmniError::Lance(format!( @@ -711,11 +713,11 @@ async fn stage_and_commit_btree( async fn stage_and_commit_inverted( db: &Omnigraph, table_key: &str, - ds: &mut Dataset, + ds: &mut SnapshotHandle, column: &str, ) -> Result<()> { let staged = db - .table_store + .storage() .stage_create_inverted_index(ds, column) .await .map_err(|e| { @@ -725,8 +727,8 @@ async fn stage_and_commit_inverted( )) })?; let new_ds = db - .table_store - .commit_staged(Arc::new(ds.clone()), staged.transaction) + .storage() + .commit_staged(ds.clone(), staged) .await .map_err(|e| { OmniError::Lance(format!( @@ -777,7 +779,7 @@ async fn prepare_updates_for_commit( ) .await?; build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?; - let state = db.table_store.table_state(&full_path, &ds).await?; + let state = db.storage().table_state(&full_path, &ds).await?; prepared_update.table_version = state.version; prepared_update.row_count = state.row_count; prepared_update.version_metadata = state.version_metadata; diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 2e5f32e..c190221 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -928,7 +928,7 @@ async fn publish_adopted_source_state( target_branch, ) .await?; - let state = target_db.table_store().table_state(&full_path, &ds).await?; + let state = target_db.storage().table_state(&full_path, &ds).await?; Ok(crate::db::SubTableUpdate { table_key: table_key.to_string(), table_version: state.version, @@ -965,9 +965,13 @@ async fn publish_rewritten_merge_table( // commit point, narrowed from the previous "merge_insert + delete + // index" multi-step inline-commit chain. if let Some(delta) = &staged.delta_staged { + // The staged delta dataset is a temp-dir Lance dataset used only + // to collect the rewrite batches; wrap it in a `SnapshotHandle` + // so we can route through the trait's `scan_batches_for_rewrite`. + let delta_snapshot = SnapshotHandle::new(delta.dataset.clone()); let batches: Vec = target_db - .table_store() - .scan_batches_for_rewrite(&delta.dataset) + .storage() + .scan_batches_for_rewrite(&delta_snapshot) .await? .into_iter() .filter(|batch| batch.num_rows() > 0) @@ -982,7 +986,7 @@ async fn publish_rewritten_merge_table( .map_err(|e| OmniError::Lance(e.to_string()))? }; let staged_merge = target_db - .table_store() + .storage() .stage_merge_insert( current_ds.clone(), combined, @@ -992,8 +996,8 @@ async fn publish_rewritten_merge_table( ) .await?; current_ds = target_db - .table_store() - .commit_staged(Arc::new(current_ds), staged_merge.transaction) + .storage() + .commit_staged(current_ds, staged_merge) .await?; } } @@ -1014,10 +1018,11 @@ async fn publish_rewritten_merge_table( .map(|id| format!("'{}'", id.replace('\'', "''"))) .collect(); let filter = format!("id IN ({})", escaped.join(", ")); - target_db - .table_store() - .delete_where(&full_path, &mut current_ds, &filter) + let (new_ds, _) = target_db + .storage() + .delete_where(&full_path, current_ds, &filter) .await?; + current_ds = new_ds; } // Phase 3: rebuild indices. @@ -1028,7 +1033,7 @@ async fn publish_rewritten_merge_table( // (`build_index_metadata_from_segments` is `pub(crate)` in lance- // 4.0.0 — companion ticket to lance-format/lance#6658). let row_count = target_db - .table_store() + .storage() .table_state(&full_path, ¤t_ds) .await? .row_count; @@ -1038,7 +1043,7 @@ async fn publish_rewritten_merge_table( .await?; } let final_state = target_db - .table_store() + .storage() .table_state(&full_path, ¤t_ds) .await?; @@ -1364,7 +1369,7 @@ impl Omnigraph { let entry = target_snapshot.entry(table_key)?; Some(crate::db::manifest::SidecarTablePin { table_key: table_key.clone(), - table_path: self.table_store().dataset_uri(&entry.table_path), + table_path: self.storage().dataset_uri(&entry.table_path), expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, // Use the merge target branch (where commits actually diff --git a/crates/omnigraph/src/exec/mod.rs b/crates/omnigraph/src/exec/mod.rs index 33a7e41..a114b78 100644 --- a/crates/omnigraph/src/exec/mod.rs +++ b/crates/omnigraph/src/exec/mod.rs @@ -40,6 +40,7 @@ use crate::db::{ReadTarget, Snapshot}; use crate::embedding::EmbeddingClient; use crate::error::{MergeConflict, MergeConflictKind, OmniError, Result}; use crate::graph_index::GraphIndex; +use crate::storage_layer::SnapshotHandle; use tempfile::{Builder as TempDirBuilder, TempDir}; mod merge; diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 02b2a21..47283f7 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -428,12 +428,11 @@ async fn ensure_node_id_exists( let filter = format!("id = '{}'", id.replace('\'', "''")); let snapshot = db.snapshot_for_branch(branch).await?; - let ds = snapshot.open(&table_key).await?; - let exists = ds - .count_rows(Some(filter)) - .await - .map_err(|e| OmniError::Lance(e.to_string()))? - > 0; + let ds = db + .storage() + .open_snapshot_at_table(&snapshot, &table_key) + .await?; + let exists = db.storage().count_rows(&ds, Some(filter)).await? > 0; if exists { Ok(()) @@ -601,7 +600,7 @@ async fn open_table_for_mutation( branch: Option<&str>, table_key: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(Dataset, String, Option)> { +) -> Result<(SnapshotHandle, String, Option)> { if let Some(prior) = staging.inline_committed.get(table_key) { let path = staging.paths.get(table_key).ok_or_else(|| { OmniError::manifest_internal(format!( @@ -623,7 +622,7 @@ async fn open_table_for_mutation( let (ds, full_path, table_branch) = db .open_for_mutation_on_branch(branch, table_key, op_kind) .await?; - let expected_version = ds.version().version; + let expected_version = ds.version(); staging.ensure_path( table_key, full_path.clone(), @@ -1055,7 +1054,7 @@ impl Omnigraph { // and a chained `update where ` can match a row whose // pending value no longer satisfies . let batches = self - .table_store() + .storage() .scan_with_pending( &ds, pending_batches, @@ -1153,13 +1152,13 @@ impl Omnigraph { crate::db::MutationOpKind::Delete, ) .await?; - let initial_version = ds.version().version; + let initial_version = ds.version(); // Scan matching IDs for cascade. Per D₂ this never overlaps with // staged inserts (mixed insert/delete in one query is rejected at // parse time), so we scan committed only. let batches = self - .table_store() + .storage() .scan(&ds, Some(&["id"]), Some(&pred_sql), None) .await?; @@ -1191,7 +1190,7 @@ impl Omnigraph { // deletes from coexisting in one query, so this advance of Lance // HEAD is the only HEAD movement during the query and the // publisher's CAS captures it intact. - let mut ds = self + let ds = self .reopen_for_mutation( &table_key, &full_path, @@ -1201,9 +1200,9 @@ impl Omnigraph { ) .await?; crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?; - let delete_state = self - .table_store() - .delete_where(&full_path, &mut ds, &pred_sql) + let (_new_ds, delete_state) = self + .storage() + .delete_where(&full_path, ds, &pred_sql) .await?; staging.record_inline(crate::db::SubTableUpdate { @@ -1242,7 +1241,7 @@ impl Omnigraph { let edge_table_key = format!("edge:{}", edge_name); let cascade_filter = cascade_filters.join(" OR "); - let (mut edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation( + let (edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation( self, staging, branch, @@ -1251,9 +1250,9 @@ impl Omnigraph { ) .await?; - let edge_delete = self - .table_store() - .delete_where(&edge_full_path, &mut edge_ds, &cascade_filter) + let (_new_edge_ds, edge_delete) = self + .storage() + .delete_where(&edge_full_path, edge_ds, &cascade_filter) .await?; affected_edges += edge_delete.deleted_rows; @@ -1290,7 +1289,7 @@ impl Omnigraph { let pred_sql = predicate_to_sql(predicate, params, true)?; let table_key = format!("edge:{}", type_name); - let (mut ds, full_path, table_branch) = open_table_for_mutation( + let (ds, full_path, table_branch) = open_table_for_mutation( self, staging, branch, @@ -1299,9 +1298,9 @@ impl Omnigraph { ) .await?; - let delete_state = self - .table_store() - .delete_where(&full_path, &mut ds, &pred_sql) + let (_new_ds, delete_state) = self + .storage() + .delete_where(&full_path, ds, &pred_sql) .await?; let affected = delete_state.deleted_rows; @@ -1355,7 +1354,7 @@ fn concat_match_batches_to_schema( /// dedup needed (`dedupe_key_column = None`). async fn validate_edge_cardinality_with_pending( db: &Omnigraph, - committed_ds: &Dataset, + committed_ds: &SnapshotHandle, staging: &MutationStaging, table_key: &str, edge_type: &omnigraph_compiler::catalog::EdgeType, diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 0d26fd3..d78f8eb 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use arrow_array::{Array, RecordBatch, StringArray, UInt32Array}; use arrow_schema::SchemaRef; -use lance::Dataset; +use crate::storage_layer::{SnapshotHandle, StagedHandle}; use omnigraph_compiler::catalog::EdgeType; use crate::db::manifest::{ @@ -325,9 +325,9 @@ impl MutationStaging { // Stage produces uncommitted fragments + transaction. No // Lance HEAD advance until `commit_all` runs `commit_staged`. let staged = match table.mode { - PendingMode::Append => db.table_store().stage_append(&ds, combined, &[]).await?, + PendingMode::Append => db.storage().stage_append(&ds, combined, &[]).await?, PendingMode::Merge => { - db.table_store() + db.storage() .stage_merge_insert( ds.clone(), combined, @@ -389,15 +389,17 @@ pub(crate) struct StagedMutation { } /// Per-table state captured during `stage_all` and consumed by -/// `commit_all`. Holds the opened `Dataset` so `commit_staged` doesn't -/// re-open, and the `StagedWrite` whose `transaction` `commit_staged` -/// will execute. +/// `commit_all`. Holds the opened snapshot (so `commit_staged` doesn't +/// re-open) plus the staged Lance transaction that `commit_staged` +/// will execute. Both held as opaque `TableStorage` handles per MR-793 +/// §III.9 — the inner `lance::Dataset` / `StagedWrite` are not visible +/// to engine code outside the storage layer. struct StagedTableEntry { table_key: String, path: StagedTablePath, expected_version: u64, - dataset: lance::Dataset, - staged_write: crate::table_store::StagedWrite, + dataset: SnapshotHandle, + staged_write: StagedHandle, } impl StagedMutation { @@ -648,11 +650,11 @@ impl StagedMutation { } = entry; let new_ds = db - .table_store() - .commit_staged(Arc::new(dataset), staged_write.transaction) + .storage() + .commit_staged(dataset, staged_write) .await?; let state = db - .table_store() + .storage() .table_state(&path.full_path, &new_ds) .await?; updates.push(SubTableUpdate { @@ -803,7 +805,7 @@ fn dedupe_merge_batches_by_id( /// `LoadMode::Merge` double-counts. pub(crate) async fn count_src_per_edge( db: &crate::db::Omnigraph, - committed_ds: &Dataset, + committed_ds: &SnapshotHandle, table_key: &str, staging: &MutationStaging, dedupe_key_column: Option<&str>, @@ -841,7 +843,7 @@ pub(crate) async fn count_src_per_edge( _ => vec!["src"], }; let committed = db - .table_store() + .storage() .scan(committed_ds, Some(&projection), None, None) .await?; for batch in &committed { diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 46a46e2..9876309 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -418,7 +418,7 @@ async fn load_jsonl_reader( let (ds, full_path, table_branch) = db .open_for_mutation_on_branch(branch, &table_key, load_op_kind) .await?; - let expected_version = ds.version().version; + let expected_version = ds.version(); staging.ensure_path( &table_key, full_path, @@ -528,7 +528,7 @@ async fn load_jsonl_reader( let (ds, full_path, table_branch) = db .open_for_mutation_on_branch(branch, &table_key, load_op_kind) .await?; - let expected_version = ds.version().version; + let expected_version = ds.version(); staging.ensure_path( &table_key, full_path, @@ -1205,28 +1205,45 @@ async fn write_batch_to_dataset( LoadMode::Merge => crate::db::MutationOpKind::Merge, LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite, }; - let (mut ds, full_path, table_branch) = db + let (ds, full_path, table_branch) = db .open_for_mutation_on_branch(branch, table_key, op_kind) .await?; - let table_store = db.table_store(); match mode { LoadMode::Overwrite => { - let state = table_store - .overwrite_batch(&full_path, &mut ds, batch) + // Inline-commit residual: the Overwrite path here is the + // legacy concurrent fast-path used by Phase 2 of the loader + // (Append/Merge route through MutationStaging instead). + // `overwrite_batch` advances Lance HEAD as a side effect; + // there is no public two-phase overwrite that fits this + // shape until Lance issues #6658/#6666 close. + let (_new_ds, state) = db + .storage() + .overwrite_batch(&full_path, ds, batch) .await?; Ok((state, table_branch)) } LoadMode::Append => { - let state = table_store.append_batch(&full_path, &mut ds, batch).await?; + // Same residual class as Overwrite above. The staged-write + // path is the `use_staging` branch in `load_with_actor`; + // this concurrent path is the per-table fast-path retained + // for parallelism. MR-793 Phase 9 will demote + // `append_batch` to `pub(crate)` once this last consumer + // moves to the staged primitive. + let (_new_ds, state) = db + .storage() + .append_batch(&full_path, ds, batch) + .await?; Ok((state, table_branch)) } LoadMode::Merge => { - let state = table_store - .merge_insert_batch( + // Same residual class as the other two arms. + let state = db + .storage() + .merge_insert_batches( &full_path, ds, - batch, + vec![batch], vec!["id".to_string()], lance::dataset::WhenMatched::UpdateAll, lance::dataset::WhenNotMatched::InsertAll, @@ -1596,7 +1613,7 @@ pub(crate) async fn validate_edge_cardinality( // Scan src column, count per source let batches = db - .table_store() + .storage() .scan(&ds, Some(&["src"]), None, None) .await?; @@ -1725,7 +1742,7 @@ async fn collect_node_ids_with_pending( .await?; let batches = db - .table_store() + .storage() .scan(&ds, Some(&["id"]), None, None) .await?; @@ -1794,7 +1811,7 @@ async fn collect_node_ids( .await?; let batches = db - .table_store() + .storage() .scan(&ds, Some(&["id"]), None, None) .await?; diff --git a/crates/omnigraph/src/storage_layer.rs b/crates/omnigraph/src/storage_layer.rs index dac9482..5cb5274 100644 --- a/crates/omnigraph/src/storage_layer.rs +++ b/crates/omnigraph/src/storage_layer.rs @@ -111,6 +111,17 @@ impl SnapshotHandle { self.inner } + /// Take ownership of the inner `Dataset` by unwrapping the `Arc` + /// (or cloning if the snapshot is shared). `pub(crate)` — used + /// only by the maintenance path (`optimize`, `cleanup`) which + /// must hand `&mut Dataset` to Lance compaction / cleanup APIs + /// that the `TableStorage` trait does not (and should not) + /// surface. Engine code that participates in the staged-write + /// invariant must stay on the trait methods. + pub(crate) fn into_dataset(self) -> Dataset { + Arc::try_unwrap(self.inner).unwrap_or_else(|arc| (*arc).clone()) + } + // ── public, lance-free accessors ── /// Current Lance manifest version of the snapshot. @@ -208,6 +219,20 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>; + /// Idempotent variant of `delete_branch` used by the best-effort fork + /// reclaim under branch delete (`db/omnigraph.rs::cleanup_deleted_branch_tables`) + /// and by the orphan-fork reconciler in `optimize`. Tolerates an + /// already-absent branch (both Lance's `RefNotFound` and the local-store + /// `NotFound` quirk on a missing `tree/{branch}/` dir). A still-referenced + /// branch (`RefConflict`) still surfaces as `OmniError::Lance`. + async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>; + + /// List the named Lance branches present on the dataset at `dataset_uri`. + /// The `cleanup` orphan reconciler diffs this against the manifest + /// branch set to find orphaned per-table forks. `main`/default is not a + /// named branch and never appears here. + async fn list_branches(&self, dataset_uri: &str) -> Result>; + async fn reopen_for_mutation( &self, dataset_uri: &str, @@ -496,6 +521,14 @@ impl TableStorage for TableStore { TableStore::delete_branch(self, dataset_uri, branch).await } + async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> { + TableStore::force_delete_branch(self, dataset_uri, branch).await + } + + async fn list_branches(&self, dataset_uri: &str) -> Result> { + TableStore::list_branches(self, dataset_uri).await + } + async fn reopen_for_mutation( &self, dataset_uri: &str, diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 10123b0..dca5b79 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -601,7 +601,16 @@ impl TableStore { }) } - pub async fn append_batch( + /// Legacy inline-commit append: writes fragments AND commits in one + /// call, advancing Lance HEAD as a side effect. Demoted to + /// `pub(crate)` by MR-793 Phase 9 — the staged primitive + /// `stage_append` + `commit_staged` is the public engine surface; + /// this one survives only as a residual called by + /// `loader::write_batch_to_dataset` (LoadMode::Append concurrent + /// fast-path) and the deprecated `merge_insert_batch` chain. Do not + /// add new call sites — they re-introduce the multi-phase commit + /// drift the trait surface was designed to eliminate. + pub(crate) async fn append_batch( &self, dataset_uri: &str, ds: &mut Dataset, @@ -656,7 +665,14 @@ impl TableStore { } } - pub async fn overwrite_batch( + /// Legacy inline-commit overwrite: truncates then + /// `append_batch`-commits, advancing Lance HEAD as a side effect. + /// Demoted to `pub(crate)` by MR-793 Phase 9 — the staged primitive + /// `stage_overwrite` + `commit_staged` is the public engine surface; + /// this one survives only as the LoadMode::Overwrite concurrent + /// fast-path inside `loader::write_batch_to_dataset`. Do not add new + /// call sites. + pub(crate) async fn overwrite_batch( &self, dataset_uri: &str, ds: &mut Dataset, @@ -682,7 +698,14 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } - pub async fn merge_insert_batch( + /// Legacy inline-commit merge-insert (single batch). Demoted to + /// `pub(crate)` by MR-793 Phase 9 — the staged primitive + /// `stage_merge_insert` + `commit_staged` is the public engine + /// surface; this one survives only as the body of + /// `merge_insert_batches` (which is itself the loader's + /// LoadMode::Merge concurrent fast-path). Do not add new call + /// sites. + pub(crate) async fn merge_insert_batch( &self, dataset_uri: &str, ds: Dataset, @@ -759,7 +782,14 @@ impl TableStore { self.table_state(dataset_uri, &new_ds).await } - pub async fn merge_insert_batches( + /// Legacy inline-commit merge-insert (multiple batches concatenated + /// into one merge). Demoted to `pub(crate)` by MR-793 Phase 9 — the + /// staged primitive `stage_merge_insert` + `commit_staged` is the + /// public engine surface; this one survives only via the + /// `TableStorage::merge_insert_batches` trait method, called by + /// `loader::write_batch_to_dataset` (LoadMode::Merge concurrent + /// fast-path). Do not add new call sites. + pub(crate) async fn merge_insert_batches( &self, dataset_uri: &str, ds: Dataset, @@ -1448,7 +1478,17 @@ impl TableStore { })) } - pub async fn create_btree_index(&self, ds: &mut Dataset, columns: &[&str]) -> Result<()> { + /// Legacy inline-commit BTREE scalar index build. Demoted to + /// `pub(crate)` by MR-793 Phase 9 — the staged primitive + /// `stage_create_btree_index` + `commit_staged` is the public engine + /// surface; this one survives only as the body of the trait's + /// inline-commit method (used by no engine call site today). Do not + /// add new call sites. + pub(crate) async fn create_btree_index( + &self, + ds: &mut Dataset, + columns: &[&str], + ) -> Result<()> { let params = ScalarIndexParams::default(); ds.create_index_builder(columns, IndexType::BTree, ¶ms) .replace(true) @@ -1457,7 +1497,17 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } - pub async fn create_inverted_index(&self, ds: &mut Dataset, column: &str) -> Result<()> { + /// Legacy inline-commit INVERTED (FTS) scalar index build. Demoted + /// to `pub(crate)` by MR-793 Phase 9 — the staged primitive + /// `stage_create_inverted_index` + `commit_staged` is the public + /// engine surface; this one survives only as the body of the + /// trait's inline-commit method (used by no engine call site today). + /// Do not add new call sites. + pub(crate) async fn create_inverted_index( + &self, + ds: &mut Dataset, + column: &str, + ) -> Result<()> { let params = InvertedIndexParams::default(); ds.create_index_builder(&[column], IndexType::Inverted, ¶ms) .replace(true) diff --git a/crates/omnigraph/tests/forbidden_apis.rs b/crates/omnigraph/tests/forbidden_apis.rs index 1936815..6d77860 100644 --- a/crates/omnigraph/tests/forbidden_apis.rs +++ b/crates/omnigraph/tests/forbidden_apis.rs @@ -29,15 +29,21 @@ //! the cross-table manifest commit. Documented exception. //! - `crates/omnigraph/src/storage_layer.rs` — IS the trait module. //! -//! ## Transitional allow-list +//! ## Allow-list shape //! -//! The migration of writers onto staged primitives is incremental. -//! Several writers (ensure_indices, branch_merge, schema_apply rewrites) -//! already route through the staged primitives; others (bulk loader, -//! exec/mutation, exec/query) still use the legacy inherent -//! `TableStore` methods — they're not visible at the trait boundary, but -//! they DO call lance types. The file-level allow-list below reflects -//! this transitional state and tightens as call sites migrate. +//! After MR-854 (MR-793 Phase 1b + Phase 9), every engine call site +//! reaches the storage layer through `db.storage()` (returns +//! `&dyn TableStorage`). The inherent inline-commit methods on +//! `TableStore` (`append_batch`, `merge_insert_batch{,es}`, +//! `overwrite_batch`, `create_{btree,inverted}_index`, `truncate_table`) +//! are now `pub(crate)`, so the only direct users are +//! `table_store.rs` itself (which IS the storage layer) and the bulk +//! loader's `LoadMode::{Append, Overwrite, Merge}` concurrent +//! fast-paths in `loader::write_batch_to_dataset` (the loader uses the +//! trait surface for the staged-write path and falls back to the +//! demoted inherent methods only for the concurrent fast-path, which +//! has no two-phase shape in Lance 4.0.0). The file-level allow-list +//! below matches that boundary. use std::path::{Path, PathBuf}; diff --git a/docs/dev/writes.md b/docs/dev/writes.md index 974f7a6..34a5650 100644 --- a/docs/dev/writes.md +++ b/docs/dev/writes.md @@ -109,16 +109,12 @@ MR-793's acceptance criterion §1 ("`TableStore` public API has no method that p | Method on `TableStore` | Inline-commit reason | Closes when | |---|---|---| -| `delete_where` | `DeleteJob` is `pub(crate)` in lance-4.0.0 — no public two-phase delete API | [lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658) lands and `stage_delete` joins the trait | -| `create_vector_index` | Vector indices take Lance's "segment commit path"; the helper `build_index_metadata_from_segments` is `pub(crate)` | [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) lands and `stage_create_vector_index` joins the trait | -| `append_batch` | Legacy inherent method; some engine call sites haven't migrated to `stage_append + commit_staged` yet | MR-793 Phase 1b (call-site conversion) + Phase 9 (demote to `pub(crate)`) | -| `merge_insert_batch` / `merge_insert_batches` | Legacy inherent method | Same — Phase 1b + Phase 9 | -| `overwrite_batch` | Legacy inherent method | Same — Phase 1b + Phase 9 | -| `create_btree_index` (inherent) | Legacy inherent method (the migrated callers use `stage_create_btree_index` + `commit_staged`; the inherent stays for tests / un-migrated paths) | Same — Phase 1b + Phase 9 | -| `create_inverted_index` (inherent) | Same | Same — Phase 1b + Phase 9 + index-class split (MR-848) | -| `truncate_table` (inherent on `TableStore`) | Used by `overwrite_batch` internally | Phase 9 | +| `delete_where` (trait) | `DeleteJob` is `pub(crate)` in lance-4.0.0 — no public two-phase delete API | [lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658) lands and `stage_delete` joins the trait | +| `create_vector_index` (trait) | Vector indices take Lance's "segment commit path"; the helper `build_index_metadata_from_segments` is `pub(crate)` | [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) lands and `stage_create_vector_index` joins the trait | -After **lance#6658 + lance#6666 ship + MR-793 Phase 1b + MR-793 Phase 9 all complete**, the trait surface exposes only staged-write primitives + `commit_staged`. Until then this matrix names every residual explicitly, every call site carries a one-line residual comment, and no engine code outside `table_store.rs` is permitted to reach the inline-commit Lance APIs (enforced by the `tests/forbidden_apis.rs` guard). +MR-854 (Phase 1b + Phase 9) closed the remaining residuals on the engine surface: every `db.table_store.X(...)` call site was converted to `db.storage().X(...)` (trait dispatch through `&dyn TableStorage`), and the inherent inline-commit methods on `TableStore` (`append_batch`, `merge_insert_batch`, `merge_insert_batches`, `overwrite_batch`, `create_btree_index`, `create_inverted_index`, `truncate_table`) were demoted from `pub` to `pub(crate)`. They survive only as the bulk loader's `LoadMode::{Append, Overwrite, Merge}` concurrent fast-paths (see "`LoadMode::Overwrite` residual" below) and as internal helpers for the staged primitives — no engine call site outside `table_store.rs` and `loader::write_batch_to_dataset` reaches them. + +After **lance#6658 + lance#6666 ship**, the trait surface exposes only staged-write primitives + `commit_staged`. Until then this matrix names the two remaining residuals explicitly, every call site carries a one-line residual comment, and no engine code outside `table_store.rs` is permitted to reach the inline-commit Lance APIs (enforced by the `tests/forbidden_apis.rs` guard). ### `LoadMode::Overwrite` residual