mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
MR-854: seal db.storage() to staged-only; move residuals to InlineCommitResidual
Split the three remaining inline-commit writes (overwrite_batch,
delete_where, create_vector_index) off the TableStorage trait onto a new
sealed InlineCommitResidual trait, reachable only via the explicit
Omnigraph::storage_inline_residual() accessor. db.storage() now exposes
only staged primitives + reads, so engine code cannot couple a write
with a Lance HEAD advance through the default surface — MR-793 acceptance
§1 ("no public method commits as a side effect of writing") now holds by
construction, not by review + naming.
Call sites moved to storage_inline_residual(): loader overwrite
fast-path, the three mutation delete_where paths, the branch-merge
delete, and the vector-index build. Impl bodies are unchanged (same
delegation to the pub(crate) inherent methods); this is a pure surface
reshape with no behavior change.
The residual trait holds two genuinely upstream-blocked methods
(delete_where -> Lance #6658/v7.x, create_vector_index -> Lance #6666)
plus overwrite_batch, kept for the loader's cross-table bulk-overwrite
concurrency until its staged migration lands (tracked follow-up).
This commit is contained in:
parent
b83233d960
commit
39a72b76bf
6 changed files with 139 additions and 99 deletions
|
|
@ -593,6 +593,20 @@ impl Omnigraph {
|
|||
&self.table_store
|
||||
}
|
||||
|
||||
/// Inline-commit residual surface (`overwrite_batch`, `delete_where`,
|
||||
/// `create_vector_index`) — the writes Lance cannot yet express as a
|
||||
/// stage-then-commit pair. Deliberately separate from [`Self::storage`] so
|
||||
/// the default storage surface is staged-only and a new writer cannot couple
|
||||
/// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only
|
||||
/// the handful of documented residual call sites (loader overwrite fast-path,
|
||||
/// mutation/merge deletes, vector-index build) use this accessor. See
|
||||
/// `crate::storage_layer::InlineCommitResidual` for the per-method blocker.
|
||||
pub(crate) fn storage_inline_residual(
|
||||
&self,
|
||||
) -> &dyn crate::storage_layer::InlineCommitResidual {
|
||||
&self.table_store
|
||||
}
|
||||
|
||||
/// Engine-level access to the object-store adapter (S3 / local fs).
|
||||
/// Used by the recovery sidecar protocol — writers in the engine
|
||||
/// call this to write/delete sidecars at `__recovery/{ulid}.json`.
|
||||
|
|
|
|||
|
|
@ -628,7 +628,7 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
// outside the lance crate. Document at the call
|
||||
// site; companion ticket to lance-format/lance#6658.
|
||||
let new_snap = db
|
||||
.storage()
|
||||
.storage_inline_residual()
|
||||
.create_vector_index(ds.clone(), prop_name.as_str())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
|
|
|
|||
|
|
@ -1019,7 +1019,7 @@ async fn publish_rewritten_merge_table(
|
|||
.collect();
|
||||
let filter = format!("id IN ({})", escaped.join(", "));
|
||||
let (new_ds, _) = target_db
|
||||
.storage()
|
||||
.storage_inline_residual()
|
||||
.delete_where(&full_path, current_ds, &filter)
|
||||
.await?;
|
||||
current_ds = new_ds;
|
||||
|
|
|
|||
|
|
@ -1201,7 +1201,7 @@ impl Omnigraph {
|
|||
.await?;
|
||||
crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?;
|
||||
let (_new_ds, delete_state) = self
|
||||
.storage()
|
||||
.storage_inline_residual()
|
||||
.delete_where(&full_path, ds, &pred_sql)
|
||||
.await?;
|
||||
|
||||
|
|
@ -1251,7 +1251,7 @@ impl Omnigraph {
|
|||
.await?;
|
||||
|
||||
let (_new_edge_ds, edge_delete) = self
|
||||
.storage()
|
||||
.storage_inline_residual()
|
||||
.delete_where(&edge_full_path, edge_ds, &cascade_filter)
|
||||
.await?;
|
||||
|
||||
|
|
@ -1299,7 +1299,7 @@ impl Omnigraph {
|
|||
.await?;
|
||||
|
||||
let (_new_ds, delete_state) = self
|
||||
.storage()
|
||||
.storage_inline_residual()
|
||||
.delete_where(&full_path, ds, &pred_sql)
|
||||
.await?;
|
||||
let affected = delete_state.deleted_rows;
|
||||
|
|
|
|||
|
|
@ -1214,7 +1214,7 @@ async fn write_batch_to_dataset(
|
|||
.open_for_mutation_on_branch(branch, table_key, crate::db::MutationOpKind::SchemaRewrite)
|
||||
.await?;
|
||||
let (_new_ds, state) = db
|
||||
.storage()
|
||||
.storage_inline_residual()
|
||||
.overwrite_batch(&full_path, ds, batch)
|
||||
.await?;
|
||||
Ok((state, table_branch))
|
||||
|
|
|
|||
|
|
@ -7,30 +7,36 @@
|
|||
//! way for new engine writers to advance Lance HEAD without coupling
|
||||
//! "write bytes" with "advance HEAD" in one Lance API call.
|
||||
//!
|
||||
//! ## Transitional residuals on the trait
|
||||
//! ## Inline-commit residuals live on a separate trait
|
||||
//!
|
||||
//! Several inline-commit methods remain on the trait surface as
|
||||
//! documented residuals: `delete_where`
|
||||
//! ([#6658](https://github.com/lance-format/lance/issues/6658) closed
|
||||
//! 2026-05-14, but the public `DeleteBuilder::execute_uncommitted` API
|
||||
//! did not backport to the 6.x release line — it first ships in
|
||||
//! `v7.0.0-beta.10`. Migration to staged two-phase delete is tracked as
|
||||
//! MR-A and is gated on the Lance v7.x bump, not the current v6.0.1 pin),
|
||||
//! `create_vector_index` (segment-commit-path requires
|
||||
//! `build_index_metadata_from_segments` which is `pub(crate)` — see
|
||||
//! [#6666](https://github.com/lance-format/lance/issues/6666), still open), and the
|
||||
//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` /
|
||||
//! `create_btree_index` / `create_inverted_index` paths kept while
|
||||
//! engine call sites finish migrating off of them (Phase 1b / Phase 9
|
||||
//! of MR-793). These are named honestly at every call site; the
|
||||
//! forbidden-API guard test catches direct lance::* misuse outside the
|
||||
//! storage layer.
|
||||
//! The inline-commit writes that Lance cannot yet express as
|
||||
//! stage-then-commit are NOT on `TableStorage`. They sit on
|
||||
//! [`InlineCommitResidual`], reachable only via
|
||||
//! `Omnigraph::storage_inline_residual()`, so the default `db.storage()`
|
||||
//! surface is staged-only and cannot couple "write bytes" with "advance
|
||||
//! HEAD" — MR-793 acceptance §1 closes by construction. The residuals:
|
||||
//!
|
||||
//! * `delete_where` — Lance #6658 (`DeleteBuilder::execute_uncommitted`)
|
||||
//! did not backport to the 6.x line; it first ships in `v7.0.0-beta.10`.
|
||||
//! Migration to staged two-phase delete is tracked as MR-A, gated on the
|
||||
//! Lance v7.x bump.
|
||||
//! * `create_vector_index` — segment-commit-path needs
|
||||
//! `build_index_metadata_from_segments`, still `pub(crate)` in Lance
|
||||
//! 6.0.1 ([#6666](https://github.com/lance-format/lance/issues/6666),
|
||||
//! open). Scalar indices already stage.
|
||||
//! * `overwrite_batch` — removable legacy: `stage_overwrite` exists
|
||||
//! (schema_apply uses it), but the loader's bulk `LoadMode::Overwrite`
|
||||
//! fast-path still uses this inline path for cross-table write
|
||||
//! concurrency. Staged migration is a tracked follow-up.
|
||||
//!
|
||||
//! Each is named honestly at its call site; the forbidden-API guard test
|
||||
//! catches direct lance::* misuse outside the storage layer.
|
||||
//!
|
||||
//! ## Sealed
|
||||
//!
|
||||
//! `TableStorage: sealed::Sealed`. Only types in this crate can implement
|
||||
//! the trait, so a downstream crate cannot subvert the contract by
|
||||
//! providing its own impl.
|
||||
//! Both `TableStorage` and `InlineCommitResidual` are `: sealed::Sealed`.
|
||||
//! Only types in this crate can implement them, so a downstream crate
|
||||
//! cannot subvert the contract by providing its own impl.
|
||||
//!
|
||||
//! ## Opaque handles
|
||||
//!
|
||||
|
|
@ -40,15 +46,15 @@
|
|||
//! through. This aligns with the storage-boundary invariant:
|
||||
//! `lance::Dataset` does not appear in trait signatures.
|
||||
//!
|
||||
//! ## Migration status (MR-793 PR #70)
|
||||
//! ## Migration status
|
||||
//!
|
||||
//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new
|
||||
//! staged primitives (`stage_overwrite`, scalar index staging), and
|
||||
//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto
|
||||
//! the staged surface. Phase 1b (call-site conversion to
|
||||
//! `Arc<dyn TableStorage>`), Phase 9 (demote unused inline-commit
|
||||
//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847),
|
||||
//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups.
|
||||
//! Phases 1a / 2 / 4 / 5 / 6 landed in MR-793 PR #70 (trait scaffolding,
|
||||
//! staged primitives, migration of `ensure_indices` / `branch_merge` /
|
||||
//! `schema_apply` onto the staged surface). Phase 1b (call-site
|
||||
//! conversion) and Phase 9 landed in MR-854, which also split the
|
||||
//! inline-commit residuals onto `InlineCommitResidual` so `db.storage()`
|
||||
//! is staged-only. Phase 7 (recovery reconciler) shipped as MR-847;
|
||||
//! Phase 8 (index reconciler) is tracked as MR-848.
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
|
@ -367,45 +373,19 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
|
|||
column: &str,
|
||||
) -> Result<StagedHandle>;
|
||||
|
||||
// ── Inline-commit residuals (named honestly per MR-793 §3.2) ──────
|
||||
// ── Index presence (reads, no HEAD advance) ──────────────────────
|
||||
//
|
||||
// These methods advance Lance HEAD as a side effect of writing.
|
||||
// They stay on the trait until the corresponding upstream Lance API
|
||||
// ships:
|
||||
//
|
||||
// * `delete_where` — Lance #6658 (two-phase delete).
|
||||
// * `create_*_index` — `build_index_metadata_from_segments` is
|
||||
// `pub(crate)` for vector indices in lance-6.0.1; scalar indices
|
||||
// migrate to staged in MR-793 Phase 2.
|
||||
// * `append_batch`, `merge_insert_batches`, `overwrite_batch` —
|
||||
// legacy paths that will be demoted to `pub(crate)` in MR-793
|
||||
// Phase 9 once all engine sites route through the staged
|
||||
// primitives.
|
||||
|
||||
async fn overwrite_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<(SnapshotHandle, TableState)>;
|
||||
|
||||
async fn delete_where(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
filter: &str,
|
||||
) -> Result<(SnapshotHandle, DeleteState)>;
|
||||
// The inline-commit writes (`overwrite_batch`, `delete_where`,
|
||||
// `create_vector_index`) are deliberately NOT on this trait. They live on
|
||||
// the separate `InlineCommitResidual` trait, reachable only through
|
||||
// `Omnigraph::storage_inline_residual()`. As a result the default
|
||||
// `db.storage()` surface cannot couple "write bytes" with "advance HEAD"
|
||||
// — closing MR-793 acceptance §1 by construction rather than by review.
|
||||
|
||||
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
|
||||
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
|
||||
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
|
||||
|
||||
async fn create_vector_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
// ── URI helpers ────────────────────────────────────────────────────
|
||||
//
|
||||
// These are pure string formatting; they live on the trait so engine
|
||||
|
|
@ -432,6 +412,49 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
|
|||
) -> Result<DatasetRecordBatchStream>;
|
||||
}
|
||||
|
||||
// ─── InlineCommitResidual trait ────────────────────────────────────────────
|
||||
|
||||
/// Inline-commit residual surface: the writes Lance cannot yet express as a
|
||||
/// stage-then-commit pair, so they advance Lance HEAD as a side effect of
|
||||
/// writing. Kept OFF `TableStorage` and reachable only through
|
||||
/// `Omnigraph::storage_inline_residual()`, so the default `db.storage()` path
|
||||
/// is staged-only and a new writer cannot reintroduce the write+commit coupling
|
||||
/// by accident (MR-793 acceptance §1, by construction).
|
||||
///
|
||||
/// Residual reasons (each is named honestly at its call site):
|
||||
/// * `delete_where` — Lance has no public two-phase delete on the 6.x line
|
||||
/// (`DeleteBuilder::execute_uncommitted` first ships in v7.x; MR-A / Lance
|
||||
/// #6658). The D2 parse-time rule + recovery sidecars cover the gap meanwhile.
|
||||
/// * `create_vector_index` — vector-index segment-commit needs
|
||||
/// `build_index_metadata_from_segments`, still `pub(crate)` in Lance 6.0.1
|
||||
/// (Lance #6666). Scalar indices already stage.
|
||||
/// * `overwrite_batch` — removable legacy: `stage_overwrite` exists (schema_apply
|
||||
/// uses it), but the loader's bulk `LoadMode::Overwrite` fast-path still uses
|
||||
/// this inline path for cross-table write concurrency. Migrating it to the
|
||||
/// staged shape is a tracked follow-up.
|
||||
#[async_trait]
|
||||
pub trait InlineCommitResidual: sealed::Sealed + Send + Sync + Debug {
|
||||
async fn overwrite_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<(SnapshotHandle, TableState)>;
|
||||
|
||||
async fn delete_where(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
filter: &str,
|
||||
) -> Result<(SnapshotHandle, DeleteState)>;
|
||||
|
||||
async fn create_vector_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<SnapshotHandle>;
|
||||
}
|
||||
|
||||
// ─── single impl: TableStore ──────────────────────────────────────────────
|
||||
|
||||
#[async_trait]
|
||||
|
|
@ -707,28 +730,6 @@ impl TableStorage for TableStore {
|
|||
.map(StagedHandle::new)
|
||||
}
|
||||
|
||||
async fn overwrite_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<(SnapshotHandle, TableState)> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
|
||||
let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
|
||||
Ok((SnapshotHandle::new(ds), state))
|
||||
}
|
||||
|
||||
async fn delete_where(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
filter: &str,
|
||||
) -> Result<(SnapshotHandle, DeleteState)> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
|
||||
let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
|
||||
Ok((SnapshotHandle::new(ds), state))
|
||||
}
|
||||
|
||||
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
|
||||
TableStore::has_btree_index(self, snapshot.dataset(), column).await
|
||||
}
|
||||
|
|
@ -741,16 +742,6 @@ impl TableStorage for TableStore {
|
|||
TableStore::has_vector_index(self, snapshot.dataset(), column).await
|
||||
}
|
||||
|
||||
async fn create_vector_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<SnapshotHandle> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
|
||||
TableStore::create_vector_index(self, &mut ds, column).await?;
|
||||
Ok(SnapshotHandle::new(ds))
|
||||
}
|
||||
|
||||
fn root_uri(&self) -> &str {
|
||||
TableStore::root_uri(self)
|
||||
}
|
||||
|
|
@ -780,3 +771,38 @@ impl TableStorage for TableStore {
|
|||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl InlineCommitResidual for TableStore {
|
||||
async fn overwrite_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<(SnapshotHandle, TableState)> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
|
||||
let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
|
||||
Ok((SnapshotHandle::new(ds), state))
|
||||
}
|
||||
|
||||
async fn delete_where(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
filter: &str,
|
||||
) -> Result<(SnapshotHandle, DeleteState)> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
|
||||
let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
|
||||
Ok((SnapshotHandle::new(ds), state))
|
||||
}
|
||||
|
||||
async fn create_vector_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<SnapshotHandle> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
|
||||
TableStore::create_vector_index(self, &mut ds, column).await?;
|
||||
Ok(SnapshotHandle::new(ds))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue