MR-854: remove dead inline-commit methods from the storage surface

The loader concurrent fast-path (write_batch_to_dataset) is only reached
for LoadMode::Overwrite — Append/Merge route through MutationStaging — so
its Append/Merge arms were unreachable. Collapse it to overwrite-only and
drop the now-unused mode params, which removes the only callers of:

- TableStorage::append_batch + TableStorage::merge_insert_batches (trait)
- TableStore::merge_insert_batch + merge_insert_batches (inherent)

create_btree_index / create_inverted_index had zero callers anywhere
(scalar index builds use the stage_* primitives). Remove both from the
trait and the inherent impl.

Inherent append_batch stays pub(crate): overwrite_batch and recovery
tests use it. Migrate the one trait-append_batch test caller
(seed_person_row) to stage_append + commit_staged. The merge_insert
FirstSeen-workaround rationale moves from the deleted merge_insert_batch
into stage_merge_insert (now the sole merge path). No behavior change.

Also corrects the inaccurate loader residual comment (the prior text
blamed Lance #6658/#6666, which are the delete and vector-index issues,
for keeping overwrite inline; a stage_overwrite primitive already exists
and schema_apply uses it).
This commit is contained in:
Ragnor Comerford 2026-06-08 14:14:46 +02:00
parent e050214f2f
commit b83233d960
No known key found for this signature in database
4 changed files with 52 additions and 317 deletions

View file

