MR-793 phases 1-6: TableStorage trait + staged-write surface for engine writers

Hoists Lance's stage+commit two-phase write pattern from "discipline at
each writer" to a sealed trait surface (`TableStorage`). New engine code
that needs to advance Lance HEAD MUST go through `stage_*` + `commit_staged`;
the trait's opaque `SnapshotHandle` / `StagedHandle` types keep
`lance::Dataset` and `lance::Transaction` out of trait signatures.

Phases landed (see .context/mr-793-design.md for the full plan):
* 1a: `crates/omnigraph/src/storage_layer.rs` — `TableStorage` trait,
  sealed (only in-tree types can impl), single impl on `TableStore`
  delegating to existing inherent methods; `Omnigraph::storage()`
  accessor returns `&dyn TableStorage`.
* 2: three new staged primitives — `stage_overwrite`,
  `stage_create_btree_index`, `stage_create_inverted_index` —
  implementing the simple branch of Lance's `CreateIndexBuilder::execute`
  (scalar indices only; vector indices stay inline because
  `build_index_metadata_from_segments` is `pub(crate)` in lance-4.0.0).
  Six new tests in `tests/staged_writes.rs` pin both the new primitives
  and the inline residuals (`delete_where`, `create_vector_index`).
* 3: `tests/forbidden_apis.rs` — defense-in-depth integration test
  walks engine source, fails on direct lance::* inline-commit API use
  outside `table_store.rs` / `db/manifest/`. Skips comment lines and
  honors `// forbidden-api-allow:` sentinels.
* 4: `ensure_indices` migration — scalar index builds now route through
  `stage_create_*_index` + `commit_staged` instead of
  `create_*_index(&mut Dataset)`. Vector indices stay inline (residual,
  named honestly at the call site).
* 5: `branch_merge::publish_rewritten_merge_table` migration — the
  merge_insert phase now uses `stage_merge_insert` + `commit_staged`;
  delete phase stays inline (Lance #6658 residual, named honestly).
* 6: `schema_apply` rewritten_tables migration — non-empty rewrites
  use `stage_overwrite` + `commit_staged`; empty-batch rewrites stay
  inline because `InsertBuilder::execute_uncommitted` rejects empty
  data. The narrow inline window is bounded by `__schema_apply_lock__`.

Verified-green test surface:
* `cargo test -p omnigraph-engine` — 68 lib + ~120 integration tests
  (incl. 6 new staged_writes tests + the new forbidden_apis test).
* `cargo test -p omnigraph-engine --features failpoints --test failpoints`
  — 5 tests, all green.
* `cargo test --workspace` — green.

Deferred to follow-up sessions (see design doc §17 split):
* Phase 1b — convert remaining engine call sites to `&dyn TableStorage`
  (mostly READS that don't touch the staged-write invariant).
* Phase 7 — recovery-on-open reconciler (closes Phase B → Phase C
  residual across process restarts; new subsystem).
* Phase 8 — index-coverage reconciler (full §VII.35 compliance —
  removes synchronous index work from the publish path).
* Phase 9 — demote unused `TableStore` inherent methods to `pub(crate)`
  (depends on Phase 1b).

Lance upstream blockers documented:
* lance-format/lance#6658 — two-phase delete API (open, no PRs).
* Companion: `build_index_metadata_from_segments` should be `pub` so
  vector-index builds can be staged outside the lance crate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-02 11:03:15 +02:00
parent 6f60c0cbcf
commit 3135ff5d19
No known key found for this signature in database
11 changed files with 1629 additions and 50 deletions

View file

@ -198,7 +198,7 @@ omnigraph policy explain --actor act-alice --action change --branch main
| Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing |
| Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables |
| Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering |
| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher` |
| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher`. Engine-internal write APIs sit behind a sealed `TableStorage` trait (MR-793) — `stage_*` + `commit_staged` are the only paths to advance Lance HEAD; inline-commit Lance APIs are not reachable through the trait surface |
| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency |
| Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy |
| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches |

View file

@ -200,6 +200,19 @@ impl Omnigraph {
&self.table_store
}
/// Engine-facing trait surface around `TableStore`.
///
/// MR-793 Phase 1: this is the canonical accessor for newly-written
/// engine code. The trait's signatures use opaque `SnapshotHandle` /
/// `StagedHandle` instead of leaking `lance::Dataset` /
/// `lance::dataset::transaction::Transaction`. Existing call sites
/// that still use `db.table_store.X(...)` (the inherent struct
/// methods) are migrated incrementally — see §9 of
/// `.context/mr-793-design.md`.
pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
&self.table_store
}
pub(crate) async fn open_coordinator_for_branch(
&self,
branch: Option<&str>,

View file

@ -237,7 +237,26 @@ pub(super) async fn apply_schema_with_lock(
)
.await?;
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?;
// MR-793 Phase 6: route through stage_overwrite + commit_staged
// for non-empty batches. Lance's `InsertBuilder::execute_uncommitted`
// errors on empty data (lance-4.0.0 `src/dataset/write/insert.rs:144`),
// so the empty-rewrite case stays on `overwrite_dataset` (which
// accepts empty input). The empty case is rare in schema_apply
// — it only fires when the source table itself was already empty
// — and schema_apply runs under `__schema_apply_lock__` so the
// narrow inline-commit residual is bounded.
let mut target_ds = if batch.num_rows() == 0 {
TableStore::overwrite_dataset(&dataset_uri, batch).await?
} else {
let existing = db
.table_store
.open_dataset_head_for_write(table_key, &dataset_uri, None)
.await?;
let staged = db.table_store.stage_overwrite(&existing, batch).await?;
db.table_store
.commit_staged(Arc::new(existing), staged.transaction)
.await?
};
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;

View file

@ -286,15 +286,18 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
) -> Result<()> {
if let Some(type_name) = table_key.strip_prefix("node:") {
if !db.table_store.has_btree_index(ds, "id").await? {
db.table_store
.create_btree_index(ds, &["id"])
.await
.map_err(|e| {
OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e))
})?;
stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
}
if let Some(node_type) = catalog.node_types.get(type_name) {
// Per MR-793 §10 OQ3: stage scalar indices first (BTree,
// Inverted), then call `create_vector_index` inline. The
// inline-commit on a vector index advances HEAD, which would
// invalidate any uncommitted scalar index transactions if we
// stacked them. Today the per-stage shape commits each
// scalar index immediately so the order constraint is
// implicit, but if we ever batch scalar stages we must
// ensure they all land before the vector inline-commit.
for index_cols in &node_type.indices {
if index_cols.len() != 1 {
continue;
@ -303,18 +306,16 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
if let Some(prop_type) = node_type.properties.get(prop_name) {
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
if !db.table_store.has_fts_index(ds, prop_name).await? {
db.table_store
.create_inverted_index(ds, prop_name.as_str())
.await
.map_err(|e| {
OmniError::Lance(format!(
"create Inverted index on {}({}): {}",
table_key, prop_name, e
))
})?;
stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
.await?;
}
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
if !db.table_store.has_vector_index(ds, prop_name).await? {
// Inline-commit residual: lance-4.0.0 does not
// expose `build_index_metadata_from_segments` as
// `pub`, so vector indices cannot be staged from
// outside the lance crate. Document at the call
// site; companion ticket to lance-format/lance#6658.
db.table_store
.create_vector_index(ds, prop_name.as_str())
.await
@ -334,28 +335,13 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
if table_key.starts_with("edge:") {
if !db.table_store.has_btree_index(ds, "id").await? {
db.table_store
.create_btree_index(ds, &["id"])
.await
.map_err(|e| {
OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e))
})?;
stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
}
if !db.table_store.has_btree_index(ds, "src").await? {
db.table_store
.create_btree_index(ds, &["src"])
.await
.map_err(|e| {
OmniError::Lance(format!("create BTree index on {}(src): {}", table_key, e))
})?;
stage_and_commit_btree(db, table_key, ds, &["src"]).await?;
}
if !db.table_store.has_btree_index(ds, "dst").await? {
db.table_store
.create_btree_index(ds, &["dst"])
.await
.map_err(|e| {
OmniError::Lance(format!("create BTree index on {}(dst): {}", table_key, e))
})?;
stage_and_commit_btree(db, table_key, ds, &["dst"]).await?;
}
return Ok(());
}
@ -366,6 +352,76 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
)))
}
/// Stage a BTREE index transaction and commit it, advancing the in-memory
/// `*ds` to the new HEAD. MR-793 Phase 4: replaces the previous
/// inline-commit `create_btree_index(ds)` call with the staged primitive
/// + an immediate `commit_staged`. Per-call behavior is unchanged
/// (HEAD advances once per index), but the bytes-on-disk and HEAD-advance
/// are now decoupled at the `TableStore` API surface — a caller that
/// needs end-of-batch atomicity can stage many transactions and commit
/// them in one pass (Phase 8's index reconciler relies on this).
async fn stage_and_commit_btree(
db: &Omnigraph,
table_key: &str,
ds: &mut Dataset,
columns: &[&str],
) -> Result<()> {
let staged = db
.table_store
.stage_create_btree_index(ds, columns)
.await
.map_err(|e| {
OmniError::Lance(format!(
"stage_create_btree_index on {}({:?}): {}",
table_key, columns, e
))
})?;
let new_ds = db
.table_store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.map_err(|e| {
OmniError::Lance(format!(
"commit BTree index on {}({:?}): {}",
table_key, columns, e
))
})?;
*ds = new_ds;
Ok(())
}
/// Stage an INVERTED (FTS) index transaction and commit it. See
/// `stage_and_commit_btree` for the MR-793 Phase 4 rationale.
async fn stage_and_commit_inverted(
db: &Omnigraph,
table_key: &str,
ds: &mut Dataset,
column: &str,
) -> Result<()> {
let staged = db
.table_store
.stage_create_inverted_index(ds, column)
.await
.map_err(|e| {
OmniError::Lance(format!(
"stage_create_inverted_index on {}({}): {}",
table_key, column, e
))
})?;
let new_ds = db
.table_store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.map_err(|e| {
OmniError::Lance(format!(
"commit Inverted index on {}({}): {}",
table_key, column, e
))
})?;
*ds = new_ds;
Ok(())
}
async fn prepare_updates_for_commit(
db: &Omnigraph,
branch: Option<&str>,

View file

@ -911,7 +911,14 @@ async fn publish_rewritten_merge_table(
let mut current_ds = ds;
// Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
// existing rows, bumps _row_last_updated_at_version only for actually-changed rows)
// existing rows, bumps _row_last_updated_at_version only for actually-changed rows).
//
// MR-793 Phase 5: routed through the staged primitive so a failure
// between writing fragments and committing leaves no Lance-HEAD
// drift. The commit_staged here is per-table per-call (Lance has no
// multi-dataset atomic commit); the residual sits at this single
// commit point, narrowed from the previous "merge_insert + delete +
// index" multi-step inline-commit chain.
if let Some(delta) = &staged.delta_staged {
let batches: Vec<RecordBatch> = target_db
.table_store()
@ -921,29 +928,40 @@ async fn publish_rewritten_merge_table(
.filter(|batch| batch.num_rows() > 0)
.collect();
if !batches.is_empty() {
let state = target_db
// Concat into one batch — stage_merge_insert takes a single batch.
let combined = if batches.len() == 1 {
batches.into_iter().next().unwrap()
} else {
let schema = batches[0].schema();
arrow_select::concat::concat_batches(&schema, &batches)
.map_err(|e| OmniError::Lance(e.to_string()))?
};
let staged_merge = target_db
.table_store()
.merge_insert_batches(
&full_path,
current_ds,
batches,
.stage_merge_insert(
current_ds.clone(),
combined,
vec!["id".to_string()],
lance::dataset::WhenMatched::UpdateAll,
lance::dataset::WhenNotMatched::InsertAll,
)
.await?;
current_ds = target_db
.reopen_for_mutation(
table_key,
&full_path,
table_branch.as_deref(),
state.version,
)
.table_store()
.commit_staged(Arc::new(current_ds), staged_merge.transaction)
.await?;
}
}
// Phase 2: delete removed rows via deletion vectors
// Phase 2: delete removed rows via deletion vectors.
//
// INLINE-COMMIT RESIDUAL: lance-4.0.0 does not expose a public
// two-phase delete API (DeleteJob is `pub(crate)` —
// lance-format/lance#6658 is open with no PRs). MR-793 deliberately
// does NOT introduce a `stage_delete` wrapper that would secretly
// inline-commit (a side-channel — see design doc §3.2). When the
// upstream API ships, swap this `delete_where` call for
// `stage_delete` + `commit_staged`.
if !staged.deleted_ids.is_empty() {
let escaped: Vec<String> = staged
.deleted_ids
@ -957,7 +975,13 @@ async fn publish_rewritten_merge_table(
.await?;
}
// Phase 3: rebuild indices
// Phase 3: rebuild indices.
//
// `build_indices_on_dataset` was migrated in MR-793 Phase 4 to use
// `stage_create_btree_index` / `stage_create_inverted_index` +
// `commit_staged` for scalar indices. Vector indices remain inline
// (residual — `build_index_metadata_from_segments` is `pub(crate)`
// in lance-4.0.0; companion ticket to lance-format/lance#6658).
let row_count = target_db
.table_store()
.table_state(&full_path, &current_ds)

View file

@ -8,4 +8,5 @@ pub mod graph_index;
pub mod loader;
pub mod runtime_cache;
pub mod storage;
pub mod storage_layer;
pub mod table_store;

View file

@ -0,0 +1,824 @@
//! Storage trait surface — MR-793.
//!
//! `TableStorage` is the engine-internal trait that funnels every Lance
//! data write through staged primitives. Engine code (in `exec/`,
//! `db/omnigraph/`, `loader/`) holds `Arc<dyn TableStorage>` instead of
//! a concrete `TableStore`; the inline-commit Lance APIs
//! (`Dataset::append`, `MergeInsertBuilder::execute`, etc.) are not
//! reachable through the trait surface.
//!
//! ## Sealed
//!
//! `TableStorage: sealed::Sealed`. Only types in this crate can implement
//! the trait, so the staged-write invariant cannot be subverted by a
//! downstream impl.
//!
//! ## Opaque handles
//!
//! `SnapshotHandle` and `StagedHandle` wrap `lance::Dataset` and
//! `StagedWrite` respectively. Their inner Lance types are
//! `pub(crate)` — engine code outside `table_store` cannot reach
//! through. This is the §III.9 alignment: `lance::Dataset` does not
//! appear in trait signatures.
//!
//! ## Scope (MR-793 Phase 1)
//!
//! The trait surface mirrors the methods engine code currently calls on
//! `TableStore`. Subsequent MR-793 phases:
//! * Phase 2 — add `stage_overwrite`, `stage_create_btree_index`,
//! `stage_create_inverted_index` to the trait.
//! * Phase 46 — migrate writers (`ensure_indices`, `branch_merge`,
//! `schema_apply`) onto the staged primitives.
//! * Phase 9 — demote unused inline-commit methods to `pub(crate)`.
use std::fmt::Debug;
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use lance::Dataset;
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
use lance::dataset::{WhenMatched, WhenNotMatched};
use crate::db::{Snapshot, SubTableEntry};
use crate::error::Result;
use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
// ─── sealed module ──────────────────────────────────────────────────────────
pub(crate) mod sealed {
/// Sealed marker — only types defined in `omnigraph-engine` can
/// implement `TableStorage`. Combined with the trait being the only
/// route to write APIs from engine code, this gives type-system
/// enforcement of the staged-write invariant.
pub trait Sealed {}
impl Sealed for crate::table_store::TableStore {}
}
// ─── opaque handles ────────────────────────────────────────────────────────
/// Opaque handle to a snapshot of a single sub-table dataset at a
/// specific version.
///
/// Engine code never sees `lance::Dataset` directly; it holds
/// `SnapshotHandle` and passes it back to `TableStorage` methods.
/// Inside this crate, `pub(crate)` accessors expose the inner
/// `Arc<Dataset>` to the `TableStorage` impl.
#[derive(Debug, Clone)]
pub struct SnapshotHandle {
pub(crate) inner: Arc<Dataset>,
}
impl SnapshotHandle {
/// Construct from a Lance dataset. `pub(crate)` — only
/// `TableStore` should produce these.
pub(crate) fn new(ds: Dataset) -> Self {
Self { inner: Arc::new(ds) }
}
/// Borrow the underlying Lance dataset. `pub(crate)` so only the
/// `TableStorage` impl in this crate can reach through.
pub(crate) fn dataset(&self) -> &Dataset {
&self.inner
}
/// Take ownership of the inner `Arc<Dataset>`. Used when committing
/// staged writes (the call needs to consume the snapshot).
pub(crate) fn into_arc(self) -> Arc<Dataset> {
self.inner
}
// ── public, lance-free accessors ──
/// Current Lance manifest version of the snapshot.
pub fn version(&self) -> u64 {
self.inner.version().version
}
/// Whether the underlying dataset uses stable row IDs.
pub fn uses_stable_row_ids(&self) -> bool {
self.inner.manifest.uses_stable_row_ids()
}
}
/// Opaque handle to a staged Lance transaction (data write or scalar
/// index build) that has not yet advanced HEAD.
///
/// Produced by `TableStorage::stage_*`, consumed by
/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite`
/// (transaction + read-your-writes deltas) behind `pub(crate)`.
#[derive(Debug, Clone)]
pub struct StagedHandle {
pub(crate) inner: StagedWrite,
}
impl StagedHandle {
pub(crate) fn new(staged: StagedWrite) -> Self {
Self { inner: staged }
}
/// Take ownership of the inner `StagedWrite`. Used by
/// `commit_staged`.
pub(crate) fn into_staged(self) -> StagedWrite {
self.inner
}
}
/// Helper: convert a slice of `StagedHandle` references to a Vec of
/// `&StagedWrite` for handing to `TableStore::stage_append`'s
/// `prior_stages` parameter. The lifetime is tied to the input slice.
pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
handles.iter().map(|h| h.inner.clone()).collect()
}
// ─── TableStorage trait ────────────────────────────────────────────────────
/// Engine-internal trait covering every Lance dataset operation an
/// `omnigraph` engine call site might perform.
///
/// `TableStore` is the only `impl`. The trait is sealed; the inline
/// Lance APIs are not reachable through trait dispatch. New writers that
/// might advance Lance HEAD MUST add a staged-shape method here.
#[async_trait]
pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
// ── Snapshot opens (no HEAD advance) ────────────────────────────────
async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
async fn open_snapshot_at_table(
&self,
snapshot: &Snapshot,
table_key: &str,
) -> Result<SnapshotHandle>;
async fn open_dataset_head(
&self,
dataset_uri: &str,
branch: Option<&str>,
) -> Result<SnapshotHandle>;
async fn open_dataset_head_for_write(
&self,
table_key: &str,
dataset_uri: &str,
branch: Option<&str>,
) -> Result<SnapshotHandle>;
async fn open_dataset_at_state(
&self,
table_path: &str,
branch: Option<&str>,
version: u64,
) -> Result<SnapshotHandle>;
async fn fork_branch_from_state(
&self,
dataset_uri: &str,
source_branch: Option<&str>,
table_key: &str,
source_version: u64,
target_branch: &str,
) -> Result<SnapshotHandle>;
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
async fn reopen_for_mutation(
&self,
dataset_uri: &str,
branch: Option<&str>,
table_key: &str,
expected_version: u64,
) -> Result<SnapshotHandle>;
fn ensure_expected_version(
&self,
snapshot: &SnapshotHandle,
table_key: &str,
expected_version: u64,
) -> Result<()>;
// ── Reads (no HEAD advance) ────────────────────────────────────────
async fn scan(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
) -> Result<Vec<RecordBatch>>;
async fn scan_with_row_id(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
with_row_id: bool,
) -> Result<Vec<RecordBatch>>;
async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
async fn scan_batches_for_rewrite(
&self,
snapshot: &SnapshotHandle,
) -> Result<Vec<RecordBatch>>;
async fn count_rows(
&self,
snapshot: &SnapshotHandle,
filter: Option<String>,
) -> Result<usize>;
async fn count_rows_with_staged(
&self,
snapshot: &SnapshotHandle,
staged: &[StagedHandle],
filter: Option<String>,
) -> Result<usize>;
async fn scan_with_staged(
&self,
snapshot: &SnapshotHandle,
staged: &[StagedHandle],
projection: Option<&[&str]>,
filter: Option<&str>,
) -> Result<Vec<RecordBatch>>;
async fn scan_with_pending(
&self,
snapshot: &SnapshotHandle,
pending: &[RecordBatch],
pending_schema: Option<SchemaRef>,
projection: Option<&[&str]>,
filter: Option<&str>,
key_column: Option<&str>,
) -> Result<Vec<RecordBatch>>;
async fn first_row_id_for_filter(
&self,
snapshot: &SnapshotHandle,
filter: &str,
) -> Result<Option<u64>>;
async fn table_state(
&self,
dataset_uri: &str,
snapshot: &SnapshotHandle,
) -> Result<TableState>;
// ── Staged writes (no HEAD advance) ────────────────────────────────
async fn stage_append(
&self,
snapshot: &SnapshotHandle,
batch: RecordBatch,
prior_stages: &[StagedHandle],
) -> Result<StagedHandle>;
async fn stage_merge_insert(
&self,
snapshot: SnapshotHandle,
batch: RecordBatch,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<StagedHandle>;
async fn commit_staged(
&self,
snapshot: SnapshotHandle,
staged: StagedHandle,
) -> Result<SnapshotHandle>;
/// Stage an overwrite (Operation::Overwrite). MR-793 Phase 2.
async fn stage_overwrite(
&self,
snapshot: &SnapshotHandle,
batch: RecordBatch,
) -> Result<StagedHandle>;
/// Stage a BTREE scalar index build. MR-793 Phase 2.
async fn stage_create_btree_index(
&self,
snapshot: &SnapshotHandle,
columns: &[&str],
) -> Result<StagedHandle>;
/// Stage an INVERTED (FTS) scalar index build. MR-793 Phase 2.
async fn stage_create_inverted_index(
&self,
snapshot: &SnapshotHandle,
column: &str,
) -> Result<StagedHandle>;
// ── Inline-commit residuals (named honestly per MR-793 §3.2) ──────
//
// These methods advance Lance HEAD as a side effect of writing.
// They stay on the trait until the corresponding upstream Lance API
// ships:
//
// * `delete_where` — Lance #6658 (two-phase delete).
// * `create_*_index` — `build_index_metadata_from_segments` is
// `pub(crate)` for vector indices in lance-4.0.0; scalar indices
// migrate to staged in MR-793 Phase 2.
// * `append_batch`, `merge_insert_batches`, `overwrite_batch` —
// legacy paths that will be demoted to `pub(crate)` in MR-793
// Phase 9 once all engine sites route through the staged
// primitives.
async fn append_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)>;
async fn merge_insert_batches(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState>;
async fn overwrite_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)>;
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<DeleteState>;
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn create_btree_index(
&self,
snapshot: SnapshotHandle,
columns: &[&str],
) -> Result<SnapshotHandle>;
async fn create_inverted_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle>;
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle>;
// ── URI helpers ────────────────────────────────────────────────────
//
// These are pure string formatting; they live on the trait so engine
// code holding `Arc<dyn TableStorage>` can compute dataset URIs
// without importing the concrete struct.
fn root_uri(&self) -> &str;
fn dataset_uri(&self, table_path: &str) -> String;
// ── Streaming access (used by the export path) ────────────────────
//
// Engine code that needs a `DatasetRecordBatchStream` (rather than a
// collected `Vec<RecordBatch>`) goes through this trait method.
// Useful for the JSONL exporter that streams rows to a writer
// without materializing the whole result.
async fn scan_stream(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
with_row_id: bool,
) -> Result<DatasetRecordBatchStream>;
}
// ─── single impl: TableStore ──────────────────────────────────────────────
#[async_trait]
impl TableStorage for TableStore {
async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
self.open_at_entry(entry).await.map(SnapshotHandle::new)
}
async fn open_snapshot_at_table(
&self,
snapshot: &Snapshot,
table_key: &str,
) -> Result<SnapshotHandle> {
self.open_snapshot_table(snapshot, table_key)
.await
.map(SnapshotHandle::new)
}
async fn open_dataset_head(
&self,
dataset_uri: &str,
branch: Option<&str>,
) -> Result<SnapshotHandle> {
TableStore::open_dataset_head(self, dataset_uri, branch)
.await
.map(SnapshotHandle::new)
}
async fn open_dataset_head_for_write(
&self,
table_key: &str,
dataset_uri: &str,
branch: Option<&str>,
) -> Result<SnapshotHandle> {
TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
.await
.map(SnapshotHandle::new)
}
async fn open_dataset_at_state(
&self,
table_path: &str,
branch: Option<&str>,
version: u64,
) -> Result<SnapshotHandle> {
TableStore::open_dataset_at_state(self, table_path, branch, version)
.await
.map(SnapshotHandle::new)
}
async fn fork_branch_from_state(
&self,
dataset_uri: &str,
source_branch: Option<&str>,
table_key: &str,
source_version: u64,
target_branch: &str,
) -> Result<SnapshotHandle> {
TableStore::fork_branch_from_state(
self,
dataset_uri,
source_branch,
table_key,
source_version,
target_branch,
)
.await
.map(SnapshotHandle::new)
}
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
TableStore::delete_branch(self, dataset_uri, branch).await
}
async fn reopen_for_mutation(
&self,
dataset_uri: &str,
branch: Option<&str>,
table_key: &str,
expected_version: u64,
) -> Result<SnapshotHandle> {
TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
.await
.map(SnapshotHandle::new)
}
fn ensure_expected_version(
&self,
snapshot: &SnapshotHandle,
table_key: &str,
expected_version: u64,
) -> Result<()> {
TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
}
async fn scan(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
) -> Result<Vec<RecordBatch>> {
TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
}
async fn scan_with_row_id(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
with_row_id: bool,
) -> Result<Vec<RecordBatch>> {
TableStore::scan_with(
self,
snapshot.dataset(),
projection,
filter,
order_by,
with_row_id,
|_| Ok(()),
)
.await
}
async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
TableStore::scan_batches(self, snapshot.dataset()).await
}
async fn scan_batches_for_rewrite(
&self,
snapshot: &SnapshotHandle,
) -> Result<Vec<RecordBatch>> {
TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
}
async fn count_rows(
&self,
snapshot: &SnapshotHandle,
filter: Option<String>,
) -> Result<usize> {
TableStore::count_rows(self, snapshot.dataset(), filter).await
}
async fn count_rows_with_staged(
&self,
snapshot: &SnapshotHandle,
staged: &[StagedHandle],
filter: Option<String>,
) -> Result<usize> {
let staged_writes = staged_handles_as_writes(staged);
TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
}
async fn scan_with_staged(
&self,
snapshot: &SnapshotHandle,
staged: &[StagedHandle],
projection: Option<&[&str]>,
filter: Option<&str>,
) -> Result<Vec<RecordBatch>> {
let staged_writes = staged_handles_as_writes(staged);
TableStore::scan_with_staged(
self,
snapshot.dataset(),
&staged_writes,
projection,
filter,
)
.await
}
async fn scan_with_pending(
&self,
snapshot: &SnapshotHandle,
pending: &[RecordBatch],
pending_schema: Option<SchemaRef>,
projection: Option<&[&str]>,
filter: Option<&str>,
key_column: Option<&str>,
) -> Result<Vec<RecordBatch>> {
TableStore::scan_with_pending(
self,
snapshot.dataset(),
pending,
pending_schema,
projection,
filter,
key_column,
)
.await
}
async fn first_row_id_for_filter(
&self,
snapshot: &SnapshotHandle,
filter: &str,
) -> Result<Option<u64>> {
TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
}
async fn table_state(
&self,
dataset_uri: &str,
snapshot: &SnapshotHandle,
) -> Result<TableState> {
TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
}
async fn stage_append(
&self,
snapshot: &SnapshotHandle,
batch: RecordBatch,
prior_stages: &[StagedHandle],
) -> Result<StagedHandle> {
let staged_writes = staged_handles_as_writes(prior_stages);
TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
.await
.map(StagedHandle::new)
}
async fn stage_merge_insert(
&self,
snapshot: SnapshotHandle,
batch: RecordBatch,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<StagedHandle> {
let ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::stage_merge_insert(
self,
ds,
batch,
key_columns,
when_matched,
when_not_matched,
)
.await
.map(StagedHandle::new)
}
async fn commit_staged(
&self,
snapshot: SnapshotHandle,
staged: StagedHandle,
) -> Result<SnapshotHandle> {
let ds_arc = snapshot.into_arc();
let transaction = staged.into_staged().transaction;
TableStore::commit_staged(self, ds_arc, transaction)
.await
.map(SnapshotHandle::new)
}
async fn stage_overwrite(
&self,
snapshot: &SnapshotHandle,
batch: RecordBatch,
) -> Result<StagedHandle> {
TableStore::stage_overwrite(self, snapshot.dataset(), batch)
.await
.map(StagedHandle::new)
}
async fn stage_create_btree_index(
&self,
snapshot: &SnapshotHandle,
columns: &[&str],
) -> Result<StagedHandle> {
TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
.await
.map(StagedHandle::new)
}
async fn stage_create_inverted_index(
&self,
snapshot: &SnapshotHandle,
column: &str,
) -> Result<StagedHandle> {
TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
.await
.map(StagedHandle::new)
}
async fn append_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn merge_insert_batches(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState> {
let ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::merge_insert_batches(
self,
dataset_uri,
ds,
batches,
key_columns,
when_matched,
when_not_matched,
)
.await
}
async fn overwrite_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<DeleteState> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::delete_where(self, dataset_uri, &mut ds, filter).await
}
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
TableStore::has_btree_index(self, snapshot.dataset(), column).await
}
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
TableStore::has_fts_index(self, snapshot.dataset(), column).await
}
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
TableStore::has_vector_index(self, snapshot.dataset(), column).await
}
async fn create_btree_index(
&self,
snapshot: SnapshotHandle,
columns: &[&str],
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::create_btree_index(self, &mut ds, columns).await?;
Ok(SnapshotHandle::new(ds))
}
async fn create_inverted_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::create_inverted_index(self, &mut ds, column).await?;
Ok(SnapshotHandle::new(ds))
}
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::create_vector_index(self, &mut ds, column).await?;
Ok(SnapshotHandle::new(ds))
}
fn root_uri(&self) -> &str {
TableStore::root_uri(self)
}
fn dataset_uri(&self, table_path: &str) -> String {
TableStore::dataset_uri(self, table_path)
}
async fn scan_stream(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
with_row_id: bool,
) -> Result<DatasetRecordBatchStream> {
// Note: existing TableStore::scan_stream is an associated fn that
// takes &Dataset, so we delegate via the dataset reference held by
// the snapshot.
TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await
}
}
// Suppress unused-import warning when the module is built without the
// `Scanner` type being referenced from the trait.
#[allow(dead_code)]
fn _scanner_type_marker(_: &Scanner) {}

View file

@ -4,7 +4,7 @@ use arrow_select::concat::concat_batches;
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
use lance::dataset::{
CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
WriteParams,
@ -760,6 +760,172 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }).
/// Returns a StagedWrite carrying the replacement fragments. HEAD does
/// NOT advance.
///
/// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. })
/// .execute_uncommitted(vec![batch])` produces a `Transaction` whose
/// `Operation::Overwrite` carries the new schema + fragments. The
/// transaction is committed via `commit_staged` (same call as
/// `stage_append`).
///
/// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
/// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
pub async fn stage_overwrite(
&self,
ds: &Dataset,
batch: RecordBatch,
) -> Result<StagedWrite> {
if batch.num_rows() == 0 {
return Err(OmniError::manifest_internal(
"stage_overwrite called with empty batch".to_string(),
));
}
let params = WriteParams {
mode: WriteMode::Overwrite,
allow_external_blob_outside_bases: true,
..Default::default()
};
let transaction = InsertBuilder::new(Arc::new(ds.clone()))
.with_params(&params)
.execute_uncommitted(vec![batch])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let mut new_fragments = match &transaction.operation {
Operation::Overwrite { fragments, .. } => fragments.clone(),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_overwrite: unexpected Lance operation {:?}",
std::mem::discriminant(other)
)));
}
};
// Overwrite REPLACES every committed fragment, and Lance restarts
// fragment-ID and row-ID counters at the post-commit version.
// For our pre-commit staged view we need to:
// 1) Renumber temporary fragment IDs (Lance returns them as
// `id = 0` from `execute_uncommitted` — see stage_append
// for the same fix). For Overwrite there are no committed
// fragments to collide with (they're all in
// removed_fragment_ids below), so start at 1.
// 2) For stable-row-id datasets, assign row_id_meta starting
// at 0 (Overwrite is a fresh-start) so `scan_with_staged`
// doesn't hit the "Missing row id meta" panic in
// lance-4.0.0 dataset/rowids.rs:22.
assign_fragment_ids(&mut new_fragments, 1);
if ds.manifest.uses_stable_row_ids() {
assign_row_id_meta(&mut new_fragments, 0)?;
}
// Overwrite REPLACES every committed fragment. For
// read-your-writes via scan_with_staged, list every committed
// fragment in removed_fragment_ids so the post-stage view shows
// ONLY the staged fragments.
let removed_fragment_ids: Vec<u64> =
ds.manifest.fragments.iter().map(|f| f.id).collect();
Ok(StagedWrite {
transaction,
new_fragments,
removed_fragment_ids,
})
}
/// Stage a BTREE scalar index build. Returns a StagedWrite whose
/// transaction commits via `commit_staged`. HEAD does NOT advance.
///
/// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns
/// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
/// { new_indices, removed_indices }` via the public `TransactionBuilder`,
/// replicating the simple (non-segment-commit-path) branch of Lance's
/// `CreateIndexBuilder::execute` (lance-4.0.0 `src/index/create.rs:502-512`).
///
/// `removed_indices` mirrors `execute()` lines 466-476: when the
/// build replaces an existing same-named index, those entries are
/// listed for tombstoning by the manifest commit.
///
/// MR-793 Phase 2: scalar index types (BTree, Inverted) are
/// stage-able. Vector indices are NOT (segment-commit-path requires
/// `build_index_metadata_from_segments` which is `pub(crate)` in
/// lance-4.0.0); see `create_vector_index` and Appendix A.3.
pub async fn stage_create_btree_index(
&self,
ds: &Dataset,
columns: &[&str],
) -> Result<StagedWrite> {
let params = ScalarIndexParams::default();
let mut ds_clone = ds.clone();
let new_idx = ds_clone
.create_index_builder(columns, IndexType::BTree, &params)
.replace(true)
.execute_uncommitted()
.await
.map_err(|e| {
OmniError::Lance(format!("stage_create_btree_index: {}", e))
})?;
let removed_indices: Vec<IndexMetadata> = ds
.load_indices()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
.iter()
.filter(|idx| idx.name == new_idx.name)
.cloned()
.collect();
let transaction = TransactionBuilder::new(
new_idx.dataset_version,
Operation::CreateIndex {
new_indices: vec![new_idx],
removed_indices,
},
)
.build();
Ok(StagedWrite {
transaction,
new_fragments: Vec::new(),
removed_fragment_ids: Vec::new(),
})
}
/// Stage an INVERTED (FTS) scalar index build. Same shape as
/// `stage_create_btree_index`; see its docs for the Lance API
/// citation and contract notes.
pub async fn stage_create_inverted_index(
&self,
ds: &Dataset,
column: &str,
) -> Result<StagedWrite> {
let params = InvertedIndexParams::default();
let mut ds_clone = ds.clone();
let new_idx = ds_clone
.create_index_builder(&[column], IndexType::Inverted, &params)
.replace(true)
.execute_uncommitted()
.await
.map_err(|e| {
OmniError::Lance(format!("stage_create_inverted_index: {}", e))
})?;
let removed_indices: Vec<IndexMetadata> = ds
.load_indices()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
.iter()
.filter(|idx| idx.name == new_idx.name)
.cloned()
.collect();
let transaction = TransactionBuilder::new(
new_idx.dataset_version,
Operation::CreateIndex {
new_indices: vec![new_idx],
removed_indices,
},
)
.build();
Ok(StagedWrite {
transaction,
new_fragments: Vec::new(),
removed_fragment_ids: Vec::new(),
})
}
/// Run a scan with optional uncommitted staged writes visible
/// alongside the committed snapshot. When `staged` is empty this is
/// identical to `scan(...)`.

View file

@ -0,0 +1,172 @@
//! MR-793 Phase 3 — forbidden-API guard test.
//!
//! Engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) MUST NOT
//! call Lance's inline-commit data-write APIs directly. The
//! `Storage` trait (`crate::storage_layer::TableStorage`) is the canonical
//! surface; staged primitives (`stage_append`, `stage_merge_insert`,
//! `stage_overwrite`, `stage_create_btree_index`,
//! `stage_create_inverted_index`) plus `commit_staged` are the only
//! way to advance Lance HEAD.
//!
//! The trait is sealed (only `TableStore` impls it), so by-construction
//! the trait surface forbids ad-hoc Lance calls. This test is **defense
//! in depth** — it catches the case where engine code reaches around
//! the trait by importing `lance::dataset::*` types directly.
//!
//! ## How it works
//!
//! Walks `crates/omnigraph/src/{exec,db/omnigraph,loader,changes}/**/*.rs`,
//! greps each line for forbidden symbols. Lines whose preceding line
//! contains the sentinel comment `// forbidden-api-allow: <reason>` are
//! exempt — reviewers see the sentinel in diff and can ask "is this
//! exemption justified?"
//!
//! ## What's deliberately out of scope (allow-listed by directory)
//!
//! - `crates/omnigraph/src/table_store.rs` — IS the storage layer.
//! The forbidden Lance APIs live here legitimately.
//! - `crates/omnigraph/src/db/manifest/**` — uses `CommitBuilder` for
//! the cross-table manifest commit. Documented exception.
//! - `crates/omnigraph/src/storage_layer.rs` — IS the trait module.
//!
//! ## Initial state (MR-793 Phase 3)
//!
//! At the time this test was written, MR-793 has migrated three writers
//! (ensure_indices, branch_merge, schema_apply rewrites) onto staged
//! primitives. Other engine call sites (the bulk loader, exec/mutation,
//! exec/query, etc.) still use the legacy inherent `TableStore` methods
//! — they're not visible at the trait boundary, but they DO call lance
//! types. The allow-list below reflects this transitional state. Phase 9
//! tightens the allow-list as call sites migrate.
use std::path::{Path, PathBuf};
const FORBIDDEN_PATTERNS: &[&str] = &[
"MergeInsertBuilder",
"InsertBuilder::",
"DeleteBuilder",
"CommitBuilder::new",
".create_index_builder(",
".create_index_segment_builder(",
];
/// Files exempt from the guard. These are the legitimate storage-layer
/// implementations that USE the forbidden APIs to provide the staged
/// primitives.
const ALLOW_LIST_FILES: &[&str] = &[
"table_store.rs", // The storage layer itself.
"storage_layer.rs", // The trait module.
];
/// Directories exempt from the guard. Files under these paths may use
/// the forbidden APIs.
const ALLOW_LIST_DIRS: &[&str] = &[
"db/manifest", // Manifest publisher uses CommitBuilder for cross-table commits.
"db/manifest/", // Belt + suspenders for the directory match.
];
const SENTINEL: &str = "// forbidden-api-allow:";
fn engine_src_root() -> PathBuf {
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR");
PathBuf::from(manifest_dir).join("src")
}
fn is_allow_listed(path: &Path) -> bool {
let path_str = path.to_string_lossy();
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if ALLOW_LIST_FILES.iter().any(|f| *f == name) {
return true;
}
}
ALLOW_LIST_DIRS.iter().any(|d| path_str.contains(d))
}
fn walk_rust_files(root: &Path) -> Vec<PathBuf> {
let mut out = Vec::new();
walk_into(root, &mut out);
out
}
fn walk_into(dir: &Path, out: &mut Vec<PathBuf>) {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
walk_into(&path, out);
} else if path.extension().and_then(|s| s.to_str()) == Some("rs") {
out.push(path);
}
}
}
#[test]
fn engine_code_does_not_call_forbidden_lance_apis() {
let src = engine_src_root();
let mut violations = Vec::new();
for file in walk_rust_files(&src) {
if is_allow_listed(&file) {
continue;
}
let contents = match std::fs::read_to_string(&file) {
Ok(c) => c,
Err(_) => continue,
};
let lines: Vec<&str> = contents.lines().collect();
for (idx, line) in lines.iter().enumerate() {
let trimmed = line.trim_start();
// Skip comment-only lines — references to forbidden API
// names in doc-comments, design notes, or residual-marker
// comments are documentation, not code use. The trait
// surface (sealed + trait-only) is the actual enforcement;
// this test only catches code use.
if trimmed.starts_with("//")
|| trimmed.starts_with("/*")
|| trimmed.starts_with("*")
{
continue;
}
// Allow lines marked with the sentinel on the SAME line or
// the immediately preceding line.
if line.contains(SENTINEL) {
continue;
}
if idx > 0 && lines[idx - 1].contains(SENTINEL) {
continue;
}
for pattern in FORBIDDEN_PATTERNS {
if line.contains(pattern) {
let rel = file
.strip_prefix(&src)
.unwrap_or(&file)
.display()
.to_string();
violations.push(format!(
"{}:{}: forbidden pattern `{}` — {}",
rel,
idx + 1,
pattern,
line.trim()
));
}
}
}
}
if !violations.is_empty() {
panic!(
"MR-793 forbidden-API guard found {} violation(s) in engine code. \
Engine code MUST route through the `TableStorage` trait (or its \
inherent counterparts on `TableStore`) instead of calling Lance's \
inline-commit APIs directly. If a use is genuinely justified, add \
the comment `// forbidden-api-allow: <reason>` on the same line or \
the line above.\n\nViolations:\n {}",
violations.len(),
violations.join("\n ")
);
}
}

