Merge remote-tracking branch 'origin/main' into feat/cluster-apply-stage3a

This commit is contained in:
aaltshuler 2026-06-10 00:45:03 +03:00
commit 69b63c33ac
33 changed files with 1250 additions and 1249 deletions

View file

@ -7,6 +7,7 @@ use lance::dataset::scanner::ColumnOrdering;
use crate::db::SubTableEntry;
use crate::db::manifest::Snapshot;
use crate::error::Result;
use crate::storage_layer::{SnapshotHandle, TableStorage};
use crate::table_store::TableStore;
// ─── Types ──────────────────────────────────────────────────────────────────
@ -229,7 +230,8 @@ async fn diff_table_same_lineage(
) -> Result<Vec<EntityChange>> {
let vf = from_entry.table_version;
let vt = to_entry.table_version;
let to_ds = table_store.open_at_entry(to_entry).await?;
let storage: &dyn TableStorage = table_store;
let to_ds = storage.open_snapshot_at_entry(to_entry).await?;
let cols: Vec<&str> = if is_edge {
vec!["id", "src", "dst", "_row_last_updated_at_version"]
@ -257,12 +259,12 @@ async fn diff_table_same_lineage(
"_row_last_updated_at_version > {} AND _row_last_updated_at_version <= {}",
vf, vt
);
let changed_rows = scan_with_filter(table_store, &to_ds, &cols, &filter_sql).await?;
let changed_rows = scan_with_filter(storage, &to_ds, &cols, &filter_sql).await?;
if !changed_rows.is_empty() {
// Build the set of IDs that existed at the from version
let from_ds = table_store.open_at_entry(from_entry).await?;
let from_ids: HashSet<String> = scan_id_set(table_store, &from_ds, &["id"])
let from_ds = storage.open_snapshot_at_entry(from_entry).await?;
let from_ids: HashSet<String> = scan_id_set(storage, &from_ds, &["id"])
.await?
.into_iter()
.map(|r| r.id)
@ -282,8 +284,8 @@ async fn diff_table_same_lineage(
// Deletes: ID set-difference
if wants_deletes {
let from_ds = table_store.open_at_entry(from_entry).await?;
let deleted = deleted_ids_by_set_diff(table_store, &from_ds, &to_ds, is_edge).await?;
let from_ds = storage.open_snapshot_at_entry(from_entry).await?;
let deleted = deleted_ids_by_set_diff(storage, &from_ds, &to_ds, is_edge).await?;
changes.extend(deleted);
}
@ -300,13 +302,14 @@ async fn diff_table_cross_branch(
is_edge: bool,
filter: &ChangeFilter,
) -> Result<Vec<EntityChange>> {
let from_ds = table_store
.open_snapshot_table(from_snap, table_key)
let storage: &dyn TableStorage = table_store;
let from_ds = storage
.open_snapshot_at_table(from_snap, table_key)
.await?;
let to_ds = table_store.open_snapshot_table(to_snap, table_key).await?;
let to_ds = storage.open_snapshot_at_table(to_snap, table_key).await?;
let from_rows = scan_all_rows_ordered(table_store, &from_ds, is_edge).await?;
let to_rows = scan_all_rows_ordered(table_store, &to_ds, is_edge).await?;
let from_rows = scan_all_rows_ordered(storage, &from_ds, is_edge).await?;
let to_rows = scan_all_rows_ordered(storage, &to_ds, is_edge).await?;
let mut changes = Vec::new();
let mut fi = 0;
@ -392,8 +395,9 @@ async fn diff_table_added(
if !filter.wants_op(ChangeOp::Insert) {
return Ok(Vec::new());
}
let ds = table_store.open_snapshot_table(to_snap, table_key).await?;
let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?;
let storage: &dyn TableStorage = table_store;
let ds = storage.open_snapshot_at_table(to_snap, table_key).await?;
let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?;
Ok(rows
.into_iter()
.map(|r| entity_change_from_row(&r, ChangeOp::Insert, is_edge))
@ -410,10 +414,11 @@ async fn diff_table_removed(
if !filter.wants_op(ChangeOp::Delete) {
return Ok(Vec::new());
}
let ds = table_store
.open_snapshot_table(from_snap, table_key)
let storage: &dyn TableStorage = table_store;
let ds = storage
.open_snapshot_at_table(from_snap, table_key)
.await?;
let rows = scan_all_rows_ordered(table_store, &ds, is_edge).await?;
let rows = scan_all_rows_ordered(storage, &ds, is_edge).await?;
Ok(rows
.into_iter()
.map(|r| entity_change_from_row(&r, ChangeOp::Delete, is_edge))
@ -424,12 +429,12 @@ async fn diff_table_removed(
/// Scan with a SQL filter, projecting specific columns.
async fn scan_with_filter(
table_store: &TableStore,
ds: &lance::Dataset,
storage: &dyn TableStorage,
ds: &SnapshotHandle,
cols: &[&str],
filter_sql: &str,
) -> Result<Vec<ScannedRow>> {
let batches = table_store
let batches = storage
.scan(ds, Some(cols), Some(filter_sql), None)
.await?;
Ok(extract_rows(&batches))
@ -437,11 +442,11 @@ async fn scan_with_filter(
/// Scan all rows ordered by id, projecting id (+ src/dst for edges) + all columns for signature.
async fn scan_all_rows_ordered(
table_store: &TableStore,
ds: &lance::Dataset,
storage: &dyn TableStorage,
ds: &SnapshotHandle,
is_edge: bool,
) -> Result<Vec<ScannedRow>> {
let batches = table_store
let batches = storage
.scan(
ds,
None,
@ -454,9 +459,9 @@ async fn scan_all_rows_ordered(
/// Compute deleted IDs: scan id at from and to, set-difference.
async fn deleted_ids_by_set_diff(
table_store: &TableStore,
from_ds: &lance::Dataset,
to_ds: &lance::Dataset,
storage: &dyn TableStorage,
from_ds: &SnapshotHandle,
to_ds: &SnapshotHandle,
is_edge: bool,
) -> Result<Vec<EntityChange>> {
let cols: Vec<&str> = if is_edge {
@ -465,8 +470,8 @@ async fn deleted_ids_by_set_diff(
vec!["id"]
};
let from_rows = scan_id_set(table_store, from_ds, &cols).await?;
let to_ids: HashSet<String> = scan_id_set(table_store, to_ds, &["id"])
let from_rows = scan_id_set(storage, from_ds, &cols).await?;
let to_ids: HashSet<String> = scan_id_set(storage, to_ds, &["id"])
.await?
.into_iter()
.map(|r| r.id)
@ -480,11 +485,11 @@ async fn deleted_ids_by_set_diff(
}
async fn scan_id_set(
table_store: &TableStore,
ds: &lance::Dataset,
storage: &dyn TableStorage,
ds: &SnapshotHandle,
cols: &[&str],
) -> Result<Vec<ScannedRow>> {
let batches = table_store.scan(ds, Some(cols), None, None).await?;
let batches = storage.scan(ds, Some(cols), None, None).await?;
Ok(extract_rows(&batches))
}

View file

@ -25,7 +25,7 @@
//! version. Pinned by
//! `tests/staged_writes.rs::lance_restore_appends_one_commit_with_checked_out_content`.
//! - `Dataset::restore` "wins" against concurrent Append/Update/Delete/
//! CreateIndex/Merge — see `check_restore_txn` at lance-4.0.0
//! CreateIndex/Merge — see `check_restore_txn` at lance-6.0.1
//! `src/io/commit/conflict_resolver.rs:986`. The hazard is documented
//! by `tests/staged_writes.rs::lance_restore_loses_to_concurrent_append_via_orphaning`.
//! This module sidesteps the hazard by running recovery only at

View file

@ -26,6 +26,7 @@ use crate::db::graph_coordinator::{GraphCoordinator, PublishedSnapshot};
use crate::error::{OmniError, Result};
use crate::runtime_cache::RuntimeCache;
use crate::storage::{StorageAdapter, join_uri, normalize_root_uri, storage_for_uri};
use crate::storage_layer::SnapshotHandle;
use crate::table_store::TableStore;
mod export;
@ -583,19 +584,30 @@ impl Omnigraph {
schema_apply::ensure_schema_apply_not_locked(self, operation).await
}
pub(crate) fn table_store(&self) -> &TableStore {
/// Engine-facing trait surface around `TableStore`.
///
/// This is the **only** accessor for engine code reaching into the
/// storage layer. The trait's signatures use opaque `SnapshotHandle`
/// / `StagedHandle` instead of leaking `lance::Dataset` /
/// `lance::dataset::transaction::Transaction`, so newly-added engine
/// call sites cannot drift the staged-write invariant by mistake
/// (the trait's `stage_*` + `commit_staged` pair is the only way to
/// land a write).
pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
&self.table_store
}
/// Engine-facing trait surface around `TableStore`.
///
/// This is the canonical accessor for newly-written engine code. The
/// trait's signatures use opaque `SnapshotHandle` / `StagedHandle`
/// instead of leaking `lance::Dataset` /
/// `lance::dataset::transaction::Transaction`. Existing call sites
/// that still use `db.table_store.X(...)` (the inherent struct
/// methods) are migrated incrementally.
pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
/// Inline-commit residual surface (`delete_where`,
/// `create_vector_index`) — the writes Lance cannot yet express as a
/// stage-then-commit pair. Deliberately separate from [`Self::storage`] so
/// the default storage surface is staged-only and a new writer cannot couple
/// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only
/// the handful of documented residual call sites (mutation/merge deletes,
/// vector-index build) use this accessor. See
/// `crate::storage_layer::InlineCommitResidual` for the per-method blocker.
pub(crate) fn storage_inline_residual(
&self,
) -> &dyn crate::storage_layer::InlineCommitResidual {
&self.table_store
}
@ -1055,19 +1067,24 @@ impl Omnigraph {
let snapshot = self.snapshot().await;
let table_key = format!("node:{}", type_name);
let ds = snapshot.open(&table_key).await?;
let handle = self
.storage()
.open_snapshot_at_table(&snapshot, &table_key)
.await?;
let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
let row_id = self
.table_store
.first_row_id_for_filter(&ds, &filter_sql)
.storage()
.first_row_id_for_filter(&handle, &filter_sql)
.await?
.ok_or_else(|| {
OmniError::manifest(format!("no {} with id '{}' found", type_name, id))
})?;
// Use take_blobs to get the BlobFile handle
let ds = Arc::new(ds);
// `take_blobs` is a Lance-specific blob accessor not surfaced
// through the `TableStorage` trait — reach the inner `Arc<Dataset>`
// via the `pub(crate)` accessor for this read-only call.
let ds = handle.into_arc();
let mut blobs = ds
.take_blobs(&[row_id], property)
.await
@ -1141,10 +1158,14 @@ impl Omnigraph {
cleanup_targets.sort_by(|left, right| left.0.cmp(&right.0));
for (table_key, table_path) in cleanup_targets {
let dataset_uri = self.table_store.dataset_uri(&table_path);
let dataset_uri = self.storage().dataset_uri(&table_path);
let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
{
Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await,
Ok(()) => {
self.storage()
.force_delete_branch(&dataset_uri, branch)
.await
}
Err(injected) => Err(injected),
};
if let Err(err) = outcome {
@ -1370,7 +1391,7 @@ impl Omnigraph {
&self,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(SnapshotHandle, String, Option<String>)> {
table_ops::open_for_mutation(self, table_key, op_kind).await
}
@ -1379,7 +1400,7 @@ impl Omnigraph {
branch: Option<&str>,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(SnapshotHandle, String, Option<String>)> {
table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
}
@ -1390,7 +1411,7 @@ impl Omnigraph {
source_branch: Option<&str>,
source_version: u64,
active_branch: &str,
) -> Result<Dataset> {
) -> Result<SnapshotHandle> {
table_ops::fork_dataset_from_entry_state(
self,
table_key,
@ -1409,7 +1430,7 @@ impl Omnigraph {
table_branch: Option<&str>,
expected_version: u64,
op_kind: crate::db::MutationOpKind,
) -> Result<Dataset> {
) -> Result<SnapshotHandle> {
table_ops::reopen_for_mutation(
self,
table_key,
@ -1426,14 +1447,14 @@ impl Omnigraph {
table_path: &str,
table_branch: Option<&str>,
table_version: u64,
) -> Result<Dataset> {
) -> Result<SnapshotHandle> {
table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
}
pub(crate) async fn build_indices_on_dataset(
&self,
table_key: &str,
ds: &mut Dataset,
ds: &mut SnapshotHandle,
) -> Result<()> {
table_ops::build_indices_on_dataset(self, table_key, ds).await
}
@ -1442,7 +1463,7 @@ impl Omnigraph {
&self,
catalog: &Catalog,
table_key: &str,
ds: &mut Dataset,
ds: &mut SnapshotHandle,
) -> Result<()> {
table_ops::build_indices_on_dataset_for_catalog(self, catalog, table_key, ds).await
}
@ -2139,8 +2160,12 @@ edge WorksAt: Person -> Company
async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
let snapshot = db.snapshot().await;
let ds = snapshot.open(table_key).await.unwrap();
let batches = db.table_store().scan_batches(&ds).await.unwrap();
let ds = db
.storage()
.open_snapshot_at_table(&snapshot, table_key)
.await
.unwrap();
let batches = db.storage().scan_batches(&ds).await.unwrap();
batches
.into_iter()
.flat_map(|batch| {
@ -2152,11 +2177,11 @@ edge WorksAt: Person -> Company
}
async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
let (mut ds, full_path, table_branch) = db
let (ds, full_path, table_branch) = db
.open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
.await
.unwrap();
let schema: Arc<Schema> = Arc::new(ds.schema().into());
let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
let columns: Vec<Arc<dyn Array>> = schema
.fields()
.iter()
@ -2168,9 +2193,11 @@ edge WorksAt: Person -> Company
})
.collect();
let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap();
let committed = db.storage().commit_staged(ds, staged).await.unwrap();
let state = db
.table_store()
.append_batch(&full_path, &mut ds, batch)
.storage()
.table_state(&full_path, &committed)
.await
.unwrap();
db.commit_updates(&[crate::db::SubTableUpdate {
@ -2354,8 +2381,12 @@ edge WorksAt: Person -> Company
db.apply_schema(&desired).await.unwrap();
let snapshot = db.snapshot().await;
let ds = snapshot.open("node:Person").await.unwrap();
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
let ds = db
.storage()
.open_snapshot_at_table(&snapshot, "node:Person")
.await
.unwrap();
assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
}
#[tokio::test]
@ -2373,9 +2404,13 @@ edge WorksAt: Person -> Company
db.apply_schema(&desired).await.unwrap();
let snapshot = db.snapshot().await;
let ds = snapshot.open("node:Person").await.unwrap();
assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
let ds = db
.storage()
.open_snapshot_at_table(&snapshot, "node:Person")
.await
.unwrap();
assert!(db.storage().has_btree_index(&ds, "id").await.unwrap());
assert!(db.storage().has_fts_index(&ds, "name").await.unwrap());
}
#[tokio::test]

View file

@ -60,12 +60,12 @@ async fn entity_from_snapshot(
}
let ds = db
.table_store
.open_snapshot_table(snapshot, table_key)
.storage()
.open_snapshot_at_table(snapshot, table_key)
.await?;
let filter_sql = format!("id = '{}'", id.replace('\'', "''"));
let batches = db
.table_store
.storage()
.scan(&ds, None, Some(&filter_sql), None)
.await?;
let Some(batch) = batches.iter().find(|batch| batch.num_rows() > 0) else {
@ -143,23 +143,23 @@ async fn export_table_to_writer<W: Write>(
writer: &mut W,
) -> Result<()> {
let ds = db
.table_store
.open_snapshot_table(snapshot, table_key)
.storage()
.open_snapshot_at_table(snapshot, table_key)
.await?;
let ordering = Some(vec![ColumnOrdering::asc_nulls_last("id".to_string())]);
let catalog = db.catalog();
let blob_properties = blob_properties_for_table_key(&catalog, table_key)?;
if blob_properties.is_empty() {
for batch in db.table_store.scan(&ds, None, None, ordering).await? {
for batch in db.storage().scan(&ds, None, None, ordering).await? {
write_export_rows_from_batch(db, table_key, &batch, None, writer)?;
}
return Ok(());
}
let batches = db
.table_store
.scan_with(&ds, None, None, ordering, true, |_| Ok(()))
.storage()
.scan_with_row_id(&ds, None, None, ordering, true)
.await?;
for batch in batches {
let row_ids = batch
@ -175,7 +175,13 @@ async fn export_table_to_writer<W: Write>(
.iter()
.copied()
.collect::<Vec<_>>();
let blob_values = export_blob_values(&ds, &batch, &row_ids, blob_properties).await?;
// Blob materialization reaches through to the inner Lance
// `Dataset` because `take_blobs` is a Lance-only API not lifted
// onto the `TableStorage` trait surface (the trait covers
// staged-write and snapshot-scan primitives; blob descriptor
// materialization sits outside that surface).
let blob_values =
export_blob_values(ds.dataset(), &batch, &row_ids, blob_properties).await?;
write_export_rows_from_batch(db, table_key, &batch, Some(&blob_values), writer)?;
}
Ok(())

View file

@ -317,10 +317,16 @@ async fn optimize_one_table(
.acquire_many(&[(table_key.clone(), None)])
.await;
let mut ds = db
.table_store
// `compact_files` is a Lance-only maintenance API that needs `&mut Dataset`.
// The `TableStorage` trait deliberately does not surface it (the staged-write
// invariant covers writes; compaction is a separate concern). Unwrap the
// opaque `SnapshotHandle` via `into_dataset()` (`pub(crate)`, gated to the
// maintenance path).
let handle = db
.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let mut ds = handle.into_dataset();
// CAS baseline: the table's current manifest version, read under the queue
// (in-memory coordinator snapshot, no storage I/O — stable for this section).
@ -408,7 +414,10 @@ async fn optimize_one_table(
// expected = the version observed under the queue). On failure the sidecar
// is intentionally left for the open-time recovery sweep to roll forward.
if committed {
let state = db.table_store.table_state(&full_path, &ds).await?;
// Re-wrap the post-compaction dataset to read its state through the
// trait surface (`table_state` is a read; no HEAD advance).
let snapshot = crate::storage_layer::SnapshotHandle::new(ds);
let state = db.storage().table_state(&full_path, &snapshot).await?;
let update = crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
@ -493,7 +502,7 @@ pub async fn cleanup_all_tables(
}
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let table_store = &db.table_store;
let storage = db.storage();
// Fault-isolated per table: a single table's GC failure is recorded on its
// stats row (`error: Some`) and logged, never aborting the healthy tables.
@ -503,9 +512,13 @@ pub async fn cleanup_all_tables(
.map(|(table_key, full_path)| async move {
let outcome: Result<RemovalStats> = async {
crate::failpoints::maybe_fail("cleanup.table_gc")?;
let ds = table_store
// `cleanup_old_versions` is a Lance-only maintenance API not
// surfaced through `TableStorage` — see the optimize path
// above for the same rationale. Unwrap via `into_dataset()`.
let handle = storage
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let ds = handle.into_dataset();
let before_version = keep_versions
.map(|n| ds.version().version.saturating_sub(n as u64))
.filter(|v| *v > 0);
@ -606,8 +619,9 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
// Per-table fault isolation: one table's transient failure is recorded and
// logged, never aborting the rest of the sweep.
let storage = db.storage();
for (table_key, full_path) in table_targets {
let listed = match db.table_store.list_branches(&full_path).await {
let listed = match storage.list_branches(&full_path).await {
Ok(listed) => listed,
Err(err) => {
tracing::warn!(
@ -622,7 +636,7 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
};
for branch in orphan_branches(listed, &keep) {
let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") {
Ok(()) => db.table_store.force_delete_branch(&full_path, &branch).await,
Ok(()) => storage.force_delete_branch(&full_path, &branch).await,
Err(injected) => Err(injected),
};
match outcome {

View file

@ -165,10 +165,15 @@ pub async fn repair_all_tables(db: &Omnigraph, options: RepairOptions) -> Result
let mut any_forced = false;
for (table_key, full_path) in table_tasks {
// `classify_drift` inspects raw Lance transaction history
// (`read_transaction_by_version`), a Lance-only maintenance read the
// staged-write trait does not surface. Open via `db.storage()` and
// unwrap the opaque handle (mirrors optimize / cleanup).
let ds = db
.table_store
.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
.await?
.into_dataset();
let manifest_version = snapshot
.entry(&table_key)
.map(|e| e.table_version)
@ -214,7 +219,10 @@ pub async fn repair_all_tables(db: &Omnigraph, options: RepairOptions) -> Result
};
if matches!(action, RepairAction::Healed | RepairAction::Forced) {
let state = db.table_store.table_state(&full_path, &ds).await?;
// Re-wrap the opened dataset to read its state through the trait
// surface (`table_state` is a read; no HEAD advance).
let snapshot = crate::storage_layer::SnapshotHandle::new(ds);
let state = db.storage().table_state(&full_path, &snapshot).await?;
updates.push(crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,

View file

@ -355,7 +355,7 @@ where
let entry = snapshot.entry(table_key)?;
Some(crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: db.table_store.dataset_uri(&entry.table_path),
table_path: db.storage().dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
@ -469,12 +469,13 @@ where
for table_key in &added_tables {
let table_path = table_path_for_table_key(table_key)?;
let dataset_uri = db.table_store.dataset_uri(&table_path);
let dataset_uri = db.storage().dataset_uri(&table_path);
let schema = schema_for_table_key(&desired_catalog, table_key)?;
let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?;
let mut ds =
SnapshotHandle::new(TableStore::create_empty_dataset(&dataset_uri, &schema).await?);
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &ds).await?;
let state = db.storage().table_state(&dataset_uri, &ds).await?;
table_registrations.insert(table_key.clone(), table_path);
table_updates.insert(
table_key.clone(),
@ -496,7 +497,10 @@ where
))
})?;
ensure_snapshot_entry_head_matches(db, source_entry).await?;
let source_ds = snapshot.open(source_table_key).await?;
let source_ds = db
.storage()
.open_snapshot_at_table(&snapshot, source_table_key)
.await?;
let current_catalog = db.catalog();
let batch = batch_for_schema_apply_rewrite(
db,
@ -509,11 +513,12 @@ where
)
.await?;
let table_path = table_path_for_table_key(target_table_key)?;
let dataset_uri = db.table_store.dataset_uri(&table_path);
let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?;
let dataset_uri = db.storage().dataset_uri(&table_path);
let mut target_ds =
SnapshotHandle::new(TableStore::write_dataset(&dataset_uri, batch).await?);
db.build_indices_on_dataset_for_catalog(&desired_catalog, target_table_key, &mut target_ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
table_registrations.insert(target_table_key.clone(), table_path);
table_updates.insert(
target_table_key.clone(),
@ -542,7 +547,10 @@ where
))
})?;
ensure_snapshot_entry_head_matches(db, entry).await?;
let source_ds = snapshot.open(table_key).await?;
let source_ds = db
.storage()
.open_snapshot_at_table(&snapshot, table_key)
.await?;
let current_catalog = db.catalog();
let batch = batch_for_schema_apply_rewrite(
db,
@ -554,37 +562,22 @@ where
property_renames.get(table_key),
)
.await?;
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
// Route through stage_overwrite + commit_staged for non-empty
// batches. Lance's `InsertBuilder::execute_uncommitted`
// errors on empty data (lance-4.0.0 `src/dataset/write/insert.rs:144`),
// so the empty-rewrite case stays on `overwrite_dataset` (which
// accepts empty input). The empty case is rare in schema_apply
// — it only fires when the source table itself was already empty
// — and schema_apply runs under `__schema_apply_lock__` so the
// narrow inline-commit residual is bounded.
let mut target_ds = if batch.num_rows() == 0 {
TableStore::overwrite_dataset(&dataset_uri, batch).await?
} else {
// Pass `entry.table_branch.as_deref()` (not `None`) for
// consistency with the indexed_tables block below. Schema
// apply runs under `__schema_apply_lock__` which today
// rejects non-main branches, so `entry.table_branch` is
// expected to be `None`. But the defensive passthrough
// means a future relaxation of the lock-check can't quietly
// open the wrong HEAD here.
let existing = db
.table_store
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
.await?;
let staged = db.table_store.stage_overwrite(&existing, batch).await?;
db.table_store
.commit_staged(Arc::new(existing), staged.transaction)
.await?
};
let dataset_uri = db.storage().dataset_uri(&entry.table_path);
// Pass `entry.table_branch.as_deref()` (not `None`) for
// consistency with the indexed_tables block below. Schema
// apply runs under `__schema_apply_lock__` which today rejects
// non-main branches, so `entry.table_branch` is expected to be
// `None`. But the defensive passthrough means a future relaxation
// of the lock-check can't quietly open the wrong HEAD here.
let existing = db
.storage()
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
.await?;
let staged = db.storage().stage_overwrite(&existing, batch).await?;
let mut target_ds = db.storage().commit_staged(existing, staged).await?;
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
@ -611,16 +604,16 @@ where
))
})?;
ensure_snapshot_entry_head_matches(db, entry).await?;
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
let dataset_uri = db.storage().dataset_uri(&entry.table_path);
let mut ds = db
.table_store
.storage()
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
.await?;
db.table_store
db.storage()
.ensure_expected_version(&ds, table_key, entry.table_version)?;
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &ds).await?;
let state = db.storage().table_state(&dataset_uri, &ds).await?;
table_updates.insert(
table_key.clone(),
crate::db::SubTableUpdate {
@ -869,22 +862,22 @@ pub(super) async fn ensure_snapshot_entry_head_matches(
db: &Omnigraph,
entry: &SubTableEntry,
) -> Result<()> {
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
let dataset_uri = db.storage().dataset_uri(&entry.table_path);
let ds = db
.table_store
.storage()
.open_dataset_head_for_write(
&entry.table_key,
&dataset_uri,
entry.table_branch.as_deref(),
)
.await?;
db.table_store
db.storage()
.ensure_expected_version(&ds, &entry.table_key, entry.table_version)
}
pub(super) async fn batch_for_schema_apply_rewrite(
db: &Omnigraph,
source_ds: &Dataset,
source_ds: &SnapshotHandle,
source_table_key: &str,
source_catalog: &Catalog,
target_table_key: &str,
@ -896,11 +889,11 @@ pub(super) async fn batch_for_schema_apply_rewrite(
let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?;
let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty();
let batches = if needs_row_ids {
db.table_store()
.scan_with(source_ds, None, None, None, true, |_| Ok(()))
db.storage()
.scan_with_row_id(source_ds, None, None, None, true)
.await?
} else {
db.table_store().scan_batches(source_ds).await?
db.storage().scan_batches(source_ds).await?
};
if batches.is_empty() {
return Ok(RecordBatch::new_empty(target_schema));
@ -970,7 +963,7 @@ pub(super) async fn batch_for_schema_apply_rewrite(
async fn rebuild_blob_column(
_db: &Omnigraph,
source_ds: &Dataset,
source_ds: &SnapshotHandle,
column_name: &str,
descriptions: &StructArray,
row_ids: &[u64],
@ -990,7 +983,7 @@ async fn rebuild_blob_column(
let blob_files = if non_null_row_ids.is_empty() {
Vec::new()
} else {
Arc::new(source_ds.clone())
Arc::new(source_ds.dataset().clone())
.take_blobs(&non_null_row_ids, column_name)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?

View file

@ -50,10 +50,10 @@ pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test(
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
let ds = db
.table_store
.storage()
.open_dataset_head_for_write(table_key, &full_path, table_branch)
.await?;
let state = db.table_store.table_state(&full_path, &ds).await?;
let state = db.storage().table_state(&full_path, &ds).await?;
let update = crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: state.version,
@ -209,18 +209,18 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st
}
},
None => (
db.table_store
db.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?,
None,
),
};
let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0);
let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0);
if row_count > 0 {
build_indices_on_dataset(db, &table_key, &mut ds).await?;
}
let state = db.table_store.table_state(&full_path, &ds).await?;
let state = db.storage().table_state(&full_path, &ds).await?;
if state.version != entry.table_version
|| resolved_branch.as_deref() != entry.table_branch.as_deref()
{
@ -257,18 +257,18 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st
}
},
None => (
db.table_store
db.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?,
None,
),
};
let row_count = db.table_store.count_rows(&ds, None).await.unwrap_or(0);
let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0);
if row_count > 0 {
build_indices_on_dataset(db, &table_key, &mut ds).await?;
}
let state = db.table_store.table_state(&full_path, &ds).await?;
let state = db.storage().table_state(&full_path, &ds).await?;
if state.version != entry.table_version
|| resolved_branch.as_deref() != entry.table_branch.as_deref()
{
@ -331,7 +331,7 @@ async fn needs_index_work_node(
table_branch: Option<&str>,
) -> Result<bool> {
let ds = db
.table_store
.storage()
.open_dataset_head_for_write(table_key, full_path, table_branch)
.await?;
// Empty tables are skipped by the ensure_indices loop, so they must
@ -341,10 +341,10 @@ async fn needs_index_work_node(
// Errors from count_rows are propagated: silently treating them as
// "0 rows" risks skipping a table that is actually about to be
// modified.
if db.table_store.count_rows(&ds, None).await? == 0 {
if db.storage().count_rows(&ds, None).await? == 0 {
return Ok(false);
}
if !db.table_store.has_btree_index(&ds, "id").await? {
if !db.storage().has_btree_index(&ds, "id").await? {
return Ok(true);
}
let catalog = db.catalog();
@ -360,11 +360,11 @@ async fn needs_index_work_node(
continue;
};
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
if !db.table_store.has_fts_index(&ds, prop_name).await? {
if !db.storage().has_fts_index(&ds, prop_name).await? {
return Ok(true);
}
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
if !db.table_store.has_vector_index(&ds, prop_name).await? {
if !db.storage().has_vector_index(&ds, prop_name).await? {
return Ok(true);
}
}
@ -389,22 +389,22 @@ async fn needs_index_work_edge(
table_branch: Option<&str>,
) -> Result<bool> {
let ds = db
.table_store
.storage()
.open_dataset_head_for_write(table_key, full_path, table_branch)
.await?;
if db.table_store.count_rows(&ds, None).await? == 0 {
if db.storage().count_rows(&ds, None).await? == 0 {
return Ok(false);
}
Ok(!db.table_store.has_btree_index(&ds, "id").await?
|| !db.table_store.has_btree_index(&ds, "src").await?
|| !db.table_store.has_btree_index(&ds, "dst").await?)
Ok(!db.storage().has_btree_index(&ds, "id").await?
|| !db.storage().has_btree_index(&ds, "src").await?
|| !db.storage().has_btree_index(&ds, "dst").await?)
}
pub(super) async fn open_for_mutation(
db: &Omnigraph,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(SnapshotHandle, String, Option<String>)> {
let current_branch = db
.coordinator
.read()
@ -425,7 +425,7 @@ pub(super) async fn open_for_mutation_on_branch(
branch: Option<&str>,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(SnapshotHandle, String, Option<String>)> {
db.ensure_schema_apply_not_locked("write").await?;
let resolved = db.resolved_branch_target(branch).await?;
let entry = resolved
@ -436,11 +436,11 @@ pub(super) async fn open_for_mutation_on_branch(
match resolved.branch.as_deref() {
None => {
let ds = db
.table_store
.storage()
.open_dataset_head_for_write(table_key, &full_path, None)
.await?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
db.storage()
.ensure_expected_version(&ds, table_key, entry.table_version)?;
}
Ok((ds, full_path, None))
@ -469,15 +469,15 @@ pub(super) async fn open_owned_dataset_for_branch_write(
entry_version: u64,
active_branch: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, Option<String>)> {
) -> Result<(SnapshotHandle, Option<String>)> {
match entry_branch {
Some(branch) if branch == active_branch => {
let ds = db
.table_store
.storage()
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
.await?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
db.storage()
.ensure_expected_version(&ds, table_key, entry_version)?;
}
Ok((ds, Some(active_branch.to_string())))
@ -509,11 +509,11 @@ pub(super) async fn open_owned_dataset_for_branch_write(
)
.await?;
let ds = db
.table_store
.storage()
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
.await?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
db.storage()
.ensure_expected_version(&ds, table_key, entry_version)?;
}
Ok((ds, Some(active_branch.to_string())))
@ -528,8 +528,8 @@ pub(super) async fn fork_dataset_from_entry_state(
source_branch: Option<&str>,
source_version: u64,
active_branch: &str,
) -> Result<Dataset> {
db.table_store
) -> Result<SnapshotHandle> {
db.storage()
.fork_branch_from_state(
full_path,
source_branch,
@ -547,10 +547,10 @@ pub(super) async fn reopen_for_mutation(
table_branch: Option<&str>,
expected_version: u64,
op_kind: crate::db::MutationOpKind,
) -> Result<Dataset> {
) -> Result<SnapshotHandle> {
db.ensure_schema_apply_not_locked("write").await?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
db.storage()
.reopen_for_mutation(full_path, table_branch, table_key, expected_version)
.await
} else {
@ -563,7 +563,7 @@ pub(super) async fn reopen_for_mutation(
// genuine cross-process drift as 409. See
// [`crate::db::MutationOpKind`] for the policy rationale.
let _ = expected_version;
db.table_store
db.storage()
.open_dataset_head_for_write(table_key, full_path, table_branch)
.await
}
@ -574,8 +574,8 @@ pub(super) async fn open_dataset_at_state(
table_path: &str,
table_branch: Option<&str>,
table_version: u64,
) -> Result<Dataset> {
db.table_store
) -> Result<SnapshotHandle> {
db.storage()
.open_dataset_at_state(table_path, table_branch, table_version)
.await
}
@ -583,7 +583,7 @@ pub(super) async fn open_dataset_at_state(
pub(super) async fn build_indices_on_dataset(
db: &Omnigraph,
table_key: &str,
ds: &mut Dataset,
ds: &mut SnapshotHandle,
) -> Result<()> {
let catalog = db.catalog();
build_indices_on_dataset_for_catalog(db, &catalog, table_key, ds).await
@ -593,10 +593,10 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
db: &Omnigraph,
catalog: &Catalog,
table_key: &str,
ds: &mut Dataset,
ds: &mut SnapshotHandle,
) -> Result<()> {
if let Some(type_name) = table_key.strip_prefix("node:") {
if !db.table_store.has_btree_index(ds, "id").await? {
if !db.storage().has_btree_index(ds, "id").await? {
stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
}
@ -616,19 +616,20 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
let prop_name = &index_cols[0];
if let Some(prop_type) = node_type.properties.get(prop_name) {
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
if !db.table_store.has_fts_index(ds, prop_name).await? {
if !db.storage().has_fts_index(ds, prop_name).await? {
stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
.await?;
}
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
if !db.table_store.has_vector_index(ds, prop_name).await? {
// Inline-commit residual: lance-4.0.0 does not
if !db.storage().has_vector_index(ds, prop_name).await? {
// Inline-commit residual: lance-6.0.1 does not
// expose `build_index_metadata_from_segments` as
// `pub`, so vector indices cannot be staged from
// outside the lance crate. Document at the call
// site; companion ticket to lance-format/lance#6658.
db.table_store
.create_vector_index(ds, prop_name.as_str())
let new_snap = db
.storage_inline_residual()
.create_vector_index(ds.clone(), prop_name.as_str())
.await
.map_err(|e| {
OmniError::Lance(format!(
@ -636,6 +637,7 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
table_key, prop_name, e
))
})?;
*ds = new_snap;
}
}
}
@ -645,13 +647,13 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
}
if table_key.starts_with("edge:") {
if !db.table_store.has_btree_index(ds, "id").await? {
if !db.storage().has_btree_index(ds, "id").await? {
stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
}
if !db.table_store.has_btree_index(ds, "src").await? {
if !db.storage().has_btree_index(ds, "src").await? {
stage_and_commit_btree(db, table_key, ds, &["src"]).await?;
}
if !db.table_store.has_btree_index(ds, "dst").await? {
if !db.storage().has_btree_index(ds, "dst").await? {
stage_and_commit_btree(db, table_key, ds, &["dst"]).await?;
}
return Ok(());
@ -674,11 +676,11 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
async fn stage_and_commit_btree(
db: &Omnigraph,
table_key: &str,
ds: &mut Dataset,
ds: &mut SnapshotHandle,
columns: &[&str],
) -> Result<()> {
let staged = db
.table_store
.storage()
.stage_create_btree_index(ds, columns)
.await
.map_err(|e| {
@ -693,8 +695,8 @@ async fn stage_and_commit_btree(
// yet called) leaves no Lance-HEAD drift on the touched table.
crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?;
let new_ds = db
.table_store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.storage()
.commit_staged(ds.clone(), staged)
.await
.map_err(|e| {
OmniError::Lance(format!(
@ -711,11 +713,11 @@ async fn stage_and_commit_btree(
async fn stage_and_commit_inverted(
db: &Omnigraph,
table_key: &str,
ds: &mut Dataset,
ds: &mut SnapshotHandle,
column: &str,
) -> Result<()> {
let staged = db
.table_store
.storage()
.stage_create_inverted_index(ds, column)
.await
.map_err(|e| {
@ -725,8 +727,8 @@ async fn stage_and_commit_inverted(
))
})?;
let new_ds = db
.table_store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.storage()
.commit_staged(ds.clone(), staged)
.await
.map_err(|e| {
OmniError::Lance(format!(
@ -777,7 +779,7 @@ async fn prepare_updates_for_commit(
)
.await?;
build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?;
let state = db.table_store.table_state(&full_path, &ds).await?;
let state = db.storage().table_state(&full_path, &ds).await?;
prepared_update.table_version = state.version;
prepared_update.row_count = state.row_count;
prepared_update.version_metadata = state.version_metadata;

View file

@ -926,7 +926,7 @@ async fn publish_adopted_source_state(
target_branch,
)
.await?;
let state = target_db.table_store().table_state(&full_path, &ds).await?;
let state = target_db.storage().table_state(&full_path, &ds).await?;
Ok(crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: state.version,
@ -963,9 +963,13 @@ async fn publish_rewritten_merge_table(
// commit point, narrowed from the previous "merge_insert + delete +
// index" multi-step inline-commit chain.
if let Some(delta) = &staged.delta_staged {
// The staged delta dataset is a temp-dir Lance dataset used only
// to collect the rewrite batches; wrap it in a `SnapshotHandle`
// so we can route through the trait's `scan_batches_for_rewrite`.
let delta_snapshot = SnapshotHandle::new(delta.dataset.clone());
let batches: Vec<RecordBatch> = target_db
.table_store()
.scan_batches_for_rewrite(&delta.dataset)
.storage()
.scan_batches_for_rewrite(&delta_snapshot)
.await?
.into_iter()
.filter(|batch| batch.num_rows() > 0)
@ -980,7 +984,7 @@ async fn publish_rewritten_merge_table(
.map_err(|e| OmniError::Lance(e.to_string()))?
};
let staged_merge = target_db
.table_store()
.storage()
.stage_merge_insert(
current_ds.clone(),
combined,
@ -990,15 +994,15 @@ async fn publish_rewritten_merge_table(
)
.await?;
current_ds = target_db
.table_store()
.commit_staged(Arc::new(current_ds), staged_merge.transaction)
.storage()
.commit_staged(current_ds, staged_merge)
.await?;
}
}
// Phase 2: delete removed rows via deletion vectors.
//
// INLINE-COMMIT RESIDUAL: lance-4.0.0 does not expose a public
// INLINE-COMMIT RESIDUAL: lance-6.0.1 does not expose a public
// two-phase delete API (DeleteJob is `pub(crate)` —
// lance-format/lance#6658 is open with no PRs). We deliberately do
// NOT introduce a `stage_delete` wrapper that would secretly
@ -1012,10 +1016,11 @@ async fn publish_rewritten_merge_table(
.map(|id| format!("'{}'", id.replace('\'', "''")))
.collect();
let filter = format!("id IN ({})", escaped.join(", "));
target_db
.table_store()
.delete_where(&full_path, &mut current_ds, &filter)
let (new_ds, _) = target_db
.storage_inline_residual()
.delete_where(&full_path, current_ds, &filter)
.await?;
current_ds = new_ds;
}
// Phase 3: rebuild indices.
@ -1024,9 +1029,9 @@ async fn publish_rewritten_merge_table(
// `stage_create_inverted_index` + `commit_staged` for scalar
// indices. Vector indices remain inline-commit
// (`build_index_metadata_from_segments` is `pub(crate)` in lance-
// 4.0.0 — companion ticket to lance-format/lance#6658).
// 6.0.1 — companion ticket to lance-format/lance#6666).
let row_count = target_db
.table_store()
.storage()
.table_state(&full_path, &current_ds)
.await?
.row_count;
@ -1036,7 +1041,7 @@ async fn publish_rewritten_merge_table(
.await?;
}
let final_state = target_db
.table_store()
.storage()
.table_state(&full_path, &current_ds)
.await?;
@ -1362,7 +1367,7 @@ impl Omnigraph {
let entry = target_snapshot.entry(table_key)?;
Some(crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: self.table_store().dataset_uri(&entry.table_path),
table_path: self.storage().dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
// Use the merge target branch (where commits actually

View file

@ -40,6 +40,7 @@ use crate::db::{ReadTarget, Snapshot};
use crate::embedding::EmbeddingClient;
use crate::error::{MergeConflict, MergeConflictKind, OmniError, Result};
use crate::graph_index::GraphIndex;
use crate::storage_layer::SnapshotHandle;
use tempfile::{Builder as TempDirBuilder, TempDir};
mod merge;

View file

@ -428,12 +428,11 @@ async fn ensure_node_id_exists(
let filter = format!("id = '{}'", id.replace('\'', "''"));
let snapshot = db.snapshot_for_branch(branch).await?;
let ds = snapshot.open(&table_key).await?;
let exists = ds
.count_rows(Some(filter))
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
> 0;
let ds = db
.storage()
.open_snapshot_at_table(&snapshot, &table_key)
.await?;
let exists = db.storage().count_rows(&ds, Some(filter)).await? > 0;
if exists {
Ok(())
@ -602,7 +601,7 @@ async fn open_table_for_mutation(
branch: Option<&str>,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(SnapshotHandle, String, Option<String>)> {
if let Some(prior) = staging.inline_committed.get(table_key) {
let path = staging.paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
@ -624,7 +623,7 @@ async fn open_table_for_mutation(
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, table_key, op_kind)
.await?;
let expected_version = ds.version().version;
let expected_version = ds.version();
staging.ensure_path(
table_key,
full_path.clone(),
@ -640,7 +639,7 @@ async fn open_table_for_mutation(
///
/// Reason: under the staged-write writer, inserts and updates
/// accumulate in memory and commit at end-of-query, while deletes still
/// inline-commit (Lance lacks a public two-phase delete in 4.0.0).
/// inline-commit (Lance lacks a public two-phase delete in 6.0.1).
/// Mixing creates ordering hazards (same-row insert→delete becomes a no-op
/// because the staged insert isn't visible to delete; cascading deletes
/// of just-inserted edges break referential integrity by silent design).
@ -1056,7 +1055,7 @@ impl Omnigraph {
// and a chained `update where <pred>` can match a row whose
// pending value no longer satisfies <pred>.
let batches = self
.table_store()
.storage()
.scan_with_pending(
&ds,
pending_batches,
@ -1154,13 +1153,13 @@ impl Omnigraph {
crate::db::MutationOpKind::Delete,
)
.await?;
let initial_version = ds.version().version;
let initial_version = ds.version();
// Scan matching IDs for cascade. Per D₂ this never overlaps with
// staged inserts (mixed insert/delete in one query is rejected at
// parse time), so we scan committed only.
let batches = self
.table_store()
.storage()
.scan(&ds, Some(&["id"]), Some(&pred_sql), None)
.await?;
@ -1188,11 +1187,11 @@ impl Omnigraph {
let affected_nodes = deleted_ids.len();
// Delete nodes — still inline-commit (Lance's `Dataset::delete` is
// not exposed as a two-phase op in 4.0.0). D₂ keeps inserts and
// not exposed as a two-phase op in 6.0.1). D₂ keeps inserts and
// deletes from coexisting in one query, so this advance of Lance
// HEAD is the only HEAD movement during the query and the
// publisher's CAS captures it intact.
let mut ds = self
let ds = self
.reopen_for_mutation(
&table_key,
&full_path,
@ -1202,9 +1201,9 @@ impl Omnigraph {
)
.await?;
crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?;
let delete_state = self
.table_store()
.delete_where(&full_path, &mut ds, &pred_sql)
let (_new_ds, delete_state) = self
.storage_inline_residual()
.delete_where(&full_path, ds, &pred_sql)
.await?;
staging.record_inline(crate::db::SubTableUpdate {
@ -1243,7 +1242,7 @@ impl Omnigraph {
let edge_table_key = format!("edge:{}", edge_name);
let cascade_filter = cascade_filters.join(" OR ");
let (mut edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation(
let (edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation(
self,
staging,
branch,
@ -1252,9 +1251,9 @@ impl Omnigraph {
)
.await?;
let edge_delete = self
.table_store()
.delete_where(&edge_full_path, &mut edge_ds, &cascade_filter)
let (_new_edge_ds, edge_delete) = self
.storage_inline_residual()
.delete_where(&edge_full_path, edge_ds, &cascade_filter)
.await?;
affected_edges += edge_delete.deleted_rows;
@ -1291,7 +1290,7 @@ impl Omnigraph {
let pred_sql = predicate_to_sql(predicate, params, true)?;
let table_key = format!("edge:{}", type_name);
let (mut ds, full_path, table_branch) = open_table_for_mutation(
let (ds, full_path, table_branch) = open_table_for_mutation(
self,
staging,
branch,
@ -1300,9 +1299,9 @@ impl Omnigraph {
)
.await?;
let delete_state = self
.table_store()
.delete_where(&full_path, &mut ds, &pred_sql)
let (_new_ds, delete_state) = self
.storage_inline_residual()
.delete_where(&full_path, ds, &pred_sql)
.await?;
let affected = delete_state.deleted_rows;
@ -1356,7 +1355,7 @@ fn concat_match_batches_to_schema(
/// dedup needed (`dedupe_key_column = None`).
async fn validate_edge_cardinality_with_pending(
db: &Omnigraph,
committed_ds: &Dataset,
committed_ds: &SnapshotHandle,
staging: &MutationStaging,
table_key: &str,
edge_type: &omnigraph_compiler::catalog::EdgeType,

View file

@ -21,9 +21,10 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::storage_layer::{SnapshotHandle, StagedHandle};
use arrow_array::{Array, RecordBatch, StringArray, UInt32Array};
use arrow_schema::SchemaRef;
use lance::Dataset;
use futures::stream::StreamExt;
use omnigraph_compiler::catalog::EdgeType;
use crate::db::manifest::{
@ -32,15 +33,13 @@ use crate::db::manifest::{
use crate::db::{MutationOpKind, SubTableUpdate};
use crate::error::{OmniError, Result};
/// Whether the per-table accumulator should commit via `stage_append`
/// (no @key inserts, edge inserts) or `stage_merge_insert` (any @key insert
/// or update). Once set to `Merge` for a table within a query, subsequent
/// inserts on that table are rolled into the same merge — a `WhenNotMatched
/// = InsertAll` merge is correct for both cases.
/// Whether the per-table accumulator should commit via `stage_append`,
/// `stage_merge_insert`, or `stage_overwrite`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PendingMode {
Append,
Merge,
Overwrite,
}
/// Per-table accumulator. Each insert/update op pushes a `RecordBatch` into
@ -158,9 +157,9 @@ impl MutationStaging {
mode: PendingMode,
batch: RecordBatch,
) -> Result<()> {
if batch.num_rows() == 0 {
// No-op — staging is purely additive; an empty batch should not
// be appended.
if batch.num_rows() == 0 && mode != PendingMode::Overwrite {
// No-op for additive modes. For Overwrite, an empty batch is
// observable: it means "replace this table with zero rows".
return Ok(());
}
// If we've already accumulated a batch on this table, the new
@ -174,6 +173,14 @@ impl MutationStaging {
// caller a clearer point of failure attached to the specific
// op that introduced the drift.
if let Some(existing) = self.pending.get(table_key) {
if existing.mode == PendingMode::Overwrite || mode == PendingMode::Overwrite {
if existing.mode != mode {
return Err(OmniError::manifest_internal(format!(
"table '{}' cannot mix overwrite staging with append/merge staging",
table_key
)));
}
}
if !schemas_compatible(&existing.schema, &batch.schema()) {
return Err(OmniError::manifest(format!(
"table '{}' accumulated mutation batches with mismatched schemas: \
@ -194,8 +201,9 @@ impl MutationStaging {
.pending
.entry(table_key.to_string())
.or_insert_with(|| PendingTable::new(schema.clone(), mode));
// Upgrade Append -> Merge if any op needs merge semantics.
if mode == PendingMode::Merge {
// Upgrade Append -> Merge if any op needs merge semantics. Overwrite
// is never mixed with additive modes (guarded above).
if mode == PendingMode::Merge && entry.mode == PendingMode::Append {
entry.mode = PendingMode::Merge;
}
entry.batches.push(batch);
@ -217,6 +225,11 @@ impl MutationStaging {
.unwrap_or(&[])
}
/// Accumulator mode for `table_key`, if this query has touched it.
pub(crate) fn pending_mode(&self, table_key: &str) -> Option<PendingMode> {
self.pending.get(table_key).map(|p| p.mode)
}
/// Schema of the accumulated batches for `table_key`, or `None` if no
/// op has touched the table. Used by `scan_with_pending` to construct
/// the in-memory `MemTable`.
@ -249,9 +262,21 @@ impl MutationStaging {
/// Lance datasets is a perf follow-up; same loop structure as the
/// pre-split `finalize`.
pub(crate) async fn stage_all(
self,
db: &crate::db::Omnigraph,
branch: Option<&str>,
) -> Result<StagedMutation> {
self.stage_all_with_concurrency(db, branch, 1).await
}
/// Loader-facing variant of [`stage_all`] that preserves
/// `OMNIGRAPH_LOAD_CONCURRENCY` for the fragment-writing stage while
/// still leaving all Lance HEAD movement to [`StagedMutation::commit_all`].
pub(crate) async fn stage_all_with_concurrency(
self,
db: &crate::db::Omnigraph,
_branch: Option<&str>,
concurrency: usize,
) -> Result<StagedMutation> {
let MutationStaging {
expected_versions,
@ -261,7 +286,8 @@ impl MutationStaging {
op_kinds,
} = self;
let mut staged_entries: Vec<StagedTableEntry> = Vec::with_capacity(pending.len());
let mut stage_inputs: Vec<(String, PendingTable, StagedTablePath, u64)> =
Vec::with_capacity(pending.len());
for (table_key, table) in pending {
let path = paths.get(&table_key).cloned().ok_or_else(|| {
OmniError::manifest_internal(format!(
@ -275,77 +301,22 @@ impl MutationStaging {
table_key
))
})?;
// Reopen the dataset for staging. The op_kind reflects the
// accumulated PendingTable's mode: Append-mode batches are
// INSERT-shaped (no key-based dedup at commit_staged); Merge-
// mode batches are MERGE-shaped (key-dedup at commit_staged).
// Both skip the strict pre-stage version check under the
// [`MutationOpKind`] policy: Lance's natural rebase + the
// per-(table, branch) queue + the publisher CAS in
// `commit_all` handle drift; the strict check would
// over-reject in-process concurrent inserts (PR 2 / MR-686
// Phase 2).
let stage_kind = match table.mode {
PendingMode::Append => crate::db::MutationOpKind::Insert,
PendingMode::Merge => crate::db::MutationOpKind::Merge,
};
let ds = db
.reopen_for_mutation(
&table_key,
&path.full_path,
path.table_branch.as_deref(),
expected,
stage_kind,
)
.await?;
if table.batches.is_empty() {
continue;
}
// For Merge mode, dedupe accumulated batches by `id`, keeping
// the LAST occurrence (last-write-wins for the query). This
// is required because Lance's `MergeInsertBuilder` produces
// arbitrary results on duplicate keys in the source. Append
// mode is exempt because no-key node and edge inserts use
// ULID-generated ids that are unique within a query.
let combined = match table.mode {
PendingMode::Merge => dedupe_merge_batches_by_id(&table.schema, table.batches)?,
PendingMode::Append => {
if table.batches.len() == 1 {
table.batches.into_iter().next().unwrap()
} else {
arrow_select::concat::concat_batches(&table.schema, &table.batches)
.map_err(|e| OmniError::Lance(e.to_string()))?
}
}
};
// Stage produces uncommitted fragments + transaction. No
// Lance HEAD advance until `commit_all` runs `commit_staged`.
let staged = match table.mode {
PendingMode::Append => db.table_store().stage_append(&ds, combined, &[]).await?,
PendingMode::Merge => {
db.table_store()
.stage_merge_insert(
ds.clone(),
combined,
vec!["id".to_string()],
lance::dataset::WhenMatched::UpdateAll,
lance::dataset::WhenNotMatched::InsertAll,
)
.await?
}
};
staged_entries.push(StagedTableEntry {
table_key,
path,
expected_version: expected,
dataset: ds,
staged_write: staged,
});
stage_inputs.push((table_key, table, path, expected));
}
let concurrency = concurrency.min(stage_inputs.len()).max(1);
let staged_entries = futures::stream::iter(stage_inputs.into_iter().map(
|(table_key, table, path, expected)| async move {
stage_pending_table(db, table_key, table, path, expected).await
},
))
.buffered(concurrency)
.collect::<Vec<Result<Option<StagedTableEntry>>>>()
.await
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();
Ok(StagedMutation {
inline_committed,
@ -357,6 +328,73 @@ impl MutationStaging {
}
}
async fn stage_pending_table(
db: &crate::db::Omnigraph,
table_key: String,
table: PendingTable,
path: StagedTablePath,
expected: u64,
) -> Result<Option<StagedTableEntry>> {
// Reopen the dataset for staging. Append/Merge can be rebased later by
// Lance + publisher CAS; Overwrite is a strict replacement and uses the
// same SchemaRewrite policy as schema apply.
let stage_kind = match table.mode {
PendingMode::Append => crate::db::MutationOpKind::Insert,
PendingMode::Merge => crate::db::MutationOpKind::Merge,
PendingMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
};
let ds = db
.reopen_for_mutation(
&table_key,
&path.full_path,
path.table_branch.as_deref(),
expected,
stage_kind,
)
.await?;
if table.batches.is_empty() {
return Ok(None);
}
let combined = match table.mode {
PendingMode::Merge => dedupe_merge_batches_by_id(&table.schema, table.batches)?,
PendingMode::Append | PendingMode::Overwrite => {
if table.batches.len() == 1 {
table.batches.into_iter().next().unwrap()
} else {
arrow_select::concat::concat_batches(&table.schema, &table.batches)
.map_err(|e| OmniError::Lance(e.to_string()))?
}
}
};
// Stage produces uncommitted fragments + transaction. No Lance HEAD
// advance until `commit_all` runs `commit_staged`.
let staged = match table.mode {
PendingMode::Append => db.storage().stage_append(&ds, combined, &[]).await?,
PendingMode::Merge => {
db.storage()
.stage_merge_insert(
ds.clone(),
combined,
vec!["id".to_string()],
lance::dataset::WhenMatched::UpdateAll,
lance::dataset::WhenNotMatched::InsertAll,
)
.await?
}
PendingMode::Overwrite => db.storage().stage_overwrite(&ds, combined).await?,
};
Ok(Some(StagedTableEntry {
table_key,
path,
expected_version: expected,
dataset: ds,
staged_write: staged,
}))
}
/// Output of [`MutationStaging::stage_all`]. Carries the staged Lance
/// transactions (Phase A complete; uncommitted fragments written) plus
/// the per-table metadata needed to write the recovery sidecar, run
@ -389,15 +427,17 @@ pub(crate) struct StagedMutation {
}
/// Per-table state captured during `stage_all` and consumed by
/// `commit_all`. Holds the opened `Dataset` so `commit_staged` doesn't
/// re-open, and the `StagedWrite` whose `transaction` `commit_staged`
/// will execute.
/// `commit_all`. Holds the opened snapshot (so `commit_staged` doesn't
/// re-open) plus the staged Lance transaction that `commit_staged`
/// will execute. Both held as opaque `TableStorage` handles per MR-793
/// §III.9 — the inner `lance::Dataset` / `StagedWrite` are not visible
/// to engine code outside the storage layer.
struct StagedTableEntry {
table_key: String,
path: StagedTablePath,
expected_version: u64,
dataset: lance::Dataset,
staged_write: crate::table_store::StagedWrite,
dataset: SnapshotHandle,
staged_write: StagedHandle,
}
impl StagedMutation {
@ -544,15 +584,14 @@ impl StagedMutation {
// raw Lance write or a pre-fix maintenance path moved HEAD without
// publishing `__manifest`, this write must not silently fold it.
let head = db
.table_store()
.storage()
.open_dataset_head_for_write(
&entry.table_key,
&entry.path.full_path,
entry.path.table_branch.as_deref(),
)
.await?
.version()
.version;
.version();
if head < current {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
@ -672,14 +711,8 @@ impl StagedMutation {
staged_write,
} = entry;
let new_ds = db
.table_store()
.commit_staged(Arc::new(dataset), staged_write.transaction)
.await?;
let state = db
.table_store()
.table_state(&path.full_path, &new_ds)
.await?;
let new_ds = db.storage().commit_staged(dataset, staged_write).await?;
let state = db.storage().table_state(&path.full_path, &new_ds).await?;
updates.push(SubTableUpdate {
table_key,
table_version: state.version,
@ -813,7 +846,9 @@ fn dedupe_merge_batches_by_id(
/// Count edges per `src` value across committed (Lance scan) + pending
/// (in-memory). Caller supplies an opened committed dataset so the
/// mutation path (which already has one) and the loader path (which
/// opens via snapshot) share the same body.
/// opens via snapshot) share the same body. For overwrite staging, the
/// pending batches are the replacement table image, so committed rows are
/// intentionally skipped.
///
/// `dedupe_key_column` controls whether committed rows are shadowed by
/// pending:
@ -828,7 +863,7 @@ fn dedupe_merge_batches_by_id(
/// `LoadMode::Merge` double-counts.
pub(crate) async fn count_src_per_edge(
db: &crate::db::Omnigraph,
committed_ds: &Dataset,
committed_ds: &SnapshotHandle,
table_key: &str,
staging: &MutationStaging,
dedupe_key_column: Option<&str>,
@ -859,41 +894,44 @@ pub(crate) async fn count_src_per_edge(
_ => None,
};
// Committed side: scan `src` plus the dedupe key column when set, so
// we can both count and shadow in one pass.
let projection: Vec<&str> = match dedupe_key_column {
Some(col) if pending_keys.as_ref().is_some_and(|s| !s.is_empty()) => vec!["src", col],
_ => vec!["src"],
};
let committed = db
.table_store()
.scan(committed_ds, Some(&projection), None, None)
.await?;
for batch in &committed {
let srcs = batch
.column_by_name("src")
.ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
// Optional shadow-key column (only present when dedupe is on).
let key_arr = match (&pending_keys, dedupe_key_column) {
(Some(set), Some(col)) if !set.is_empty() => batch
.column_by_name(col)
.and_then(|c| c.as_any().downcast_ref::<StringArray>()),
_ => None,
let replace_committed = staging.pending_mode(table_key) == Some(PendingMode::Overwrite);
if !replace_committed {
// Committed side: scan `src` plus the dedupe key column when set, so
// we can both count and shadow in one pass.
let projection: Vec<&str> = match dedupe_key_column {
Some(col) if pending_keys.as_ref().is_some_and(|s| !s.is_empty()) => vec!["src", col],
_ => vec!["src"],
};
for i in 0..srcs.len() {
if !srcs.is_valid(i) {
continue;
}
// Shadow this committed row if its key is in pending.
if let (Some(arr), Some(set)) = (key_arr, pending_keys.as_ref()) {
if arr.is_valid(i) && set.contains(arr.value(i)) {
let committed = db
.storage()
.scan(committed_ds, Some(&projection), None, None)
.await?;
for batch in &committed {
let srcs = batch
.column_by_name("src")
.ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
// Optional shadow-key column (only present when dedupe is on).
let key_arr = match (&pending_keys, dedupe_key_column) {
(Some(set), Some(col)) if !set.is_empty() => batch
.column_by_name(col)
.and_then(|c| c.as_any().downcast_ref::<StringArray>()),
_ => None,
};
for i in 0..srcs.len() {
if !srcs.is_valid(i) {
continue;
}
// Shadow this committed row if its key is in pending.
if let (Some(arr), Some(set)) = (key_arr, pending_keys.as_ref()) {
if arr.is_valid(i) && set.contains(arr.value(i)) {
continue;
}
}
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}

View file

@ -305,8 +305,7 @@ async fn load_jsonl_reader<R: BufRead>(
if !catalog.node_types.contains_key(type_name) {
return Err(OmniError::manifest(format!(
"record {}: unknown node type '{}'",
record_num,
type_name
record_num, type_name
)));
}
let data = value
@ -321,8 +320,7 @@ async fn load_jsonl_reader<R: BufRead>(
if catalog.lookup_edge_by_name(edge_name).is_none() {
return Err(OmniError::manifest(format!(
"record {}: unknown edge type '{}'",
record_num,
edge_name
record_num, edge_name
)));
}
let from = value
@ -357,27 +355,23 @@ async fn load_jsonl_reader<R: BufRead>(
}
// Phase 2: Build per-type RecordBatches and accumulate into the
// staging pipeline. For Append/Merge, batches go into an in-memory
// accumulator and a single `stage_*` + `commit_staged` per touched
// table runs at end-of-load — a mid-load failure (RI / cardinality
// violation) leaves Lance HEAD untouched. For Overwrite, the legacy
// inline-commit path is preserved (truncate+append doesn't fit the
// staged shape cleanly, and overwrite has no in-flight read-your-writes
// requirement).
// staging pipeline. Batches go into an in-memory accumulator and a
// single `stage_*` + `commit_staged` per touched table runs at
// end-of-load — a mid-load failure (RI / cardinality violation) leaves
// Lance HEAD untouched. `LoadMode::Overwrite` uses Lance's staged
// `Overwrite` transaction rather than the former truncate-then-append
// inline path.
let mut result = LoadResult::default();
let snapshot = db.snapshot_for_branch(branch).await?;
let use_staging = !matches!(mode, LoadMode::Overwrite);
let mut staging = MutationStaging::default();
let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
let pending_mode = match mode {
LoadMode::Merge => PendingMode::Merge,
// Append-mode loads accumulate as Append. Edge tables (no @key)
// and no-key node tables stay safe on the stage_append path. The
// Merge mode applies dedupe-by-id; Append assumes unique inputs.
LoadMode::Append => PendingMode::Append,
LoadMode::Overwrite => PendingMode::Append, // unused
LoadMode::Overwrite => PendingMode::Overwrite,
};
// Map LoadMode to MutationOpKind for the version-check policy.
// Append/Merge skip the strict pre-stage check (concurrency-safe
@ -405,81 +399,43 @@ async fn load_jsonl_reader<R: BufRead>(
}
let loaded_count = batch.num_rows();
let table_key = format!("node:{}", type_name);
let entry = snapshot
let _entry = snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
if !use_staging {
overwrite_expected.insert(table_key.clone(), entry.table_version);
}
prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
}
// Phase 2b: write every node type. Append/Merge → in-memory
// accumulator. Overwrite → concurrent inline-commit (legacy path).
if use_staging {
for (type_name, table_key, batch, loaded_count) in prepared_nodes {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
&table_key,
full_path,
table_branch,
expected_version,
load_op_kind,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
result.nodes_loaded.insert(type_name, loaded_count);
}
} else {
let node_write_results =
write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
overwrite_updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.nodes_loaded.insert(type_name, loaded_count);
}
// Phase 2b: accumulate every node type in memory. Fragment writes are
// delayed until after all validation succeeds.
for (type_name, table_key, batch, loaded_count) in prepared_nodes {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
.await?;
let expected_version = ds.version();
staging.ensure_path(
&table_key,
full_path,
table_branch,
expected_version,
load_op_kind,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
result.nodes_loaded.insert(type_name, loaded_count);
}
// Phase 2c: Validate edge referential integrity — every src/dst must
// reference an existing node ID in the appropriate type. For staged
// loads, the lookup unions snapshot-committed IDs with the in-memory
// pending batches (which carry the just-staged node inserts).
// reference an existing node ID in the appropriate type. For
// Append/Merge the lookup unions snapshot-committed IDs with the
// in-memory pending batches. For Overwrite, a touched node table's
// pending batch is the replacement image, so committed rows are not
// included for that table.
for (edge_name, rows) in &edge_rows {
let edge_type = &catalog.edge_types[edge_name];
let from_ids = if use_staging {
collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?
} else {
collect_node_ids(
db,
branch,
&edge_type.from_type,
&node_rows,
&catalog,
&overwrite_updates,
)
.await?
};
let to_ids = if use_staging {
collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?
} else {
collect_node_ids(
db,
branch,
&edge_type.to_type,
&node_rows,
&catalog,
&overwrite_updates,
)
.await?
};
let from_ids =
collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?;
let to_ids =
collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?;
for (i, (src, dst, _)) in rows.iter().enumerate() {
if !from_ids.contains(src.as_str()) {
@ -516,118 +472,72 @@ async fn load_jsonl_reader<R: BufRead>(
}
let loaded_count = batch.num_rows();
let table_key = format!("edge:{}", edge_name);
let entry = snapshot
let _entry = snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
if !use_staging {
overwrite_expected.insert(table_key.clone(), entry.table_version);
}
prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
}
// Phase 2e: write every edge type. Same dispatch as Phase 2b.
if use_staging {
for (edge_name, table_key, batch, loaded_count) in prepared_edges {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
&table_key,
full_path,
table_branch,
expected_version,
load_op_kind,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
result.edges_loaded.insert(edge_name, loaded_count);
}
} else {
let edge_write_results =
write_batches_concurrently(db, branch, mode, prepared_edges).await?;
for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
overwrite_updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.edges_loaded.insert(edge_name, loaded_count);
}
// Phase 2e: accumulate every edge type. Same dispatch as Phase 2b.
for (edge_name, table_key, batch, loaded_count) in prepared_edges {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
.await?;
let expected_version = ds.version();
staging.ensure_path(
&table_key,
full_path,
table_branch,
expected_version,
load_op_kind,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
result.edges_loaded.insert(edge_name, loaded_count);
}
// Phase 3: Validate edge cardinality constraints (before commit —
// invalid data must not be committed). Staged path scans committed
// edges via Lance + iterates pending edges in-memory. Overwrite path
// opens the just-written version (legacy behavior).
// invalid data must not be committed). The helper scans committed
// edges via Lance + iterates pending edges in-memory; for Overwrite it
// treats the pending edge batches as the replacement table image.
for (edge_name, _) in &edge_rows {
let edge_type = &catalog.edge_types[edge_name];
let table_key = format!("edge:{}", edge_name);
if use_staging {
validate_edge_cardinality_with_pending_loader(
db, branch, edge_type, &table_key, &staging, mode,
)
.await?;
} else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
validate_edge_cardinality(
db,
branch,
edge_name,
update.table_version,
update.table_branch.as_deref(),
)
.await?;
}
validate_edge_cardinality_with_pending_loader(
db, branch, edge_type, &table_key, &staging, mode,
)
.await?;
}
// Phase 4: Atomic manifest commit with publisher-level OCC.
if use_staging {
let staged = staging.stage_all(db, branch).await?;
// `_queue_guards` holds per-(table_key, branch) write queues
// across the manifest publish below — see exec/mutation.rs for
// the rationale (interleaving prevention).
let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
.commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
.await?;
// Same finalize → publisher residual as mutations: per-table
// staged commits have advanced Lance HEAD, but the manifest
// publish has not run yet. Reuse the mutation failpoint name so
// one failpoint pins the shared `MutationStaging` boundary.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
.await?;
// The recovery sidecar protects the per-table commit_staged →
// manifest publish window. Phase C succeeded — clean up
// best-effort: failing the user here would error out a write
// that already landed durably.
if let Some(handle) = sidecar_handle {
if let Err(err) =
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
{
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
);
}
}
} else {
// LoadMode::Overwrite keeps the legacy inline-commit path —
// truncate-then-append doesn't fit the staged shape (see
// `docs/dev/writes.md` "LoadMode::Overwrite residual"). The recovery
// sidecar is not applicable here because the writer doesn't go
// through MutationStaging; per-table inline commits + a final
// manifest publish handle their own residual via the documented
// operator workflow (re-run overwrite to recover).
db.commit_updates_on_branch_with_expected(
branch,
&overwrite_updates,
&overwrite_expected,
actor_id,
)
let staged = staging
.stage_all_with_concurrency(db, branch, load_write_concurrency())
.await?;
// `_queue_guards` holds per-(table_key, branch) write queues
// across the manifest publish below — see exec/mutation.rs for
// the rationale (interleaving prevention).
let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
.commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
.await?;
// Same finalize → publisher residual as mutations: per-table
// staged commits have advanced Lance HEAD, but the manifest
// publish has not run yet. Reuse the mutation failpoint name so
// one failpoint pins the shared `MutationStaging` boundary.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
.await?;
// The recovery sidecar protects the per-table commit_staged →
// manifest publish window. Phase C succeeded — clean up
// best-effort: failing the user here would error out a write
// that already landed durably.
if let Some(handle) = sidecar_handle {
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
);
}
}
Ok(result)
@ -1157,89 +1067,6 @@ fn load_write_concurrency() -> usize {
.unwrap_or(DEFAULT_LOAD_WRITE_CONCURRENCY)
}
/// Write a set of prepared `(type_name, table_key, batch, row_count)` tuples
/// concurrently. Returns results in original iteration order so callers can
/// zip them back to per-type metadata.
async fn write_batches_concurrently(
db: &Omnigraph,
branch: Option<&str>,
mode: LoadMode,
prepared: Vec<(String, String, RecordBatch, usize)>,
) -> Result<
Vec<(
String,
String,
usize,
crate::table_store::TableState,
Option<String>,
)>,
> {
use futures::stream::StreamExt;
if prepared.is_empty() {
return Ok(Vec::new());
}
let concurrency = load_write_concurrency().min(prepared.len()).max(1);
futures::stream::iter(prepared.into_iter().map(
|(type_name, table_key, batch, loaded_count)| async move {
let (state, table_branch) =
write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
},
))
.buffered(concurrency)
.collect::<Vec<Result<_>>>()
.await
.into_iter()
.collect()
}
async fn write_batch_to_dataset(
db: &Omnigraph,
branch: Option<&str>,
table_key: &str,
batch: RecordBatch,
mode: LoadMode,
) -> Result<(crate::table_store::TableState, Option<String>)> {
let op_kind = match mode {
LoadMode::Append => crate::db::MutationOpKind::Insert,
LoadMode::Merge => crate::db::MutationOpKind::Merge,
LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
};
let (mut ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, table_key, op_kind)
.await?;
let table_store = db.table_store();
match mode {
LoadMode::Overwrite => {
let state = table_store
.overwrite_batch(&full_path, &mut ds, batch)
.await?;
Ok((state, table_branch))
}
LoadMode::Append => {
let state = table_store.append_batch(&full_path, &mut ds, batch).await?;
Ok((state, table_branch))
}
LoadMode::Merge => {
let state = table_store
.merge_insert_batch(
&full_path,
ds,
batch,
vec!["id".to_string()],
lance::dataset::WhenMatched::UpdateAll,
lance::dataset::WhenNotMatched::InsertAll,
)
.await?;
Ok((state, table_branch))
}
}
}
fn generate_id() -> String {
ulid::Ulid::new().to_string()
}
@ -1672,10 +1499,7 @@ pub(crate) async fn validate_edge_cardinality(
.await?;
// Scan src column, count per source
let batches = db
.table_store()
.scan(&ds, Some(&["src"]), None, None)
.await?;
let batches = db.storage().scan(&ds, Some(&["src"]), None, None).await?;
let mut counts: HashMap<String, u32> = HashMap::new();
for batch in &batches {
@ -1766,6 +1590,11 @@ async fn validate_edge_cardinality_with_pending_loader(
/// - IDs from the staged loader's pending batches (in-memory; just-staged
/// inserts of this type)
/// - IDs from the committed sub-table at the pre-load snapshot version
///
/// For `LoadMode::Overwrite`, if the node table is touched then the pending
/// batches are the replacement image. In that case committed IDs are not
/// included, so edge RI is validated against exactly what the overwrite will
/// publish.
async fn collect_node_ids_with_pending(
db: &Omnigraph,
branch: Option<&str>,
@ -1788,6 +1617,10 @@ async fn collect_node_ids_with_pending(
}
}
if staging.pending_mode(&table_key) == Some(PendingMode::Overwrite) {
return Ok(ids);
}
// From the committed Lance sub-table at the pre-load snapshot version.
let snapshot = db.snapshot_for_branch(branch).await?;
let Some(entry) = snapshot.entry(&table_key) else {
@ -1801,10 +1634,7 @@ async fn collect_node_ids_with_pending(
)
.await?;
let batches = db
.table_store()
.scan(&ds, Some(&["id"]), None, None)
.await?;
let batches = db.storage().scan(&ds, Some(&["id"]), None, None).await?;
for batch in &batches {
let id_col = batch
@ -1827,72 +1657,6 @@ async fn collect_node_ids_with_pending(
Ok(ids)
}
/// Collect all valid node IDs for a given type. Union of:
/// - IDs from the just-loaded batch (in memory, from node_rows)
/// - IDs from the sub-table at the just-written version (if it was updated)
/// - IDs from the sub-table at the snapshot-pinned version (if it was not updated)
async fn collect_node_ids(
db: &Omnigraph,
branch: Option<&str>,
type_name: &str,
node_rows: &HashMap<String, Vec<JsonValue>>,
catalog: &omnigraph_compiler::catalog::Catalog,
updates: &[crate::db::SubTableUpdate],
) -> Result<HashSet<String>> {
let mut ids = HashSet::new();
// IDs from the in-memory batch (just loaded in this operation)
if let Some(rows) = node_rows.get(type_name) {
if let Some(node_type) = catalog.node_types.get(type_name) {
if let Some(key_prop) = node_type.key_property() {
for row in rows {
if let Some(id) = row.get(key_prop).and_then(|v| v.as_str()) {
ids.insert(id.to_string());
}
}
}
}
}
// IDs from the Lance sub-table
let table_key = format!("node:{}", type_name);
let snapshot = db.snapshot_for_branch(branch).await?;
let Some(entry) = snapshot.entry(&table_key) else {
return Ok(ids);
};
// Use the just-written version if this type was updated, else snapshot version
let updated = updates
.iter()
.find(|u| u.table_key == table_key)
.map(|u| (u.table_version, u.table_branch.as_deref()));
let (version, branch) = updated.unwrap_or((entry.table_version, entry.table_branch.as_deref()));
let ds = db
.open_dataset_at_state(&entry.table_path, branch, version)
.await?;
let batches = db
.table_store()
.scan(&ds, Some(&["id"]), None, None)
.await?;
for batch in &batches {
let id_col = batch
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..batch.num_rows() {
if !id_col.is_valid(i) {
continue;
}
ids.insert(id_col.value(i).to_string());
}
}
Ok(ids)
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -7,30 +7,32 @@
//! way for new engine writers to advance Lance HEAD without coupling
//! "write bytes" with "advance HEAD" in one Lance API call.
//!
//! ## Transitional residuals on the trait
//! ## Inline-commit residuals live on a separate trait
//!
//! Several inline-commit methods remain on the trait surface as
//! documented residuals: `delete_where`
//! ([#6658](https://github.com/lance-format/lance/issues/6658) closed
//! 2026-05-14, but the public `DeleteBuilder::execute_uncommitted` API
//! did not backport to the 6.x release line — it first ships in
//! `v7.0.0-beta.10`. Migration to staged two-phase delete is tracked as
//! MR-A and is gated on the Lance v7.x bump, not the current v6.0.1 pin),
//! `create_vector_index` (segment-commit-path requires
//! `build_index_metadata_from_segments` which is `pub(crate)` — see
//! [#6666](https://github.com/lance-format/lance/issues/6666), still open), and the
//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` /
//! `create_btree_index` / `create_inverted_index` paths kept while
//! engine call sites finish migrating off of them (Phase 1b / Phase 9
//! of MR-793). These are named honestly at every call site; the
//! forbidden-API guard test catches direct lance::* misuse outside the
//! storage layer.
//! The inline-commit writes that Lance cannot yet express as
//! stage-then-commit are NOT on `TableStorage`. They sit on
//! [`InlineCommitResidual`], reachable only via
//! `Omnigraph::storage_inline_residual()`, so the default `db.storage()`
//! surface is staged-only and cannot couple "write bytes" with "advance
//! HEAD" — MR-793 acceptance §1 closes by construction. The residuals:
//!
//! * `delete_where` — Lance #6658 (`DeleteBuilder::execute_uncommitted`)
//! did not backport to the 6.x line; it first ships in `v7.0.0-beta.10`.
//! Migration to staged two-phase delete is tracked as MR-A, gated on the
//! Lance v7.x bump.
//! * `create_vector_index` — segment-commit-path needs
//! `build_index_metadata_from_segments`, still `pub(crate)` in Lance
//! 6.0.1 ([#6666](https://github.com/lance-format/lance/issues/6666),
//! open). Scalar indices already stage.
//!
//! Each is named honestly at its call site; the forbidden-API guard test
//! catches direct lance::* misuse outside the storage layer.
//!
//! ## Sealed
//!
//! `TableStorage: sealed::Sealed`. Only types in this crate can implement
//! the trait, so a downstream crate cannot subvert the contract by
//! providing its own impl.
//! Both `TableStorage` and `InlineCommitResidual` are `: sealed::Sealed`.
//! Only types in this crate can implement them, so a downstream crate
//! cannot subvert the contract by providing its own impl.
//!
//! ## Opaque handles
//!
@ -40,15 +42,15 @@
//! through. This aligns with the storage-boundary invariant:
//! `lance::Dataset` does not appear in trait signatures.
//!
//! ## Migration status (MR-793 PR #70)
//! ## Migration status
//!
//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new
//! staged primitives (`stage_overwrite`, scalar index staging), and
//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto
//! the staged surface. Phase 1b (call-site conversion to
//! `Arc<dyn TableStorage>`), Phase 9 (demote unused inline-commit
//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847),
//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups.
//! Phases 1a / 2 / 4 / 5 / 6 landed in MR-793 PR #70 (trait scaffolding,
//! staged primitives, migration of `ensure_indices` / `branch_merge` /
//! `schema_apply` onto the staged surface). Phase 1b (call-site
//! conversion) and Phase 9 landed in MR-854, which also split the
//! inline-commit residuals onto `InlineCommitResidual` so `db.storage()`
//! is staged-only. Phase 7 (recovery reconciler) shipped as MR-847;
//! Phase 8 (index reconciler) is tracked as MR-848.
use std::fmt::Debug;
use std::sync::Arc;
@ -105,12 +107,37 @@ impl SnapshotHandle {
&self.inner
}
/// Take ownership of the inner `Arc<Dataset>`. Used when committing
/// staged writes (the call needs to consume the snapshot).
/// Take ownership of the inner `Arc<Dataset>`. Used by the
/// `TableStorage` impl when an op needs to mutate the dataset in
/// place (commit a staged write, append, overwrite, …).
///
/// Performance note: callers consume the returned `Arc` via
/// `Arc::try_unwrap(...).unwrap_or_else(|arc| (*arc).clone())`. The
/// fast path (no clone) only fires when the snapshot is single-ref
/// — i.e. the caller dropped every other `SnapshotHandle` clone
/// before calling. Holding parallel clones (e.g. across an `await`
/// point or stashed in a struct) forces a deep `Dataset` clone on
/// every mutating op. Engine callers should pass `SnapshotHandle`
/// by value into the mutating method, not keep a side copy.
pub(crate) fn into_arc(self) -> Arc<Dataset> {
self.inner
}
/// Take ownership of the inner `Dataset` by unwrapping the `Arc`
/// (or cloning if the snapshot is shared). `pub(crate)` — used
/// only by the maintenance path (`optimize`, `cleanup`) which
/// must hand `&mut Dataset` to Lance compaction / cleanup APIs
/// that the `TableStorage` trait does not (and should not)
/// surface. Engine code that participates in the staged-write
/// invariant must stay on the trait methods.
///
/// Single-ref invariant: same fast-path/clone behavior as
/// `into_arc` — see that method's doc. Drop sibling
/// `SnapshotHandle` clones before calling.
pub(crate) fn into_dataset(self) -> Dataset {
Arc::try_unwrap(self.inner).unwrap_or_else(|arc| (*arc).clone())
}
// ── public, lance-free accessors ──
/// Current Lance manifest version of the snapshot.
@ -208,6 +235,20 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
/// Idempotent variant of `delete_branch` used by the best-effort fork
/// reclaim under branch delete (`db/omnigraph.rs::cleanup_deleted_branch_tables`)
/// and by the orphan-fork reconciler in `optimize`. Tolerates an
/// already-absent branch (both Lance's `RefNotFound` and the local-store
/// `NotFound` quirk on a missing `tree/{branch}/` dir). A still-referenced
/// branch (`RefConflict`) still surfaces as `OmniError::Lance`.
async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
/// List the named Lance branches present on the dataset at `dataset_uri`.
/// The `cleanup` orphan reconciler diffs this against the manifest
/// branch set to find orphaned per-table forks. `main`/default is not a
/// named branch and never appears here.
async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>>;
async fn reopen_for_mutation(
&self,
dataset_uri: &str,
@ -328,74 +369,19 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
column: &str,
) -> Result<StagedHandle>;
// ── Inline-commit residuals (named honestly per MR-793 §3.2) ──────
// ── Index presence (reads, no HEAD advance) ──────────────────────
//
// These methods advance Lance HEAD as a side effect of writing.
// They stay on the trait until the corresponding upstream Lance API
// ships:
//
// * `delete_where` — Lance #6658 (two-phase delete).
// * `create_*_index` — `build_index_metadata_from_segments` is
// `pub(crate)` for vector indices in lance-4.0.0; scalar indices
// migrate to staged in MR-793 Phase 2.
// * `append_batch`, `merge_insert_batches`, `overwrite_batch` —
// legacy paths that will be demoted to `pub(crate)` in MR-793
// Phase 9 once all engine sites route through the staged
// primitives.
async fn append_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)>;
async fn merge_insert_batches(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState>;
async fn overwrite_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)>;
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<(SnapshotHandle, DeleteState)>;
// The inline-commit writes (`delete_where`, `create_vector_index`) are
// deliberately NOT on this trait. They live on
// the separate `InlineCommitResidual` trait, reachable only through
// `Omnigraph::storage_inline_residual()`. As a result the default
// `db.storage()` surface cannot couple "write bytes" with "advance HEAD"
// — closing MR-793 acceptance §1 by construction rather than by review.
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn create_btree_index(
&self,
snapshot: SnapshotHandle,
columns: &[&str],
) -> Result<SnapshotHandle>;
async fn create_inverted_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle>;
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle>;
// ── URI helpers ────────────────────────────────────────────────────
//
// These are pure string formatting; they live on the trait so engine
@ -422,6 +408,38 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
) -> Result<DatasetRecordBatchStream>;
}
// ─── InlineCommitResidual trait ────────────────────────────────────────────
/// Inline-commit residual surface: the writes Lance cannot yet express as a
/// stage-then-commit pair, so they advance Lance HEAD as a side effect of
/// writing. Kept OFF `TableStorage` and reachable only through
/// `Omnigraph::storage_inline_residual()`, so the default `db.storage()` path
/// is staged-only and a new writer cannot reintroduce the write+commit coupling
/// by accident (MR-793 acceptance §1, by construction).
///
/// Residual reasons (each is named honestly at its call site):
/// * `delete_where` — Lance has no public two-phase delete on the 6.x line
/// (`DeleteBuilder::execute_uncommitted` first ships in v7.x; MR-A / Lance
/// #6658). The D2 parse-time rule + recovery sidecars cover the gap meanwhile.
/// * `create_vector_index` — vector-index segment-commit needs
/// `build_index_metadata_from_segments`, still `pub(crate)` in Lance 6.0.1
/// (Lance #6666). Scalar indices already stage.
#[async_trait]
pub(crate) trait InlineCommitResidual: sealed::Sealed + Send + Sync + Debug {
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<(SnapshotHandle, DeleteState)>;
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle>;
}
// ─── single impl: TableStore ──────────────────────────────────────────────
#[async_trait]
@ -496,6 +514,14 @@ impl TableStorage for TableStore {
TableStore::delete_branch(self, dataset_uri, branch).await
}
async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
TableStore::force_delete_branch(self, dataset_uri, branch).await
}
async fn list_branches(&self, dataset_uri: &str) -> Result<Vec<String>> {
TableStore::list_branches(self, dataset_uri).await
}
async fn reopen_for_mutation(
&self,
dataset_uri: &str,
@ -689,61 +715,6 @@ impl TableStorage for TableStore {
.map(StagedHandle::new)
}
async fn append_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn merge_insert_batches(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState> {
let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
TableStore::merge_insert_batches(
self,
dataset_uri,
ds,
batches,
key_columns,
when_matched,
when_not_matched,
)
.await
}
async fn overwrite_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<(SnapshotHandle, DeleteState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
TableStore::has_btree_index(self, snapshot.dataset(), column).await
}
@ -756,36 +727,6 @@ impl TableStorage for TableStore {
TableStore::has_vector_index(self, snapshot.dataset(), column).await
}
async fn create_btree_index(
&self,
snapshot: SnapshotHandle,
columns: &[&str],
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
TableStore::create_btree_index(self, &mut ds, columns).await?;
Ok(SnapshotHandle::new(ds))
}
async fn create_inverted_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
TableStore::create_inverted_index(self, &mut ds, column).await?;
Ok(SnapshotHandle::new(ds))
}
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
TableStore::create_vector_index(self, &mut ds, column).await?;
Ok(SnapshotHandle::new(ds))
}
fn root_uri(&self) -> &str {
TableStore::root_uri(self)
}
@ -815,3 +756,27 @@ impl TableStorage for TableStore {
.await
}
}
#[async_trait]
impl InlineCommitResidual for TableStore {
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<(SnapshotHandle, DeleteState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
TableStore::create_vector_index(self, &mut ds, column).await?;
Ok(SnapshotHandle::new(ds))
}
}

View file

@ -2,7 +2,6 @@ use arrow_array::{
Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
};
use arrow_schema::SchemaRef;
use arrow_select::concat::concat_batches;
use futures::TryStreamExt;
use lance::Dataset;
use lance::blob::BlobArrayBuilder;
@ -13,7 +12,7 @@ use lance::dataset::{
CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
WriteParams,
};
use lance::datatypes::BlobKind;
use lance::datatypes::{BlobKind, Schema as LanceSchema};
use lance::index::DatasetIndexExt;
use lance::index::scalar::IndexDetails;
use lance_file::version::LanceFileVersion;
@ -725,7 +724,14 @@ impl TableStore {
})
}
pub async fn append_batch(
/// Legacy inline-commit append: writes fragments AND commits in one
/// call, advancing Lance HEAD as a side effect. Not on the
/// `TableStorage` trait surface — the staged primitive `stage_append`
/// + `commit_staged` is the engine write path. This inherent
/// `pub(crate)` method survives only for recovery test setup. Do not
/// add new engine call sites — they re-introduce the multi-phase
/// commit drift the trait surface was designed to eliminate.
pub(crate) async fn append_batch(
&self,
dataset_uri: &str,
ds: &mut Dataset,
@ -780,139 +786,7 @@ impl TableStore {
}
}
pub async fn overwrite_batch(
&self,
dataset_uri: &str,
ds: &mut Dataset,
batch: RecordBatch,
) -> Result<TableState> {
ds.truncate_table()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
self.append_batch(dataset_uri, ds, batch).await
}
pub async fn overwrite_dataset(dataset_uri: &str, batch: RecordBatch) -> Result<Dataset> {
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema());
let params = WriteParams {
mode: WriteMode::Overwrite,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
allow_external_blob_outside_bases: true,
..Default::default()
};
Dataset::write(reader, dataset_uri, Some(params))
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub async fn merge_insert_batch(
&self,
dataset_uri: &str,
ds: Dataset,
batch: RecordBatch,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState> {
if batch.num_rows() == 0 {
return self.table_state(dataset_uri, &ds).await;
}
// Precondition for the FirstSeen workaround below: every caller of
// this primitive must hand in a source batch that is unique by
// `key_columns`. Without this check, `SourceDedupeBehavior::FirstSeen`
// would silently collapse genuine duplicates instead of erroring.
check_batch_unique_by_keys(&batch, &key_columns, "merge_insert_batch")?;
// TODO(lance-upstream): MergeInsertBuilder does not accept WriteParams,
// so allow_external_blob_outside_bases cannot be set here. External URI
// blobs via merge_insert (LoadMode::Merge, mutations) are unsupported
// until Lance exposes WriteParams on MergeInsertBuilder.
let ds = Arc::new(ds);
let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
.map_err(|e| OmniError::Lance(e.to_string()))?;
builder.when_matched(when_matched);
builder.when_not_matched(when_not_matched);
// Workaround for a Lance 4.0.x bug class where sequential
// merge_insert calls against rows previously rewritten by
// merge_insert produce a spurious "Ambiguous merge inserts:
// multiple source rows match the same target row on (id = ...)"
// error. Lance's `processed_row_ids: Mutex<HashSet<u64>>`
// (lance-4.0.0 `src/dataset/write/merge_insert.rs:2099`)
// double-processes the same source/target match against
// datasets previously rewritten by merge_insert, and the default
// `SourceDedupeBehavior::Fail` errors on the second insertion.
// `FirstSeen` makes Lance skip the duplicate match instead.
//
// Covers both observed surfaces:
// - PR #98 (sequential `load --mode merge` against same keys).
// - MR-920 (sequential `update T set {f} where x=y` on same row).
//
// Correctness-preserving for OmniGraph because every call path
// that reaches this primitive either pre-dedupes the source batch
// by id, or surfaces a real source dup via the
// `check_batch_unique_by_keys` precondition above (which fires
// before the FirstSeen setter has a chance to silently collapse
// anything):
// - Load path: `enforce_unique_constraints_intra_batch`
// (`loader/mod.rs:1442`) errors on intra-batch `@key` dups.
// - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`)
// accumulates and dedupes by `id`.
// - Branch-merge path: `compute_source_delta` /
// `compute_three_way_delta` (`exec/merge.rs`) walk via
// `OrderedTableCursor` and `push_row` each id at most once.
// So FirstSeen only suppresses the spurious Lance behavior, never
// user data. Pinned by `loader_rejects_intra_batch_duplicate_keys`
// in `tests/consistency.rs` plus the
// `check_batch_unique_by_keys` precondition.
//
// Retire when upstream Lance fixes the bug class. Tracked at
// MR-957; upstream: lance-format/lance#6877.
builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
let job = builder
.try_build()
.map_err(|e| OmniError::Lance(e.to_string()))?;
let schema = batch.schema();
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
let (new_ds, _stats) = job
.execute(lance_datafusion::utils::reader_to_stream(Box::new(reader)))
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
self.table_state(dataset_uri, &new_ds).await
}
pub async fn merge_insert_batches(
&self,
dataset_uri: &str,
ds: Dataset,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState> {
if batches.is_empty() {
return self.table_state(dataset_uri, &ds).await;
}
let batch = if batches.len() == 1 {
batches.into_iter().next().unwrap()
} else {
let schema = batches[0].schema();
concat_batches(&schema, &batches).map_err(|e| OmniError::Lance(e.to_string()))?
};
self.merge_insert_batch(
dataset_uri,
ds,
batch,
key_columns,
when_matched,
when_not_matched,
)
.await
}
pub async fn delete_where(
pub(crate) async fn delete_where(
&self,
dataset_uri: &str,
ds: &mut Dataset,
@ -1011,7 +885,7 @@ impl TableStore {
}
};
// Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted`
// returns fragments with `id = 0` ("Temporary ID" — see lance-4.0.0
// returns fragments with `id = 0` ("Temporary ID" — see lance-6.0.1
// `dataset/write.rs:1044/1712`); the real assignment happens during
// commit via `Transaction::fragments_with_ids`. Because we expose
// these fragments to `scan_with_staged` *before* commit, two staged
@ -1082,11 +956,12 @@ impl TableStore {
));
}
// Precondition for FirstSeen below. See the comment on
// `merge_insert_batch` for why this check is here, not on the caller:
// every call path that reaches stage_merge_insert (load,
// MutationStaging::finalize, branch_merge::publish_rewritten_merge_table)
// must hand in a source batch that is unique by `key_columns`.
// Precondition for the FirstSeen workaround below: every call path that
// reaches stage_merge_insert (load, MutationStaging::finalize,
// branch_merge::publish_rewritten_merge_table) must hand in a source
// batch that is unique by `key_columns`. Without this check,
// `SourceDedupeBehavior::FirstSeen` would silently collapse genuine
// duplicates instead of erroring.
check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?;
let ds = Arc::new(ds);
@ -1094,11 +969,21 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))?;
builder.when_matched(when_matched);
builder.when_not_matched(when_not_matched);
// See `merge_insert_batch` for the FirstSeen rationale. Workaround
// for the Lance 4.0.x bug class where sequential merge_insert /
// update against rows previously rewritten by merge_insert trips
// Lance's `processed_row_ids` HashSet and errors under the default
// `SourceDedupeBehavior::Fail`. Retire when upstream Lance is fixed.
// Workaround for a Lance bug class where sequential merge_insert calls
// against rows previously rewritten by merge_insert produce a spurious
// "Ambiguous merge inserts: multiple source rows match the same target
// row on (id = ...)" error. Lance's `processed_row_ids:
// Mutex<HashSet<u64>>` (lance-6.0.1 `src/dataset/write/merge_insert.rs`)
// double-processes the same source/target match against datasets
// previously rewritten by merge_insert, and the default
// `SourceDedupeBehavior::Fail` errors on the second insertion; FirstSeen
// makes Lance skip the duplicate match instead. Correctness-preserving
// because every call path pre-dedupes the source batch by id or surfaces
// a real source dup via `check_batch_unique_by_keys` above (load:
// `enforce_unique_constraints_intra_batch`; mutate:
// `MutationStaging::finalize`; branch-merge: the `OrderedTableCursor`
// walk in `exec/merge.rs`). Retire when upstream Lance fixes the bug
// class. Tracked at MR-957; upstream: lance-format/lance#6877.
builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
let job = builder
.try_build()
@ -1174,40 +1059,51 @@ impl TableStore {
/// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
/// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
pub async fn stage_overwrite(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
if batch.num_rows() == 0 {
return Err(OmniError::manifest_internal(
"stage_overwrite called with empty batch".to_string(),
));
}
// `enable_stable_row_ids: true` is defensive — empirically Lance 4.0.0
// `enable_stable_row_ids: true` is defensive — empirically Lance 6.0.1
// preserves the source dataset's flag through `Operation::Overwrite`
// when WriteParams omits it (pinned by
// `stage_overwrite_preserves_stable_row_ids` in tests/staged_writes.rs),
// but setting it explicitly matches the public `overwrite_dataset`
// path and keeps the invariant documented at every Overwrite site
// but setting it explicitly keeps the invariant documented at every Overwrite site
// (see docs/storage.md "Stable row IDs"). Setting it on an existing
// dataset that was created without stable row IDs is a no-op per
// Lance's row-id-lineage spec, so this stays correct for legacy
// datasets.
let params = WriteParams {
mode: WriteMode::Overwrite,
enable_stable_row_ids: true,
allow_external_blob_outside_bases: true,
..Default::default()
};
let transaction = InsertBuilder::new(Arc::new(ds.clone()))
.with_params(&params)
.execute_uncommitted(vec![batch])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let mut new_fragments = match &transaction.operation {
Operation::Overwrite { fragments, .. } => fragments.clone(),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_overwrite: unexpected Lance operation {:?}",
std::mem::discriminant(other)
)));
}
let (transaction, mut new_fragments) = if batch.num_rows() == 0 {
let schema = LanceSchema::try_from(batch.schema().as_ref())
.map_err(|e| OmniError::Lance(e.to_string()))?;
let transaction = TransactionBuilder::new(
ds.manifest.version,
Operation::Overwrite {
fragments: Vec::new(),
schema,
config_upsert_values: None,
initial_bases: None,
},
)
.build();
(transaction, Vec::new())
} else {
let params = WriteParams {
mode: WriteMode::Overwrite,
enable_stable_row_ids: true,
allow_external_blob_outside_bases: true,
..Default::default()
};
let transaction = InsertBuilder::new(Arc::new(ds.clone()))
.with_params(&params)
.execute_uncommitted(vec![batch])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let new_fragments = match &transaction.operation {
Operation::Overwrite { fragments, .. } => fragments.clone(),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_overwrite: unexpected Lance operation {:?}",
std::mem::discriminant(other)
)));
}
};
(transaction, new_fragments)
};
// Overwrite REPLACES every committed fragment, and Lance restarts
// fragment-ID and row-ID counters at the post-commit version.
@ -1220,7 +1116,7 @@ impl TableStore {
// 2) For stable-row-id datasets, assign row_id_meta starting
// at 0 (Overwrite is a fresh-start) so `scan_with_staged`
// doesn't hit the "Missing row id meta" panic in
// lance-4.0.0 dataset/rowids.rs:22.
// lance-6.0.1 dataset/rowids.rs:22.
assign_fragment_ids(&mut new_fragments, 1);
if ds.manifest.uses_stable_row_ids() {
assign_row_id_meta(&mut new_fragments, 0)?;
@ -1244,7 +1140,7 @@ impl TableStore {
/// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
/// { new_indices, removed_indices }` via the public `TransactionBuilder`,
/// replicating the simple (non-segment-commit-path) branch of Lance's
/// `CreateIndexBuilder::execute` (lance-4.0.0 `src/index/create.rs:502-512`).
/// `CreateIndexBuilder::execute` (lance-6.0.1 `src/index/create.rs:502-512`).
///
/// `removed_indices` mirrors `execute()` lines 466-476: when the
/// build replaces an existing same-named index, those entries are
@ -1253,7 +1149,7 @@ impl TableStore {
/// MR-793 Phase 2: scalar index types (BTree, Inverted) are
/// stage-able. Vector indices are NOT (segment-commit-path requires
/// `build_index_metadata_from_segments` which is `pub(crate)` in
/// lance-4.0.0); see `create_vector_index` and Appendix A.3.
/// lance-6.0.1); see `create_vector_index` and Appendix A.3.
pub async fn stage_create_btree_index(
&self,
ds: &Dataset,
@ -1348,7 +1244,7 @@ impl TableStore {
/// committed fragments carry; Lance's optimizer drops them from the
/// filtered scan even when their data would match. Staged-fragment
/// rows are silently absent from the result. `scanner.use_stats(false)`
/// does not fix this in lance 4.0.0. Callers needing correct filtered
/// does not fix this in lance 6.0.1. Callers needing correct filtered
/// reads against staged data should use a different strategy — the
/// engine's `MutationStaging` accumulator unions in-memory pending
/// batches with the committed scan via DataFusion `MemTable` (see
@ -1572,25 +1468,7 @@ impl TableStore {
}))
}
pub async fn create_btree_index(&self, ds: &mut Dataset, columns: &[&str]) -> Result<()> {
let params = ScalarIndexParams::default();
ds.create_index_builder(columns, IndexType::BTree, &params)
.replace(true)
.await
.map(|_| ())
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub async fn create_inverted_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
let params = InvertedIndexParams::default();
ds.create_index_builder(&[column], IndexType::Inverted, &params)
.replace(true)
.await
.map(|_| ())
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
pub(crate) async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
ds.create_index_builder(&[column], IndexType::Vector, &params)
.replace(true)
@ -1674,7 +1552,7 @@ fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
}
/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's
/// commit-time `Transaction::fragments_with_ids` (lance-4.0.0
/// commit-time `Transaction::fragments_with_ids` (lance-6.0.1
/// `dataset/transaction.rs:1456`) — fragments produced by
/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary
/// placeholder; we renumber here so they don't collide with committed
@ -1705,7 +1583,7 @@ fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
/// Assign sequential row IDs to fragments that lack them, starting from
/// `start_row_id`. Mirrors the relevant arm of Lance's
/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`)
/// `Transaction::assign_row_ids` (lance-6.0.1 `dataset/transaction.rs:2682`)
/// for the `row_id_meta = None` case — fragments produced by
/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
///
@ -1878,7 +1756,7 @@ fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fr
combined
}
/// Precondition guard for `merge_insert_batch` and `stage_merge_insert`.
/// Precondition guard for `stage_merge_insert`.
/// Both opt into `SourceDedupeBehavior::FirstSeen` to suppress the Lance
/// `processed_row_ids` bug (MR-957). FirstSeen would *also* silently
/// collapse genuine duplicate source keys; this check restores fail-fast

View file

@ -126,7 +126,7 @@ async fn load_merge_upserts_existing_and_inserts_new() {
/// source batch had one row per key.
///
/// Triggered by Lance's `processed_row_ids: Mutex<HashSet<u64>>`
/// (lance-4.0.0 `src/dataset/write/merge_insert.rs:2099`) double-
/// (lance-6.0.1 `src/dataset/write/merge_insert.rs:2099`) double-
/// processing the same source/target match against datasets previously
/// rewritten by merge_insert. Worked around by opting
/// `MergeInsertBuilder` into `SourceDedupeBehavior::FirstSeen` in

View file

@ -907,6 +907,76 @@ async fn recovery_rolls_forward_load_on_feature_branch() {
);
}
#[tokio::test]
async fn recovery_rolls_forward_load_overwrite() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
let parent_commit_id;
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, helpers::TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
parent_commit_id = branch_head_commit_id(dir.path(), "main").await.unwrap();
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = db
.load(
"main",
r#"{"type":"Person","data":{"name":"OverwriteLoad","age":41}}
"#,
LoadMode::Overwrite,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"overwrite row must be visible after recovery rolls the load forward"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::main("node:Person")
.expected_recovery_parent_commit_id(parent_commit_id)
.follow_up_mutation(FollowUpMutation::new(
"main",
MUTATION_QUERIES,
"insert_person",
mixed_params(&[("$name", "AfterOverwriteLoad")], &[("$age", 42)]),
)),
],
},
)
.await
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
2,
"follow-up mutation must succeed after overwrite load recovery"
);
}
#[tokio::test]
async fn recovery_rolls_forward_ensure_indices_on_feature_branch() {
use lance::index::DatasetIndexExt;
@ -1132,7 +1202,6 @@ async fn refresh_runs_roll_forward_recovery_in_process() {
#[tokio::test]
async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
@ -1162,12 +1231,8 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
// touching the manifest) so the classifier can reach UnexpectedAtP1
// / UnexpectedMultistep / RolledPastExpected paths that require
// a real restore on rollback.
let store = TableStore::new(&uri);
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
helpers::lance_delete_inline(&mut ds, "1 = 2").await;
let head_after_drift = ds.version().version;
assert_eq!(head_after_drift, manifest_pin + 1);
@ -1697,8 +1762,9 @@ async fn optimize_phase_b_failure_recovered_on_next_open() {
ScopedFailPoint::new("optimize.post_phase_b_pre_manifest_commit", "return");
let err = db.optimize().await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: optimize.post_phase_b_pre_manifest_commit"),
err.to_string().contains(
"injected failpoint triggered: optimize.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);

View file

@ -29,15 +29,21 @@
//! the cross-table manifest commit. Documented exception.
//! - `crates/omnigraph/src/storage_layer.rs` — IS the trait module.
//!
//! ## Transitional allow-list
//! ## Allow-list shape
//!
//! The migration of writers onto staged primitives is incremental.
//! Several writers (ensure_indices, branch_merge, schema_apply rewrites)
//! already route through the staged primitives; others (bulk loader,
//! exec/mutation, exec/query) still use the legacy inherent
//! `TableStore` methods — they're not visible at the trait boundary, but
//! they DO call lance types. The file-level allow-list below reflects
//! this transitional state and tightens as call sites migrate.
//! After MR-854, `db.storage()` (`&dyn TableStorage`) exposes only staged
//! primitives + reads. The inline-commit writes live on a separate
//! `InlineCommitResidual` trait reached via
//! `Omnigraph::storage_inline_residual()`, so the default storage surface
//! cannot couple "write bytes" with "advance HEAD" — engine code that
//! wants an inline residual must name the residual accessor explicitly.
//! The only residuals are `delete_where` (Lance #6658 / v7.x) and
//! `create_vector_index` (Lance #6666). The dead legacy methods
//! (trait `append_batch` / `merge_insert_batches`, inherent
//! `merge_insert_batch{,es}`, `create_{btree,inverted}_index`) were
//! removed entirely. This guard's scope is unchanged: it catches direct
//! `lance::*` inline-commit misuse outside the storage layer. The
//! file-level allow-list below matches that boundary.
use std::path::{Path, PathBuf};

View file

@ -195,6 +195,14 @@ pub async fn diff_since_branch(
.await
}
/// Advance a Lance dataset HEAD directly from tests without going through
/// OmniGraph's storage residual surface. Used to synthesize uncovered drift.
pub async fn lance_delete_inline(ds: &mut lance::Dataset, filter: &str) -> usize {
let result = ds.delete(filter).await.unwrap();
*ds = (*result.new_dataset).clone();
result.num_deleted_rows as usize
}
/// Build a ParamMap from string key-value pairs.
pub fn params(pairs: &[(&str, &str)]) -> ParamMap {
pairs
@ -258,6 +266,27 @@ pub fn vector_and_string_params(
map
}
/// Test-only helper: perform a raw `Dataset::append` against Lance,
/// advancing Lance HEAD without going through the manifest. Used by
/// `recovery::*` and `staged_writes::*` tests that deliberately set up
/// HEAD-ahead-of-manifest drift scenarios.
///
/// This mirrors the body of the engine's inline-commit
/// `TableStore::append_batch` (which is `pub(crate)` after MR-854) —
/// kept here as a test helper because integration tests need to
/// simulate drift without depending on the demoted crate-internal API.
pub async fn lance_append_inline(ds: &mut lance::Dataset, batch: RecordBatch) {
use lance::dataset::{WriteMode, WriteParams};
let schema = batch.schema();
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode: WriteMode::Append,
allow_external_blob_outside_bases: true,
..Default::default()
};
ds.append(reader, Some(params)).await.unwrap();
}
pub fn s3_test_graph_uri(suite: &str) -> Option<String> {
let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?;
let prefix = std::env::var("OMNIGRAPH_S3_TEST_PREFIX")

View file

@ -175,7 +175,6 @@ async fn read_only_open_skips_recovery_sweep() {
#[tokio::test]
async fn recovery_rolls_back_synthetic_drift_on_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -202,13 +201,9 @@ async fn recovery_rolls_back_synthetic_drift_on_open() {
// residual the sweep recovers from is the manifest-vs-Lance-HEAD gap;
// it's agnostic to *what* op caused the gap.
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let head_before_drift = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let _ = helpers::lance_delete_inline(&mut ds, "1 = 2").await;
let head_after_drift = ds.version().version;
assert_eq!(
head_after_drift,
@ -290,7 +285,6 @@ async fn recovery_rolls_back_synthetic_drift_on_open() {
async fn recovery_rollback_converges_manifest_so_schema_apply_succeeds() {
use omnigraph::db::ReadTarget;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -310,13 +304,9 @@ async fn recovery_rollback_converges_manifest_so_schema_apply_succeeds() {
// Forge a Phase-B residual: advance Person's Lance HEAD without publishing to
// the manifest (the manifest pin stays at the load's committed version).
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let manifest_pin = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let _ = helpers::lance_delete_inline(&mut ds, "1 = 2").await;
drop(ds);
// Roll-back-classified sidecar (post_commit_pin != observed head ⇒
@ -518,7 +508,6 @@ async fn count_recovery_actor_commits(graph_root: &Path) -> usize {
#[tokio::test]
async fn recovery_rolls_forward_after_phase_b_completes() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -535,16 +524,12 @@ async fn recovery_rolls_forward_after_phase_b_completes() {
drop(db);
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let head_before = ds.version().version;
// Synthesize a successful Phase B: advance Lance HEAD by one
// (delete_where with no-match — no fragment changes, but version bumps).
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let _ = helpers::lance_delete_inline(&mut ds, "1 = 2").await;
let head_after = ds.version().version;
assert_eq!(head_after, head_before + 1);
@ -728,7 +713,6 @@ async fn recovery_records_rolled_forward_for_stale_sidecar_after_successful_roll
#[tokio::test]
async fn recovery_rolls_back_records_audit_row_with_recovery_actor() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -742,13 +726,9 @@ async fn recovery_rolls_back_records_audit_row_with_recovery_actor() {
drop(db);
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let head_before = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let _ = helpers::lance_delete_inline(&mut ds, "1 = 2").await;
let head_after = ds.version().version;
let _ = head_after;
@ -795,7 +775,6 @@ async fn recovery_rolls_back_records_audit_row_with_recovery_actor() {
#[tokio::test]
async fn recovery_rolls_forward_with_null_actor() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -809,13 +788,9 @@ async fn recovery_rolls_forward_with_null_actor() {
drop(db);
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let head_before = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let _ = helpers::lance_delete_inline(&mut ds, "1 = 2").await;
let head_after = ds.version().version;
// Sidecar with no actor_id (CLI-driven mutation; common case).
@ -871,7 +846,6 @@ async fn recovery_rolls_forward_with_null_actor() {
#[tokio::test]
async fn recovery_processes_multiple_sidecars_with_fresh_snapshot_per_iter() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -889,21 +863,14 @@ async fn recovery_processes_multiple_sidecars_with_fresh_snapshot_per_iter() {
// Synthesize drift on both tables independently.
let person_uri = node_table_uri(uri, "Person");
let company_uri = node_table_uri(uri, "Company");
let store = TableStore::new(uri);
let mut person_ds = Dataset::open(&person_uri).await.unwrap();
let person_pre = person_ds.version().version;
let _ = store
.delete_where(&person_uri, &mut person_ds, "1 = 2")
.await
.unwrap();
let _ = helpers::lance_delete_inline(&mut person_ds, "1 = 2").await;
let person_post = person_ds.version().version;
let mut company_ds = Dataset::open(&company_uri).await.unwrap();
let company_pre = company_ds.version().version;
let _ = store
.delete_where(&company_uri, &mut company_ds, "1 = 2")
.await
.unwrap();
let _ = helpers::lance_delete_inline(&mut company_ds, "1 = 2").await;
let company_post = company_ds.version().version;
// Drop two sidecars; ULID prefix ensures sort order is A then B.
@ -1083,7 +1050,6 @@ async fn recovery_ensure_indices_handles_empty_tables() {
#[tokio::test]
async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -1102,7 +1068,6 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() {
drop(db);
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let v1 = ds.version().version;
@ -1116,23 +1081,9 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() {
// Bypassing __manifest is what `delete_where` and `append_batch`
// both do (direct on Lance); using append_batch (instead of no-op
// deletes) is what makes the fragment-set differ across versions.
store
.append_batch(
&person_uri,
&mut ds,
person_batch(&[("bob-id", "bob", Some(25))]),
)
.await
.unwrap();
helpers::lance_append_inline(&mut ds, person_batch(&[("bob-id", "bob", Some(25))])).await;
let v2 = ds.version().version;
store
.append_batch(
&person_uri,
&mut ds,
person_batch(&[("carol-id", "carol", Some(40))]),
)
.await
.unwrap();
helpers::lance_append_inline(&mut ds, person_batch(&[("carol-id", "carol", Some(40))])).await;
let v3 = ds.version().version;
assert_eq!(v2, v1 + 1);
assert_eq!(v3, v2 + 1);
@ -1297,14 +1248,7 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() {
.open_dataset_head(&person_uri, feature_branch_name.as_deref())
.await
.unwrap();
store
.append_batch(
&person_uri,
&mut ds,
person_batch(&[("carol-id", "carol", Some(40))]),
)
.await
.unwrap();
helpers::lance_append_inline(&mut ds, person_batch(&[("carol-id", "carol", Some(40))])).await;
let v_head = ds.version().version;
assert_eq!(v_head, v_pin + 1, "append must advance HEAD by 1");
@ -1419,14 +1363,7 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() {
.open_dataset_head(&person_uri, feature_branch_name.as_deref())
.await
.unwrap();
store
.append_batch(
&person_uri,
&mut ds,
person_batch(&[("dave-id", "dave", Some(50))]),
)
.await
.unwrap();
helpers::lance_append_inline(&mut ds, person_batch(&[("dave-id", "dave", Some(50))])).await;
let v_head = ds.version().version;
assert_eq!(v_head, v_pin + 1);

View file

@ -23,6 +23,9 @@ use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::{WhenMatched, WhenNotMatched};
use lance::index::DatasetIndexExt;
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
use lance_table::format::Fragment;
use omnigraph::table_store::{StagedWrite, TableStore};
use std::sync::Arc;
@ -34,6 +37,22 @@ fn person_schema() -> Arc<Schema> {
]))
}
/// Test-only helper: raw `Dataset::append` to advance Lance HEAD without
/// going through the manifest. Mirrors `TableStore::append_batch`'s body
/// (which is `pub(crate)` after MR-854) — kept local so these
/// drift-simulation tests don't depend on the demoted crate-internal API.
async fn lance_append_inline_local(ds: &mut Dataset, batch: RecordBatch) {
use lance::dataset::{WriteMode, WriteParams};
let schema = batch.schema();
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode: WriteMode::Append,
allow_external_blob_outside_bases: true,
..Default::default()
};
ds.append(reader, Some(params)).await.unwrap();
}
fn person_batch(rows: &[(&str, Option<i32>)]) -> RecordBatch {
let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, age)| *age).collect();
@ -351,7 +370,7 @@ async fn stage_merge_insert_then_commit_persists_merged_view() {
/// `write_fragments_internal` lack per-column statistics. The result
/// contains only matching committed rows; matching staged rows are
/// silently absent. `scanner.use_stats(false)` does not bypass this in
/// lance 4.0.0.
/// lance 6.0.1.
///
/// This test pins the actual behavior so a future change either
/// preserves it (and updates the doc) or fixes it (and rewrites this
@ -616,6 +635,58 @@ async fn stage_overwrite_replaces_all_fragments() {
);
}
#[tokio::test]
async fn stage_overwrite_empty_batch_replaces_all_rows() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let pre_version = ds.version().version;
let target_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
Field::new("nickname", DataType::Utf8, true),
]));
let staged = store
.stage_overwrite(&ds, RecordBatch::new_empty(target_schema.clone()))
.await
.unwrap();
assert!(
staged.new_fragments.is_empty(),
"empty overwrite should produce a zero-fragment Lance Overwrite transaction"
);
assert_eq!(
staged.removed_fragment_ids.len(),
ds.manifest.fragments.len(),
"empty overwrite still removes every committed fragment"
);
assert_eq!(
ds.version().version,
pre_version,
"staging empty overwrite must not advance HEAD"
);
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert_eq!(new_ds.version().version, pre_version + 1);
assert_eq!(new_ds.count_rows(None).await.unwrap(), 0);
assert!(
arrow_schema::Schema::from(new_ds.schema())
.field_with_name("nickname")
.is_ok(),
"empty overwrite must commit the replacement batch schema"
);
}
/// `stage_create_btree_index` writes index segments to object storage
/// but does NOT advance Lance HEAD until `commit_staged`. After commit,
/// the index is queryable.
@ -699,7 +770,7 @@ async fn stage_create_inverted_index_does_not_advance_head_until_commit() {
);
}
/// Pin the inline-commit behavior of `delete_where`. Lance 4.0.0 does
/// Pin the inline-commit behavior of `delete_where`. Lance 6.0.1 does
/// NOT expose a public `DeleteJob::execute_uncommitted`
/// (`pub(crate)` — see lance-format/lance#6658). The trait deliberately
/// does NOT introduce a `stage_delete` wrapper that would secretly
@ -714,7 +785,6 @@ async fn stage_create_inverted_index_does_not_advance_head_until_commit() {
async fn delete_where_advances_head_inline_documents_residual() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let mut ds = TableStore::write_dataset(
&uri,
@ -724,13 +794,11 @@ async fn delete_where_advances_head_inline_documents_residual() {
.unwrap();
let pre_version = ds.version().version;
let result = store
.delete_where(&uri, &mut ds, "id = 'alice'")
.await
.unwrap();
assert_eq!(result.deleted_rows, 1);
let result = ds.delete("id = 'alice'").await.unwrap();
ds = (*result.new_dataset).clone();
assert_eq!(result.num_deleted_rows, 1);
assert!(
result.version > pre_version,
ds.version().version > pre_version,
"delete_where ADVANCES Lance HEAD inline (the residual). When \
lance-format/lance#6658 ships and we migrate to stage_delete + \
commit_staged, flip this assertion to assert that staging does \
@ -739,9 +807,9 @@ async fn delete_where_advances_head_inline_documents_residual() {
}
/// Companion to `delete_where_*`: pin the inline-commit behavior of
/// `create_vector_index`. Lance 4.0.0 vector indices take the
/// `create_vector_index`. Lance 6.0.1 vector indices take the
/// "segment commit path" which calls `build_index_metadata_from_segments`
/// (`pub(crate)` in lance-4.0.0 `src/index.rs:111`). Until upstream
/// (`pub(crate)` in lance-6.0.1 `src/index.rs:111`). Until upstream
/// exposes that helper (companion ticket to lance-format/lance#6658),
/// the trait surface deliberately does NOT include
/// `stage_create_vector_index` — same rationale as `stage_delete`'s
@ -780,8 +848,9 @@ async fn create_vector_index_advances_head_inline_documents_residual() {
let pre_version = ds.version().version;
assert!(!store.has_vector_index(&ds, "embedding").await.unwrap());
store
.create_vector_index(&mut ds, "embedding")
let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
ds.create_index_builder(&["embedding"], IndexType::Vector, &params)
.replace(true)
.await
.unwrap();
assert!(
@ -804,7 +873,7 @@ async fn create_vector_index_advances_head_inline_documents_residual() {
/// The Lance source confirms this — `restore()` (no args) takes the
/// currently-checked-out version's content and applies it via
/// `apply_commit` against the latest manifest, advancing HEAD by one.
/// See lance-4.0.0 `src/dataset.rs:1106` and the transaction-spec
/// See lance-6.0.1 `src/dataset.rs:1106` and the transaction-spec
/// example at https://lance.org/format/table/transaction/.
///
/// If the lance bump (4.0.0 → 4.x) ever changes this delta or the call
@ -815,7 +884,6 @@ async fn create_vector_index_advances_head_inline_documents_residual() {
async fn lance_restore_appends_one_commit_with_checked_out_content() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Build version history: v1 = {alice}, v2 = {alice, bob}, v3 = {alice, bob, carol}.
let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
@ -823,16 +891,10 @@ async fn lance_restore_appends_one_commit_with_checked_out_content() {
.unwrap();
assert_eq!(ds.version().version, 1);
store
.append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
lance_append_inline_local(&mut ds, person_batch(&[("bob", Some(25))])).await;
assert_eq!(ds.version().version, 2);
store
.append_batch(&uri, &mut ds, person_batch(&[("carol", Some(40))]))
.await
.unwrap();
lance_append_inline_local(&mut ds, person_batch(&[("carol", Some(40))])).await;
assert_eq!(ds.version().version, 3);
let head_before = ds.version().version;
@ -878,7 +940,7 @@ async fn lance_restore_appends_one_commit_with_checked_out_content() {
/// and any future continuous-recovery reconciler's queue-acquisition
/// requirement.
///
/// `Dataset::restore`'s `check_restore_txn` (lance-4.0.0
/// `Dataset::restore`'s `check_restore_txn` (lance-6.0.1
/// `src/io/commit/conflict_resolver.rs:986`) returns `Ok(())` against
/// almost every other op (Append, Update, Delete, CreateIndex, Merge, …),
/// so a Restore commits successfully even with concurrent commits in
@ -908,7 +970,6 @@ async fn lance_restore_appends_one_commit_with_checked_out_content() {
async fn lance_restore_loses_to_concurrent_append_via_orphaning() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// v1: seed with alice.
let _ = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
@ -925,10 +986,7 @@ async fn lance_restore_loses_to_concurrent_append_via_orphaning() {
// This simulates a per-table-queue model where another tenant wrote
// between recovery's open and recovery's restore call.
let mut writer_handle = Dataset::open(&uri).await.unwrap();
store
.append_batch(&uri, &mut writer_handle, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
lance_append_inline_local(&mut writer_handle, person_batch(&[("bob", Some(25))])).await;
assert_eq!(writer_handle.version().version, 2);
// Recovery now restores. Because restore's `check_restore_txn` returns

View file

@ -778,6 +778,47 @@ async fn load_with_bad_edge_reference_unblocks_next_load() {
);
}
#[tokio::test]
async fn load_overwrite_with_bad_edge_reference_unblocks_next_load() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
let pre_persons = count_rows(&db, "node:Person").await;
let pre_edges = count_rows(&db, "edge:Knows").await;
let bad = r#"{"type": "Person", "data": {"name": "Mallory", "age": 5}}
{"edge": "Knows", "from": "Mallory", "to": "Ghost"}
"#;
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
.await
.expect_err("RI violation must fail overwrite before commit_staged");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("not found"),
"unexpected error: {}",
manifest_err.message,
);
assert_eq!(count_rows(&db, "node:Person").await, pre_persons);
assert_eq!(count_rows(&db, "edge:Knows").await, pre_edges);
let good = r#"{"type": "Person", "data": {"name": "Pat", "age": 55}}
{"type": "Person", "data": {"name": "Quinn", "age": 56}}
{"edge": "Knows", "from": "Pat", "to": "Quinn"}
"#;
load_jsonl(&mut db, good, LoadMode::Overwrite)
.await
.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, 2);
assert_eq!(count_rows(&db, "edge:Knows").await, 1);
}
/// Same shape as the RI test above, but driven by a cardinality
/// violation (`@card(0..1)` on `WorksAt`). The staged loader's pending
/// edge accumulator drives the cardinality scan; a violation aborts
@ -842,6 +883,56 @@ edge WorksAt: Person -> Company @card(0..1)
);
}
#[tokio::test]
async fn load_overwrite_with_cardinality_violation_unblocks_next_load() {
const CARD_SCHEMA: &str = r#"
node Person {
name: String @key
age: I32?
}
node Company {
name: String @key
}
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
let seed = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Bigco"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite)
.await
.unwrap();
let pre_works = count_rows(&db, "edge:WorksAt").await;
let bad = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
{"edge": "WorksAt", "from": "Alice", "to": "Bigco"}
"#;
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
.await
.expect_err("cardinality violation must fail overwrite before commit_staged");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("@card violation"),
"unexpected error: {}",
manifest_err.message,
);
assert_eq!(count_rows(&db, "edge:WorksAt").await, pre_works);
let good = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}"#;
load_jsonl(&mut db, good, LoadMode::Overwrite)
.await
.unwrap();
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
}
// ─── Chained-mutation correctness — pinned coverage ─────────────────────────
/// Chained `update` ops in one query must respect each previous op's