@ -2154,9 +2154,11 @@ edge WorksAt: Person -> Company
})
.collect();
let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap();
let (_new_ds, state) = db
let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap();
let committed = db.storage().commit_staged(ds, staged).await.unwrap();
let state = db
.storage()
.append_batch(&full_path, ds, batch)
.table_state(&full_path, &committed)
.await
.unwrap();
db.commit_updates(&[crate::db::SubTableUpdate {

View file

@ -435,7 +435,7 @@ async fn load_jsonl_reader<R: BufRead>(
}
} else {
let node_write_results =
write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
write_batches_concurrently(db, branch, prepared_nodes).await?;
for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
overwrite_updates.push(crate::db::SubTableUpdate {
table_key,
@ -545,7 +545,7 @@ async fn load_jsonl_reader<R: BufRead>(
}
} else {
let edge_write_results =
write_batches_concurrently(db, branch, mode, prepared_edges).await?;
write_batches_concurrently(db, branch, prepared_edges).await?;
for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
overwrite_updates.push(crate::db::SubTableUpdate {
table_key,
@ -1163,7 +1163,6 @@ fn load_write_concurrency() -> usize {
async fn write_batches_concurrently(
db: &Omnigraph,
branch: Option<&str>,
mode: LoadMode,
prepared: Vec<(String, String, RecordBatch, usize)>,
) -> Result<
Vec<(
@ -1185,7 +1184,7 @@ async fn write_batches_concurrently(
futures::stream::iter(prepared.into_iter().map(
|(type_name, table_key, batch, loaded_count)| async move {
let (state, table_branch) =
write_batch_to_dataset(db, branch, &table_key, batch, mode).await?;
write_batch_to_dataset(db, branch, &table_key, batch).await?;
Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch))
},
))
@ -1196,65 +1195,29 @@ async fn write_batches_concurrently(
.collect()
}
/// Bulk-overwrite fast-path: write one table's batch concurrently with the
/// other tables. Only `LoadMode::Overwrite` reaches this path — `Append` and
/// `Merge` route through the `MutationStaging` accumulator (the `use_staging`
/// branch in the loader). `overwrite_batch` inline-commits (advances Lance
/// HEAD); it is kept as the bulk fast-path because a fresh overwrite wipes the
/// prior data and has no partial-drift correctness concern (re-running the
/// overwrite recovers). A `stage_overwrite` primitive exists (schema_apply uses
/// it) and could replace this path with a staged + recovery-sidecar shape; that
/// migration is a tracked follow-up.
async fn write_batch_to_dataset(
db: &Omnigraph,
branch: Option<&str>,
table_key: &str,
batch: RecordBatch,
mode: LoadMode,
) -> Result<(crate::table_store::TableState, Option<String>)> {
let op_kind = match mode {
LoadMode::Append => crate::db::MutationOpKind::Insert,
LoadMode::Merge => crate::db::MutationOpKind::Merge,
LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
};
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, table_key, op_kind)
.open_for_mutation_on_branch(branch, table_key, crate::db::MutationOpKind::SchemaRewrite)
.await?;
match mode {
LoadMode::Overwrite => {
// Inline-commit residual: the Overwrite path here is the
// legacy concurrent fast-path used by Phase 2 of the loader
// (Append/Merge route through MutationStaging instead).
// `overwrite_batch` advances Lance HEAD as a side effect;
// there is no public two-phase overwrite that fits this
// shape until Lance issues #6658/#6666 close.
let (_new_ds, state) = db
.storage()
.overwrite_batch(&full_path, ds, batch)
.await?;
Ok((state, table_branch))
}
LoadMode::Append => {
// Same residual class as Overwrite above. The staged-write
// path is the `use_staging` branch in `load_with_actor`;
// this concurrent path is the per-table fast-path retained
// for parallelism. MR-793 Phase 9 will demote
// `append_batch` to `pub(crate)` once this last consumer
// moves to the staged primitive.
let (_new_ds, state) = db
.storage()
.append_batch(&full_path, ds, batch)
.await?;
Ok((state, table_branch))
}
LoadMode::Merge => {
// Same residual class as the other two arms.
let state = db
.storage()
.merge_insert_batches(
&full_path,
ds,
vec![batch],
vec!["id".to_string()],
lance::dataset::WhenMatched::UpdateAll,
lance::dataset::WhenNotMatched::InsertAll,
)
.await?;
Ok((state, table_branch))
}
}
let (_new_ds, state) = db
.storage()
.overwrite_batch(&full_path, ds, batch)
.await?;
Ok((state, table_branch))
}
fn generate_id() -> String {

View file

@ -382,23 +382,6 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
// Phase 9 once all engine sites route through the staged
// primitives.
async fn append_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)>;
async fn merge_insert_batches(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState>;
async fn overwrite_batch(
&self,
dataset_uri: &str,
@ -417,18 +400,6 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn create_btree_index(
&self,
snapshot: SnapshotHandle,
columns: &[&str],
) -> Result<SnapshotHandle>;
async fn create_inverted_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle>;
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
@ -736,39 +707,6 @@ impl TableStorage for TableStore {
.map(StagedHandle::new)
}
async fn append_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn merge_insert_batches(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState> {
let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
TableStore::merge_insert_batches(
self,
dataset_uri,
ds,
batches,
key_columns,
when_matched,
when_not_matched,
)
.await
}
async fn overwrite_batch(
&self,
dataset_uri: &str,
@ -803,26 +741,6 @@ impl TableStorage for TableStore {
TableStore::has_vector_index(self, snapshot.dataset(), column).await
}
async fn create_btree_index(
&self,
snapshot: SnapshotHandle,
columns: &[&str],
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
TableStore::create_btree_index(self, &mut ds, columns).await?;
Ok(SnapshotHandle::new(ds))
}
async fn create_inverted_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
TableStore::create_inverted_index(self, &mut ds, column).await?;
Ok(SnapshotHandle::new(ds))
}
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,

View file

@ -2,7 +2,6 @@ use arrow_array::{
Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
};
use arrow_schema::SchemaRef;
use arrow_select::concat::concat_batches;
use futures::TryStreamExt;
use lance::Dataset;
use lance::blob::BlobArrayBuilder;
@ -602,14 +601,14 @@ impl TableStore {
}
/// Legacy inline-commit append: writes fragments AND commits in one
/// call, advancing Lance HEAD as a side effect. Demoted to
/// `pub(crate)` by MR-793 Phase 9 — the staged primitive
/// `stage_append` + `commit_staged` is the public engine surface;
/// this one survives only as a residual called by
/// `loader::write_batch_to_dataset` (LoadMode::Append concurrent
/// fast-path) and the deprecated `merge_insert_batch` chain. Do not
/// add new call sites — they re-introduce the multi-phase commit
/// drift the trait surface was designed to eliminate.
/// call, advancing Lance HEAD as a side effect. Not on the
/// `TableStorage` trait surface — the staged primitive `stage_append`
/// + `commit_staged` is the engine write path. This inherent
/// `pub(crate)` method survives only as the body of `overwrite_batch`
/// (the loader's `LoadMode::Overwrite` bulk fast-path) and recovery
/// test setup. Do not add new call sites — they re-introduce the
/// multi-phase commit drift the trait surface was designed to
/// eliminate.
pub(crate) async fn append_batch(
&self,
dataset_uri: &str,
@ -698,126 +697,6 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Legacy inline-commit merge-insert (single batch). Demoted to
/// `pub(crate)` by MR-793 Phase 9 — the staged primitive
/// `stage_merge_insert` + `commit_staged` is the public engine
/// surface; this one survives only as the body of
/// `merge_insert_batches` (which is itself the loader's
/// LoadMode::Merge concurrent fast-path). Do not add new call
/// sites.
pub(crate) async fn merge_insert_batch(
&self,
dataset_uri: &str,
ds: Dataset,
batch: RecordBatch,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState> {
if batch.num_rows() == 0 {
return self.table_state(dataset_uri, &ds).await;
}
// Precondition for the FirstSeen workaround below: every caller of
// this primitive must hand in a source batch that is unique by
// `key_columns`. Without this check, `SourceDedupeBehavior::FirstSeen`
// would silently collapse genuine duplicates instead of erroring.
check_batch_unique_by_keys(&batch, &key_columns, "merge_insert_batch")?;
// TODO(lance-upstream): MergeInsertBuilder does not accept WriteParams,
// so allow_external_blob_outside_bases cannot be set here. External URI
// blobs via merge_insert (LoadMode::Merge, mutations) are unsupported
// until Lance exposes WriteParams on MergeInsertBuilder.
let ds = Arc::new(ds);
let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
.map_err(|e| OmniError::Lance(e.to_string()))?;
builder.when_matched(when_matched);
builder.when_not_matched(when_not_matched);
// Workaround for a Lance 4.0.x bug class where sequential
// merge_insert calls against rows previously rewritten by
// merge_insert produce a spurious "Ambiguous merge inserts:
// multiple source rows match the same target row on (id = ...)"
// error. Lance's `processed_row_ids: Mutex<HashSet<u64>>`
// (lance-6.0.1 `src/dataset/write/merge_insert.rs:2099`)
// double-processes the same source/target match against
// datasets previously rewritten by merge_insert, and the default
// `SourceDedupeBehavior::Fail` errors on the second insertion.
// `FirstSeen` makes Lance skip the duplicate match instead.
//
// Covers both observed surfaces:
// - PR #98 (sequential `load --mode merge` against same keys).
// - MR-920 (sequential `update T set {f} where x=y` on same row).
//
// Correctness-preserving for OmniGraph because every call path
// that reaches this primitive either pre-dedupes the source batch
// by id, or surfaces a real source dup via the
// `check_batch_unique_by_keys` precondition above (which fires
// before the FirstSeen setter has a chance to silently collapse
// anything):
// - Load path: `enforce_unique_constraints_intra_batch`
// (`loader/mod.rs:1453`) errors on intra-batch `@key` dups.
// - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`)
// accumulates and dedupes by `id`.
// - Branch-merge path: `compute_source_delta` /
// `compute_three_way_delta` (`exec/merge.rs`) walk via
// `OrderedTableCursor` and `push_row` each id at most once.
// So FirstSeen only suppresses the spurious Lance behavior, never
// user data. Pinned by `loader_rejects_intra_batch_duplicate_keys`
// in `tests/consistency.rs` plus the
// `check_batch_unique_by_keys` precondition.
//
// Retire when upstream Lance fixes the bug class. Tracked at
// MR-957; upstream: lance-format/lance#6877.
builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
let job = builder
.try_build()
.map_err(|e| OmniError::Lance(e.to_string()))?;
let schema = batch.schema();
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
let (new_ds, _stats) = job
.execute(lance_datafusion::utils::reader_to_stream(Box::new(reader)))
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
self.table_state(dataset_uri, &new_ds).await
}
/// Legacy inline-commit merge-insert (multiple batches concatenated
/// into one merge). Demoted to `pub(crate)` by MR-793 Phase 9 — the
/// staged primitive `stage_merge_insert` + `commit_staged` is the
/// public engine surface; this one survives only via the
/// `TableStorage::merge_insert_batches` trait method, called by
/// `loader::write_batch_to_dataset` (LoadMode::Merge concurrent
/// fast-path). Do not add new call sites.
pub(crate) async fn merge_insert_batches(
&self,
dataset_uri: &str,
ds: Dataset,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState> {
if batches.is_empty() {
return self.table_state(dataset_uri, &ds).await;
}
let batch = if batches.len() == 1 {
batches.into_iter().next().unwrap()
} else {
let schema = batches[0].schema();
concat_batches(&schema, &batches).map_err(|e| OmniError::Lance(e.to_string()))?
};
self.merge_insert_batch(
dataset_uri,
ds,
batch,
key_columns,
when_matched,
when_not_matched,
)
.await
}
pub async fn delete_where(
&self,
dataset_uri: &str,
@ -988,11 +867,12 @@ impl TableStore {
));
}
// Precondition for FirstSeen below. See the comment on
// `merge_insert_batch` for why this check is here, not on the caller:
// every call path that reaches stage_merge_insert (load,
// MutationStaging::finalize, branch_merge::publish_rewritten_merge_table)
// must hand in a source batch that is unique by `key_columns`.
// Precondition for the FirstSeen workaround below: every call path that
// reaches stage_merge_insert (load, MutationStaging::finalize,
// branch_merge::publish_rewritten_merge_table) must hand in a source
// batch that is unique by `key_columns`. Without this check,
// `SourceDedupeBehavior::FirstSeen` would silently collapse genuine
// duplicates instead of erroring.
check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?;
let ds = Arc::new(ds);
@ -1000,11 +880,21 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))?;
builder.when_matched(when_matched);
builder.when_not_matched(when_not_matched);
// See `merge_insert_batch` for the FirstSeen rationale. Workaround
// for the Lance 4.0.x bug class where sequential merge_insert /
// update against rows previously rewritten by merge_insert trips
// Lance's `processed_row_ids` HashSet and errors under the default
// `SourceDedupeBehavior::Fail`. Retire when upstream Lance is fixed.
// Workaround for a Lance bug class where sequential merge_insert calls
// against rows previously rewritten by merge_insert produce a spurious
// "Ambiguous merge inserts: multiple source rows match the same target
// row on (id = ...)" error. Lance's `processed_row_ids:
// Mutex<HashSet<u64>>` (lance-6.0.1 `src/dataset/write/merge_insert.rs`)
// double-processes the same source/target match against datasets
// previously rewritten by merge_insert, and the default
// `SourceDedupeBehavior::Fail` errors on the second insertion; FirstSeen
// makes Lance skip the duplicate match instead. Correctness-preserving
// because every call path pre-dedupes the source batch by id or surfaces
// a real source dup via `check_batch_unique_by_keys` above (load:
// `enforce_unique_constraints_intra_batch`; mutate:
// `MutationStaging::finalize`; branch-merge: the `OrderedTableCursor`
// walk in `exec/merge.rs`). Retire when upstream Lance fixes the bug
// class. Tracked at MR-957; upstream: lance-format/lance#6877.
builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
let job = builder
.try_build()
@ -1478,44 +1368,6 @@ impl TableStore {
}))
}
/// Legacy inline-commit BTREE scalar index build. Demoted to
/// `pub(crate)` by MR-793 Phase 9 — the staged primitive
/// `stage_create_btree_index` + `commit_staged` is the public engine
/// surface; this one survives only as the body of the trait's
/// inline-commit method (used by no engine call site today). Do not
/// add new call sites.
pub(crate) async fn create_btree_index(
&self,
ds: &mut Dataset,
columns: &[&str],
) -> Result<()> {
let params = ScalarIndexParams::default();
ds.create_index_builder(columns, IndexType::BTree, &params)
.replace(true)
.await
.map(|_| ())
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Legacy inline-commit INVERTED (FTS) scalar index build. Demoted
/// to `pub(crate)` by MR-793 Phase 9 — the staged primitive
/// `stage_create_inverted_index` + `commit_staged` is the public
/// engine surface; this one survives only as the body of the
/// trait's inline-commit method (used by no engine call site today).
/// Do not add new call sites.
pub(crate) async fn create_inverted_index(
&self,
ds: &mut Dataset,
column: &str,
) -> Result<()> {
let params = InvertedIndexParams::default();
ds.create_index_builder(&[column], IndexType::Inverted, &params)
.replace(true)
.await
.map(|_| ())
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> {
let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2);
ds.create_index_builder(&[column], IndexType::Vector, &params)
@ -1804,7 +1656,7 @@ fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fr
combined
}
/// Precondition guard for `merge_insert_batch` and `stage_merge_insert`.
/// Precondition guard for `stage_merge_insert`.
/// Both opt into `SourceDedupeBehavior::FirstSeen` to suppress the Lance
/// `processed_row_ids` bug (MR-957). FirstSeen would *also* silently
/// collapse genuine duplicate source keys; this check restores fail-fast