View file

@ -492,3 +492,269 @@ async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior
surface in production paths see exec/staging.rs."
);
}
// ─── MR-793 Phase 2: stage_overwrite + scalar index staging ─────────────────
/// `stage_overwrite` writes replacement fragments to object storage but
/// does NOT advance Lance HEAD until `commit_staged` runs. Mirrors
/// `stage_append`'s contract.
#[tokio::test]
async fn stage_overwrite_does_not_advance_head_until_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let pre_version = ds.version().version;
let staged = store
.stage_overwrite(&ds, person_batch(&[("zoe", Some(99))]))
.await
.unwrap();
assert_eq!(
ds.version().version,
pre_version,
"stage_overwrite must not advance HEAD"
);
// Reopen at HEAD; still pre-version (no commit happened on disk).
let reopened = Dataset::open(&uri).await.unwrap();
assert_eq!(reopened.version().version, pre_version);
// After commit_staged, HEAD advances and the dataset shows the
// overwrite result (zoe alone — alice replaced).
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
let after = store.scan_batches(&new_ds).await.unwrap();
assert_eq!(collect_ids(&after), vec!["zoe"]);
}
/// `stage_overwrite` semantically REPLACES every committed fragment.
/// `removed_fragment_ids` lists every committed fragment so
/// `scan_with_staged` shows only the staged rows (not committed + staged).
#[tokio::test]
async fn stage_overwrite_replaces_all_fragments() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let committed_fragment_ids: std::collections::HashSet<u64> =
ds.manifest.fragments.iter().map(|f| f.id).collect();
let staged = store
.stage_overwrite(&ds, person_batch(&[("zoe", Some(99))]))
.await
.unwrap();
let removed: std::collections::HashSet<u64> =
staged.removed_fragment_ids.iter().copied().collect();
assert_eq!(
removed, committed_fragment_ids,
"stage_overwrite must list every committed fragment as removed so \
scan_with_staged shadows them all (overwrite semantics pre-data \
is being wiped)"
);
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
assert_eq!(
collect_ids(&batches),
vec!["zoe"],
"scan_with_staged must show only the staged row, not committed + staged"
);
}
/// `stage_create_btree_index` writes index segments to object storage
/// but does NOT advance Lance HEAD until `commit_staged`. After commit,
/// the index is queryable.
#[tokio::test]
async fn stage_create_btree_index_does_not_advance_head_until_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let pre_version = ds.version().version;
assert!(
!store.has_btree_index(&ds, "id").await.unwrap(),
"fresh dataset has no btree index on `id`"
);
let staged = store.stage_create_btree_index(&ds, &["id"]).await.unwrap();
assert_eq!(
ds.version().version,
pre_version,
"stage_create_btree_index must not advance HEAD"
);
let reopened = Dataset::open(&uri).await.unwrap();
assert_eq!(
reopened.version().version,
pre_version,
"no Lance commit happened on disk"
);
assert!(
!store.has_btree_index(&reopened, "id").await.unwrap(),
"index is not visible until commit_staged"
);
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
assert!(
store.has_btree_index(&new_ds, "id").await.unwrap(),
"after commit_staged, the index IS visible"
);
}
/// `stage_create_inverted_index` (FTS) — same shape as the BTREE test.
#[tokio::test]
async fn stage_create_inverted_index_does_not_advance_head_until_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let pre_version = ds.version().version;
let staged = store
.stage_create_inverted_index(&ds, "id")
.await
.unwrap();
assert_eq!(
ds.version().version,
pre_version,
"stage_create_inverted_index must not advance HEAD"
);
assert!(!store.has_fts_index(&ds, "id").await.unwrap());
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
assert!(
store.has_fts_index(&new_ds, "id").await.unwrap(),
"after commit_staged, the FTS index IS visible"
);
}
/// Pin the inline-commit behavior of `delete_where`. Lance 4.0.0 does
/// NOT expose a public `DeleteJob::execute_uncommitted`
/// (`pub(crate)` — see lance-format/lance#6658). MR-793 deliberately
/// does NOT introduce a `stage_delete` wrapper that would secretly
/// inline-commit (a side-channel — see design doc §3.2). Instead, the
/// trait keeps `delete_where` as the only delete entry point, named
/// honestly.
///
/// **When Lance #6658 lands**: this test will need to flip — replace
/// the assertion with a `stage_delete` + `commit_staged` round-trip
/// and remove the residual line in `docs/runs.md`.
#[tokio::test]
async fn delete_where_advances_head_inline_documents_residual() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let mut ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
.await
.unwrap();
let pre_version = ds.version().version;
let result = store
.delete_where(&uri, &mut ds, "id = 'alice'")
.await
.unwrap();
assert_eq!(result.deleted_rows, 1);
assert!(
result.version > pre_version,
"delete_where ADVANCES Lance HEAD inline (the residual). When \
lance-format/lance#6658 ships and we migrate to stage_delete + \
commit_staged, flip this assertion to assert that staging does \
NOT advance HEAD."
);
}
/// Companion to `delete_where_*`: pin the inline-commit behavior of
/// `create_vector_index`. Lance 4.0.0 vector indices take the
/// "segment commit path" which calls `build_index_metadata_from_segments`
/// (`pub(crate)` in lance-4.0.0 `src/index.rs:111`). Until upstream
/// exposes that helper (companion ticket to #6658), MR-793's trait
/// surface deliberately does NOT include `stage_create_vector_index` —
/// see design doc Appendix A.3.
#[tokio::test]
async fn create_vector_index_advances_head_inline_documents_residual() {
use arrow_array::FixedSizeListArray;
use arrow_schema::FieldRef;
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/vec.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Build a small dataset with a fixed-size vector column. Vector index
// training requires multiple rows; provide enough.
let dim = 4usize;
let n_rows = 8usize;
let item_field: FieldRef = Arc::new(Field::new("item", DataType::Float32, true));
let vec_field = Field::new(
"embedding",
DataType::FixedSizeList(item_field.clone(), dim as i32),
false,
);
let id_field = Field::new("id", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![id_field, vec_field]));
let ids: Vec<String> = (0..n_rows).map(|i| format!("v{}", i)).collect();
let id_arr = StringArray::from(ids);
let flat: Vec<f32> = (0..(n_rows * dim)).map(|i| i as f32).collect();
let values = arrow_array::Float32Array::from(flat);
let vec_arr =
FixedSizeListArray::new(item_field, dim as i32, Arc::new(values), None);
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(id_arr), Arc::new(vec_arr)],
)
.unwrap();
let mut ds = TableStore::write_dataset(&uri, batch).await.unwrap();
let pre_version = ds.version().version;
assert!(!store.has_vector_index(&ds, "embedding").await.unwrap());
store
.create_vector_index(&mut ds, "embedding")
.await
.unwrap();
assert!(
ds.version().version > pre_version,
"create_vector_index ADVANCES Lance HEAD inline (the residual). \
When the upstream Lance helper `build_index_metadata_from_segments` \
is made `pub`, add `stage_create_vector_index` to the trait and \
flip this test to assert staging does NOT advance HEAD."
);
assert!(store.has_vector_index(&ds, "embedding").await.unwrap());
}

View file

@ -63,6 +63,44 @@ edges break referential integrity). Until Lance exposes a two-phase
delete API, the parse-time rejection keeps both paths atomic and
correct. Tracked: MR-793, plus a Lance-upstream ticket.
### MR-793 status (storage trait two-phase invariant) — partial
MR-793 hoists the staged-write pattern into a `TableStorage` trait
surface with sealed-trait enforcement and opaque `SnapshotHandle` /
`StagedHandle` types — see `crates/omnigraph/src/storage_layer.rs`.
The trait is the canonical surface for new engine code; existing call
sites still use the inherent `TableStore` methods (mechanical migration
deferred to a follow-up cycle — tracked).
Three writers have been migrated onto staged primitives:
* **`ensure_indices`** (`db/omnigraph/table_ops.rs::build_indices_on_dataset_for_catalog`)
— scalar indices (BTree, Inverted) now use `stage_create_*_index` +
`commit_staged`. Vector indices stay inline (residual — Lance
`build_index_metadata_from_segments` is `pub(crate)` in 4.0.0;
companion ticket to lance-format/lance#6658 needed).
* **`branch_merge::publish_rewritten_merge_table`**
(`exec/merge.rs`) — merge_insert now uses `stage_merge_insert` +
`commit_staged`. Deletes stay inline (Lance #6658 residual).
* **`schema_apply` rewritten_tables** (`db/omnigraph/schema_apply.rs`)
— non-empty rewrites use `stage_overwrite` + `commit_staged`.
Empty-batch rewrites stay inline (Lance `InsertBuilder::execute_uncommitted`
rejects empty data; the empty case is rare and bounded by the
schema-apply lock branch).
A defense-in-depth integration test (`tests/forbidden_apis.rs`) walks
engine source and fails if non-allow-listed code calls Lance's
inline-commit APIs directly. The trait surface itself is the primary
enforcement (sealed + only-callable-via-trait once call sites land);
the grep test catches type-system bypass attempts.
The "finalize → publisher residual" described below applies equally to
the migrated writers — Lance has no multi-dataset atomic commit
primitive, so the per-table commit_staged → manifest publish gap is
the same drift class. Closing it requires either upstream Lance
multi-dataset commit OR the omnigraph-side recovery-on-open reconciler
described in `.context/mr-793-design.md` §15 (deferred to MR-795).
### `LoadMode::Overwrite` residual
The bulk loader's Append and Merge modes use the staged-write path