mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
MR-854: convert engine call sites to &dyn TableStorage; demote legacy methods
Phase 1b: every db.table_store.X(...) call site converts to
db.storage().X(...), reaching the storage layer through the sealed
TableStorage trait (returns &dyn TableStorage). Opaque SnapshotHandle
and StagedHandle replace bare lance::Dataset and Transaction in the
threaded values.
Phase 9: the inherent inline-commit methods on TableStore
(append_batch, merge_insert_batch{,es}, overwrite_batch,
create_btree_index, create_inverted_index) demote from pub to
pub(crate). Their only remaining direct users are table_store.rs
itself and the bulk loader's LoadMode::{Append, Overwrite, Merge}
concurrent fast-paths in loader::write_batch_to_dataset (no
two-phase shape in Lance 4.0.0 — closes after lance#6658 and #6666).
Docs:
- invariants.md \u00a7VI.23: drop "at the writer-trait surface"
qualifier; staged primitives are now the only engine surface.
- runs.md: residual matrix shrinks to delete_where and
create_vector_index (the two upstream-blocked residuals).
- forbidden_apis.rs: replace transitional language with the
current allow-list shape (table_store.rs + loader concurrent
fast-path only).
Files touched:
- changes/mod.rs, db/omnigraph.rs (+export/optimize/schema_apply/
table_ops.rs), exec/{merge,mod,mutation,staging}.rs,
loader/mod.rs, storage_layer.rs, table_store.rs,
tests/forbidden_apis.rs, docs/{invariants,runs}.md.
Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com>
This commit is contained in:
parent
c7365bf8ef
commit
6a41028bf1
15 changed files with 393 additions and 241 deletions
|
|
@ -7,6 +7,7 @@ use lance::dataset::scanner::ColumnOrdering;
|
|||
use crate::db::SubTableEntry;
|
||||
use crate::db::manifest::Snapshot;
|
||||
use crate::error::Result;
|
||||
use crate::storage_layer::{SnapshotHandle, TableStorage};
|
||||
use crate::table_store::TableStore;
|
||||
|
||||
// ─── Types ──────────────────────────────────────────────────────────────────
|
||||
|
|
@ -229,7 +230,8 @@ async fn diff_table_same_lineage(
|
|||
) -> Result<Vec<EntityChange>> {
|
||||
let vf = from_entry.table_version;
|
||||
let vt = to_entry.table_version;
|
||||
let to_ds = table_store.open_at_entry(to_entry).await?;
|
||||
let storage: &dyn TableStorage = table_store;
|
||||
let to_ds = storage.open_snapshot_at_entry(to_entry).await?;
|
||||
|
||||
let cols: Vec<&str> = if is_edge {
|
||||
vec!["id", "src", "dst", "_row_last_updated_at_version"]
|
||||
|
|
@ -257,12 +259,12 @@ async fn diff_table_same_lineage(
|
|||
"_row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}",
|
||||
vf, vt
|
||||
);
|
||||
let changed_rows = scan_with_filter(table_store, &to_ds, &cols, &filter_sql).await?;
|
||||
let changed_rows = scan_with_filter(storage, &to_ds, &cols, &filter_sql).await?;
|
||||
|
||||
if !changed_rows.is_empty() {
|
||||
// Build the set of IDs that existed at the from version
|
||||
let from_ds = table_store.open_at_entry(from_entry).await?;
|
||||
let from_ids: HashSet<String> = scan_id_set(table_store, &from_ds, &["id"])
|
||||
let from_ds = storage.open_snapshot_at_entry(from_entry).await?;
|
||||
let from_ids: HashSet<String> = scan_id_set(storage, &from_ds, &["id"])
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|r| r.id)
|
||||
|
|
@ -282,8 +284,8 @@ async fn diff_table_same_lineage(
|
|||
|
||||
// Deletes: ID set-difference
|
||||
if wants_deletes {
|
||||
let from_ds = table_store.open_at_entry(from_entry).await?;
|
||||
let deleted = deleted_ids_by_set_diff(table_store, &from_ds, &to_ds, is_edge).await?;
|
||||
let from_ds = storage.open_snapshot_at_entry(from_entry).await?;
|
||||
let deleted = deleted_ids_by_set_diff(storage, &from_ds, &to_ds, is_edge).await?;
|
||||
changes.extend(deleted);
|
||||
}
|
||||
|
||||
|
|
@ -300,13 +302,14 @@ async fn diff_table_cross_branch(
|
|||
is_edge: bool,
|
||||
filter: &ChangeFilter,
|
||||
) -> Result<Vec<EntityChange>> {
|
||||
let from_ds = table_store
|
||||
.open_snapshot_table(from_snap, table_key)
|
||||
let storage: &dyn TableStorage = table_store;
|
||||
let from_ds = storage
|
||||
.open_snapshot_at_table(from_snap, table_key)
|
||||
.await?;
|
||||
let to_ds = table_store.open_snapshot_table(to_snap, table_key).await?;
|
||||
let to_ds = storage.open_snapshot_at_table(to_snap, table_key).await?;
|
||||
|
||||
let from_rows = scan_all_rows_ordered(table_store, &from_ds, is_edge).await?;
|
||||
let to_rows = scan_all_rows_ordered(table_store, &to_ds, is_edge).await?;
|
||||
let from_rows = scan_all_rows_ordered(storage, &from_ds, is_edge).await?;
|
||||
let to_rows = scan_all_rows_ordered(storage, &to_ds, is_edge).await?;
|
||||
|
||||
let mut changes = Vec::new();
|
||||
let mut fi = 0;
|
||||
|
|
@ -392,8 +395,9 @@ async fn diff_table_added(
|
|||
if !filter.wants_op(ChangeOp::Insert) {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let ds = table_store.open_snapshot_table(to_snap, table_key).await?;
|
||||
let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?;
|
||||
let storage: &dyn TableStorage = table_store;
|
||||
let ds = storage.open_snapshot_at_table(to_snap, table_key).await?;
|
||||
let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?;
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|r| entity_change_from_row(&r, ChangeOp::Insert, is_edge))
|
||||
|
|
@ -410,10 +414,11 @@ async fn diff_table_removed(
|
|||
if !filter.wants_op(ChangeOp::Delete) {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let ds = table_store
|
||||
.open_snapshot_table(from_snap, table_key)
|
||||
let storage: &dyn TableStorage = table_store;
|
||||
let ds = storage
|
||||
.open_snapshot_at_table(from_snap, table_key)
|
||||
.await?;
|
||||
let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?;
|
||||
let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?;
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
|
||||
|
|
@ -424,12 +429,12 @@ async fn diff_table_removed(
|
|||
|
||||
/// Scan with a SQL filter, projecting specific columns.
|
||||
async fn scan_with_filter(
|
||||
table_store: &TableStore,
|
||||
ds: &lance::Dataset,
|
||||
storage: &dyn TableStorage,
|
||||
ds: &SnapshotHandle,
|
||||
cols: &[&str],
|
||||
filter_sql: &str,
|
||||
) -> Result<Vec<ScannedRow>> {
|
||||
let batches = table_store
|
||||
let batches = storage
|
||||
.scan(ds, Some(cols), Some(filter_sql), None)
|
||||
.await?;
|
||||
Ok(extract_rows(&batches))
|
||||
|
|
@ -437,11 +442,11 @@ async fn scan_with_filter(
|
|||
|
||||
/// Scan all rows ordered by id, projecting id (+ src/dst for edges) + all columns for signature.
|
||||
async fn scan_all_rows_ordered(
|
||||
table_store: &TableStore,
|
||||
ds: &lance::Dataset,
|
||||
storage: &dyn TableStorage,
|
||||
ds: &SnapshotHandle,
|
||||
is_edge: bool,
|
||||
) -> Result<Vec<ScannedRow>> {
|
||||
let batches = table_store
|
||||
let batches = storage
|
||||
.scan(
|
||||
ds,
|
||||
None,
|
||||
|
|
@ -454,9 +459,9 @@ async fn scan_all_rows_ordered(
|
|||
|
||||
/// Compute deleted IDs: scan id at from and to, set-difference.
|
||||
async fn deleted_ids_by_set_diff(
|
||||
table_store: &TableStore,
|
||||
from_ds: &lance::Dataset,
|
||||
to_ds: &lance::Dataset,
|
||||
storage: &dyn TableStorage,
|
||||
from_ds: &SnapshotHandle,
|
||||
to_ds: &SnapshotHandle,
|
||||
is_edge: bool,
|
||||
) -> Result<Vec<EntityChange>> {
|
||||
let cols: Vec<&str> = if is_edge {
|
||||
|
|
@ -465,8 +470,8 @@ async fn deleted_ids_by_set_diff(
|
|||
vec!["id"]
|
||||
};
|
||||
|
||||
let from_rows = scan_id_set(table_store, from_ds, &cols).await?;
|
||||
let to_ids: HashSet<String> = scan_id_set(table_store, to_ds, &["id"])
|
||||
let from_rows = scan_id_set(storage, from_ds, &cols).await?;
|
||||
let to_ids: HashSet<String> = scan_id_set(storage, to_ds, &["id"])
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|r| r.id)
|
||||
|
|
@ -480,11 +485,11 @@ async fn deleted_ids_by_set_diff(
|
|||
}
|
||||
|
||||
async fn scan_id_set(
|
||||
table_store: &TableStore,
|
||||
ds: &lance::Dataset,
|
||||
storage: &dyn TableStorage,
|
||||
ds: &SnapshotHandle,
|
||||
cols: &[&str],
|
||||
) -> Result<Vec<ScannedRow>> {
|
||||
let batches = table_store.scan(ds, Some(cols), None, None).await?;
|
||||
let batches = storage.scan(ds, Some(cols), None, None).await?;
|
||||
Ok(extract_rows(&batches))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
|
|||
use crate::error::{OmniError, Result};
|
||||
use crate::runtime_cache::RuntimeCache;
|
||||
use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
|
||||
use crate::storage_layer::SnapshotHandle;
|
||||
use crate::table_store::TableStore;
|
||||
|
||||
mod export;
|
||||
|
|
@ -569,18 +570,15 @@ impl Omnigraph {
|
|||
schema_apply::ensure_schema_apply_not_locked(self, operation).await
|
||||
}
|
||||
|
||||
pub(crate) fn table_store(&self) -> &TableStore {
|
||||
&self.table_store
|
||||
}
|
||||
|
||||
/// Engine-facing trait surface around `TableStore`.
|
||||
///
|
||||
/// This is the canonical accessor for newly-written engine code. The
|
||||
/// trait's signatures use opaque `SnapshotHandle` / `StagedHandle`
|
||||
/// instead of leaking `lance::Dataset` /
|
||||
/// `lance::dataset::transaction::Transaction`. Existing call sites
|
||||
/// that still use `db.table_store.X(...)` (the inherent struct
|
||||
/// methods) are migrated incrementally.
|
||||
/// This is the **only** accessor for engine code reaching into the
|
||||
/// storage layer. The trait's signatures use opaque `SnapshotHandle`
|
||||
/// / `StagedHandle` instead of leaking `lance::Dataset` /
|
||||
/// `lance::dataset::transaction::Transaction`, so newly-added engine
|
||||
/// call sites cannot drift the staged-write invariant by mistake
|
||||
/// (the trait's `stage_*` + `commit_staged` pair is the only way to
|
||||
/// land a write).
|
||||
pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
|
||||
&self.table_store
|
||||
}
|
||||
|
|
@ -1110,10 +1108,10 @@ impl Omnigraph {
|
|||
cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
|
||||
|
||||
for (table_key, table_path) in cleanup_targets {
|
||||
let dataset_uri = self.table_store.dataset_uri(&table_path);
|
||||
let dataset_uri = self.storage().dataset_uri(&table_path);
|
||||
let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
|
||||
{
|
||||
Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await,
|
||||
Ok(()) => self.storage().force_delete_branch(&dataset_uri, branch).await,
|
||||
Err(injected) => Err(injected),
|
||||
};
|
||||
if let Err(err) = outcome {
|
||||
|
|
@ -1339,7 +1337,7 @@ impl Omnigraph {
|
|||
&self,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(Dataset, String, Option<String>)> {
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
table_ops::open_for_mutation(self, table_key, op_kind).await
|
||||
}
|
||||
|
||||
|
|
@ -1348,7 +1346,7 @@ impl Omnigraph {
|
|||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(Dataset, String, Option<String>)> {
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
|
||||
}
|
||||
|
||||
|
|
@ -1359,7 +1357,7 @@ impl Omnigraph {
|
|||
source_branch: Option<&str>,
|
||||
source_version: u64,
|
||||
active_branch: &str,
|
||||
) -> Result<Dataset> {
|
||||
) -> Result<SnapshotHandle> {
|
||||
table_ops::fork_dataset_from_entry_state(
|
||||
self,
|
||||
table_key,
|
||||
|
|
@ -1378,7 +1376,7 @@ impl Omnigraph {
|
|||
table_branch: Option<&str>,
|
||||
expected_version: u64,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<Dataset> {
|
||||
) -> Result<SnapshotHandle> {
|
||||
table_ops::reopen_for_mutation(
|
||||
self,
|
||||
table_key,
|
||||
|
|
@ -1395,14 +1393,14 @@ impl Omnigraph {
|
|||
table_path: &str,
|
||||
table_branch: Option<&str>,
|
||||
table_version: u64,
|
||||
) -> Result<Dataset> {
|
||||
) -> Result<SnapshotHandle> {
|
||||
table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
|
||||
}
|
||||
|
||||
pub(crate) async fn build_indices_on_dataset(
|
||||
&self,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
ds: &mut SnapshotHandle,
|
||||
) -> Result<()> {
|
||||
table_ops::build_indices_on_dataset(self, table_key, ds).await
|
||||
}
|
||||
|
|
@ -1411,7 +1409,7 @@ impl Omnigraph {
|
|||
&self,
|
||||
catalog: &Catalog,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
ds: &mut SnapshotHandle,
|
||||
) -> Result<()> {
|
||||
table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
|
||||
}
|
||||
|
|
@ -2115,8 +2113,12 @@ edge WorksAt: Person -> Company
|
|||
|
||||
async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
|
||||
let snapshot = db.snapshot().await;
|
||||
let ds = snapshot.open(table_key).await.unwrap();
|
||||
let batches = db.table_store().scan_batches(&ds).await.unwrap();
|
||||
let ds = db
|
||||
.storage()
|
||||
.open_snapshot_at_table(&snapshot, table_key)
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = db.storage().scan_batches(&ds).await.unwrap();
|
||||
batches
|
||||
.into_iter()
|
||||
.flat_map(|batch| {
|
||||
|
|
@ -2128,11 +2130,11 @@ edge WorksAt: Person -> Company
|
|||
}
|
||||
|
||||
async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
|
||||
let (mut ds, full_path, table_branch) = db
|
||||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
|
||||
.await
|
||||
.unwrap();
|
||||
let schema: Arc<Schema> = Arc::new(ds.schema().into());
|
||||
let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
|
||||
let columns: Vec<Arc<dyn Array>> = schema
|
||||
.fields()
|
||||
.iter()
|
||||
|
|
@ -2144,9 +2146,9 @@ edge WorksAt: Person -> Company
|
|||
})
|
||||
.collect();
|
||||
let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
|
||||
let state = db
|
||||
.table_store()
|
||||
.append_batch(&full_path, &mut ds, batch)
|
||||
let (_new_ds, state) = db
|
||||
.storage()
|
||||
.append_batch(&full_path, ds, batch)
|
||||
.await
|
||||
.unwrap();
|
||||
db.commit_updates(&[crate::db::SubTableUpdate {
|
||||
|
|
@ -2280,8 +2282,12 @@ edge WorksAt: Person -> Company
|
|||
db.apply_schema(&desired).await.unwrap();
|
||||
|
||||
let snapshot = db.snapshot().await;
|
||||
let ds = snapshot.open("node:Person").await.unwrap();
|
||||
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
|
||||
let ds = db
|
||||
.storage()
|
||||
.open_snapshot_at_table(&snapshot, "node:Person")
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -2299,9 +2305,13 @@ edge WorksAt: Person -> Company
|
|||
db.apply_schema(&desired).await.unwrap();
|
||||
|
||||
let snapshot = db.snapshot().await;
|
||||
let ds = snapshot.open("node:Person").await.unwrap();
|
||||
assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
|
||||
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
|
||||
let ds = db
|
||||
.storage()
|
||||
.open_snapshot_at_table(&snapshot, "node:Person")
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(db.storage().has_btree_index(&ds, "id").await.unwrap());
|
||||
assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
|
|
@ -143,23 +143,23 @@ async fn export_table_to_writer<W: Write>(
|
|||
writer: &mut W,
|
||||
) -> Result<()> {
|
||||
let ds = db
|
||||
.table_store
|
||||
.open_snapshot_table(snapshot, table_key)
|
||||
.storage()
|
||||
.open_snapshot_at_table(snapshot, table_key)
|
||||
.await?;
|
||||
let ordering = Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]);
|
||||
let catalog = db.catalog();
|
||||
let blob_properties = blob_properties_for_table_key(&catalog, table_key)?;
|
||||
|
||||
if blob_properties.is_empty() {
|
||||
for batch in db.table_store.scan(&ds, None, None, ordering).await? {
|
||||
for batch in db.storage().scan(&ds, None, None, ordering).await? {
|
||||
write_export_rows_from_batch(db, table_key, &batch, None, writer)?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let batches = db
|
||||
.table_store
|
||||
.scan_with(&ds, None, None, ordering, true, |_| Ok(()))
|
||||
.storage()
|
||||
.scan_with_row_id(&ds, None, None, ordering, true)
|
||||
.await?;
|
||||
for batch in batches {
|
||||
let row_ids = batch
|
||||
|
|
@ -175,7 +175,13 @@ async fn export_table_to_writer<W: Write>(
|
|||
.iter()
|
||||
.copied()
|
||||
.collect::<Vec<_>>();
|
||||
let blob_values = export_blob_values(&ds, &batch, &row_ids, blob_properties).await?;
|
||||
// Blob materialization reaches through to the inner Lance
|
||||
// `Dataset` because `take_blobs` is a Lance-only API not lifted
|
||||
// onto the `TableStorage` trait surface (the trait covers
|
||||
// staged-write and snapshot-scan primitives; blob descriptor
|
||||
// materialization sits outside that surface).
|
||||
let blob_values =
|
||||
export_blob_values(ds.dataset(), &batch, &row_ids, blob_properties).await?;
|
||||
write_export_rows_from_batch(db, table_key, &batch, Some(&blob_values), writer)?;
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -183,7 +183,7 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
|
|||
}
|
||||
|
||||
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
|
||||
let table_store = &db.table_store;
|
||||
let storage = db.storage();
|
||||
|
||||
let stats: Vec<Result<TableOptimizeStats>> = futures::stream::iter(table_tasks.into_iter())
|
||||
.map(|(table_key, full_path, has_blob)| async move {
|
||||
|
|
@ -204,9 +204,16 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
|
|||
SkipReason::BlobColumnsUnsupportedByLance,
|
||||
));
|
||||
}
|
||||
let mut ds = table_store
|
||||
// `compact_files` is a Lance-only maintenance API that needs
|
||||
// `&mut Dataset`. The `TableStorage` trait deliberately does
|
||||
// not surface it (the staged-write invariant covers writes;
|
||||
// compaction is a separate concern). Unwrap the opaque
|
||||
// `SnapshotHandle` via `into_dataset()` (`pub(crate)` and
|
||||
// gated to the maintenance path).
|
||||
let handle = storage
|
||||
.open_dataset_head_for_write(&table_key, &full_path, None)
|
||||
.await?;
|
||||
let mut ds = handle.into_dataset();
|
||||
let version_before = ds.version().version;
|
||||
let metrics: CompactionMetrics =
|
||||
compact_files(&mut ds, CompactionOptions::default(), None)
|
||||
|
|
@ -282,7 +289,7 @@ pub async fn cleanup_all_tables(
|
|||
}
|
||||
|
||||
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
|
||||
let table_store = &db.table_store;
|
||||
let storage = db.storage();
|
||||
|
||||
// Fault-isolated per table: a single table's GC failure is recorded on its
|
||||
// stats row (`error: Some`) and logged, never aborting the healthy tables.
|
||||
|
|
@ -292,9 +299,13 @@ pub async fn cleanup_all_tables(
|
|||
.map(|(table_key, full_path)| async move {
|
||||
let outcome: Result<RemovalStats> = async {
|
||||
crate::failpoints::maybe_fail("cleanup.table_gc")?;
|
||||
let ds = table_store
|
||||
// `cleanup_old_versions` is a Lance-only maintenance API not
|
||||
// surfaced through `TableStorage` — see the optimize path
|
||||
// above for the same rationale. Unwrap via `into_dataset()`.
|
||||
let handle = storage
|
||||
.open_dataset_head_for_write(&table_key, &full_path, None)
|
||||
.await?;
|
||||
let ds = handle.into_dataset();
|
||||
let before_version = keep_versions
|
||||
.map(|n| ds.version().version.saturating_sub(n as u64))
|
||||
.filter(|v| *v > 0);
|
||||
|
|
@ -395,8 +406,9 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
|
|||
|
||||
// Per-table fault isolation: one table's transient failure is recorded and
|
||||
// logged, never aborting the rest of the sweep.
|
||||
let storage = db.storage();
|
||||
for (table_key, full_path) in table_targets {
|
||||
let listed = match db.table_store.list_branches(&full_path).await {
|
||||
let listed = match storage.list_branches(&full_path).await {
|
||||
Ok(listed) => listed,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
|
|
@ -411,7 +423,7 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
|
|||
};
|
||||
for branch in orphan_branches(listed, &keep) {
|
||||
let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") {
|
||||
Ok(()) => db.table_store.force_delete_branch(&full_path, &branch).await,
|
||||
Ok(()) => storage.force_delete_branch(&full_path, &branch).await,
|
||||
Err(injected) => Err(injected),
|
||||
};
|
||||
match outcome {
|
||||
|
|
|
|||
|
|
@ -355,7 +355,7 @@ where
|
|||
let entry = snapshot.entry(table_key)?;
|
||||
Some(crate::db::manifest::SidecarTablePin {
|
||||
table_key: table_key.clone(),
|
||||
table_path: db.table_store.dataset_uri(&entry.table_path),
|
||||
table_path: db.storage().dataset_uri(&entry.table_path),
|
||||
expected_version: entry.table_version,
|
||||
post_commit_pin: entry.table_version + 1,
|
||||
table_branch: entry.table_branch.clone(),
|
||||
|
|
@ -469,12 +469,14 @@ where
|
|||
|
||||
for table_key in &added_tables {
|
||||
let table_path = table_path_for_table_key(table_key)?;
|
||||
let dataset_uri = db.table_store.dataset_uri(&table_path);
|
||||
let dataset_uri = db.storage().dataset_uri(&table_path);
|
||||
let schema = schema_for_table_key(&desired_catalog, table_key)?;
|
||||
let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?;
|
||||
let mut ds = SnapshotHandle::new(
|
||||
TableStore::create_empty_dataset(&dataset_uri, &schema).await?,
|
||||
);
|
||||
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
|
||||
.await?;
|
||||
let state = db.table_store.table_state(&dataset_uri, &ds).await?;
|
||||
let state = db.storage().table_state(&dataset_uri, &ds).await?;
|
||||
table_registrations.insert(table_key.clone(), table_path);
|
||||
table_updates.insert(
|
||||
table_key.clone(),
|
||||
|
|
@ -496,7 +498,10 @@ where
|
|||
))
|
||||
})?;
|
||||
ensure_snapshot_entry_head_matches(db, source_entry).await?;
|
||||
let source_ds = snapshot.open(source_table_key).await?;
|
||||
let source_ds = db
|
||||
.storage()
|
||||
.open_snapshot_at_table(&snapshot, source_table_key)
|
||||
.await?;
|
||||
let current_catalog = db.catalog();
|
||||
let batch = batch_for_schema_apply_rewrite(
|
||||
db,
|
||||
|
|
@ -509,11 +514,13 @@ where
|
|||
)
|
||||
.await?;
|
||||
let table_path = table_path_for_table_key(target_table_key)?;
|
||||
let dataset_uri = db.table_store.dataset_uri(&table_path);
|
||||
let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?;
|
||||
let dataset_uri = db.storage().dataset_uri(&table_path);
|
||||
let mut target_ds = SnapshotHandle::new(
|
||||
TableStore::write_dataset(&dataset_uri, batch).await?,
|
||||
);
|
||||
db.build_indices_on_dataset_for_catalog(&desired_catalog, target_table_key, &mut target_ds)
|
||||
.await?;
|
||||
let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
|
||||
let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
|
||||
table_registrations.insert(target_table_key.clone(), table_path);
|
||||
table_updates.insert(
|
||||
target_table_key.clone(),
|
||||
|
|
@ -542,7 +549,10 @@ where
|
|||
))
|
||||
})?;
|
||||
ensure_snapshot_entry_head_matches(db, entry).await?;
|
||||
let source_ds = snapshot.open(table_key).await?;
|
||||
let source_ds = db
|
||||
.storage()
|
||||
.open_snapshot_at_table(&snapshot, table_key)
|
||||
.await?;
|
||||
let current_catalog = db.catalog();
|
||||
let batch = batch_for_schema_apply_rewrite(
|
||||
db,
|
||||
|
|
@ -554,7 +564,7 @@ where
|
|||
property_renames.get(table_key),
|
||||
)
|
||||
.await?;
|
||||
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
|
||||
let dataset_uri = db.storage().dataset_uri(&entry.table_path);
|
||||
// Route through stage_overwrite + commit_staged for non-empty
|
||||
// batches. Lance's `InsertBuilder::execute_uncommitted`
|
||||
// errors on empty data (lance-4.0.0 `src/dataset/write/insert.rs:144`),
|
||||
|
|
@ -564,7 +574,7 @@ where
|
|||
// — and schema_apply runs under `__schema_apply_lock__` so the
|
||||
// narrow inline-commit residual is bounded.
|
||||
let mut target_ds = if batch.num_rows() == 0 {
|
||||
TableStore::overwrite_dataset(&dataset_uri, batch).await?
|
||||
SnapshotHandle::new(TableStore::overwrite_dataset(&dataset_uri, batch).await?)
|
||||
} else {
|
||||
// Pass `entry.table_branch.as_deref()` (not `None`) for
|
||||
// consistency with the indexed_tables block below. Schema
|
||||
|
|
@ -574,17 +584,15 @@ where
|
|||
// means a future relaxation of the lock-check can't quietly
|
||||
// open the wrong HEAD here.
|
||||
let existing = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
|
||||
.await?;
|
||||
let staged = db.table_store.stage_overwrite(&existing, batch).await?;
|
||||
db.table_store
|
||||
.commit_staged(Arc::new(existing), staged.transaction)
|
||||
.await?
|
||||
let staged = db.storage().stage_overwrite(&existing, batch).await?;
|
||||
db.storage().commit_staged(existing, staged).await?
|
||||
};
|
||||
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
|
||||
.await?;
|
||||
let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
|
||||
let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
|
||||
table_updates.insert(
|
||||
table_key.clone(),
|
||||
crate::db::SubTableUpdate {
|
||||
|
|
@ -611,16 +619,16 @@ where
|
|||
))
|
||||
})?;
|
||||
ensure_snapshot_entry_head_matches(db, entry).await?;
|
||||
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
|
||||
let dataset_uri = db.storage().dataset_uri(&entry.table_path);
|
||||
let mut ds = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
|
||||
.await?;
|
||||
db.table_store
|
||||
db.storage()
|
||||
.ensure_expected_version(&ds, table_key, entry.table_version)?;
|
||||
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
|
||||
.await?;
|
||||
let state = db.table_store.table_state(&dataset_uri, &ds).await?;
|
||||
let state = db.storage().table_state(&dataset_uri, &ds).await?;
|
||||
table_updates.insert(
|
||||
table_key.clone(),
|
||||
crate::db::SubTableUpdate {
|
||||
|
|
@ -869,22 +877,22 @@ pub(super) async fn ensure_snapshot_entry_head_matches(
|
|||
db: &Omnigraph,
|
||||
entry: &SubTableEntry,
|
||||
) -> Result<()> {
|
||||
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
|
||||
let dataset_uri = db.storage().dataset_uri(&entry.table_path);
|
||||
let ds = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(
|
||||
&entry.table_key,
|
||||
&dataset_uri,
|
||||
entry.table_branch.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
db.table_store
|
||||
db.storage()
|
||||
.ensure_expected_version(&ds, &entry.table_key, entry.table_version)
|
||||
}
|
||||
|
||||
pub(super) async fn batch_for_schema_apply_rewrite(
|
||||
db: &Omnigraph,
|
||||
source_ds: &Dataset,
|
||||
source_ds: &SnapshotHandle,
|
||||
source_table_key: &str,
|
||||
source_catalog: &Catalog,
|
||||
target_table_key: &str,
|
||||
|
|
@ -896,11 +904,11 @@ pub(super) async fn batch_for_schema_apply_rewrite(
|
|||
let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?;
|
||||
let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty();
|
||||
let batches = if needs_row_ids {
|
||||
db.table_store()
|
||||
.scan_with(source_ds, None, None, None, true, |_| Ok(()))
|
||||
db.storage()
|
||||
.scan_with_row_id(source_ds, None, None, None, true)
|
||||
.await?
|
||||
} else {
|
||||
db.table_store().scan_batches(source_ds).await?
|
||||
db.storage().scan_batches(source_ds).await?
|
||||
};
|
||||
if batches.is_empty() {
|
||||
return Ok(RecordBatch::new_empty(target_schema));
|
||||
|
|
@ -970,7 +978,7 @@ pub(super) async fn batch_for_schema_apply_rewrite(
|
|||
|
||||
async fn rebuild_blob_column(
|
||||
_db: &Omnigraph,
|
||||
source_ds: &Dataset,
|
||||
source_ds: &SnapshotHandle,
|
||||
column_name: &str,
|
||||
descriptions: &StructArray,
|
||||
row_ids: &[u64],
|
||||
|
|
@ -990,7 +998,7 @@ async fn rebuild_blob_column(
|
|||
let blob_files = if non_null_row_ids.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
Arc::new(source_ds.clone())
|
||||
Arc::new(source_ds.dataset().clone())
|
||||
.take_blobs(&non_null_row_ids, column_name)
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
|
|
|
|||
|
|
@ -50,10 +50,10 @@ pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test(
|
|||
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
|
||||
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
|
||||
let ds = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(table_key, &full_path, table_branch)
|
||||
.await?;
|
||||
let state = db.table_store.table_state(&full_path, &ds).await?;
|
||||
let state = db.storage().table_state(&full_path, &ds).await?;
|
||||
let update = crate::db::SubTableUpdate {
|
||||
table_key: table_key.to_string(),
|
||||
table_version: state.version,
|
||||
|
|
@ -209,18 +209,18 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st
|
|||
}
|
||||
},
|
||||
None => (
|
||||
db.table_store
|
||||
db.storage()
|
||||
.open_dataset_head_for_write(&table_key, &full_path, None)
|
||||
.await?,
|
||||
None,
|
||||
),
|
||||
};
|
||||
let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0);
|
||||
let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0);
|
||||
if row_count > 0 {
|
||||
build_indices_on_dataset(db, &table_key, &mut ds).await?;
|
||||
}
|
||||
|
||||
let state = db.table_store.table_state(&full_path, &ds).await?;
|
||||
let state = db.storage().table_state(&full_path, &ds).await?;
|
||||
if state.version != entry.table_version
|
||||
|| resolved_branch.as_deref() != entry.table_branch.as_deref()
|
||||
{
|
||||
|
|
@ -257,18 +257,18 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st
|
|||
}
|
||||
},
|
||||
None => (
|
||||
db.table_store
|
||||
db.storage()
|
||||
.open_dataset_head_for_write(&table_key, &full_path, None)
|
||||
.await?,
|
||||
None,
|
||||
),
|
||||
};
|
||||
let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0);
|
||||
let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0);
|
||||
if row_count > 0 {
|
||||
build_indices_on_dataset(db, &table_key, &mut ds).await?;
|
||||
}
|
||||
|
||||
let state = db.table_store.table_state(&full_path, &ds).await?;
|
||||
let state = db.storage().table_state(&full_path, &ds).await?;
|
||||
if state.version != entry.table_version
|
||||
|| resolved_branch.as_deref() != entry.table_branch.as_deref()
|
||||
{
|
||||
|
|
@ -331,7 +331,7 @@ async fn needs_index_work_node(
|
|||
table_branch: Option<&str>,
|
||||
) -> Result<bool> {
|
||||
let ds = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(table_key, full_path, table_branch)
|
||||
.await?;
|
||||
// Empty tables are skipped by the ensure_indices loop, so they must
|
||||
|
|
@ -341,10 +341,10 @@ async fn needs_index_work_node(
|
|||
// Errors from count_rows are propagated: silently treating them as
|
||||
// "0 rows" risks skipping a table that is actually about to be
|
||||
// modified.
|
||||
if db.table_store.count_rows(&ds, None).await? == 0 {
|
||||
if db.storage().count_rows(&ds, None).await? == 0 {
|
||||
return Ok(false);
|
||||
}
|
||||
if !db.table_store.has_btree_index(&ds, "id").await? {
|
||||
if !db.storage().has_btree_index(&ds, "id").await? {
|
||||
return Ok(true);
|
||||
}
|
||||
let catalog = db.catalog();
|
||||
|
|
@ -360,11 +360,11 @@ async fn needs_index_work_node(
|
|||
continue;
|
||||
};
|
||||
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
|
||||
if !db.table_store.has_fts_index(&ds, prop_name).await? {
|
||||
if !db.storage().has_fts_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
}
|
||||
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
|
||||
if !db.table_store.has_vector_index(&ds, prop_name).await? {
|
||||
if !db.storage().has_vector_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
|
|
@ -389,22 +389,22 @@ async fn needs_index_work_edge(
|
|||
table_branch: Option<&str>,
|
||||
) -> Result<bool> {
|
||||
let ds = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(table_key, full_path, table_branch)
|
||||
.await?;
|
||||
if db.table_store.count_rows(&ds, None).await? == 0 {
|
||||
if db.storage().count_rows(&ds, None).await? == 0 {
|
||||
return Ok(false);
|
||||
}
|
||||
Ok(!db.table_store.has_btree_index(&ds, "id").await?
|
||||
|| !db.table_store.has_btree_index(&ds, "src").await?
|
||||
|| !db.table_store.has_btree_index(&ds, "dst").await?)
|
||||
Ok(!db.storage().has_btree_index(&ds, "id").await?
|
||||
|| !db.storage().has_btree_index(&ds, "src").await?
|
||||
|| !db.storage().has_btree_index(&ds, "dst").await?)
|
||||
}
|
||||
|
||||
pub(super) async fn open_for_mutation(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(Dataset, String, Option<String>)> {
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
let current_branch = db
|
||||
.coordinator
|
||||
.read()
|
||||
|
|
@ -425,7 +425,7 @@ pub(super) async fn open_for_mutation_on_branch(
|
|||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(Dataset, String, Option<String>)> {
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
db.ensure_schema_apply_not_locked("write").await?;
|
||||
let resolved = db.resolved_branch_target(branch).await?;
|
||||
let entry = resolved
|
||||
|
|
@ -436,11 +436,11 @@ pub(super) async fn open_for_mutation_on_branch(
|
|||
match resolved.branch.as_deref() {
|
||||
None => {
|
||||
let ds = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(table_key, &full_path, None)
|
||||
.await?;
|
||||
if op_kind.strict_pre_stage_version_check() {
|
||||
db.table_store
|
||||
db.storage()
|
||||
.ensure_expected_version(&ds, table_key, entry.table_version)?;
|
||||
}
|
||||
Ok((ds, full_path, None))
|
||||
|
|
@ -469,15 +469,15 @@ pub(super) async fn open_owned_dataset_for_branch_write(
|
|||
entry_version: u64,
|
||||
active_branch: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(Dataset, Option<String>)> {
|
||||
) -> Result<(SnapshotHandle, Option<String>)> {
|
||||
match entry_branch {
|
||||
Some(branch) if branch == active_branch => {
|
||||
let ds = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
|
||||
.await?;
|
||||
if op_kind.strict_pre_stage_version_check() {
|
||||
db.table_store
|
||||
db.storage()
|
||||
.ensure_expected_version(&ds, table_key, entry_version)?;
|
||||
}
|
||||
Ok((ds, Some(active_branch.to_string())))
|
||||
|
|
@ -509,11 +509,11 @@ pub(super) async fn open_owned_dataset_for_branch_write(
|
|||
)
|
||||
.await?;
|
||||
let ds = db
|
||||
.table_store
|
||||
.storage()
|
||||
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
|
||||
.await?;
|
||||
if op_kind.strict_pre_stage_version_check() {
|
||||
db.table_store
|
||||
db.storage()
|
||||
.ensure_expected_version(&ds, table_key, entry_version)?;
|
||||
}
|
||||
Ok((ds, Some(active_branch.to_string())))
|
||||
|
|
@ -528,8 +528,8 @@ pub(super) async fn fork_dataset_from_entry_state(
|
|||
source_branch: Option<&str>,
|
||||
source_version: u64,
|
||||
active_branch: &str,
|
||||
) -> Result<Dataset> {
|
||||
db.table_store
|
||||
) -> Result<SnapshotHandle> {
|
||||
db.storage()
|
||||
.fork_branch_from_state(
|
||||
full_path,
|
||||
source_branch,
|
||||
|
|
@ -547,10 +547,10 @@ pub(super) async fn reopen_for_mutation(
|
|||
table_branch: Option<&str>,
|
||||
expected_version: u64,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<Dataset> {
|
||||
) -> Result<SnapshotHandle> {
|
||||
db.ensure_schema_apply_not_locked("write").await?;
|
||||
if op_kind.strict_pre_stage_version_check() {
|
||||
db.table_store
|
||||
db.storage()
|
||||
.reopen_for_mutation(full_path, table_branch, table_key, expected_version)
|
||||
.await
|
||||
} else {
|
||||
|
|
@ -563,7 +563,7 @@ pub(super) async fn reopen_for_mutation(
|
|||
// genuine cross-process drift as 409. See
|
||||
// [`crate::db::MutationOpKind`] for the policy rationale.
|
||||
let _ = expected_version;
|
||||
db.table_store
|
||||
db.storage()
|
||||
.open_dataset_head_for_write(table_key, full_path, table_branch)
|
||||
.await
|
||||
}
|
||||
|
|
@ -574,8 +574,8 @@ pub(super) async fn open_dataset_at_state(
|
|||
table_path: &str,
|
||||
table_branch: Option<&str>,
|
||||
table_version: u64,
|
||||
) -> Result<Dataset> {
|
||||
db.table_store
|
||||
) -> Result<SnapshotHandle> {
|
||||
db.storage()
|
||||
.open_dataset_at_state(table_path, table_branch, table_version)
|
||||
.await
|
||||
}
|
||||
|
|
@ -583,7 +583,7 @@ pub(super) async fn open_dataset_at_state(
|
|||
pub(super) async fn build_indices_on_dataset(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
ds: &mut SnapshotHandle,
|
||||
) -> Result<()> {
|
||||
let catalog = db.catalog();
|
||||
build_indices_on_dataset_for_catalog(db, &catalog, table_key, ds).await
|
||||
|
|
@ -593,10 +593,10 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
db: &Omnigraph,
|
||||
catalog: &Catalog,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
ds: &mut SnapshotHandle,
|
||||
) -> Result<()> {
|
||||
if let Some(type_name) = table_key.strip_prefix("node:") {
|
||||
if !db.table_store.has_btree_index(ds, "id").await? {
|
||||
if !db.storage().has_btree_index(ds, "id").await? {
|
||||
stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
|
||||
}
|
||||
|
||||
|
|
@ -616,19 +616,20 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
let prop_name = &index_cols[0];
|
||||
if let Some(prop_type) = node_type.properties.get(prop_name) {
|
||||
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
|
||||
if !db.table_store.has_fts_index(ds, prop_name).await? {
|
||||
if !db.storage().has_fts_index(ds, prop_name).await? {
|
||||
stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
|
||||
.await?;
|
||||
}
|
||||
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
|
||||
if !db.table_store.has_vector_index(ds, prop_name).await? {
|
||||
if !db.storage().has_vector_index(ds, prop_name).await? {
|
||||
// Inline-commit residual: lance-4.0.0 does not
|
||||
// expose `build_index_metadata_from_segments` as
|
||||
// `pub`, so vector indices cannot be staged from
|
||||
// outside the lance crate. Document at the call
|
||||
// site; companion ticket to lance-format/lance#6658.
|
||||
db.table_store
|
||||
.create_vector_index(ds, prop_name.as_str())
|
||||
let new_snap = db
|
||||
.storage()
|
||||
.create_vector_index(ds.clone(), prop_name.as_str())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
|
|
@ -636,6 +637,7 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
table_key, prop_name, e
|
||||
))
|
||||
})?;
|
||||
*ds = new_snap;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -645,13 +647,13 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
}
|
||||
|
||||
if table_key.starts_with("edge:") {
|
||||
if !db.table_store.has_btree_index(ds, "id").await? {
|
||||
if !db.storage().has_btree_index(ds, "id").await? {
|
||||
stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
|
||||
}
|
||||
if !db.table_store.has_btree_index(ds, "src").await? {
|
||||
if !db.storage().has_btree_index(ds, "src").await? {
|
||||
stage_and_commit_btree(db, table_key, ds, &["src"]).await?;
|
||||
}
|
||||
if !db.table_store.has_btree_index(ds, "dst").await? {
|
||||
if !db.storage().has_btree_index(ds, "dst").await? {
|
||||
stage_and_commit_btree(db, table_key, ds, &["dst"]).await?;
|
||||
}
|
||||
return Ok(());
|
||||
|
|
@ -674,11 +676,11 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
async fn stage_and_commit_btree(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
ds: &mut SnapshotHandle,
|
||||
columns: &[&str],
|
||||
) -> Result<()> {
|
||||
let staged = db
|
||||
.table_store
|
||||
.storage()
|
||||
.stage_create_btree_index(ds, columns)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
|
@ -693,8 +695,8 @@ async fn stage_and_commit_btree(
|
|||
// yet called) leaves no Lance-HEAD drift on the touched table.
|
||||
crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?;
|
||||
let new_ds = db
|
||||
.table_store
|
||||
.commit_staged(Arc::new(ds.clone()), staged.transaction)
|
||||
.storage()
|
||||
.commit_staged(ds.clone(), staged)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
|
|
@ -711,11 +713,11 @@ async fn stage_and_commit_btree(
|
|||
async fn stage_and_commit_inverted(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
ds: &mut SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<()> {
|
||||
let staged = db
|
||||
.table_store
|
||||
.storage()
|
||||
.stage_create_inverted_index(ds, column)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
|
@ -725,8 +727,8 @@ async fn stage_and_commit_inverted(
|
|||
))
|
||||
})?;
|
||||
let new_ds = db
|
||||
.table_store
|
||||
.commit_staged(Arc::new(ds.clone()), staged.transaction)
|
||||
.storage()
|
||||
.commit_staged(ds.clone(), staged)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
|
|
@ -777,7 +779,7 @@ async fn prepare_updates_for_commit(
|
|||
)
|
||||
.await?;
|
||||
build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?;
|
||||
let state = db.table_store.table_state(&full_path, &ds).await?;
|
||||
let state = db.storage().table_state(&full_path, &ds).await?;
|
||||
prepared_update.table_version = state.version;
|
||||
prepared_update.row_count = state.row_count;
|
||||
prepared_update.version_metadata = state.version_metadata;
|
||||
|
|
|
|||
|
|
@ -928,7 +928,7 @@ async fn publish_adopted_source_state(
|
|||
target_branch,
|
||||
)
|
||||
.await?;
|
||||
let state = target_db.table_store().table_state(&full_path, &ds).await?;
|
||||
let state = target_db.storage().table_state(&full_path, &ds).await?;
|
||||
Ok(crate::db::SubTableUpdate {
|
||||
table_key: table_key.to_string(),
|
||||
table_version: state.version,
|
||||
|
|
@ -965,9 +965,13 @@ async fn publish_rewritten_merge_table(
|
|||
// commit point, narrowed from the previous "merge_insert + delete +
|
||||
// index" multi-step inline-commit chain.
|
||||
if let Some(delta) = &staged.delta_staged {
|
||||
// The staged delta dataset is a temp-dir Lance dataset used only
|
||||
// to collect the rewrite batches; wrap it in a `SnapshotHandle`
|
||||
// so we can route through the trait's `scan_batches_for_rewrite`.
|
||||
let delta_snapshot = SnapshotHandle::new(delta.dataset.clone());
|
||||
let batches: Vec<RecordBatch> = target_db
|
||||
.table_store()
|
||||
.scan_batches_for_rewrite(&delta.dataset)
|
||||
.storage()
|
||||
.scan_batches_for_rewrite(&delta_snapshot)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|batch| batch.num_rows() > 0)
|
||||
|
|
@ -982,7 +986,7 @@ async fn publish_rewritten_merge_table(
|
|||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
};
|
||||
let staged_merge = target_db
|
||||
.table_store()
|
||||
.storage()
|
||||
.stage_merge_insert(
|
||||
current_ds.clone(),
|
||||
combined,
|
||||
|
|
@ -992,8 +996,8 @@ async fn publish_rewritten_merge_table(
|
|||
)
|
||||
.await?;
|
||||
current_ds = target_db
|
||||
.table_store()
|
||||
.commit_staged(Arc::new(current_ds), staged_merge.transaction)
|
||||
.storage()
|
||||
.commit_staged(current_ds, staged_merge)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
|
@ -1014,10 +1018,11 @@ async fn publish_rewritten_merge_table(
|
|||
.map(|id| format!("'{}'", id.replace('\'', "''")))
|
||||
.collect();
|
||||
let filter = format!("id IN ({})", escaped.join(", "));
|
||||
target_db
|
||||
.table_store()
|
||||
.delete_where(&full_path, &mut current_ds, &filter)
|
||||
let (new_ds, _) = target_db
|
||||
.storage()
|
||||
.delete_where(&full_path, current_ds, &filter)
|
||||
.await?;
|
||||
current_ds = new_ds;
|
||||
}
|
||||
|
||||
// Phase 3: rebuild indices.
|
||||
|
|
@ -1028,7 +1033,7 @@ async fn publish_rewritten_merge_table(
|
|||
// (`build_index_metadata_from_segments` is `pub(crate)` in lance-
|
||||
// 4.0.0 — companion ticket to lance-format/lance#6658).
|
||||
let row_count = target_db
|
||||
.table_store()
|
||||
.storage()
|
||||
.table_state(&full_path, ¤t_ds)
|
||||
.await?
|
||||
.row_count;
|
||||
|
|
@ -1038,7 +1043,7 @@ async fn publish_rewritten_merge_table(
|
|||
.await?;
|
||||
}
|
||||
let final_state = target_db
|
||||
.table_store()
|
||||
.storage()
|
||||
.table_state(&full_path, ¤t_ds)
|
||||
.await?;
|
||||
|
||||
|
|
@ -1364,7 +1369,7 @@ impl Omnigraph {
|
|||
let entry = target_snapshot.entry(table_key)?;
|
||||
Some(crate::db::manifest::SidecarTablePin {
|
||||
table_key: table_key.clone(),
|
||||
table_path: self.table_store().dataset_uri(&entry.table_path),
|
||||
table_path: self.storage().dataset_uri(&entry.table_path),
|
||||
expected_version: entry.table_version,
|
||||
post_commit_pin: entry.table_version + 1,
|
||||
// Use the merge target branch (where commits actually
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ use crate::db::{ReadTarget, Snapshot};
|
|||
use crate::embedding::EmbeddingClient;
|
||||
use crate::error::{MergeConflict, MergeConflictKind, OmniError, Result};
|
||||
use crate::graph_index::GraphIndex;
|
||||
use crate::storage_layer::SnapshotHandle;
|
||||
use tempfile::{Builder as TempDirBuilder, TempDir};
|
||||
|
||||
mod merge;
|
||||
|
|
|
|||
|
|
@ -428,12 +428,11 @@ async fn ensure_node_id_exists(
|
|||
|
||||
let filter = format!("id = '{}'", id.replace('\'', "''"));
|
||||
let snapshot = db.snapshot_for_branch(branch).await?;
|
||||
let ds = snapshot.open(&table_key).await?;
|
||||
let exists = ds
|
||||
.count_rows(Some(filter))
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
> 0;
|
||||
let ds = db
|
||||
.storage()
|
||||
.open_snapshot_at_table(&snapshot, &table_key)
|
||||
.await?;
|
||||
let exists = db.storage().count_rows(&ds, Some(filter)).await? > 0;
|
||||
|
||||
if exists {
|
||||
Ok(())
|
||||
|
|
@ -601,7 +600,7 @@ async fn open_table_for_mutation(
|
|||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(Dataset, String, Option<String>)> {
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
if let Some(prior) = staging.inline_committed.get(table_key) {
|
||||
let path = staging.paths.get(table_key).ok_or_else(|| {
|
||||
OmniError::manifest_internal(format!(
|
||||
|
|
@ -623,7 +622,7 @@ async fn open_table_for_mutation(
|
|||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation_on_branch(branch, table_key, op_kind)
|
||||
.await?;
|
||||
let expected_version = ds.version().version;
|
||||
let expected_version = ds.version();
|
||||
staging.ensure_path(
|
||||
table_key,
|
||||
full_path.clone(),
|
||||
|
|
@ -1055,7 +1054,7 @@ impl Omnigraph {
|
|||
// and a chained `update where <pred>` can match a row whose
|
||||
// pending value no longer satisfies <pred>.
|
||||
let batches = self
|
||||
.table_store()
|
||||
.storage()
|
||||
.scan_with_pending(
|
||||
&ds,
|
||||
pending_batches,
|
||||
|
|
@ -1153,13 +1152,13 @@ impl Omnigraph {
|
|||
crate::db::MutationOpKind::Delete,
|
||||
)
|
||||
.await?;
|
||||
let initial_version = ds.version().version;
|
||||
let initial_version = ds.version();
|
||||
|
||||
// Scan matching IDs for cascade. Per D₂ this never overlaps with
|
||||
// staged inserts (mixed insert/delete in one query is rejected at
|
||||
// parse time), so we scan committed only.
|
||||
let batches = self
|
||||
.table_store()
|
||||
.storage()
|
||||
.scan(&ds, Some(&["id"]), Some(&pred_sql), None)
|
||||
.await?;
|
||||
|
||||
|
|
@ -1191,7 +1190,7 @@ impl Omnigraph {
|
|||
// deletes from coexisting in one query, so this advance of Lance
|
||||
// HEAD is the only HEAD movement during the query and the
|
||||
// publisher's CAS captures it intact.
|
||||
let mut ds = self
|
||||
let ds = self
|
||||
.reopen_for_mutation(
|
||||
&table_key,
|
||||
&full_path,
|
||||
|
|
@ -1201,9 +1200,9 @@ impl Omnigraph {
|
|||
)
|
||||
.await?;
|
||||
crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?;
|
||||
let delete_state = self
|
||||
.table_store()
|
||||
.delete_where(&full_path, &mut ds, &pred_sql)
|
||||
let (_new_ds, delete_state) = self
|
||||
.storage()
|
||||
.delete_where(&full_path, ds, &pred_sql)
|
||||
.await?;
|
||||
|
||||
staging.record_inline(crate::db::SubTableUpdate {
|
||||
|
|
@ -1242,7 +1241,7 @@ impl Omnigraph {
|
|||
|
||||
let edge_table_key = format!("edge:{}", edge_name);
|
||||
let cascade_filter = cascade_filters.join(" OR ");
|
||||
let (mut edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation(
|
||||
let (edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation(
|
||||
self,
|
||||
staging,
|
||||
branch,
|
||||
|
|
@ -1251,9 +1250,9 @@ impl Omnigraph {
|
|||
)
|
||||
.await?;
|
||||
|
||||
let edge_delete = self
|
||||
.table_store()
|
||||
.delete_where(&edge_full_path, &mut edge_ds, &cascade_filter)
|
||||
let (_new_edge_ds, edge_delete) = self
|
||||
.storage()
|
||||
.delete_where(&edge_full_path, edge_ds, &cascade_filter)
|
||||
.await?;
|
||||
|
||||
affected_edges += edge_delete.deleted_rows;
|
||||
|
|
@ -1290,7 +1289,7 @@ impl Omnigraph {
|
|||
let pred_sql = predicate_to_sql(predicate, params, true)?;
|
||||
|
||||
let table_key = format!("edge:{}", type_name);
|
||||
let (mut ds, full_path, table_branch) = open_table_for_mutation(
|
||||
let (ds, full_path, table_branch) = open_table_for_mutation(
|
||||
self,
|
||||
staging,
|
||||
branch,
|
||||
|
|
@ -1299,9 +1298,9 @@ impl Omnigraph {
|
|||
)
|
||||
.await?;
|
||||
|
||||
let delete_state = self
|
||||
.table_store()
|
||||
.delete_where(&full_path, &mut ds, &pred_sql)
|
||||
let (_new_ds, delete_state) = self
|
||||
.storage()
|
||||
.delete_where(&full_path, ds, &pred_sql)
|
||||
.await?;
|
||||
let affected = delete_state.deleted_rows;
|
||||
|
||||
|
|
@ -1355,7 +1354,7 @@ fn concat_match_batches_to_schema(
|
|||
/// dedup needed (`dedupe_key_column = None`).
|
||||
async fn validate_edge_cardinality_with_pending(
|
||||
db: &Omnigraph,
|
||||
committed_ds: &Dataset,
|
||||
committed_ds: &SnapshotHandle,
|
||||
staging: &MutationStaging,
|
||||
table_key: &str,
|
||||
edge_type: &omnigraph_compiler::catalog::EdgeType,
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ use std::sync::Arc;
|
|||
|
||||
use arrow_array::{Array, RecordBatch, StringArray, UInt32Array};
|
||||
use arrow_schema::SchemaRef;
|
||||
use lance::Dataset;
|
||||
use crate::storage_layer::{SnapshotHandle, StagedHandle};
|
||||
use omnigraph_compiler::catalog::EdgeType;
|
||||
|
||||
use crate::db::manifest::{
|
||||
|
|
@ -325,9 +325,9 @@ impl MutationStaging {
|
|||
// Stage produces uncommitted fragments + transaction. No
|
||||
// Lance HEAD advance until `commit_all` runs `commit_staged`.
|
||||
let staged = match table.mode {
|
||||
PendingMode::Append => db.table_store().stage_append(&ds, combined, &[]).await?,
|
||||
PendingMode::Append => db.storage().stage_append(&ds, combined, &[]).await?,
|
||||
PendingMode::Merge => {
|
||||
db.table_store()
|
||||
db.storage()
|
||||
.stage_merge_insert(
|
||||
ds.clone(),
|
||||
combined,
|
||||
|
|
@ -389,15 +389,17 @@ pub(crate) struct StagedMutation {
|
|||
}
|
||||
|
||||
/// Per-table state captured during `stage_all` and consumed by
|
||||
/// `commit_all`. Holds the opened `Dataset` so `commit_staged` doesn't
|
||||
/// re-open, and the `StagedWrite` whose `transaction` `commit_staged`
|
||||
/// will execute.
|
||||
/// `commit_all`. Holds the opened snapshot (so `commit_staged` doesn't
|
||||
/// re-open) plus the staged Lance transaction that `commit_staged`
|
||||
/// will execute. Both held as opaque `TableStorage` handles per MR-793
|
||||
/// §III.9 — the inner `lance::Dataset` / `StagedWrite` are not visible
|
||||
/// to engine code outside the storage layer.
|
||||
struct StagedTableEntry {
|
||||
table_key: String,
|
||||
path: StagedTablePath,
|
||||
expected_version: u64,
|
||||
dataset: lance::Dataset,
|
||||
staged_write: crate::table_store::StagedWrite,
|
||||
dataset: SnapshotHandle,
|
||||
staged_write: StagedHandle,
|
||||
}
|
||||
|
||||
impl StagedMutation {
|
||||
|
|
@ -648,11 +650,11 @@ impl StagedMutation {
|
|||
} = entry;
|
||||
|
||||
let new_ds = db
|
||||
.table_store()
|
||||
.commit_staged(Arc::new(dataset), staged_write.transaction)
|
||||
.storage()
|
||||
.commit_staged(dataset, staged_write)
|
||||
.await?;
|
||||
let state = db
|
||||
.table_store()
|
||||
.storage()
|
||||
.table_state(&path.full_path, &new_ds)
|
||||
.await?;
|
||||
updates.push(SubTableUpdate {
|
||||
|
|
@ -803,7 +805,7 @@ fn dedupe_merge_batches_by_id(
|
|||
/// `LoadMode::Merge` double-counts.
|
||||
pub(crate) async fn count_src_per_edge(
|
||||
db: &crate::db::Omnigraph,
|
||||
committed_ds: &Dataset,
|
||||
committed_ds: &SnapshotHandle,
|
||||
table_key: &str,
|
||||
staging: &MutationStaging,
|
||||
dedupe_key_column: Option<&str>,
|
||||
|
|
@ -841,7 +843,7 @@ pub(crate) async fn count_src_per_edge(
|
|||
_ => vec!["src"],
|
||||
};
|
||||
let committed = db
|
||||
.table_store()
|
||||
.storage()
|
||||
.scan(committed_ds, Some(&projection), None, None)
|
||||
.await?;
|
||||
for batch in &committed {
|
||||
|
|
|
|||
|
|
@ -418,7 +418,7 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
|
||||
.await?;
|
||||
let expected_version = ds.version().version;
|
||||
let expected_version = ds.version();
|
||||
staging.ensure_path(
|
||||
&table_key,
|
||||
full_path,
|
||||
|
|
@ -528,7 +528,7 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
|
||||
.await?;
|
||||
let expected_version = ds.version().version;
|
||||
let expected_version = ds.version();
|
||||
staging.ensure_path(
|
||||
&table_key,
|
||||
full_path,
|
||||
|
|
@ -1205,28 +1205,45 @@ async fn write_batch_to_dataset(
|
|||
LoadMode::Merge => crate::db::MutationOpKind::Merge,
|
||||
LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
|
||||
};
|
||||
let (mut ds, full_path, table_branch) = db
|
||||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation_on_branch(branch, table_key, op_kind)
|
||||
.await?;
|
||||
let table_store = db.table_store();
|
||||
|
||||
match mode {
|
||||
LoadMode::Overwrite => {
|
||||
let state = table_store
|
||||
.overwrite_batch(&full_path, &mut ds, batch)
|
||||
// Inline-commit residual: the Overwrite path here is the
|
||||
// legacy concurrent fast-path used by Phase 2 of the loader
|
||||
// (Append/Merge route through MutationStaging instead).
|
||||
// `overwrite_batch` advances Lance HEAD as a side effect;
|
||||
// there is no public two-phase overwrite that fits this
|
||||
// shape until Lance issues #6658/#6666 close.
|
||||
let (_new_ds, state) = db
|
||||
.storage()
|
||||
.overwrite_batch(&full_path, ds, batch)
|
||||
.await?;
|
||||
Ok((state, table_branch))
|
||||
}
|
||||
LoadMode::Append => {
|
||||
let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
|
||||
// Same residual class as Overwrite above. The staged-write
|
||||
// path is the `use_staging` branch in `load_with_actor`;
|
||||
// this concurrent path is the per-table fast-path retained
|
||||
// for parallelism. MR-793 Phase 9 will demote
|
||||
// `append_batch` to `pub(crate)` once this last consumer
|
||||
// moves to the staged primitive.
|
||||
let (_new_ds, state) = db
|
||||
.storage()
|
||||
.append_batch(&full_path, ds, batch)
|
||||
.await?;
|
||||
Ok((state, table_branch))
|
||||
}
|
||||
LoadMode::Merge => {
|
||||
let state = table_store
|
||||
.merge_insert_batch(
|
||||
// Same residual class as the other two arms.
|
||||
let state = db
|
||||
.storage()
|
||||
.merge_insert_batches(
|
||||
&full_path,
|
||||
ds,
|
||||
batch,
|
||||
vec![batch],
|
||||
vec!["id".to_string()],
|
||||
lance::dataset::WhenMatched::UpdateAll,
|
||||
lance::dataset::WhenNotMatched::InsertAll,
|
||||
|
|
@ -1596,7 +1613,7 @@ pub(crate) async fn validate_edge_cardinality(
|
|||
|
||||
// Scan src column, count per source
|
||||
let batches = db
|
||||
.table_store()
|
||||
.storage()
|
||||
.scan(&ds, Some(&["src"]), None, None)
|
||||
.await?;
|
||||
|
||||
|
|
@ -1725,7 +1742,7 @@ async fn collect_node_ids_with_pending(
|
|||
.await?;
|
||||
|
||||
let batches = db
|
||||
.table_store()
|
||||
.storage()
|
||||
.scan(&ds, Some(&["id"]), None, None)
|
||||
.await?;
|
||||
|
||||
|
|
@ -1794,7 +1811,7 @@ async fn collect_node_ids(
|
|||
.await?;
|
||||
|
||||
let batches = db
|
||||
.table_store()
|
||||
.storage()
|
||||
.scan(&ds, Some(&["id"]), None, None)
|
||||
.await?;
|
||||
|
||||
|
|
|
|||
|
|
@ -111,6 +111,17 @@ impl SnapshotHandle {
|
|||
self.inner
|
||||
}
|
||||
|
||||
/// Take ownership of the inner `Dataset` by unwrapping the `Arc`
|
||||
/// (or cloning if the snapshot is shared). `pub(crate)` — used
|
||||
/// only by the maintenance path (`optimize`, `cleanup`) which
|
||||
/// must hand `&mut Dataset` to Lance compaction / cleanup APIs
|
||||
/// that the `TableStorage` trait does not (and should not)
|
||||
/// surface. Engine code that participates in the staged-write
|
||||
/// invariant must stay on the trait methods.
|
||||
pub(crate) fn into_dataset(self) -> Dataset {
|
||||
Arc::try_unwrap(self.inner).unwrap_or_else(|arc| (*arc).clone())
|
||||
}
|
||||
|
||||
// ── public, lance-free accessors ──
|
||||
|
||||
/// Current Lance manifest version of the snapshot.
|
||||
|
|
@ -208,6 +219,20 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
|
|||
|
||||
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
|
||||
|
||||
/// Idempotent variant of `delete_branch` used by the best-effort fork
|
||||
/// reclaim under branch delete (`db/omnigraph.rs::cleanup_deleted_branch_tables`)
|
||||
/// and by the orphan-fork reconciler in `optimize`. Tolerates an
|
||||
/// already-absent branch (both Lance's `RefNotFound` and the local-store
|
||||
/// `NotFound` quirk on a missing `tree/{branch}/` dir). A still-referenced
|
||||
/// branch (`RefConflict`) still surfaces as `OmniError::Lance`.
|
||||
async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
|
||||
|
||||
/// List the named Lance branches present on the dataset at `dataset_uri`.
|
||||
/// The `cleanup` orphan reconciler diffs this against the manifest
|
||||
/// branch set to find orphaned per-table forks. `main`/default is not a
|
||||
/// named branch and never appears here.
|
||||
async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>>;
|
||||
|
||||
async fn reopen_for_mutation(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
|
|
@ -496,6 +521,14 @@ impl TableStorage for TableStore {
|
|||
TableStore::delete_branch(self, dataset_uri, branch).await
|
||||
}
|
||||
|
||||
async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
|
||||
TableStore::force_delete_branch(self, dataset_uri, branch).await
|
||||
}
|
||||
|
||||
async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
|
||||
TableStore::list_branches(self, dataset_uri).await
|
||||
}
|
||||
|
||||
async fn reopen_for_mutation(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
|
|
|
|||
|
|
@ -601,7 +601,16 @@ impl TableStore {
|
|||
})
|
||||
}
|
||||
|
||||
pub async fn append_batch(
|
||||
/// Legacy inline-commit append: writes fragments AND commits in one
|
||||
/// call, advancing Lance HEAD as a side effect. Demoted to
|
||||
/// `pub(crate)` by MR-793 Phase 9 — the staged primitive
|
||||
/// `stage_append` + `commit_staged` is the public engine surface;
|
||||
/// this one survives only as a residual called by
|
||||
/// `loader::write_batch_to_dataset` (LoadMode::Append concurrent
|
||||
/// fast-path) and the deprecated `merge_insert_batch` chain. Do not
|
||||
/// add new call sites — they re-introduce the multi-phase commit
|
||||
/// drift the trait surface was designed to eliminate.
|
||||
pub(crate) async fn append_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
ds: &mut Dataset,
|
||||
|
|
@ -656,7 +665,14 @@ impl TableStore {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn overwrite_batch(
|
||||
/// Legacy inline-commit overwrite: truncates then
|
||||
/// `append_batch`-commits, advancing Lance HEAD as a side effect.
|
||||
/// Demoted to `pub(crate)` by MR-793 Phase 9 — the staged primitive
|
||||
/// `stage_overwrite` + `commit_staged` is the public engine surface;
|
||||
/// this one survives only as the LoadMode::Overwrite concurrent
|
||||
/// fast-path inside `loader::write_batch_to_dataset`. Do not add new
|
||||
/// call sites.
|
||||
pub(crate) async fn overwrite_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
ds: &mut Dataset,
|
||||
|
|
@ -682,7 +698,14 @@ impl TableStore {
|
|||
.map_err(|e| OmniError::Lance(e.to_string()))
|
||||
}
|
||||
|
||||
pub async fn merge_insert_batch(
|
||||
/// Legacy inline-commit merge-insert (single batch). Demoted to
|
||||
/// `pub(crate)` by MR-793 Phase 9 — the staged primitive
|
||||
/// `stage_merge_insert` + `commit_staged` is the public engine
|
||||
/// surface; this one survives only as the body of
|
||||
/// `merge_insert_batches` (which is itself the loader's
|
||||
/// LoadMode::Merge concurrent fast-path). Do not add new call
|
||||
/// sites.
|
||||
pub(crate) async fn merge_insert_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
ds: Dataset,
|
||||
|
|
@ -759,7 +782,14 @@ impl TableStore {
|
|||
self.table_state(dataset_uri, &new_ds).await
|
||||
}
|
||||
|
||||
pub async fn merge_insert_batches(
|
||||
/// Legacy inline-commit merge-insert (multiple batches concatenated
|
||||
/// into one merge). Demoted to `pub(crate)` by MR-793 Phase 9 — the
|
||||
/// staged primitive `stage_merge_insert` + `commit_staged` is the
|
||||
/// public engine surface; this one survives only via the
|
||||
/// `TableStorage::merge_insert_batches` trait method, called by
|
||||
/// `loader::write_batch_to_dataset` (LoadMode::Merge concurrent
|
||||
/// fast-path). Do not add new call sites.
|
||||
pub(crate) async fn merge_insert_batches(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
ds: Dataset,
|
||||
|
|
@ -1448,7 +1478,17 @@ impl TableStore {
|
|||
}))
|
||||
}
|
||||
|
||||
pub async fn create_btree_index(&self, ds: &mut Dataset, columns: &[&str]) -> Result<()> {
|
||||
/// Legacy inline-commit BTREE scalar index build. Demoted to
|
||||
/// `pub(crate)` by MR-793 Phase 9 — the staged primitive
|
||||
/// `stage_create_btree_index` + `commit_staged` is the public engine
|
||||
/// surface; this one survives only as the body of the trait's
|
||||
/// inline-commit method (used by no engine call site today). Do not
|
||||
/// add new call sites.
|
||||
pub(crate) async fn create_btree_index(
|
||||
&self,
|
||||
ds: &mut Dataset,
|
||||
columns: &[&str],
|
||||
) -> Result<()> {
|
||||
let params = ScalarIndexParams::default();
|
||||
ds.create_index_builder(columns, IndexType::BTree, ¶ms)
|
||||
.replace(true)
|
||||
|
|
@ -1457,7 +1497,17 @@ impl TableStore {
|
|||
.map_err(|e| OmniError::Lance(e.to_string()))
|
||||
}
|
||||
|
||||
pub async fn create_inverted_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
|
||||
/// Legacy inline-commit INVERTED (FTS) scalar index build. Demoted
|
||||
/// to `pub(crate)` by MR-793 Phase 9 — the staged primitive
|
||||
/// `stage_create_inverted_index` + `commit_staged` is the public
|
||||
/// engine surface; this one survives only as the body of the
|
||||
/// trait's inline-commit method (used by no engine call site today).
|
||||
/// Do not add new call sites.
|
||||
pub(crate) async fn create_inverted_index(
|
||||
&self,
|
||||
ds: &mut Dataset,
|
||||
column: &str,
|
||||
) -> Result<()> {
|
||||
let params = InvertedIndexParams::default();
|
||||
ds.create_index_builder(&[column], IndexType::Inverted, ¶ms)
|
||||
.replace(true)
|
||||
|
|
|
|||
|
|
@ -29,15 +29,21 @@
|
|||
//! the cross-table manifest commit. Documented exception.
|
||||
//! - `crates/omnigraph/src/storage_layer.rs` — IS the trait module.
|
||||
//!
|
||||
//! ## Transitional allow-list
|
||||
//! ## Allow-list shape
|
||||
//!
|
||||
//! The migration of writers onto staged primitives is incremental.
|
||||
//! Several writers (ensure_indices, branch_merge, schema_apply rewrites)
|
||||
//! already route through the staged primitives; others (bulk loader,
|
||||
//! exec/mutation, exec/query) still use the legacy inherent
|
||||
//! `TableStore` methods — they're not visible at the trait boundary, but
|
||||
//! they DO call lance types. The file-level allow-list below reflects
|
||||
//! this transitional state and tightens as call sites migrate.
|
||||
//! After MR-854 (MR-793 Phase 1b + Phase 9), every engine call site
|
||||
//! reaches the storage layer through `db.storage()` (returns
|
||||
//! `&dyn TableStorage`). The inherent inline-commit methods on
|
||||
//! `TableStore` (`append_batch`, `merge_insert_batch{,es}`,
|
||||
//! `overwrite_batch`, `create_{btree,inverted}_index`, `truncate_table`)
|
||||
//! are now `pub(crate)`, so the only direct users are
|
||||
//! `table_store.rs` itself (which IS the storage layer) and the bulk
|
||||
//! loader's `LoadMode::{Append, Overwrite, Merge}` concurrent
|
||||
//! fast-paths in `loader::write_batch_to_dataset` (the loader uses the
|
||||
//! trait surface for the staged-write path and falls back to the
|
||||
//! demoted inherent methods only for the concurrent fast-path, which
|
||||
//! has no two-phase shape in Lance 4.0.0). The file-level allow-list
|
||||
//! below matches that boundary.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
|
|
|
|||
|
|
@ -109,16 +109,12 @@ MR-793's acceptance criterion §1 ("`TableStore` public API has no method that p
|
|||
|
||||
| Method on `TableStore` | Inline-commit reason | Closes when |
|
||||
|---|---|---|
|
||||
| `delete_where` | `DeleteJob` is `pub(crate)` in lance-4.0.0 — no public two-phase delete API | [lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658) lands and `stage_delete` joins the trait |
|
||||
| `create_vector_index` | Vector indices take Lance's "segment commit path"; the helper `build_index_metadata_from_segments` is `pub(crate)` | [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) lands and `stage_create_vector_index` joins the trait |
|
||||
| `append_batch` | Legacy inherent method; some engine call sites haven't migrated to `stage_append + commit_staged` yet | MR-793 Phase 1b (call-site conversion) + Phase 9 (demote to `pub(crate)`) |
|
||||
| `merge_insert_batch` / `merge_insert_batches` | Legacy inherent method | Same — Phase 1b + Phase 9 |
|
||||
| `overwrite_batch` | Legacy inherent method | Same — Phase 1b + Phase 9 |
|
||||
| `create_btree_index` (inherent) | Legacy inherent method (the migrated callers use `stage_create_btree_index` + `commit_staged`; the inherent stays for tests / un-migrated paths) | Same — Phase 1b + Phase 9 |
|
||||
| `create_inverted_index` (inherent) | Same | Same — Phase 1b + Phase 9 + index-class split (MR-848) |
|
||||
| `truncate_table` (inherent on `TableStore`) | Used by `overwrite_batch` internally | Phase 9 |
|
||||
| `delete_where` (trait) | `DeleteJob` is `pub(crate)` in lance-4.0.0 — no public two-phase delete API | [lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658) lands and `stage_delete` joins the trait |
|
||||
| `create_vector_index` (trait) | Vector indices take Lance's "segment commit path"; the helper `build_index_metadata_from_segments` is `pub(crate)` | [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) lands and `stage_create_vector_index` joins the trait |
|
||||
|
||||
After **lance#6658 + lance#6666 ship + MR-793 Phase 1b + MR-793 Phase 9 all complete**, the trait surface exposes only staged-write primitives + `commit_staged`. Until then this matrix names every residual explicitly, every call site carries a one-line residual comment, and no engine code outside `table_store.rs` is permitted to reach the inline-commit Lance APIs (enforced by the `tests/forbidden_apis.rs` guard).
|
||||
MR-854 (Phase 1b + Phase 9) closed the remaining residuals on the engine surface: every `db.table_store.X(...)` call site was converted to `db.storage().X(...)` (trait dispatch through `&dyn TableStorage`), and the inherent inline-commit methods on `TableStore` (`append_batch`, `merge_insert_batch`, `merge_insert_batches`, `overwrite_batch`, `create_btree_index`, `create_inverted_index`, `truncate_table`) were demoted from `pub` to `pub(crate)`. They survive only as the bulk loader's `LoadMode::{Append, Overwrite, Merge}` concurrent fast-paths (see "`LoadMode::Overwrite` residual" below) and as internal helpers for the staged primitives — no engine call site outside `table_store.rs` and `loader::write_batch_to_dataset` reaches them.
|
||||
|
||||
After **lance#6658 + lance#6666 ship**, the trait surface exposes only staged-write primitives + `commit_staged`. Until then this matrix names the two remaining residuals explicitly, every call site carries a one-line residual comment, and no engine code outside `table_store.rs` is permitted to reach the inline-commit Lance APIs (enforced by the `tests/forbidden_apis.rs` guard).
|
||||
|
||||
### `LoadMode::Overwrite` residual
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue