diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 4051b4d..3ed0486 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -593,6 +593,20 @@ impl Omnigraph { &self.table_store } + /// Inline-commit residual surface (`overwrite_batch`, `delete_where`, + /// `create_vector_index`) — the writes Lance cannot yet express as a + /// stage-then-commit pair. Deliberately separate from [`Self::storage`] so + /// the default storage surface is staged-only and a new writer cannot couple + /// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only + /// the handful of documented residual call sites (loader overwrite fast-path, + /// mutation/merge deletes, vector-index build) use this accessor. See + /// `crate::storage_layer::InlineCommitResidual` for the per-method blocker. + pub(crate) fn storage_inline_residual( + &self, + ) -> &dyn crate::storage_layer::InlineCommitResidual { + &self.table_store + } + /// Engine-level access to the object-store adapter (S3 / local fs). /// Used by the recovery sidecar protocol — writers in the engine /// call this to write/delete sidecars at `__recovery/{ulid}.json`. diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index f035d6d..f7a365a 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -628,7 +628,7 @@ pub(super) async fn build_indices_on_dataset_for_catalog( // outside the lance crate. Document at the call // site; companion ticket to lance-format/lance#6658. let new_snap = db - .storage() + .storage_inline_residual() .create_vector_index(ds.clone(), prop_name.as_str()) .await .map_err(|e| { diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 88ec3cb..138e8bd 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1019,7 +1019,7 @@ async fn publish_rewritten_merge_table( .collect(); let filter = format!("id IN ({})", escaped.join(", ")); let (new_ds, _) = target_db - .storage() + .storage_inline_residual() .delete_where(&full_path, current_ds, &filter) .await?; current_ds = new_ds; diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 47283f7..4dc2820 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -1201,7 +1201,7 @@ impl Omnigraph { .await?; crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?; let (_new_ds, delete_state) = self - .storage() + .storage_inline_residual() .delete_where(&full_path, ds, &pred_sql) .await?; @@ -1251,7 +1251,7 @@ impl Omnigraph { .await?; let (_new_edge_ds, edge_delete) = self - .storage() + .storage_inline_residual() .delete_where(&edge_full_path, edge_ds, &cascade_filter) .await?; @@ -1299,7 +1299,7 @@ impl Omnigraph { .await?; let (_new_ds, delete_state) = self - .storage() + .storage_inline_residual() .delete_where(&full_path, ds, &pred_sql) .await?; let affected = delete_state.deleted_rows; diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 7463ac9..d5dba5d 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -1214,7 +1214,7 @@ async fn write_batch_to_dataset( .open_for_mutation_on_branch(branch, table_key, crate::db::MutationOpKind::SchemaRewrite) .await?; let (_new_ds, state) = db - .storage() + .storage_inline_residual() .overwrite_batch(&full_path, ds, batch) .await?; Ok((state, table_branch)) diff --git a/crates/omnigraph/src/storage_layer.rs b/crates/omnigraph/src/storage_layer.rs index 625771d..dd2c319 100644 --- a/crates/omnigraph/src/storage_layer.rs +++ b/crates/omnigraph/src/storage_layer.rs @@ -7,30 +7,36 @@ //! way for new engine writers to advance Lance HEAD without coupling //! "write bytes" with "advance HEAD" in one Lance API call. //! -//! ## Transitional residuals on the trait +//! ## Inline-commit residuals live on a separate trait //! -//! Several inline-commit methods remain on the trait surface as -//! documented residuals: `delete_where` -//! ([#6658](https://github.com/lance-format/lance/issues/6658) closed -//! 2026-05-14, but the public `DeleteBuilder::execute_uncommitted` API -//! did not backport to the 6.x release line — it first ships in -//! `v7.0.0-beta.10`. Migration to staged two-phase delete is tracked as -//! MR-A and is gated on the Lance v7.x bump, not the current v6.0.1 pin), -//! `create_vector_index` (segment-commit-path requires -//! `build_index_metadata_from_segments` which is `pub(crate)` — see -//! [#6666](https://github.com/lance-format/lance/issues/6666), still open), and the -//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / -//! `create_btree_index` / `create_inverted_index` paths kept while -//! engine call sites finish migrating off of them (Phase 1b / Phase 9 -//! of MR-793). These are named honestly at every call site; the -//! forbidden-API guard test catches direct lance::* misuse outside the -//! storage layer. +//! The inline-commit writes that Lance cannot yet express as +//! stage-then-commit are NOT on `TableStorage`. They sit on +//! [`InlineCommitResidual`], reachable only via +//! `Omnigraph::storage_inline_residual()`, so the default `db.storage()` +//! surface is staged-only and cannot couple "write bytes" with "advance +//! HEAD" — MR-793 acceptance §1 closes by construction. The residuals: +//! +//! * `delete_where` — Lance #6658 (`DeleteBuilder::execute_uncommitted`) +//! did not backport to the 6.x line; it first ships in `v7.0.0-beta.10`. +//! Migration to staged two-phase delete is tracked as MR-A, gated on the +//! Lance v7.x bump. +//! * `create_vector_index` — segment-commit-path needs +//! `build_index_metadata_from_segments`, still `pub(crate)` in Lance +//! 6.0.1 ([#6666](https://github.com/lance-format/lance/issues/6666), +//! open). Scalar indices already stage. +//! * `overwrite_batch` — removable legacy: `stage_overwrite` exists +//! (schema_apply uses it), but the loader's bulk `LoadMode::Overwrite` +//! fast-path still uses this inline path for cross-table write +//! concurrency. Staged migration is a tracked follow-up. +//! +//! Each is named honestly at its call site; the forbidden-API guard test +//! catches direct lance::* misuse outside the storage layer. //! //! ## Sealed //! -//! `TableStorage: sealed::Sealed`. Only types in this crate can implement -//! the trait, so a downstream crate cannot subvert the contract by -//! providing its own impl. +//! Both `TableStorage` and `InlineCommitResidual` are `: sealed::Sealed`. +//! Only types in this crate can implement them, so a downstream crate +//! cannot subvert the contract by providing its own impl. //! //! ## Opaque handles //! @@ -40,15 +46,15 @@ //! through. This aligns with the storage-boundary invariant: //! `lance::Dataset` does not appear in trait signatures. //! -//! ## Migration status (MR-793 PR #70) +//! ## Migration status //! -//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new -//! staged primitives (`stage_overwrite`, scalar index staging), and -//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto -//! the staged surface. Phase 1b (call-site conversion to -//! `Arc`), Phase 9 (demote unused inline-commit -//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847), -//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups. +//! Phases 1a / 2 / 4 / 5 / 6 landed in MR-793 PR #70 (trait scaffolding, +//! staged primitives, migration of `ensure_indices` / `branch_merge` / +//! `schema_apply` onto the staged surface). Phase 1b (call-site +//! conversion) and Phase 9 landed in MR-854, which also split the +//! inline-commit residuals onto `InlineCommitResidual` so `db.storage()` +//! is staged-only. Phase 7 (recovery reconciler) shipped as MR-847; +//! Phase 8 (index reconciler) is tracked as MR-848. use std::fmt::Debug; use std::sync::Arc; @@ -367,45 +373,19 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { column: &str, ) -> Result; - // ── Inline-commit residuals (named honestly per MR-793 §3.2) ────── + // ── Index presence (reads, no HEAD advance) ────────────────────── // - // These methods advance Lance HEAD as a side effect of writing. - // They stay on the trait until the corresponding upstream Lance API - // ships: - // - // * `delete_where` — Lance #6658 (two-phase delete). - // * `create_*_index` — `build_index_metadata_from_segments` is - // `pub(crate)` for vector indices in lance-6.0.1; scalar indices - // migrate to staged in MR-793 Phase 2. - // * `append_batch`, `merge_insert_batches`, `overwrite_batch` — - // legacy paths that will be demoted to `pub(crate)` in MR-793 - // Phase 9 once all engine sites route through the staged - // primitives. - - async fn overwrite_batch( - &self, - dataset_uri: &str, - snapshot: SnapshotHandle, - batch: RecordBatch, - ) -> Result<(SnapshotHandle, TableState)>; - - async fn delete_where( - &self, - dataset_uri: &str, - snapshot: SnapshotHandle, - filter: &str, - ) -> Result<(SnapshotHandle, DeleteState)>; + // The inline-commit writes (`overwrite_batch`, `delete_where`, + // `create_vector_index`) are deliberately NOT on this trait. They live on + // the separate `InlineCommitResidual` trait, reachable only through + // `Omnigraph::storage_inline_residual()`. As a result the default + // `db.storage()` surface cannot couple "write bytes" with "advance HEAD" + // — closing MR-793 acceptance §1 by construction rather than by review. async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; - async fn create_vector_index( - &self, - snapshot: SnapshotHandle, - column: &str, - ) -> Result; - // ── URI helpers ──────────────────────────────────────────────────── // // These are pure string formatting; they live on the trait so engine @@ -432,6 +412,49 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { ) -> Result; } +// ─── InlineCommitResidual trait ──────────────────────────────────────────── + +/// Inline-commit residual surface: the writes Lance cannot yet express as a +/// stage-then-commit pair, so they advance Lance HEAD as a side effect of +/// writing. Kept OFF `TableStorage` and reachable only through +/// `Omnigraph::storage_inline_residual()`, so the default `db.storage()` path +/// is staged-only and a new writer cannot reintroduce the write+commit coupling +/// by accident (MR-793 acceptance §1, by construction). +/// +/// Residual reasons (each is named honestly at its call site): +/// * `delete_where` — Lance has no public two-phase delete on the 6.x line +/// (`DeleteBuilder::execute_uncommitted` first ships in v7.x; MR-A / Lance +/// #6658). The D2 parse-time rule + recovery sidecars cover the gap meanwhile. +/// * `create_vector_index` — vector-index segment-commit needs +/// `build_index_metadata_from_segments`, still `pub(crate)` in Lance 6.0.1 +/// (Lance #6666). Scalar indices already stage. +/// * `overwrite_batch` — removable legacy: `stage_overwrite` exists (schema_apply +/// uses it), but the loader's bulk `LoadMode::Overwrite` fast-path still uses +/// this inline path for cross-table write concurrency. Migrating it to the +/// staged shape is a tracked follow-up. +#[async_trait] +pub trait InlineCommitResidual: sealed::Sealed + Send + Sync + Debug { + async fn overwrite_batch( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + batch: RecordBatch, + ) -> Result<(SnapshotHandle, TableState)>; + + async fn delete_where( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + filter: &str, + ) -> Result<(SnapshotHandle, DeleteState)>; + + async fn create_vector_index( + &self, + snapshot: SnapshotHandle, + column: &str, + ) -> Result; +} + // ─── single impl: TableStore ────────────────────────────────────────────── #[async_trait] @@ -707,28 +730,6 @@ impl TableStorage for TableStore { .map(StagedHandle::new) } - async fn overwrite_batch( - &self, - dataset_uri: &str, - snapshot: SnapshotHandle, - batch: RecordBatch, - ) -> Result<(SnapshotHandle, TableState)> { - let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); - let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?; - Ok((SnapshotHandle::new(ds), state)) - } - - async fn delete_where( - &self, - dataset_uri: &str, - snapshot: SnapshotHandle, - filter: &str, - ) -> Result<(SnapshotHandle, DeleteState)> { - let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); - let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?; - Ok((SnapshotHandle::new(ds), state)) - } - async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result { TableStore::has_btree_index(self, snapshot.dataset(), column).await } @@ -741,16 +742,6 @@ impl TableStorage for TableStore { TableStore::has_vector_index(self, snapshot.dataset(), column).await } - async fn create_vector_index( - &self, - snapshot: SnapshotHandle, - column: &str, - ) -> Result { - let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); - TableStore::create_vector_index(self, &mut ds, column).await?; - Ok(SnapshotHandle::new(ds)) - } - fn root_uri(&self) -> &str { TableStore::root_uri(self) } @@ -780,3 +771,38 @@ impl TableStorage for TableStore { .await } } + +#[async_trait] +impl InlineCommitResidual for TableStore { + async fn overwrite_batch( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + batch: RecordBatch, + ) -> Result<(SnapshotHandle, TableState)> { + let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); + let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?; + Ok((SnapshotHandle::new(ds), state)) + } + + async fn delete_where( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + filter: &str, + ) -> Result<(SnapshotHandle, DeleteState)> { + let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); + let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?; + Ok((SnapshotHandle::new(ds), state)) + } + + async fn create_vector_index( + &self, + snapshot: SnapshotHandle, + column: &str, + ) -> Result { + let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone()); + TableStore::create_vector_index(self, &mut ds, column).await?; + Ok(SnapshotHandle::new(ds)) + } +}