diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 8d23ebb..4051b4d 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -2154,9 +2154,11 @@ edge WorksAt: Person -> Company }) .collect(); let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); - let (_new_ds, state) = db + let staged = db.storage().stage_append(&ds, batch, &[]).await.unwrap(); + let committed = db.storage().commit_staged(ds, staged).await.unwrap(); + let state = db .storage() - .append_batch(&full_path, ds, batch) + .table_state(&full_path, &committed) .await .unwrap(); db.commit_updates(&[crate::db::SubTableUpdate { diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 9d7329d..7463ac9 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -435,7 +435,7 @@ async fn load_jsonl_reader( } } else { let node_write_results = - write_batches_concurrently(db, branch, mode, prepared_nodes).await?; + write_batches_concurrently(db, branch, prepared_nodes).await?; for (type_name, table_key, loaded_count, state, table_branch) in node_write_results { overwrite_updates.push(crate::db::SubTableUpdate { table_key, @@ -545,7 +545,7 @@ async fn load_jsonl_reader( } } else { let edge_write_results = - write_batches_concurrently(db, branch, mode, prepared_edges).await?; + write_batches_concurrently(db, branch, prepared_edges).await?; for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results { overwrite_updates.push(crate::db::SubTableUpdate { table_key, @@ -1163,7 +1163,6 @@ fn load_write_concurrency() -> usize { async fn write_batches_concurrently( db: &Omnigraph, branch: Option<&str>, - mode: LoadMode, prepared: Vec<(String, String, RecordBatch, usize)>, ) -> Result< Vec<( @@ -1185,7 +1184,7 @@ async fn write_batches_concurrently( futures::stream::iter(prepared.into_iter().map( |(type_name, table_key, batch, loaded_count)| async move { let (state, table_branch) = - write_batch_to_dataset(db, branch, &table_key, batch, mode).await?; + write_batch_to_dataset(db, branch, &table_key, batch).await?; Ok::<_, OmniError>((type_name, table_key, loaded_count, state, table_branch)) }, )) @@ -1196,65 +1195,29 @@ async fn write_batches_concurrently( .collect() } +/// Bulk-overwrite fast-path: write one table's batch concurrently with the +/// other tables. Only `LoadMode::Overwrite` reaches this path — `Append` and +/// `Merge` route through the `MutationStaging` accumulator (the `use_staging` +/// branch in the loader). `overwrite_batch` inline-commits (advances Lance +/// HEAD); it is kept as the bulk fast-path because a fresh overwrite wipes the +/// prior data and has no partial-drift correctness concern (re-running the +/// overwrite recovers). A `stage_overwrite` primitive exists (schema_apply uses +/// it) and could replace this path with a staged + recovery-sidecar shape; that +/// migration is a tracked follow-up. async fn write_batch_to_dataset( db: &Omnigraph, branch: Option<&str>, table_key: &str, batch: RecordBatch, - mode: LoadMode, ) -> Result<(crate::table_store::TableState, Option)> { - let op_kind = match mode { - LoadMode::Append => crate::db::MutationOpKind::Insert, - LoadMode::Merge => crate::db::MutationOpKind::Merge, - LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite, - }; let (ds, full_path, table_branch) = db - .open_for_mutation_on_branch(branch, table_key, op_kind) + .open_for_mutation_on_branch(branch, table_key, crate::db::MutationOpKind::SchemaRewrite) .await?; - - match mode { - LoadMode::Overwrite => { - // Inline-commit residual: the Overwrite path here is the - // legacy concurrent fast-path used by Phase 2 of the loader - // (Append/Merge route through MutationStaging instead). - // `overwrite_batch` advances Lance HEAD as a side effect; - // there is no public two-phase overwrite that fits this - // shape until Lance issues #6658/#6666 close. - let (_new_ds, state) = db - .storage() - .overwrite_batch(&full_path, ds, batch) - .await?; - Ok((state, table_branch)) - } - LoadMode::Append => { - // Same residual class as Overwrite above. The staged-write - // path is the `use_staging` branch in `load_with_actor`; - // this concurrent path is the per-table fast-path retained - // for parallelism. MR-793 Phase 9 will demote - // `append_batch` to `pub(crate)` once this last consumer - // moves to the staged primitive. - let (_new_ds, state) = db - .storage() - .append_batch(&full_path, ds, batch) - .await?; - Ok((state, table_branch)) - } - LoadMode::Merge => { - // Same residual class as the other two arms. - let state = db - .storage() - .merge_insert_batches( - &full_path, - ds, - vec![batch], - vec!["id".to_string()], - lance::dataset::WhenMatched::UpdateAll, - lance::dataset::WhenNotMatched::InsertAll, - ) - .await?; - Ok((state, table_branch)) - } - } + let (_new_ds, state) = db + .storage() + .overwrite_batch(&full_path, ds, batch) + .await?; + Ok((state, table_branch)) } fn generate_id() -> String { diff --git a/crates/omnigraph/src/storage_layer.rs b/crates/omnigraph/src/storage_layer.rs index 484d9df..625771d 100644 --- a/crates/omnigraph/src/storage_layer.rs +++ b/crates/omnigraph/src/storage_layer.rs @@ -382,23 +382,6 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { // Phase 9 once all engine sites route through the staged // primitives. - async fn append_batch( - &self, - dataset_uri: &str, - snapshot: SnapshotHandle, - batch: RecordBatch, - ) -> Result<(SnapshotHandle, TableState)>; - - async fn merge_insert_batches( - &self, - dataset_uri: &str, - snapshot: SnapshotHandle, - batches: Vec, - key_columns: Vec, - when_matched: WhenMatched, - when_not_matched: WhenNotMatched, - ) -> Result; - async fn overwrite_batch( &self, dataset_uri: &str, @@ -417,18 +400,6 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; - async fn create_btree_index( - &self, - snapshot: SnapshotHandle, - columns: &[&str], - ) -> Result; - - async fn create_inverted_index( - &self, - snapshot: SnapshotHandle, - column: &str, - ) -> Result; - async fn create_vector_index( &self, snapshot: SnapshotHandle, @@ -736,39 +707,6 @@ impl TableStorage for TableStore { .map(StagedHandle::new) } - async fn append_batch( - &self, - dataset_uri: &str, - snapshot: SnapshotHandle, - batch: RecordBatch, - ) -> Result<(SnapshotHandle, TableState)> { - let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); - let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?; - Ok((SnapshotHandle::new(ds), state)) - } - - async fn merge_insert_batches( - &self, - dataset_uri: &str, - snapshot: SnapshotHandle, - batches: Vec, - key_columns: Vec, - when_matched: WhenMatched, - when_not_matched: WhenNotMatched, - ) -> Result { - let ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); - TableStore::merge_insert_batches( - self, - dataset_uri, - ds, - batches, - key_columns, - when_matched, - when_not_matched, - ) - .await - } - async fn overwrite_batch( &self, dataset_uri: &str, @@ -803,26 +741,6 @@ impl TableStorage for TableStore { TableStore::has_vector_index(self, snapshot.dataset(), column).await } - async fn create_btree_index( - &self, - snapshot: SnapshotHandle, - columns: &[&str], - ) -> Result { - let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); - TableStore::create_btree_index(self, &mut ds, columns).await?; - Ok(SnapshotHandle::new(ds)) - } - - async fn create_inverted_index( - &self, - snapshot: SnapshotHandle, - column: &str, - ) -> Result { - let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); - TableStore::create_inverted_index(self, &mut ds, column).await?; - Ok(SnapshotHandle::new(ds)) - } - async fn create_vector_index( &self, snapshot: SnapshotHandle, diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 3957c3f..0b79766 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -2,7 +2,6 @@ use arrow_array::{ Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array, }; use arrow_schema::SchemaRef; -use arrow_select::concat::concat_batches; use futures::TryStreamExt; use lance::Dataset; use lance::blob::BlobArrayBuilder; @@ -602,14 +601,14 @@ impl TableStore { } /// Legacy inline-commit append: writes fragments AND commits in one - /// call, advancing Lance HEAD as a side effect. Demoted to - /// `pub(crate)` by MR-793 Phase 9 — the staged primitive - /// `stage_append` + `commit_staged` is the public engine surface; - /// this one survives only as a residual called by - /// `loader::write_batch_to_dataset` (LoadMode::Append concurrent - /// fast-path) and the deprecated `merge_insert_batch` chain. Do not - /// add new call sites — they re-introduce the multi-phase commit - /// drift the trait surface was designed to eliminate. + /// call, advancing Lance HEAD as a side effect. Not on the + /// `TableStorage` trait surface — the staged primitive `stage_append` + /// + `commit_staged` is the engine write path. This inherent + /// `pub(crate)` method survives only as the body of `overwrite_batch` + /// (the loader's `LoadMode::Overwrite` bulk fast-path) and recovery + /// test setup. Do not add new call sites — they re-introduce the + /// multi-phase commit drift the trait surface was designed to + /// eliminate. pub(crate) async fn append_batch( &self, dataset_uri: &str, @@ -698,126 +697,6 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } - /// Legacy inline-commit merge-insert (single batch). Demoted to - /// `pub(crate)` by MR-793 Phase 9 — the staged primitive - /// `stage_merge_insert` + `commit_staged` is the public engine - /// surface; this one survives only as the body of - /// `merge_insert_batches` (which is itself the loader's - /// LoadMode::Merge concurrent fast-path). Do not add new call - /// sites. - pub(crate) async fn merge_insert_batch( - &self, - dataset_uri: &str, - ds: Dataset, - batch: RecordBatch, - key_columns: Vec, - when_matched: WhenMatched, - when_not_matched: WhenNotMatched, - ) -> Result { - if batch.num_rows() == 0 { - return self.table_state(dataset_uri, &ds).await; - } - - // Precondition for the FirstSeen workaround below: every caller of - // this primitive must hand in a source batch that is unique by - // `key_columns`. Without this check, `SourceDedupeBehavior::FirstSeen` - // would silently collapse genuine duplicates instead of erroring. - check_batch_unique_by_keys(&batch, &key_columns, "merge_insert_batch")?; - - // TODO(lance-upstream): MergeInsertBuilder does not accept WriteParams, - // so allow_external_blob_outside_bases cannot be set here. External URI - // blobs via merge_insert (LoadMode::Merge, mutations) are unsupported - // until Lance exposes WriteParams on MergeInsertBuilder. - let ds = Arc::new(ds); - let mut builder = MergeInsertBuilder::try_new(ds, key_columns) - .map_err(|e| OmniError::Lance(e.to_string()))?; - builder.when_matched(when_matched); - builder.when_not_matched(when_not_matched); - // Workaround for a Lance 4.0.x bug class where sequential - // merge_insert calls against rows previously rewritten by - // merge_insert produce a spurious "Ambiguous merge inserts: - // multiple source rows match the same target row on (id = ...)" - // error. Lance's `processed_row_ids: Mutex>` - // (lance-6.0.1 `src/dataset/write/merge_insert.rs:2099`) - // double-processes the same source/target match against - // datasets previously rewritten by merge_insert, and the default - // `SourceDedupeBehavior::Fail` errors on the second insertion. - // `FirstSeen` makes Lance skip the duplicate match instead. - // - // Covers both observed surfaces: - // - PR #98 (sequential `load --mode merge` against same keys). - // - MR-920 (sequential `update T set {f} where x=y` on same row). - // - // Correctness-preserving for OmniGraph because every call path - // that reaches this primitive either pre-dedupes the source batch - // by id, or surfaces a real source dup via the - // `check_batch_unique_by_keys` precondition above (which fires - // before the FirstSeen setter has a chance to silently collapse - // anything): - // - Load path: `enforce_unique_constraints_intra_batch` - // (`loader/mod.rs:1453`) errors on intra-batch `@key` dups. - // - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`) - // accumulates and dedupes by `id`. - // - Branch-merge path: `compute_source_delta` / - // `compute_three_way_delta` (`exec/merge.rs`) walk via - // `OrderedTableCursor` and `push_row` each id at most once. - // So FirstSeen only suppresses the spurious Lance behavior, never - // user data. Pinned by `loader_rejects_intra_batch_duplicate_keys` - // in `tests/consistency.rs` plus the - // `check_batch_unique_by_keys` precondition. - // - // Retire when upstream Lance fixes the bug class. Tracked at - // MR-957; upstream: lance-format/lance#6877. - builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen); - let job = builder - .try_build() - .map_err(|e| OmniError::Lance(e.to_string()))?; - - let schema = batch.schema(); - let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema); - let (new_ds, _stats) = job - .execute(lance_datafusion::utils::reader_to_stream(Box::new(reader))) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - self.table_state(dataset_uri, &new_ds).await - } - - /// Legacy inline-commit merge-insert (multiple batches concatenated - /// into one merge). Demoted to `pub(crate)` by MR-793 Phase 9 — the - /// staged primitive `stage_merge_insert` + `commit_staged` is the - /// public engine surface; this one survives only via the - /// `TableStorage::merge_insert_batches` trait method, called by - /// `loader::write_batch_to_dataset` (LoadMode::Merge concurrent - /// fast-path). Do not add new call sites. - pub(crate) async fn merge_insert_batches( - &self, - dataset_uri: &str, - ds: Dataset, - batches: Vec, - key_columns: Vec, - when_matched: WhenMatched, - when_not_matched: WhenNotMatched, - ) -> Result { - if batches.is_empty() { - return self.table_state(dataset_uri, &ds).await; - } - let batch = if batches.len() == 1 { - batches.into_iter().next().unwrap() - } else { - let schema = batches[0].schema(); - concat_batches(&schema, &batches).map_err(|e| OmniError::Lance(e.to_string()))? - }; - self.merge_insert_batch( - dataset_uri, - ds, - batch, - key_columns, - when_matched, - when_not_matched, - ) - .await - } - pub async fn delete_where( &self, dataset_uri: &str, @@ -988,11 +867,12 @@ impl TableStore { )); } - // Precondition for FirstSeen below. See the comment on - // `merge_insert_batch` for why this check is here, not on the caller: - // every call path that reaches stage_merge_insert (load, - // MutationStaging::finalize, branch_merge::publish_rewritten_merge_table) - // must hand in a source batch that is unique by `key_columns`. + // Precondition for the FirstSeen workaround below: every call path that + // reaches stage_merge_insert (load, MutationStaging::finalize, + // branch_merge::publish_rewritten_merge_table) must hand in a source + // batch that is unique by `key_columns`. Without this check, + // `SourceDedupeBehavior::FirstSeen` would silently collapse genuine + // duplicates instead of erroring. check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?; let ds = Arc::new(ds); @@ -1000,11 +880,21 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string()))?; builder.when_matched(when_matched); builder.when_not_matched(when_not_matched); - // See `merge_insert_batch` for the FirstSeen rationale. Workaround - // for the Lance 4.0.x bug class where sequential merge_insert / - // update against rows previously rewritten by merge_insert trips - // Lance's `processed_row_ids` HashSet and errors under the default - // `SourceDedupeBehavior::Fail`. Retire when upstream Lance is fixed. + // Workaround for a Lance bug class where sequential merge_insert calls + // against rows previously rewritten by merge_insert produce a spurious + // "Ambiguous merge inserts: multiple source rows match the same target + // row on (id = ...)" error. Lance's `processed_row_ids: + // Mutex>` (lance-6.0.1 `src/dataset/write/merge_insert.rs`) + // double-processes the same source/target match against datasets + // previously rewritten by merge_insert, and the default + // `SourceDedupeBehavior::Fail` errors on the second insertion; FirstSeen + // makes Lance skip the duplicate match instead. Correctness-preserving + // because every call path pre-dedupes the source batch by id or surfaces + // a real source dup via `check_batch_unique_by_keys` above (load: + // `enforce_unique_constraints_intra_batch`; mutate: + // `MutationStaging::finalize`; branch-merge: the `OrderedTableCursor` + // walk in `exec/merge.rs`). Retire when upstream Lance fixes the bug + // class. Tracked at MR-957; upstream: lance-format/lance#6877. builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen); let job = builder .try_build() @@ -1478,44 +1368,6 @@ impl TableStore { })) } - /// Legacy inline-commit BTREE scalar index build. Demoted to - /// `pub(crate)` by MR-793 Phase 9 — the staged primitive - /// `stage_create_btree_index` + `commit_staged` is the public engine - /// surface; this one survives only as the body of the trait's - /// inline-commit method (used by no engine call site today). Do not - /// add new call sites. - pub(crate) async fn create_btree_index( - &self, - ds: &mut Dataset, - columns: &[&str], - ) -> Result<()> { - let params = ScalarIndexParams::default(); - ds.create_index_builder(columns, IndexType::BTree, ¶ms) - .replace(true) - .await - .map(|_| ()) - .map_err(|e| OmniError::Lance(e.to_string())) - } - - /// Legacy inline-commit INVERTED (FTS) scalar index build. Demoted - /// to `pub(crate)` by MR-793 Phase 9 — the staged primitive - /// `stage_create_inverted_index` + `commit_staged` is the public - /// engine surface; this one survives only as the body of the - /// trait's inline-commit method (used by no engine call site today). - /// Do not add new call sites. - pub(crate) async fn create_inverted_index( - &self, - ds: &mut Dataset, - column: &str, - ) -> Result<()> { - let params = InvertedIndexParams::default(); - ds.create_index_builder(&[column], IndexType::Inverted, ¶ms) - .replace(true) - .await - .map(|_| ()) - .map_err(|e| OmniError::Lance(e.to_string())) - } - pub async fn create_vector_index(&self, ds: &mut Dataset, column: &str) -> Result<()> { let params = lance::index::vector::VectorIndexParams::ivf_flat(1, MetricType::L2); ds.create_index_builder(&[column], IndexType::Vector, ¶ms) @@ -1804,7 +1656,7 @@ fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec