From 3135ff5d198f6c9481cb272821db9c13e1178cd3 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 11:03:15 +0200 Subject: [PATCH 1/8] MR-793 phases 1-6: TableStorage trait + staged-write surface for engine writers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hoists Lance's stage+commit two-phase write pattern from "discipline at each writer" to a sealed trait surface (`TableStorage`). New engine code that needs to advance Lance HEAD MUST go through `stage_*` + `commit_staged`; the trait's opaque `SnapshotHandle` / `StagedHandle` types keep `lance::Dataset` and `lance::Transaction` out of trait signatures. Phases landed (see .context/mr-793-design.md for the full plan): * 1a: `crates/omnigraph/src/storage_layer.rs` — `TableStorage` trait, sealed (only in-tree types can impl), single impl on `TableStore` delegating to existing inherent methods; `Omnigraph::storage()` accessor returns `&dyn TableStorage`. * 2: three new staged primitives — `stage_overwrite`, `stage_create_btree_index`, `stage_create_inverted_index` — implementing the simple branch of Lance's `CreateIndexBuilder::execute` (scalar indices only; vector indices stay inline because `build_index_metadata_from_segments` is `pub(crate)` in lance-4.0.0). Six new tests in `tests/staged_writes.rs` pin both the new primitives and the inline residuals (`delete_where`, `create_vector_index`). * 3: `tests/forbidden_apis.rs` — defense-in-depth integration test walks engine source, fails on direct lance::* inline-commit API use outside `table_store.rs` / `db/manifest/`. Skips comment lines and honors `// forbidden-api-allow:` sentinels. * 4: `ensure_indices` migration — scalar index builds now route through `stage_create_*_index` + `commit_staged` instead of `create_*_index(&mut Dataset)`. Vector indices stay inline (residual, named honestly at the call site). * 5: `branch_merge::publish_rewritten_merge_table` migration — the merge_insert phase now uses `stage_merge_insert` + `commit_staged`; delete phase stays inline (Lance #6658 residual, named honestly). * 6: `schema_apply` rewritten_tables migration — non-empty rewrites use `stage_overwrite` + `commit_staged`; empty-batch rewrites stay inline because `InsertBuilder::execute_uncommitted` rejects empty data. The narrow inline window is bounded by `__schema_apply_lock__`. Verified-green test surface: * `cargo test -p omnigraph-engine` — 68 lib + ~120 integration tests (incl. 6 new staged_writes tests + the new forbidden_apis test). * `cargo test -p omnigraph-engine --features failpoints --test failpoints` — 5 tests, all green. * `cargo test --workspace` — green. Deferred to follow-up sessions (see design doc §17 split): * Phase 1b — convert remaining engine call sites to `&dyn TableStorage` (mostly READS that don't touch the staged-write invariant). * Phase 7 — recovery-on-open reconciler (closes Phase B → Phase C residual across process restarts; new subsystem). * Phase 8 — index-coverage reconciler (full §VII.35 compliance — removes synchronous index work from the publish path). * Phase 9 — demote unused `TableStore` inherent methods to `pub(crate)` (depends on Phase 1b). Lance upstream blockers documented: * lance-format/lance#6658 — two-phase delete API (open, no PRs). * Companion: `build_index_metadata_from_segments` should be `pub` so vector-index builds can be staged outside the lance crate. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 2 +- crates/omnigraph/src/db/omnigraph.rs | 13 + .../src/db/omnigraph/schema_apply.rs | 21 +- .../omnigraph/src/db/omnigraph/table_ops.rs | 122 ++- crates/omnigraph/src/exec/merge.rs | 52 +- crates/omnigraph/src/lib.rs | 1 + crates/omnigraph/src/storage_layer.rs | 824 ++++++++++++++++++ crates/omnigraph/src/table_store.rs | 168 +++- crates/omnigraph/tests/forbidden_apis.rs | 172 ++++ crates/omnigraph/tests/staged_writes.rs | 266 ++++++ docs/runs.md | 38 + 11 files changed, 1629 insertions(+), 50 deletions(-) create mode 100644 crates/omnigraph/src/storage_layer.rs create mode 100644 crates/omnigraph/tests/forbidden_apis.rs diff --git a/AGENTS.md b/AGENTS.md index d347343..38560fa 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -198,7 +198,7 @@ omnigraph policy explain --actor act-alice --action change --branch main | Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing | | Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables | | Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering | -| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher` | +| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher`. Engine-internal write APIs sit behind a sealed `TableStorage` trait (MR-793) — `stage_*` + `commit_staged` are the only paths to advance Lance HEAD; inline-commit Lance APIs are not reachable through the trait surface | | Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency | | Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy | | BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches | diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index a924b1f..32f1234 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -200,6 +200,19 @@ impl Omnigraph { &self.table_store } + /// Engine-facing trait surface around `TableStore`. + /// + /// MR-793 Phase 1: this is the canonical accessor for newly-written + /// engine code. The trait's signatures use opaque `SnapshotHandle` / + /// `StagedHandle` instead of leaking `lance::Dataset` / + /// `lance::dataset::transaction::Transaction`. Existing call sites + /// that still use `db.table_store.X(...)` (the inherent struct + /// methods) are migrated incrementally — see §9 of + /// `.context/mr-793-design.md`. + pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage { + &self.table_store + } + pub(crate) async fn open_coordinator_for_branch( &self, branch: Option<&str>, diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index b096dbd..7550f20 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -237,7 +237,26 @@ pub(super) async fn apply_schema_with_lock( ) .await?; let dataset_uri = db.table_store.dataset_uri(&entry.table_path); - let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?; + // MR-793 Phase 6: route through stage_overwrite + commit_staged + // for non-empty batches. Lance's `InsertBuilder::execute_uncommitted` + // errors on empty data (lance-4.0.0 `src/dataset/write/insert.rs:144`), + // so the empty-rewrite case stays on `overwrite_dataset` (which + // accepts empty input). The empty case is rare in schema_apply + // — it only fires when the source table itself was already empty + // — and schema_apply runs under `__schema_apply_lock__` so the + // narrow inline-commit residual is bounded. + let mut target_ds = if batch.num_rows() == 0 { + TableStore::overwrite_dataset(&dataset_uri, batch).await? + } else { + let existing = db + .table_store + .open_dataset_head_for_write(table_key, &dataset_uri, None) + .await?; + let staged = db.table_store.stage_overwrite(&existing, batch).await?; + db.table_store + .commit_staged(Arc::new(existing), staged.transaction) + .await? + }; db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds) .await?; let state = db.table_store.table_state(&dataset_uri, &target_ds).await?; diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index afc59d1..fcde941 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -286,15 +286,18 @@ pub(super) async fn build_indices_on_dataset_for_catalog( ) -> Result<()> { if let Some(type_name) = table_key.strip_prefix("node:") { if !db.table_store.has_btree_index(ds, "id").await? { - db.table_store - .create_btree_index(ds, &["id"]) - .await - .map_err(|e| { - OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e)) - })?; + stage_and_commit_btree(db, table_key, ds, &["id"]).await?; } if let Some(node_type) = catalog.node_types.get(type_name) { + // Per MR-793 §10 OQ3: stage scalar indices first (BTree, + // Inverted), then call `create_vector_index` inline. The + // inline-commit on a vector index advances HEAD, which would + // invalidate any uncommitted scalar index transactions if we + // stacked them. Today the per-stage shape commits each + // scalar index immediately so the order constraint is + // implicit, but if we ever batch scalar stages we must + // ensure they all land before the vector inline-commit. for index_cols in &node_type.indices { if index_cols.len() != 1 { continue; @@ -303,18 +306,16 @@ pub(super) async fn build_indices_on_dataset_for_catalog( if let Some(prop_type) = node_type.properties.get(prop_name) { if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list { if !db.table_store.has_fts_index(ds, prop_name).await? { - db.table_store - .create_inverted_index(ds, prop_name.as_str()) - .await - .map_err(|e| { - OmniError::Lance(format!( - "create Inverted index on {}({}): {}", - table_key, prop_name, e - )) - })?; + stage_and_commit_inverted(db, table_key, ds, prop_name.as_str()) + .await?; } } else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list { if !db.table_store.has_vector_index(ds, prop_name).await? { + // Inline-commit residual: lance-4.0.0 does not + // expose `build_index_metadata_from_segments` as + // `pub`, so vector indices cannot be staged from + // outside the lance crate. Document at the call + // site; companion ticket to lance-format/lance#6658. db.table_store .create_vector_index(ds, prop_name.as_str()) .await @@ -334,28 +335,13 @@ pub(super) async fn build_indices_on_dataset_for_catalog( if table_key.starts_with("edge:") { if !db.table_store.has_btree_index(ds, "id").await? { - db.table_store - .create_btree_index(ds, &["id"]) - .await - .map_err(|e| { - OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e)) - })?; + stage_and_commit_btree(db, table_key, ds, &["id"]).await?; } if !db.table_store.has_btree_index(ds, "src").await? { - db.table_store - .create_btree_index(ds, &["src"]) - .await - .map_err(|e| { - OmniError::Lance(format!("create BTree index on {}(src): {}", table_key, e)) - })?; + stage_and_commit_btree(db, table_key, ds, &["src"]).await?; } if !db.table_store.has_btree_index(ds, "dst").await? { - db.table_store - .create_btree_index(ds, &["dst"]) - .await - .map_err(|e| { - OmniError::Lance(format!("create BTree index on {}(dst): {}", table_key, e)) - })?; + stage_and_commit_btree(db, table_key, ds, &["dst"]).await?; } return Ok(()); } @@ -366,6 +352,76 @@ pub(super) async fn build_indices_on_dataset_for_catalog( ))) } +/// Stage a BTREE index transaction and commit it, advancing the in-memory +/// `*ds` to the new HEAD. MR-793 Phase 4: replaces the previous +/// inline-commit `create_btree_index(ds)` call with the staged primitive +/// + an immediate `commit_staged`. Per-call behavior is unchanged +/// (HEAD advances once per index), but the bytes-on-disk and HEAD-advance +/// are now decoupled at the `TableStore` API surface — a caller that +/// needs end-of-batch atomicity can stage many transactions and commit +/// them in one pass (Phase 8's index reconciler relies on this). +async fn stage_and_commit_btree( + db: &Omnigraph, + table_key: &str, + ds: &mut Dataset, + columns: &[&str], +) -> Result<()> { + let staged = db + .table_store + .stage_create_btree_index(ds, columns) + .await + .map_err(|e| { + OmniError::Lance(format!( + "stage_create_btree_index on {}({:?}): {}", + table_key, columns, e + )) + })?; + let new_ds = db + .table_store + .commit_staged(Arc::new(ds.clone()), staged.transaction) + .await + .map_err(|e| { + OmniError::Lance(format!( + "commit BTree index on {}({:?}): {}", + table_key, columns, e + )) + })?; + *ds = new_ds; + Ok(()) +} + +/// Stage an INVERTED (FTS) index transaction and commit it. See +/// `stage_and_commit_btree` for the MR-793 Phase 4 rationale. +async fn stage_and_commit_inverted( + db: &Omnigraph, + table_key: &str, + ds: &mut Dataset, + column: &str, +) -> Result<()> { + let staged = db + .table_store + .stage_create_inverted_index(ds, column) + .await + .map_err(|e| { + OmniError::Lance(format!( + "stage_create_inverted_index on {}({}): {}", + table_key, column, e + )) + })?; + let new_ds = db + .table_store + .commit_staged(Arc::new(ds.clone()), staged.transaction) + .await + .map_err(|e| { + OmniError::Lance(format!( + "commit Inverted index on {}({}): {}", + table_key, column, e + )) + })?; + *ds = new_ds; + Ok(()) +} + async fn prepare_updates_for_commit( db: &Omnigraph, branch: Option<&str>, diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index c5c6802..df62a63 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -911,7 +911,14 @@ async fn publish_rewritten_merge_table( let mut current_ds = ds; // Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for - // existing rows, bumps _row_last_updated_at_version only for actually-changed rows) + // existing rows, bumps _row_last_updated_at_version only for actually-changed rows). + // + // MR-793 Phase 5: routed through the staged primitive so a failure + // between writing fragments and committing leaves no Lance-HEAD + // drift. The commit_staged here is per-table per-call (Lance has no + // multi-dataset atomic commit); the residual sits at this single + // commit point, narrowed from the previous "merge_insert + delete + + // index" multi-step inline-commit chain. if let Some(delta) = &staged.delta_staged { let batches: Vec = target_db .table_store() @@ -921,29 +928,40 @@ async fn publish_rewritten_merge_table( .filter(|batch| batch.num_rows() > 0) .collect(); if !batches.is_empty() { - let state = target_db + // Concat into one batch — stage_merge_insert takes a single batch. + let combined = if batches.len() == 1 { + batches.into_iter().next().unwrap() + } else { + let schema = batches[0].schema(); + arrow_select::concat::concat_batches(&schema, &batches) + .map_err(|e| OmniError::Lance(e.to_string()))? + }; + let staged_merge = target_db .table_store() - .merge_insert_batches( - &full_path, - current_ds, - batches, + .stage_merge_insert( + current_ds.clone(), + combined, vec!["id".to_string()], lance::dataset::WhenMatched::UpdateAll, lance::dataset::WhenNotMatched::InsertAll, ) .await?; current_ds = target_db - .reopen_for_mutation( - table_key, - &full_path, - table_branch.as_deref(), - state.version, - ) + .table_store() + .commit_staged(Arc::new(current_ds), staged_merge.transaction) .await?; } } - // Phase 2: delete removed rows via deletion vectors + // Phase 2: delete removed rows via deletion vectors. + // + // INLINE-COMMIT RESIDUAL: lance-4.0.0 does not expose a public + // two-phase delete API (DeleteJob is `pub(crate)` — + // lance-format/lance#6658 is open with no PRs). MR-793 deliberately + // does NOT introduce a `stage_delete` wrapper that would secretly + // inline-commit (a side-channel — see design doc §3.2). When the + // upstream API ships, swap this `delete_where` call for + // `stage_delete` + `commit_staged`. if !staged.deleted_ids.is_empty() { let escaped: Vec = staged .deleted_ids @@ -957,7 +975,13 @@ async fn publish_rewritten_merge_table( .await?; } - // Phase 3: rebuild indices + // Phase 3: rebuild indices. + // + // `build_indices_on_dataset` was migrated in MR-793 Phase 4 to use + // `stage_create_btree_index` / `stage_create_inverted_index` + + // `commit_staged` for scalar indices. Vector indices remain inline + // (residual — `build_index_metadata_from_segments` is `pub(crate)` + // in lance-4.0.0; companion ticket to lance-format/lance#6658). let row_count = target_db .table_store() .table_state(&full_path, ¤t_ds) diff --git a/crates/omnigraph/src/lib.rs b/crates/omnigraph/src/lib.rs index 78d62ea..d781096 100644 --- a/crates/omnigraph/src/lib.rs +++ b/crates/omnigraph/src/lib.rs @@ -8,4 +8,5 @@ pub mod graph_index; pub mod loader; pub mod runtime_cache; pub mod storage; +pub mod storage_layer; pub mod table_store; diff --git a/crates/omnigraph/src/storage_layer.rs b/crates/omnigraph/src/storage_layer.rs new file mode 100644 index 0000000..c3599e6 --- /dev/null +++ b/crates/omnigraph/src/storage_layer.rs @@ -0,0 +1,824 @@ +//! Storage trait surface — MR-793. +//! +//! `TableStorage` is the engine-internal trait that funnels every Lance +//! data write through staged primitives. Engine code (in `exec/`, +//! `db/omnigraph/`, `loader/`) holds `Arc` instead of +//! a concrete `TableStore`; the inline-commit Lance APIs +//! (`Dataset::append`, `MergeInsertBuilder::execute`, etc.) are not +//! reachable through the trait surface. +//! +//! ## Sealed +//! +//! `TableStorage: sealed::Sealed`. Only types in this crate can implement +//! the trait, so the staged-write invariant cannot be subverted by a +//! downstream impl. +//! +//! ## Opaque handles +//! +//! `SnapshotHandle` and `StagedHandle` wrap `lance::Dataset` and +//! `StagedWrite` respectively. Their inner Lance types are +//! `pub(crate)` — engine code outside `table_store` cannot reach +//! through. This is the §III.9 alignment: `lance::Dataset` does not +//! appear in trait signatures. +//! +//! ## Scope (MR-793 Phase 1) +//! +//! The trait surface mirrors the methods engine code currently calls on +//! `TableStore`. Subsequent MR-793 phases: +//! * Phase 2 — add `stage_overwrite`, `stage_create_btree_index`, +//! `stage_create_inverted_index` to the trait. +//! * Phase 4–6 — migrate writers (`ensure_indices`, `branch_merge`, +//! `schema_apply`) onto the staged primitives. +//! * Phase 9 — demote unused inline-commit methods to `pub(crate)`. + +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use lance::Dataset; +use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner}; +use lance::dataset::{WhenMatched, WhenNotMatched}; + +use crate::db::{Snapshot, SubTableEntry}; +use crate::error::Result; +use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore}; + +// ─── sealed module ────────────────────────────────────────────────────────── + +pub(crate) mod sealed { + /// Sealed marker — only types defined in `omnigraph-engine` can + /// implement `TableStorage`. Combined with the trait being the only + /// route to write APIs from engine code, this gives type-system + /// enforcement of the staged-write invariant. + pub trait Sealed {} + + impl Sealed for crate::table_store::TableStore {} +} + +// ─── opaque handles ──────────────────────────────────────────────────────── + +/// Opaque handle to a snapshot of a single sub-table dataset at a +/// specific version. +/// +/// Engine code never sees `lance::Dataset` directly; it holds +/// `SnapshotHandle` and passes it back to `TableStorage` methods. +/// Inside this crate, `pub(crate)` accessors expose the inner +/// `Arc` to the `TableStorage` impl. +#[derive(Debug, Clone)] +pub struct SnapshotHandle { + pub(crate) inner: Arc, +} + +impl SnapshotHandle { + /// Construct from a Lance dataset. `pub(crate)` — only + /// `TableStore` should produce these. + pub(crate) fn new(ds: Dataset) -> Self { + Self { inner: Arc::new(ds) } + } + + /// Borrow the underlying Lance dataset. `pub(crate)` so only the + /// `TableStorage` impl in this crate can reach through. + pub(crate) fn dataset(&self) -> &Dataset { + &self.inner + } + + /// Take ownership of the inner `Arc`. Used when committing + /// staged writes (the call needs to consume the snapshot). + pub(crate) fn into_arc(self) -> Arc { + self.inner + } + + // ── public, lance-free accessors ── + + /// Current Lance manifest version of the snapshot. + pub fn version(&self) -> u64 { + self.inner.version().version + } + + /// Whether the underlying dataset uses stable row IDs. + pub fn uses_stable_row_ids(&self) -> bool { + self.inner.manifest.uses_stable_row_ids() + } +} + +/// Opaque handle to a staged Lance transaction (data write or scalar +/// index build) that has not yet advanced HEAD. +/// +/// Produced by `TableStorage::stage_*`, consumed by +/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite` +/// (transaction + read-your-writes deltas) behind `pub(crate)`. +#[derive(Debug, Clone)] +pub struct StagedHandle { + pub(crate) inner: StagedWrite, +} + +impl StagedHandle { + pub(crate) fn new(staged: StagedWrite) -> Self { + Self { inner: staged } + } + + /// Take ownership of the inner `StagedWrite`. Used by + /// `commit_staged`. + pub(crate) fn into_staged(self) -> StagedWrite { + self.inner + } +} + +/// Helper: convert a slice of `StagedHandle` references to a Vec of +/// `&StagedWrite` for handing to `TableStore::stage_append`'s +/// `prior_stages` parameter. The lifetime is tied to the input slice. +pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec { + handles.iter().map(|h| h.inner.clone()).collect() +} + +// ─── TableStorage trait ──────────────────────────────────────────────────── + +/// Engine-internal trait covering every Lance dataset operation an +/// `omnigraph` engine call site might perform. +/// +/// `TableStore` is the only `impl`. The trait is sealed; the inline +/// Lance APIs are not reachable through trait dispatch. New writers that +/// might advance Lance HEAD MUST add a staged-shape method here. +#[async_trait] +pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { + // ── Snapshot opens (no HEAD advance) ──────────────────────────────── + + async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result; + + async fn open_snapshot_at_table( + &self, + snapshot: &Snapshot, + table_key: &str, + ) -> Result; + + async fn open_dataset_head( + &self, + dataset_uri: &str, + branch: Option<&str>, + ) -> Result; + + async fn open_dataset_head_for_write( + &self, + table_key: &str, + dataset_uri: &str, + branch: Option<&str>, + ) -> Result; + + async fn open_dataset_at_state( + &self, + table_path: &str, + branch: Option<&str>, + version: u64, + ) -> Result; + + async fn fork_branch_from_state( + &self, + dataset_uri: &str, + source_branch: Option<&str>, + table_key: &str, + source_version: u64, + target_branch: &str, + ) -> Result; + + async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>; + + async fn reopen_for_mutation( + &self, + dataset_uri: &str, + branch: Option<&str>, + table_key: &str, + expected_version: u64, + ) -> Result; + + fn ensure_expected_version( + &self, + snapshot: &SnapshotHandle, + table_key: &str, + expected_version: u64, + ) -> Result<()>; + + // ── Reads (no HEAD advance) ──────────────────────────────────────── + + async fn scan( + &self, + snapshot: &SnapshotHandle, + projection: Option<&[&str]>, + filter: Option<&str>, + order_by: Option>, + ) -> Result>; + + async fn scan_with_row_id( + &self, + snapshot: &SnapshotHandle, + projection: Option<&[&str]>, + filter: Option<&str>, + order_by: Option>, + with_row_id: bool, + ) -> Result>; + + async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result>; + + async fn scan_batches_for_rewrite( + &self, + snapshot: &SnapshotHandle, + ) -> Result>; + + async fn count_rows( + &self, + snapshot: &SnapshotHandle, + filter: Option, + ) -> Result; + + async fn count_rows_with_staged( + &self, + snapshot: &SnapshotHandle, + staged: &[StagedHandle], + filter: Option, + ) -> Result; + + async fn scan_with_staged( + &self, + snapshot: &SnapshotHandle, + staged: &[StagedHandle], + projection: Option<&[&str]>, + filter: Option<&str>, + ) -> Result>; + + async fn scan_with_pending( + &self, + snapshot: &SnapshotHandle, + pending: &[RecordBatch], + pending_schema: Option, + projection: Option<&[&str]>, + filter: Option<&str>, + key_column: Option<&str>, + ) -> Result>; + + async fn first_row_id_for_filter( + &self, + snapshot: &SnapshotHandle, + filter: &str, + ) -> Result>; + + async fn table_state( + &self, + dataset_uri: &str, + snapshot: &SnapshotHandle, + ) -> Result; + + // ── Staged writes (no HEAD advance) ──────────────────────────────── + + async fn stage_append( + &self, + snapshot: &SnapshotHandle, + batch: RecordBatch, + prior_stages: &[StagedHandle], + ) -> Result; + + async fn stage_merge_insert( + &self, + snapshot: SnapshotHandle, + batch: RecordBatch, + key_columns: Vec, + when_matched: WhenMatched, + when_not_matched: WhenNotMatched, + ) -> Result; + + async fn commit_staged( + &self, + snapshot: SnapshotHandle, + staged: StagedHandle, + ) -> Result; + + /// Stage an overwrite (Operation::Overwrite). MR-793 Phase 2. + async fn stage_overwrite( + &self, + snapshot: &SnapshotHandle, + batch: RecordBatch, + ) -> Result; + + /// Stage a BTREE scalar index build. MR-793 Phase 2. + async fn stage_create_btree_index( + &self, + snapshot: &SnapshotHandle, + columns: &[&str], + ) -> Result; + + /// Stage an INVERTED (FTS) scalar index build. MR-793 Phase 2. + async fn stage_create_inverted_index( + &self, + snapshot: &SnapshotHandle, + column: &str, + ) -> Result; + + // ── Inline-commit residuals (named honestly per MR-793 §3.2) ────── + // + // These methods advance Lance HEAD as a side effect of writing. + // They stay on the trait until the corresponding upstream Lance API + // ships: + // + // * `delete_where` — Lance #6658 (two-phase delete). + // * `create_*_index` — `build_index_metadata_from_segments` is + // `pub(crate)` for vector indices in lance-4.0.0; scalar indices + // migrate to staged in MR-793 Phase 2. + // * `append_batch`, `merge_insert_batches`, `overwrite_batch` — + // legacy paths that will be demoted to `pub(crate)` in MR-793 + // Phase 9 once all engine sites route through the staged + // primitives. + + async fn append_batch( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + batch: RecordBatch, + ) -> Result<(SnapshotHandle, TableState)>; + + async fn merge_insert_batches( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + batches: Vec, + key_columns: Vec, + when_matched: WhenMatched, + when_not_matched: WhenNotMatched, + ) -> Result; + + async fn overwrite_batch( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + batch: RecordBatch, + ) -> Result<(SnapshotHandle, TableState)>; + + async fn delete_where( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + filter: &str, + ) -> Result; + + async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; + async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; + async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; + + async fn create_btree_index( + &self, + snapshot: SnapshotHandle, + columns: &[&str], + ) -> Result; + + async fn create_inverted_index( + &self, + snapshot: SnapshotHandle, + column: &str, + ) -> Result; + + async fn create_vector_index( + &self, + snapshot: SnapshotHandle, + column: &str, + ) -> Result; + + // ── URI helpers ──────────────────────────────────────────────────── + // + // These are pure string formatting; they live on the trait so engine + // code holding `Arc` can compute dataset URIs + // without importing the concrete struct. + + fn root_uri(&self) -> &str; + fn dataset_uri(&self, table_path: &str) -> String; + + // ── Streaming access (used by the export path) ──────────────────── + // + // Engine code that needs a `DatasetRecordBatchStream` (rather than a + // collected `Vec`) goes through this trait method. + // Useful for the JSONL exporter that streams rows to a writer + // without materializing the whole result. + + async fn scan_stream( + &self, + snapshot: &SnapshotHandle, + projection: Option<&[&str]>, + filter: Option<&str>, + order_by: Option>, + with_row_id: bool, + ) -> Result; +} + +// ─── single impl: TableStore ────────────────────────────────────────────── + +#[async_trait] +impl TableStorage for TableStore { + async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result { + self.open_at_entry(entry).await.map(SnapshotHandle::new) + } + + async fn open_snapshot_at_table( + &self, + snapshot: &Snapshot, + table_key: &str, + ) -> Result { + self.open_snapshot_table(snapshot, table_key) + .await + .map(SnapshotHandle::new) + } + + async fn open_dataset_head( + &self, + dataset_uri: &str, + branch: Option<&str>, + ) -> Result { + TableStore::open_dataset_head(self, dataset_uri, branch) + .await + .map(SnapshotHandle::new) + } + + async fn open_dataset_head_for_write( + &self, + table_key: &str, + dataset_uri: &str, + branch: Option<&str>, + ) -> Result { + TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch) + .await + .map(SnapshotHandle::new) + } + + async fn open_dataset_at_state( + &self, + table_path: &str, + branch: Option<&str>, + version: u64, + ) -> Result { + TableStore::open_dataset_at_state(self, table_path, branch, version) + .await + .map(SnapshotHandle::new) + } + + async fn fork_branch_from_state( + &self, + dataset_uri: &str, + source_branch: Option<&str>, + table_key: &str, + source_version: u64, + target_branch: &str, + ) -> Result { + TableStore::fork_branch_from_state( + self, + dataset_uri, + source_branch, + table_key, + source_version, + target_branch, + ) + .await + .map(SnapshotHandle::new) + } + + async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> { + TableStore::delete_branch(self, dataset_uri, branch).await + } + + async fn reopen_for_mutation( + &self, + dataset_uri: &str, + branch: Option<&str>, + table_key: &str, + expected_version: u64, + ) -> Result { + TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version) + .await + .map(SnapshotHandle::new) + } + + fn ensure_expected_version( + &self, + snapshot: &SnapshotHandle, + table_key: &str, + expected_version: u64, + ) -> Result<()> { + TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version) + } + + async fn scan( + &self, + snapshot: &SnapshotHandle, + projection: Option<&[&str]>, + filter: Option<&str>, + order_by: Option>, + ) -> Result> { + TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await + } + + async fn scan_with_row_id( + &self, + snapshot: &SnapshotHandle, + projection: Option<&[&str]>, + filter: Option<&str>, + order_by: Option>, + with_row_id: bool, + ) -> Result> { + TableStore::scan_with( + self, + snapshot.dataset(), + projection, + filter, + order_by, + with_row_id, + |_| Ok(()), + ) + .await + } + + async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result> { + TableStore::scan_batches(self, snapshot.dataset()).await + } + + async fn scan_batches_for_rewrite( + &self, + snapshot: &SnapshotHandle, + ) -> Result> { + TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await + } + + async fn count_rows( + &self, + snapshot: &SnapshotHandle, + filter: Option, + ) -> Result { + TableStore::count_rows(self, snapshot.dataset(), filter).await + } + + async fn count_rows_with_staged( + &self, + snapshot: &SnapshotHandle, + staged: &[StagedHandle], + filter: Option, + ) -> Result { + let staged_writes = staged_handles_as_writes(staged); + TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await + } + + async fn scan_with_staged( + &self, + snapshot: &SnapshotHandle, + staged: &[StagedHandle], + projection: Option<&[&str]>, + filter: Option<&str>, + ) -> Result> { + let staged_writes = staged_handles_as_writes(staged); + TableStore::scan_with_staged( + self, + snapshot.dataset(), + &staged_writes, + projection, + filter, + ) + .await + } + + async fn scan_with_pending( + &self, + snapshot: &SnapshotHandle, + pending: &[RecordBatch], + pending_schema: Option, + projection: Option<&[&str]>, + filter: Option<&str>, + key_column: Option<&str>, + ) -> Result> { + TableStore::scan_with_pending( + self, + snapshot.dataset(), + pending, + pending_schema, + projection, + filter, + key_column, + ) + .await + } + + async fn first_row_id_for_filter( + &self, + snapshot: &SnapshotHandle, + filter: &str, + ) -> Result> { + TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await + } + + async fn table_state( + &self, + dataset_uri: &str, + snapshot: &SnapshotHandle, + ) -> Result { + TableStore::table_state(self, dataset_uri, snapshot.dataset()).await + } + + async fn stage_append( + &self, + snapshot: &SnapshotHandle, + batch: RecordBatch, + prior_stages: &[StagedHandle], + ) -> Result { + let staged_writes = staged_handles_as_writes(prior_stages); + TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes) + .await + .map(StagedHandle::new) + } + + async fn stage_merge_insert( + &self, + snapshot: SnapshotHandle, + batch: RecordBatch, + key_columns: Vec, + when_matched: WhenMatched, + when_not_matched: WhenNotMatched, + ) -> Result { + let ds = Arc::try_unwrap(snapshot.into_arc()) + .unwrap_or_else(|arc| (*arc).clone()); + TableStore::stage_merge_insert( + self, + ds, + batch, + key_columns, + when_matched, + when_not_matched, + ) + .await + .map(StagedHandle::new) + } + + async fn commit_staged( + &self, + snapshot: SnapshotHandle, + staged: StagedHandle, + ) -> Result { + let ds_arc = snapshot.into_arc(); + let transaction = staged.into_staged().transaction; + TableStore::commit_staged(self, ds_arc, transaction) + .await + .map(SnapshotHandle::new) + } + + async fn stage_overwrite( + &self, + snapshot: &SnapshotHandle, + batch: RecordBatch, + ) -> Result { + TableStore::stage_overwrite(self, snapshot.dataset(), batch) + .await + .map(StagedHandle::new) + } + + async fn stage_create_btree_index( + &self, + snapshot: &SnapshotHandle, + columns: &[&str], + ) -> Result { + TableStore::stage_create_btree_index(self, snapshot.dataset(), columns) + .await + .map(StagedHandle::new) + } + + async fn stage_create_inverted_index( + &self, + snapshot: &SnapshotHandle, + column: &str, + ) -> Result { + TableStore::stage_create_inverted_index(self, snapshot.dataset(), column) + .await + .map(StagedHandle::new) + } + + async fn append_batch( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + batch: RecordBatch, + ) -> Result<(SnapshotHandle, TableState)> { + let mut ds = Arc::try_unwrap(snapshot.into_arc()) + .unwrap_or_else(|arc| (*arc).clone()); + let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?; + Ok((SnapshotHandle::new(ds), state)) + } + + async fn merge_insert_batches( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + batches: Vec, + key_columns: Vec, + when_matched: WhenMatched, + when_not_matched: WhenNotMatched, + ) -> Result { + let ds = Arc::try_unwrap(snapshot.into_arc()) + .unwrap_or_else(|arc| (*arc).clone()); + TableStore::merge_insert_batches( + self, + dataset_uri, + ds, + batches, + key_columns, + when_matched, + when_not_matched, + ) + .await + } + + async fn overwrite_batch( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + batch: RecordBatch, + ) -> Result<(SnapshotHandle, TableState)> { + let mut ds = Arc::try_unwrap(snapshot.into_arc()) + .unwrap_or_else(|arc| (*arc).clone()); + let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?; + Ok((SnapshotHandle::new(ds), state)) + } + + async fn delete_where( + &self, + dataset_uri: &str, + snapshot: SnapshotHandle, + filter: &str, + ) -> Result { + let mut ds = Arc::try_unwrap(snapshot.into_arc()) + .unwrap_or_else(|arc| (*arc).clone()); + TableStore::delete_where(self, dataset_uri, &mut ds, filter).await + } + + async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result { + TableStore::has_btree_index(self, snapshot.dataset(), column).await + } + + async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result { + TableStore::has_fts_index(self, snapshot.dataset(), column).await + } + + async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result { + TableStore::has_vector_index(self, snapshot.dataset(), column).await + } + + async fn create_btree_index( + &self, + snapshot: SnapshotHandle, + columns: &[&str], + ) -> Result { + let mut ds = Arc::try_unwrap(snapshot.into_arc()) + .unwrap_or_else(|arc| (*arc).clone()); + TableStore::create_btree_index(self, &mut ds, columns).await?; + Ok(SnapshotHandle::new(ds)) + } + + async fn create_inverted_index( + &self, + snapshot: SnapshotHandle, + column: &str, + ) -> Result { + let mut ds = Arc::try_unwrap(snapshot.into_arc()) + .unwrap_or_else(|arc| (*arc).clone()); + TableStore::create_inverted_index(self, &mut ds, column).await?; + Ok(SnapshotHandle::new(ds)) + } + + async fn create_vector_index( + &self, + snapshot: SnapshotHandle, + column: &str, + ) -> Result { + let mut ds = Arc::try_unwrap(snapshot.into_arc()) + .unwrap_or_else(|arc| (*arc).clone()); + TableStore::create_vector_index(self, &mut ds, column).await?; + Ok(SnapshotHandle::new(ds)) + } + + fn root_uri(&self) -> &str { + TableStore::root_uri(self) + } + + fn dataset_uri(&self, table_path: &str) -> String { + TableStore::dataset_uri(self, table_path) + } + + async fn scan_stream( + &self, + snapshot: &SnapshotHandle, + projection: Option<&[&str]>, + filter: Option<&str>, + order_by: Option>, + with_row_id: bool, + ) -> Result { + // Note: existing TableStore::scan_stream is an associated fn that + // takes &Dataset, so we delegate via the dataset reference held by + // the snapshot. + TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await + } +} + +// Suppress unused-import warning when the module is built without the +// `Scanner` type being referenced from the trait. +#[allow(dead_code)] +fn _scanner_type_marker(_: &Scanner) {} diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index adc7c2b..5b3fabf 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -4,7 +4,7 @@ use arrow_select::concat::concat_batches; use futures::TryStreamExt; use lance::Dataset; use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner}; -use lance::dataset::transaction::{Operation, Transaction}; +use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder}; use lance::dataset::{ CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams, @@ -760,6 +760,172 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } + /// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }). + /// Returns a StagedWrite carrying the replacement fragments. HEAD does + /// NOT advance. + /// + /// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. }) + /// .execute_uncommitted(vec![batch])` produces a `Transaction` whose + /// `Operation::Overwrite` carries the new schema + fragments. The + /// transaction is committed via `commit_staged` (same call as + /// `stage_append`). + /// + /// MR-793 Phase 2: introduces this for the schema_apply rewrite path. + /// Lance API verified in `.context/mr-793-design.md` Appendix A.1. + pub async fn stage_overwrite( + &self, + ds: &Dataset, + batch: RecordBatch, + ) -> Result { + if batch.num_rows() == 0 { + return Err(OmniError::manifest_internal( + "stage_overwrite called with empty batch".to_string(), + )); + } + let params = WriteParams { + mode: WriteMode::Overwrite, + allow_external_blob_outside_bases: true, + ..Default::default() + }; + let transaction = InsertBuilder::new(Arc::new(ds.clone())) + .with_params(¶ms) + .execute_uncommitted(vec![batch]) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let mut new_fragments = match &transaction.operation { + Operation::Overwrite { fragments, .. } => fragments.clone(), + other => { + return Err(OmniError::manifest_internal(format!( + "stage_overwrite: unexpected Lance operation {:?}", + std::mem::discriminant(other) + ))); + } + }; + // Overwrite REPLACES every committed fragment, and Lance restarts + // fragment-ID and row-ID counters at the post-commit version. + // For our pre-commit staged view we need to: + // 1) Renumber temporary fragment IDs (Lance returns them as + // `id = 0` from `execute_uncommitted` — see stage_append + // for the same fix). For Overwrite there are no committed + // fragments to collide with (they're all in + // removed_fragment_ids below), so start at 1. + // 2) For stable-row-id datasets, assign row_id_meta starting + // at 0 (Overwrite is a fresh-start) so `scan_with_staged` + // doesn't hit the "Missing row id meta" panic in + // lance-4.0.0 dataset/rowids.rs:22. + assign_fragment_ids(&mut new_fragments, 1); + if ds.manifest.uses_stable_row_ids() { + assign_row_id_meta(&mut new_fragments, 0)?; + } + // Overwrite REPLACES every committed fragment. For + // read-your-writes via scan_with_staged, list every committed + // fragment in removed_fragment_ids so the post-stage view shows + // ONLY the staged fragments. + let removed_fragment_ids: Vec = + ds.manifest.fragments.iter().map(|f| f.id).collect(); + Ok(StagedWrite { + transaction, + new_fragments, + removed_fragment_ids, + }) + } + + /// Stage a BTREE scalar index build. Returns a StagedWrite whose + /// transaction commits via `commit_staged`. HEAD does NOT advance. + /// + /// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns + /// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex + /// { new_indices, removed_indices }` via the public `TransactionBuilder`, + /// replicating the simple (non-segment-commit-path) branch of Lance's + /// `CreateIndexBuilder::execute` (lance-4.0.0 `src/index/create.rs:502-512`). + /// + /// `removed_indices` mirrors `execute()` lines 466-476: when the + /// build replaces an existing same-named index, those entries are + /// listed for tombstoning by the manifest commit. + /// + /// MR-793 Phase 2: scalar index types (BTree, Inverted) are + /// stage-able. Vector indices are NOT (segment-commit-path requires + /// `build_index_metadata_from_segments` which is `pub(crate)` in + /// lance-4.0.0); see `create_vector_index` and Appendix A.3. + pub async fn stage_create_btree_index( + &self, + ds: &Dataset, + columns: &[&str], + ) -> Result { + let params = ScalarIndexParams::default(); + let mut ds_clone = ds.clone(); + let new_idx = ds_clone + .create_index_builder(columns, IndexType::BTree, ¶ms) + .replace(true) + .execute_uncommitted() + .await + .map_err(|e| { + OmniError::Lance(format!("stage_create_btree_index: {}", e)) + })?; + let removed_indices: Vec = ds + .load_indices() + .await + .map_err(|e| OmniError::Lance(e.to_string()))? + .iter() + .filter(|idx| idx.name == new_idx.name) + .cloned() + .collect(); + let transaction = TransactionBuilder::new( + new_idx.dataset_version, + Operation::CreateIndex { + new_indices: vec![new_idx], + removed_indices, + }, + ) + .build(); + Ok(StagedWrite { + transaction, + new_fragments: Vec::new(), + removed_fragment_ids: Vec::new(), + }) + } + + /// Stage an INVERTED (FTS) scalar index build. Same shape as + /// `stage_create_btree_index`; see its docs for the Lance API + /// citation and contract notes. + pub async fn stage_create_inverted_index( + &self, + ds: &Dataset, + column: &str, + ) -> Result { + let params = InvertedIndexParams::default(); + let mut ds_clone = ds.clone(); + let new_idx = ds_clone + .create_index_builder(&[column], IndexType::Inverted, ¶ms) + .replace(true) + .execute_uncommitted() + .await + .map_err(|e| { + OmniError::Lance(format!("stage_create_inverted_index: {}", e)) + })?; + let removed_indices: Vec = ds + .load_indices() + .await + .map_err(|e| OmniError::Lance(e.to_string()))? + .iter() + .filter(|idx| idx.name == new_idx.name) + .cloned() + .collect(); + let transaction = TransactionBuilder::new( + new_idx.dataset_version, + Operation::CreateIndex { + new_indices: vec![new_idx], + removed_indices, + }, + ) + .build(); + Ok(StagedWrite { + transaction, + new_fragments: Vec::new(), + removed_fragment_ids: Vec::new(), + }) + } + /// Run a scan with optional uncommitted staged writes visible /// alongside the committed snapshot. When `staged` is empty this is /// identical to `scan(...)`. diff --git a/crates/omnigraph/tests/forbidden_apis.rs b/crates/omnigraph/tests/forbidden_apis.rs new file mode 100644 index 0000000..d3df340 --- /dev/null +++ b/crates/omnigraph/tests/forbidden_apis.rs @@ -0,0 +1,172 @@ +//! MR-793 Phase 3 — forbidden-API guard test. +//! +//! Engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) MUST NOT +//! call Lance's inline-commit data-write APIs directly. The +//! `Storage` trait (`crate::storage_layer::TableStorage`) is the canonical +//! surface; staged primitives (`stage_append`, `stage_merge_insert`, +//! `stage_overwrite`, `stage_create_btree_index`, +//! `stage_create_inverted_index`) plus `commit_staged` are the only +//! way to advance Lance HEAD. +//! +//! The trait is sealed (only `TableStore` impls it), so by-construction +//! the trait surface forbids ad-hoc Lance calls. This test is **defense +//! in depth** — it catches the case where engine code reaches around +//! the trait by importing `lance::dataset::*` types directly. +//! +//! ## How it works +//! +//! Walks `crates/omnigraph/src/{exec,db/omnigraph,loader,changes}/**/*.rs`, +//! greps each line for forbidden symbols. Lines whose preceding line +//! contains the sentinel comment `// forbidden-api-allow: ` are +//! exempt — reviewers see the sentinel in diff and can ask "is this +//! exemption justified?" +//! +//! ## What's deliberately out of scope (allow-listed by directory) +//! +//! - `crates/omnigraph/src/table_store.rs` — IS the storage layer. +//! The forbidden Lance APIs live here legitimately. +//! - `crates/omnigraph/src/db/manifest/**` — uses `CommitBuilder` for +//! the cross-table manifest commit. Documented exception. +//! - `crates/omnigraph/src/storage_layer.rs` — IS the trait module. +//! +//! ## Initial state (MR-793 Phase 3) +//! +//! At the time this test was written, MR-793 has migrated three writers +//! (ensure_indices, branch_merge, schema_apply rewrites) onto staged +//! primitives. Other engine call sites (the bulk loader, exec/mutation, +//! exec/query, etc.) still use the legacy inherent `TableStore` methods +//! — they're not visible at the trait boundary, but they DO call lance +//! types. The allow-list below reflects this transitional state. Phase 9 +//! tightens the allow-list as call sites migrate. + +use std::path::{Path, PathBuf}; + +const FORBIDDEN_PATTERNS: &[&str] = &[ + "MergeInsertBuilder", + "InsertBuilder::", + "DeleteBuilder", + "CommitBuilder::new", + ".create_index_builder(", + ".create_index_segment_builder(", +]; + +/// Files exempt from the guard. These are the legitimate storage-layer +/// implementations that USE the forbidden APIs to provide the staged +/// primitives. +const ALLOW_LIST_FILES: &[&str] = &[ + "table_store.rs", // The storage layer itself. + "storage_layer.rs", // The trait module. +]; + +/// Directories exempt from the guard. Files under these paths may use +/// the forbidden APIs. +const ALLOW_LIST_DIRS: &[&str] = &[ + "db/manifest", // Manifest publisher uses CommitBuilder for cross-table commits. + "db/manifest/", // Belt + suspenders for the directory match. +]; + +const SENTINEL: &str = "// forbidden-api-allow:"; + +fn engine_src_root() -> PathBuf { + let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR"); + PathBuf::from(manifest_dir).join("src") +} + +fn is_allow_listed(path: &Path) -> bool { + let path_str = path.to_string_lossy(); + if let Some(name) = path.file_name().and_then(|s| s.to_str()) { + if ALLOW_LIST_FILES.iter().any(|f| *f == name) { + return true; + } + } + ALLOW_LIST_DIRS.iter().any(|d| path_str.contains(d)) +} + +fn walk_rust_files(root: &Path) -> Vec { + let mut out = Vec::new(); + walk_into(root, &mut out); + out +} + +fn walk_into(dir: &Path, out: &mut Vec) { + let entries = match std::fs::read_dir(dir) { + Ok(e) => e, + Err(_) => return, + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + walk_into(&path, out); + } else if path.extension().and_then(|s| s.to_str()) == Some("rs") { + out.push(path); + } + } +} + +#[test] +fn engine_code_does_not_call_forbidden_lance_apis() { + let src = engine_src_root(); + let mut violations = Vec::new(); + + for file in walk_rust_files(&src) { + if is_allow_listed(&file) { + continue; + } + let contents = match std::fs::read_to_string(&file) { + Ok(c) => c, + Err(_) => continue, + }; + let lines: Vec<&str> = contents.lines().collect(); + for (idx, line) in lines.iter().enumerate() { + let trimmed = line.trim_start(); + // Skip comment-only lines — references to forbidden API + // names in doc-comments, design notes, or residual-marker + // comments are documentation, not code use. The trait + // surface (sealed + trait-only) is the actual enforcement; + // this test only catches code use. + if trimmed.starts_with("//") + || trimmed.starts_with("/*") + || trimmed.starts_with("*") + { + continue; + } + // Allow lines marked with the sentinel on the SAME line or + // the immediately preceding line. + if line.contains(SENTINEL) { + continue; + } + if idx > 0 && lines[idx - 1].contains(SENTINEL) { + continue; + } + for pattern in FORBIDDEN_PATTERNS { + if line.contains(pattern) { + let rel = file + .strip_prefix(&src) + .unwrap_or(&file) + .display() + .to_string(); + violations.push(format!( + "{}:{}: forbidden pattern `{}` — {}", + rel, + idx + 1, + pattern, + line.trim() + )); + } + } + } + } + + if !violations.is_empty() { + panic!( + "MR-793 forbidden-API guard found {} violation(s) in engine code. \ + Engine code MUST route through the `TableStorage` trait (or its \ + inherent counterparts on `TableStore`) instead of calling Lance's \ + inline-commit APIs directly. If a use is genuinely justified, add \ + the comment `// forbidden-api-allow: ` on the same line or \ + the line above.\n\nViolations:\n {}", + violations.len(), + violations.join("\n ") + ); + } +} diff --git a/crates/omnigraph/tests/staged_writes.rs b/crates/omnigraph/tests/staged_writes.rs index 1f8822d..225b215 100644 --- a/crates/omnigraph/tests/staged_writes.rs +++ b/crates/omnigraph/tests/staged_writes.rs @@ -492,3 +492,269 @@ async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior surface in production paths — see exec/staging.rs." ); } + +// ─── MR-793 Phase 2: stage_overwrite + scalar index staging ───────────────── + +/// `stage_overwrite` writes replacement fragments to object storage but +/// does NOT advance Lance HEAD until `commit_staged` runs. Mirrors +/// `stage_append`'s contract. +#[tokio::test] +async fn stage_overwrite_does_not_advance_head_until_commit() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + let pre_version = ds.version().version; + + let staged = store + .stage_overwrite(&ds, person_batch(&[("zoe", Some(99))])) + .await + .unwrap(); + assert_eq!( + ds.version().version, + pre_version, + "stage_overwrite must not advance HEAD" + ); + // Reopen at HEAD; still pre-version (no commit happened on disk). + let reopened = Dataset::open(&uri).await.unwrap(); + assert_eq!(reopened.version().version, pre_version); + + // After commit_staged, HEAD advances and the dataset shows the + // overwrite result (zoe alone — alice replaced). + let new_ds = store + .commit_staged(Arc::new(ds.clone()), staged.transaction) + .await + .unwrap(); + assert!(new_ds.version().version > pre_version); + let after = store.scan_batches(&new_ds).await.unwrap(); + assert_eq!(collect_ids(&after), vec!["zoe"]); +} + +/// `stage_overwrite` semantically REPLACES every committed fragment. +/// `removed_fragment_ids` lists every committed fragment so +/// `scan_with_staged` shows only the staged rows (not committed + staged). +#[tokio::test] +async fn stage_overwrite_replaces_all_fragments() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset( + &uri, + person_batch(&[("alice", Some(30)), ("bob", Some(25))]), + ) + .await + .unwrap(); + let committed_fragment_ids: std::collections::HashSet = + ds.manifest.fragments.iter().map(|f| f.id).collect(); + + let staged = store + .stage_overwrite(&ds, person_batch(&[("zoe", Some(99))])) + .await + .unwrap(); + let removed: std::collections::HashSet = + staged.removed_fragment_ids.iter().copied().collect(); + assert_eq!( + removed, committed_fragment_ids, + "stage_overwrite must list every committed fragment as removed so \ + scan_with_staged shadows them all (overwrite semantics — pre-data \ + is being wiped)" + ); + + let batches = store + .scan_with_staged(&ds, std::slice::from_ref(&staged), None, None) + .await + .unwrap(); + assert_eq!( + collect_ids(&batches), + vec!["zoe"], + "scan_with_staged must show only the staged row, not committed + staged" + ); +} + +/// `stage_create_btree_index` writes index segments to object storage +/// but does NOT advance Lance HEAD until `commit_staged`. After commit, +/// the index is queryable. +#[tokio::test] +async fn stage_create_btree_index_does_not_advance_head_until_commit() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset( + &uri, + person_batch(&[("alice", Some(30)), ("bob", Some(25))]), + ) + .await + .unwrap(); + let pre_version = ds.version().version; + assert!( + !store.has_btree_index(&ds, "id").await.unwrap(), + "fresh dataset has no btree index on `id`" + ); + + let staged = store.stage_create_btree_index(&ds, &["id"]).await.unwrap(); + assert_eq!( + ds.version().version, + pre_version, + "stage_create_btree_index must not advance HEAD" + ); + let reopened = Dataset::open(&uri).await.unwrap(); + assert_eq!( + reopened.version().version, + pre_version, + "no Lance commit happened on disk" + ); + assert!( + !store.has_btree_index(&reopened, "id").await.unwrap(), + "index is not visible until commit_staged" + ); + + let new_ds = store + .commit_staged(Arc::new(ds.clone()), staged.transaction) + .await + .unwrap(); + assert!(new_ds.version().version > pre_version); + assert!( + store.has_btree_index(&new_ds, "id").await.unwrap(), + "after commit_staged, the index IS visible" + ); +} + +/// `stage_create_inverted_index` (FTS) — same shape as the BTREE test. +#[tokio::test] +async fn stage_create_inverted_index_does_not_advance_head_until_commit() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset( + &uri, + person_batch(&[("alice", Some(30)), ("bob", Some(25))]), + ) + .await + .unwrap(); + let pre_version = ds.version().version; + + let staged = store + .stage_create_inverted_index(&ds, "id") + .await + .unwrap(); + assert_eq!( + ds.version().version, + pre_version, + "stage_create_inverted_index must not advance HEAD" + ); + assert!(!store.has_fts_index(&ds, "id").await.unwrap()); + + let new_ds = store + .commit_staged(Arc::new(ds.clone()), staged.transaction) + .await + .unwrap(); + assert!(new_ds.version().version > pre_version); + assert!( + store.has_fts_index(&new_ds, "id").await.unwrap(), + "after commit_staged, the FTS index IS visible" + ); +} + +/// Pin the inline-commit behavior of `delete_where`. Lance 4.0.0 does +/// NOT expose a public `DeleteJob::execute_uncommitted` +/// (`pub(crate)` — see lance-format/lance#6658). MR-793 deliberately +/// does NOT introduce a `stage_delete` wrapper that would secretly +/// inline-commit (a side-channel — see design doc §3.2). Instead, the +/// trait keeps `delete_where` as the only delete entry point, named +/// honestly. +/// +/// **When Lance #6658 lands**: this test will need to flip — replace +/// the assertion with a `stage_delete` + `commit_staged` round-trip +/// and remove the residual line in `docs/runs.md`. +#[tokio::test] +async fn delete_where_advances_head_inline_documents_residual() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let mut ds = TableStore::write_dataset( + &uri, + person_batch(&[("alice", Some(30)), ("bob", Some(25))]), + ) + .await + .unwrap(); + let pre_version = ds.version().version; + + let result = store + .delete_where(&uri, &mut ds, "id = 'alice'") + .await + .unwrap(); + assert_eq!(result.deleted_rows, 1); + assert!( + result.version > pre_version, + "delete_where ADVANCES Lance HEAD inline (the residual). When \ + lance-format/lance#6658 ships and we migrate to stage_delete + \ + commit_staged, flip this assertion to assert that staging does \ + NOT advance HEAD." + ); +} + +/// Companion to `delete_where_*`: pin the inline-commit behavior of +/// `create_vector_index`. Lance 4.0.0 vector indices take the +/// "segment commit path" which calls `build_index_metadata_from_segments` +/// (`pub(crate)` in lance-4.0.0 `src/index.rs:111`). Until upstream +/// exposes that helper (companion ticket to #6658), MR-793's trait +/// surface deliberately does NOT include `stage_create_vector_index` — +/// see design doc Appendix A.3. +#[tokio::test] +async fn create_vector_index_advances_head_inline_documents_residual() { + use arrow_array::FixedSizeListArray; + use arrow_schema::FieldRef; + + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/vec.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Build a small dataset with a fixed-size vector column. Vector index + // training requires multiple rows; provide enough. + let dim = 4usize; + let n_rows = 8usize; + let item_field: FieldRef = Arc::new(Field::new("item", DataType::Float32, true)); + let vec_field = Field::new( + "embedding", + DataType::FixedSizeList(item_field.clone(), dim as i32), + false, + ); + let id_field = Field::new("id", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![id_field, vec_field])); + + let ids: Vec = (0..n_rows).map(|i| format!("v{}", i)).collect(); + let id_arr = StringArray::from(ids); + let flat: Vec = (0..(n_rows * dim)).map(|i| i as f32).collect(); + let values = arrow_array::Float32Array::from(flat); + let vec_arr = + FixedSizeListArray::new(item_field, dim as i32, Arc::new(values), None); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(id_arr), Arc::new(vec_arr)], + ) + .unwrap(); + + let mut ds = TableStore::write_dataset(&uri, batch).await.unwrap(); + let pre_version = ds.version().version; + assert!(!store.has_vector_index(&ds, "embedding").await.unwrap()); + + store + .create_vector_index(&mut ds, "embedding") + .await + .unwrap(); + assert!( + ds.version().version > pre_version, + "create_vector_index ADVANCES Lance HEAD inline (the residual). \ + When the upstream Lance helper `build_index_metadata_from_segments` \ + is made `pub`, add `stage_create_vector_index` to the trait and \ + flip this test to assert staging does NOT advance HEAD." + ); + assert!(store.has_vector_index(&ds, "embedding").await.unwrap()); +} diff --git a/docs/runs.md b/docs/runs.md index 459b7ca..4b27e71 100644 --- a/docs/runs.md +++ b/docs/runs.md @@ -63,6 +63,44 @@ edges break referential integrity). Until Lance exposes a two-phase delete API, the parse-time rejection keeps both paths atomic and correct. Tracked: MR-793, plus a Lance-upstream ticket. +### MR-793 status (storage trait two-phase invariant) — partial + +MR-793 hoists the staged-write pattern into a `TableStorage` trait +surface with sealed-trait enforcement and opaque `SnapshotHandle` / +`StagedHandle` types — see `crates/omnigraph/src/storage_layer.rs`. +The trait is the canonical surface for new engine code; existing call +sites still use the inherent `TableStore` methods (mechanical migration +deferred to a follow-up cycle — tracked). + +Three writers have been migrated onto staged primitives: + +* **`ensure_indices`** (`db/omnigraph/table_ops.rs::build_indices_on_dataset_for_catalog`) + — scalar indices (BTree, Inverted) now use `stage_create_*_index` + + `commit_staged`. Vector indices stay inline (residual — Lance + `build_index_metadata_from_segments` is `pub(crate)` in 4.0.0; + companion ticket to lance-format/lance#6658 needed). +* **`branch_merge::publish_rewritten_merge_table`** + (`exec/merge.rs`) — merge_insert now uses `stage_merge_insert` + + `commit_staged`. Deletes stay inline (Lance #6658 residual). +* **`schema_apply` rewritten_tables** (`db/omnigraph/schema_apply.rs`) + — non-empty rewrites use `stage_overwrite` + `commit_staged`. + Empty-batch rewrites stay inline (Lance `InsertBuilder::execute_uncommitted` + rejects empty data; the empty case is rare and bounded by the + schema-apply lock branch). + +A defense-in-depth integration test (`tests/forbidden_apis.rs`) walks +engine source and fails if non-allow-listed code calls Lance's +inline-commit APIs directly. The trait surface itself is the primary +enforcement (sealed + only-callable-via-trait once call sites land); +the grep test catches type-system bypass attempts. + +The "finalize → publisher residual" described below applies equally to +the migrated writers — Lance has no multi-dataset atomic commit +primitive, so the per-table commit_staged → manifest publish gap is +the same drift class. Closing it requires either upstream Lance +multi-dataset commit OR the omnigraph-side recovery-on-open reconciler +described in `.context/mr-793-design.md` §15 (deferred to MR-795). + ### `LoadMode::Overwrite` residual The bulk loader's Append and Merge modes use the staged-write path From 17bf978d0e66fff4a3f212f0edcd6cb4a6c47a7a Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 17:41:32 +0200 Subject: [PATCH 2/8] MR-793 follow-up: lance docs alignment audit + mandate full-page fetch via mdrip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * AGENTS.md / docs/lance.md: agents must use `npx mdrip` (not summarizing WebFetch) when consulting Lance docs. WebFetch routinely drops load-bearing details — `pub(crate)` blockers, sub-specs behind nav hubs, default flags. Lesson learned during the MR-793 alignment audit. * docs/lance.md: add "Last alignment audit: 2026-05-02" stanza documenting MemWAL gap, lance#6666 companion ticket, stable-row-ID status (experimental, may unblock MR-848), FRI as documented compaction-friendly alternative. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 2 +- docs/lance.md | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 38560fa..aa0de0c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -5,7 +5,7 @@ This file is the always-on map for AI coding agents (Claude Code, Codex, Cursor, **Required reading every session, every change:** 1. **[docs/invariants.md](docs/invariants.md)** — the architectural invariants and §IX deny-list. Apply to every PR, not only architecture work. -2. **[docs/lance.md](docs/lance.md)** — the curated index of upstream Lance docs. **Consult it before every task** to identify which Lance pages are relevant to what you're about to do, then fetch those upstream URLs before grepping our code or guessing. Lance is the substrate; behavior is documented there, not here. +2. **[docs/lance.md](docs/lance.md)** — the curated index of upstream Lance docs. **Consult it before every task** to identify which Lance pages are relevant to what you're about to do, then fetch those upstream URLs before grepping our code or guessing. Lance is the substrate; behavior is documented there, not here. **Always fetch the FULL page content, not summaries** — use `npx mdrip ` (or `npx mdrip --max-chars 200000 ` for very long pages). Tools that summarize pages (like Claude's `WebFetch`) drop load-bearing details — we have caught alignment misses (default flags, `pub(crate)` blockers, three-page sub-specs hidden behind navigation hubs) only after dumping the full markdown. If `npx mdrip` is unavailable, fall back to `curl | pandoc -f html -t markdown` or paste the rendered page text manually; never act on a summarized fetch alone. 3. **[docs/testing.md](docs/testing.md)** — the test-coverage map. **Always check what already covers your change before writing a new test.** Extending an existing test (an assertion, a fixture row, a parameterization) is preferred over a duplicated `init_and_load()` block. Walk the before-every-task checklist to identify existing coverage, run those tests as a clean baseline, and only add a new test fn or file when no existing one owns the area. Tools that support `@`-imports (Claude Code) auto-include all three files via the imports below — note these must sit at column 0 (not inside a blockquote) for the parser to recognize them. Other agents (Codex, Cursor, Cline, …) must open them explicitly at the start of each session. diff --git a/docs/lance.md b/docs/lance.md index 700e105..64fbb3e 100644 --- a/docs/lance.md +++ b/docs/lance.md @@ -4,7 +4,7 @@ OmniGraph sits on top of Lance. Many problems — index lifecycle, branching, tr This file is the curated entry point. **When you hit a Lance-shaped problem, find the matching topic below and fetch the listed URL(s) before guessing.** Don't grep our codebase for behavior that is documented authoritatively in Lance. -Base URL: `https://lance.org`. Use `WebFetch` (or your tool's equivalent) on the full URLs. Keep this index curated to relevant material — the upstream sitemap has hundreds of URLs (notably the Namespace REST API model surface, Spark/Trino/Databricks integrations) that we don't use. +Base URL: `https://lance.org`. **Fetch the FULL page content, not summaries** — use `npx mdrip ` (or `npx mdrip --max-chars 200000 ` for very long pages). Tools that summarize pages (like Claude's `WebFetch`) routinely drop load-bearing details — defaults, `pub(crate)` blockers, sub-specs hidden behind navigation hubs. If `npx mdrip` is unavailable, fall back to `curl | pandoc -f html -t markdown` or paste the rendered page text manually; **never act on a summarized fetch alone**. Keep this index curated to relevant material — the upstream sitemap has hundreds of URLs (notably the Namespace REST API model surface, Spark/Trino/Databricks integrations) that we don't use. > **Substrate boundary check.** Before fetching, recall [docs/invariants.md §I](invariants.md): if Lance already does the thing, we don't reimplement it. The most common reason to read these docs is to confirm a substrate behavior, not to learn what to clone. @@ -155,3 +155,14 @@ If a future need pulls one of these into scope, add a row to the matching domain ## Maintenance When Lance ships a major release that changes any of the above (file format bump, new index type, transaction semantics change, new branching primitive), refresh this index in the same change as the omnigraph upgrade. Stale Lance pointers are worse than no pointers. + +### Last alignment audit: 2026-05-02 (Lance 4.0.1 upstream; omnigraph pinned at 4.0.0) + +A full read-through of every index page above was performed in the MR-793 cycle. Findings (no code changes required for PR #70): + +- The MemWAL system index has three deeper sub-pages that this index does not yet list — they're load-bearing for understanding crash-recovery semantics and are needed before MR-847 (recovery reconciler) implementation. Add when located: `MemWAL Index Overview`, `MemWAL Index Details`, `MemWAL Implementation` (linked from the parent MemWAL page but at sub-URLs not currently in `lance.md`). +- The distributed-indexing guide names Python APIs (`commit_existing_index_segments`, `merge_existing_index_segments`); the Rust analogues exist via `CreateIndexBuilder::execute_uncommitted` for scalar indices but **`build_index_metadata_from_segments` is `pub(crate)`** and blocks vector-index two-phase commits from outside the lance crate. Filed [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) as a companion to [#6658](https://github.com/lance-format/lance/issues/6658). +- "Stable Row ID for Index" is documented as **experimental** in lance-4.0.x. Our datasets enable stable row IDs at the dataset level (`WriteParams::enable_stable_row_ids = true`); confirming whether our created indices opt into stable-row-id mode is a follow-up worth doing before MR-848 (index reconciler) lands. +- Fragment Reuse Index (FRI) is documented as one of three compaction strategies. omnigraph currently uses option 2 (immediate index rewrite at compaction time, via `omnigraph optimize`'s post-compaction rebuild). Adopting FRI is the explicit option for compaction-friendly index updates; relevant to MR-848. + +Bump this date stanza on the next alignment pass. From b87be5e9f0235b52b49d7808589dc6b33817fab4 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 17:44:41 +0200 Subject: [PATCH 3/8] agents: read every Lance page even slightly relevant, not just the obvious match MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Behavior is interlocked across Lance pages — transactions reference index lifecycle, index lifecycle references compaction, compaction references row-id lineage. Skipping a "slightly relevant" page is how alignment misses happen. The index alone is not a substitute for reading the pages. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index aa0de0c..21c9e2a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -5,7 +5,7 @@ This file is the always-on map for AI coding agents (Claude Code, Codex, Cursor, **Required reading every session, every change:** 1. **[docs/invariants.md](docs/invariants.md)** — the architectural invariants and §IX deny-list. Apply to every PR, not only architecture work. -2. **[docs/lance.md](docs/lance.md)** — the curated index of upstream Lance docs. **Consult it before every task** to identify which Lance pages are relevant to what you're about to do, then fetch those upstream URLs before grepping our code or guessing. Lance is the substrate; behavior is documented there, not here. **Always fetch the FULL page content, not summaries** — use `npx mdrip ` (or `npx mdrip --max-chars 200000 ` for very long pages). Tools that summarize pages (like Claude's `WebFetch`) drop load-bearing details — we have caught alignment misses (default flags, `pub(crate)` blockers, three-page sub-specs hidden behind navigation hubs) only after dumping the full markdown. If `npx mdrip` is unavailable, fall back to `curl | pandoc -f html -t markdown` or paste the rendered page text manually; never act on a summarized fetch alone. +2. **[docs/lance.md](docs/lance.md)** — the curated index of upstream Lance docs. **Consult it before every task** to identify which Lance pages are relevant. **Then fetch every page in the matching domain section, plus every page that is even slightly relevant** — not just the page whose title most obviously matches the task. Behavior is interlocked across pages (transactions reference index lifecycle; index lifecycle references compaction; compaction references row-id lineage), and skipping a "slightly relevant" page is how alignment misses happen. The index itself is not a substitute for reading the pages — never act on the index alone. **Always fetch the FULL page content, not summaries** — use `npx mdrip ` (or `npx mdrip --max-chars 200000 ` for very long pages). Tools that summarize pages (like Claude's `WebFetch`) drop load-bearing details — we have caught alignment misses (default flags, `pub(crate)` blockers, three-page sub-specs hidden behind navigation hubs) only after dumping the full markdown. If `npx mdrip` is unavailable, fall back to `curl | pandoc -f html -t markdown` or paste the rendered page text manually; never act on a summarized fetch alone. 3. **[docs/testing.md](docs/testing.md)** — the test-coverage map. **Always check what already covers your change before writing a new test.** Extending an existing test (an assertion, a fixture row, a parameterization) is preferred over a duplicated `init_and_load()` block. Walk the before-every-task checklist to identify existing coverage, run those tests as a clean baseline, and only add a new test fn or file when no existing one owns the area. Tools that support `@`-imports (Claude Code) auto-include all three files via the imports below — note these must sit at column 0 (not inside a blockquote) for the parser to recognize them. Other agents (Codex, Cursor, Cline, …) must open them explicitly at the start of each session. From 9b0920b5dac93cc176b163b32da86e58528014e2 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 18:47:07 +0200 Subject: [PATCH 4/8] address PR #70 bot review (Cubic + Cursor): 7 inline + failpoint test + invariants notes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cubic findings: * `tests/forbidden_apis.rs`: expand `FORBIDDEN_PATTERNS` with `Dataset::write` / `Dataset::append` / `Dataset::delete` / `Dataset::merge_insert` / `Dataset::add_columns` / `update_columns` / `drop_columns` / `truncate_table` / `restore` and the bare `.merge_insert(` / `.add_columns(` / `.update_columns(` / `.drop_columns(` / `.truncate_table(` method patterns. Deliberately avoid `.append(` / `.delete(` / `.write(` (over-match `Vec::append`, `.delete_branch(`, arrow-array `.append(`, etc.). Allow-list `commit_graph.rs` and `graph_coordinator.rs` — they're manifest-layer infra that legitimately uses `Dataset::write` for system tables. * `schema_apply.rs:253`: pass `entry.table_branch.as_deref()` (not `None`) to `open_dataset_head_for_write` for consistency with the sibling `indexed_tables` block. Schema apply rejects non-main branches at the lock-acquire step today, so behavior is unchanged; this is a defensive consistency fix that survives a future relaxation of the lock check. * `storage_layer.rs:131` doc: was `Vec<&StagedWrite>` with lifetime claim; actually returns `Vec` (cloned). Fixed. * `AGENTS.md:201` capability matrix row + `storage_layer.rs:1` module doc: softened the "stage_* + commit_staged are the only paths" / "trait funnels every write" overclaim. Inline-commit residuals (`delete_where`, `create_vector_index`) remain on the trait pending upstream Lance work (#6658, #6666); legacy `append_batch` etc. remain pending Phase 1b / Phase 9. Module doc now describes the current transitional state honestly. Cursor Bugbot findings: * `storage_layer.rs:360`: trait `delete_where` consumed `SnapshotHandle` but returned only `DeleteState`, dropping the post-delete dataset. Future callers migrating from the inherent `&mut Dataset` API would lose the post-delete dataset state needed for indexing / `table_state` queries. Fixed: returns `(SnapshotHandle, DeleteState)` matching `append_batch` / `overwrite_batch` shape. * `storage_layer.rs:824`: removed dead `_scanner_type_marker` fn and the unused `Scanner` import (the marker existed only to suppress an unused-import warning — fixing the import is the cleaner answer). Engine-level Phase A failpoint test (closes the partial-criterion flagged in Cubic's acceptance-criteria checklist): * `db/omnigraph/table_ops.rs::stage_and_commit_btree`: instrumented with `crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")` between `stage_create_btree_index` and `commit_staged`. * `tests/failpoints.rs::ensure_indices_phase_a_btree_failure_leaves_existing_tables_writable`: triggers the failpoint via a schema-apply that adds a new node type; proves that existing tables are unaffected (Person mutation succeeds after the failed apply) — i.e. Phase A failure leaves no Lance-HEAD drift on tables outside the failed `added_tables` iteration. `docs/invariants.md` transitional notes: * §VI.23 (atomicity per query): annotated as upheld at the writer-trait surface for inserts / updates / scalar-index builds / merge_insert / overwrite after MR-793 PR #70. Per-table commit_staged → manifest publish window remains; closing requires MR-847's recovery-on-open reconciler. `delete_where` and `create_vector_index` remain inline pending lance#6658 / #6666. * §VII.35 (reconciler pattern): annotated as partial — staged primitives are the building blocks; the reconciler task itself is MR-848. * §VIII.45 (reference impl per trait): `TableStorage` has its primary impl on `TableStore` with opaque-handle signatures; no test impl yet. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 2 +- .../src/db/omnigraph/schema_apply.rs | 13 +++- .../omnigraph/src/db/omnigraph/table_ops.rs | 4 ++ crates/omnigraph/src/storage_layer.rs | 70 +++++++++++-------- crates/omnigraph/tests/failpoints.rs | 65 +++++++++++++++++ crates/omnigraph/tests/forbidden_apis.rs | 38 ++++++++-- docs/invariants.md | 3 + 7 files changed, 161 insertions(+), 34 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 21c9e2a..ad1211a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -198,7 +198,7 @@ omnigraph policy explain --actor act-alice --action change --branch main | Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing | | Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables | | Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering | -| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher`. Engine-internal write APIs sit behind a sealed `TableStorage` trait (MR-793) — `stage_*` + `commit_staged` are the only paths to advance Lance HEAD; inline-commit Lance APIs are not reachable through the trait surface | +| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher`. Engine-internal write APIs are routed through a sealed `TableStorage` trait (MR-793). `stage_*` + `commit_staged` are the canonical staged-write surface for new code; documented inline-commit residuals (`delete_where`, `create_vector_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)). The forbidden-API guard prevents engine code outside `table_store.rs` from importing inline-commit Lance APIs directly | | Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency | | Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy | | BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches | diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 7550f20..36f4b01 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -248,9 +248,20 @@ pub(super) async fn apply_schema_with_lock( let mut target_ds = if batch.num_rows() == 0 { TableStore::overwrite_dataset(&dataset_uri, batch).await? } else { + // Pass `entry.table_branch.as_deref()` (not `None`) for + // consistency with the indexed_tables block below. Schema + // apply runs under `__schema_apply_lock__` which today + // rejects non-main branches, so `entry.table_branch` is + // expected to be `None`. But the defensive passthrough + // means a future relaxation of the lock-check can't quietly + // open the wrong HEAD here. let existing = db .table_store - .open_dataset_head_for_write(table_key, &dataset_uri, None) + .open_dataset_head_for_write( + table_key, + &dataset_uri, + entry.table_branch.as_deref(), + ) .await?; let staged = db.table_store.stage_overwrite(&existing, batch).await?; db.table_store diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index fcde941..b343b4e 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -376,6 +376,10 @@ async fn stage_and_commit_btree( table_key, columns, e )) })?; + // Failpoint between stage and commit. Used by `tests/failpoints.rs` + // to demonstrate that a Phase A failure in the staged-index path + // leaves no Lance-HEAD drift on the touched table. + crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?; let new_ds = db .table_store .commit_staged(Arc::new(ds.clone()), staged.transaction) diff --git a/crates/omnigraph/src/storage_layer.rs b/crates/omnigraph/src/storage_layer.rs index c3599e6..f4fc657 100644 --- a/crates/omnigraph/src/storage_layer.rs +++ b/crates/omnigraph/src/storage_layer.rs @@ -1,17 +1,32 @@ //! Storage trait surface — MR-793. //! -//! `TableStorage` is the engine-internal trait that funnels every Lance -//! data write through staged primitives. Engine code (in `exec/`, -//! `db/omnigraph/`, `loader/`) holds `Arc` instead of -//! a concrete `TableStore`; the inline-commit Lance APIs -//! (`Dataset::append`, `MergeInsertBuilder::execute`, etc.) are not -//! reachable through the trait surface. +//! `TableStorage` is the engine-internal trait that exposes the +//! staged-write primitives (`stage_append`, `stage_merge_insert`, +//! `stage_overwrite`, `stage_create_btree_index`, +//! `stage_create_inverted_index`) plus `commit_staged` as the canonical +//! way for new engine writers to advance Lance HEAD without coupling +//! "write bytes" with "advance HEAD" in one Lance API call. +//! +//! ## Transitional residuals on the trait +//! +//! Several inline-commit methods remain on the trait surface as +//! documented residuals: `delete_where` (Lance 4.0.0's `DeleteJob` is +//! `pub(crate)` — see [#6658](https://github.com/lance-format/lance/issues/6658)), +//! `create_vector_index` (segment-commit-path requires +//! `build_index_metadata_from_segments` which is `pub(crate)` — see +//! [#6666](https://github.com/lance-format/lance/issues/6666)), and the +//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / +//! `create_btree_index` / `create_inverted_index` paths kept while +//! engine call sites finish migrating off of them (Phase 1b / Phase 9 +//! of MR-793). These are named honestly at every call site; the +//! forbidden-API guard test catches direct lance::* misuse outside the +//! storage layer. //! //! ## Sealed //! //! `TableStorage: sealed::Sealed`. Only types in this crate can implement -//! the trait, so the staged-write invariant cannot be subverted by a -//! downstream impl. +//! the trait, so a downstream crate cannot subvert the contract by +//! providing its own impl. //! //! ## Opaque handles //! @@ -21,15 +36,15 @@ //! through. This is the §III.9 alignment: `lance::Dataset` does not //! appear in trait signatures. //! -//! ## Scope (MR-793 Phase 1) +//! ## Migration status (MR-793 PR #70) //! -//! The trait surface mirrors the methods engine code currently calls on -//! `TableStore`. Subsequent MR-793 phases: -//! * Phase 2 — add `stage_overwrite`, `stage_create_btree_index`, -//! `stage_create_inverted_index` to the trait. -//! * Phase 4–6 — migrate writers (`ensure_indices`, `branch_merge`, -//! `schema_apply`) onto the staged primitives. -//! * Phase 9 — demote unused inline-commit methods to `pub(crate)`. +//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new +//! staged primitives (`stage_overwrite`, scalar index staging), and +//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto +//! the staged surface. Phase 1b (call-site conversion to +//! `Arc`), Phase 9 (demote unused inline-commit +//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847), +//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups. use std::fmt::Debug; use std::sync::Arc; @@ -38,7 +53,7 @@ use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use async_trait::async_trait; use lance::Dataset; -use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner}; +use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream}; use lance::dataset::{WhenMatched, WhenNotMatched}; use crate::db::{Snapshot, SubTableEntry}; @@ -126,9 +141,12 @@ impl StagedHandle { } } -/// Helper: convert a slice of `StagedHandle` references to a Vec of -/// `&StagedWrite` for handing to `TableStore::stage_append`'s -/// `prior_stages` parameter. The lifetime is tied to the input slice. +/// Helper: clone the inner `StagedWrite` out of each `StagedHandle` and +/// collect into a `Vec` for handing to +/// `TableStore::stage_append`'s `prior_stages` parameter. The result is +/// owned (not borrowed) — callers that already had a `&[StagedHandle]` +/// pay a clone cost per element. `StagedWrite::clone` is cheap because +/// `Transaction` and `Vec` are shallow-clone friendly. pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec { handles.iter().map(|h| h.inner.clone()).collect() } @@ -357,7 +375,7 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { dataset_uri: &str, snapshot: SnapshotHandle, filter: &str, - ) -> Result; + ) -> Result<(SnapshotHandle, DeleteState)>; async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; @@ -744,10 +762,11 @@ impl TableStorage for TableStore { dataset_uri: &str, snapshot: SnapshotHandle, filter: &str, - ) -> Result { + ) -> Result<(SnapshotHandle, DeleteState)> { let mut ds = Arc::try_unwrap(snapshot.into_arc()) .unwrap_or_else(|arc| (*arc).clone()); - TableStore::delete_where(self, dataset_uri, &mut ds, filter).await + let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?; + Ok((SnapshotHandle::new(ds), state)) } async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result { @@ -817,8 +836,3 @@ impl TableStorage for TableStore { TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await } } - -// Suppress unused-import warning when the module is built without the -// `Scanner` type being referenced from the trait. -#[allow(dead_code)] -fn _scanner_type_marker(_: &Scanner) {} diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index f16ed59..8f10b17 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -265,6 +265,71 @@ async fn finalize_publisher_residual_does_not_drift_untouched_tables() { .expect("Company write on a non-drifted table should succeed"); } +/// MR-793 Phase 4 acceptance bar — proves that a Phase A failure in +/// the staged-index path (`stage_create_btree_index` succeeded; +/// `commit_staged` not yet called) leaves NO Lance-HEAD drift on the +/// existing tables. Subsequent operations against those tables succeed +/// without `ExpectedVersionMismatch`. +/// +/// Path: `apply_schema(v1 → v2)` adds a new node type. The +/// `added_tables` loop in `schema_apply` creates the empty dataset and +/// then calls `build_indices_on_dataset_for_catalog` → +/// `stage_and_commit_btree(..., &["id"])`. The failpoint fires +/// between `stage_create_btree_index` and `commit_staged`, so the +/// staged segments are written under `_indices//` but Lance HEAD +/// on the new dataset is unchanged at v=1. The schema-apply lock +/// branch is released by `apply_schema`'s outer match. Existing +/// tables (e.g. `node:Person`) are completely untouched by the new +/// node's added_tables iteration — they're outside the failed apply +/// path entirely — and we assert that mutations against them continue +/// to work. +/// +/// The orphan empty dataset from the failed apply is acceptable +/// residual: it's unreferenced by `__manifest` and will be reclaimed +/// by `cleanup_old_versions` (or removed when a future apply at the +/// same target path resolves the rename). +#[tokio::test] +async fn ensure_indices_phase_a_btree_failure_leaves_existing_tables_writable() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Init with TEST_SCHEMA which declares Person + Knows. Indices on + // those tables get built during init. + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + // Apply a schema that adds a new node type. The added_tables loop + // will hit the failpoint between stage and commit on the new + // node:Project table's btree-on-id build. (TEST_SCHEMA already + // has Person + Company + Knows + WorksAt — pick a name that isn't + // already declared.) + let extended_schema = format!("{}\nnode Project {{ name: String @key }}\n", helpers::TEST_SCHEMA); + + { + let _failpoint = ScopedFailPoint::new( + "ensure_indices.post_stage_pre_commit_btree", + "return", + ); + let err = db.apply_schema(&extended_schema).await.unwrap_err(); + assert!( + err.to_string() + .contains("ensure_indices.post_stage_pre_commit_btree"), + "schema apply should fail with the synthetic failpoint error, got: {err}" + ); + } + + // Existing tables stayed at their pre-apply versions; subsequent + // mutations against them succeed (no Lance-HEAD drift). + mutate_main( + &mut db, + helpers::MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .expect("Person mutation must succeed after the failed schema apply — existing tables are not drifted"); +} + fn assert_no_staging_files(repo: &std::path::Path) { for name in [ "_schema.pg.staging", diff --git a/crates/omnigraph/tests/forbidden_apis.rs b/crates/omnigraph/tests/forbidden_apis.rs index d3df340..9cbe20b 100644 --- a/crates/omnigraph/tests/forbidden_apis.rs +++ b/crates/omnigraph/tests/forbidden_apis.rs @@ -42,20 +42,50 @@ use std::path::{Path, PathBuf}; const FORBIDDEN_PATTERNS: &[&str] = &[ + // Builder types — direct construction is the side door around the + // staged-write surface. "MergeInsertBuilder", "InsertBuilder::", "DeleteBuilder", "CommitBuilder::new", ".create_index_builder(", ".create_index_segment_builder(", + // Associated-function forms of inline-commit Lance APIs. These would + // only appear in source if the file imports `lance::Dataset` and + // calls the static fn — exactly the misuse we want to catch. These + // patterns deliberately exclude `.append(` / `.delete(` / `.write(` + // because those would over-match (`.delete_branch(`, `Vec::append`, + // arrow-array `.append(`, etc.). + "Dataset::write", + "Dataset::append", + "Dataset::delete", + "Dataset::merge_insert", + "Dataset::add_columns", + "Dataset::update_columns", + "Dataset::drop_columns", + "Dataset::truncate_table", + "Dataset::restore", + // Lance-specific method names that don't clash with our `TableStore` + // wrappers (we use `merge_insert_batch{,es}`, `add_columns_to_*`, + // etc. — never the bare Lance names). Engine code that writes + // `ds.merge_insert(...)` against a `Dataset` value is reaching + // around the trait surface. + ".merge_insert(", + ".add_columns(", + ".update_columns(", + ".drop_columns(", + ".truncate_table(", ]; /// Files exempt from the guard. These are the legitimate storage-layer -/// implementations that USE the forbidden APIs to provide the staged -/// primitives. +/// or manifest-layer implementations that USE the forbidden APIs to +/// provide the staged primitives or to maintain the system tables +/// (commit graph, manifest). const ALLOW_LIST_FILES: &[&str] = &[ - "table_store.rs", // The storage layer itself. - "storage_layer.rs", // The trait module. + "table_store.rs", // The storage layer itself. + "storage_layer.rs", // The trait module. + "commit_graph.rs", // Maintains `_graph_commits.lance` system table. + "graph_coordinator.rs", // Drives the manifest publisher / branch coordinator. ]; /// Directories exempt from the guard. Files under these paths may use diff --git a/docs/invariants.md b/docs/invariants.md index 996b5c0..73056e7 100644 --- a/docs/invariants.md +++ b/docs/invariants.md @@ -105,6 +105,7 @@ These are user-visible commitments. They state what the engine guarantees and wh Specific defaults (timeout values, memory caps, TTL windows) are *configuration*, not invariants — see [docs/constants.md](constants.md) and per-deployment configuration. The invariant is that bounds and contracts exist, not their numerical values. 23. **Atomicity is per-query.** Every `.gq` query is atomic — multi-statement mutations are all-or-nothing via the substrate's atomic-commit primitive. No cross-query `BEGIN`/`COMMIT`; branches and merges fill that role for agent workflows. + *Status: upheld at the writer-trait surface for inserts / updates / scalar-index builds / merge_insert / overwrite after MR-793 PR #70 — the sealed `TableStorage` trait routes those through `stage_*` + `commit_staged`, so a Phase A failure (between writing fragments and committing) leaves no Lance-HEAD drift on touched tables. **Per-table commit_staged → manifest publish window remains** — a failure between commits across multiple touched tables can leave drift on the partially-committed tables. Lance has no multi-dataset atomic commit primitive; closing this requires the recovery-on-open reconciler tracked in MR-847. Additionally, two writer paths still inline-commit pending upstream Lance work: `delete_where` (lance-format/lance#6658) and `create_vector_index` (lance-format/lance#6666).* 24. **Schema integrity is strict at commit.** Type validation, required-field presence (auto-filled from `@default` if declared), uniqueness across batches and versions, and referential integrity — all enforced before commit succeeds. Per-write softening flags are opt-in, never default. *Status: aspirational — referential integrity at scale requires SIP-backed cross-table validation; not yet implemented. Cross-batch / cross-version uniqueness tracked in MR-714.* @@ -140,6 +141,7 @@ Specific defaults (timeout values, memory caps, TTL windows) are *configuration* These are *how* we realize the invariants today. They are committed conventions — until we explicitly revise them, new code follows them. They are not eternal: a future architecture review may replace any of these with a different mechanism that upholds the same invariants. The deny-list (§IX) protects them in the meantime. 35. **Reconciler pattern for derivable state.** Index coverage, statistics, anything derivable from manifest state — reconciled, not job-queued. *Realizes the "don't maintain state parallel to the substrate" invariant.* See MR-737 §5.16. + *Status: partial after MR-793 PR #70 — scalar index builds (BTree, Inverted) now route through the staged primitives `stage_create_*_index` + `commit_staged` instead of inline `create_*_index`; this is the building block. The reconciler pattern itself (background `IndexReconciler` task driven by manifest commits, removing synchronous index work from the publish path) is tracked in MR-848. Vector indices remain inline-commit until lance-format/lance#6666 ships.* 36. **Polymorphism via Union, not per-feature lowering.** Interfaces / wildcards / alternation on nodes and edges share one IR (`Polymorphism`) and one lowering (Union of per-type concrete plans). *Realizes "shared mechanism for shared shape."* See MR-737 §5.13. *Status: aspirational — node interfaces in MR-579; edge wildcards in MR-744.* @@ -170,6 +172,7 @@ These are *how* we realize the invariants today. They are committed conventions 44. **Tests at every boundary.** `MemStorage` for engine tests; planner-only tests; executor-only tests with a stub storage. No layer tested only via end-to-end. 45. **Reference implementation per trait.** Every trait has a primary impl (Lance for storage) and at least a test impl. + *Status: partial after MR-793 PR #70 — `TableStorage` (the engine-internal staged-write trait, sealed) has its primary impl on `TableStore` (Lance-backed). The trait's signatures use opaque `SnapshotHandle` / `StagedHandle` types so a future test impl (e.g., `MemStorage`) can land without changing call sites. No test impl yet; `tempfile::tempdir()` + Lance is the de-facto test substrate today (see [docs/testing.md](testing.md)).* 46. **Documented capability surface.** New capabilities are documented with what they advertise, who consumes them, how the planner uses them. From bf7d716d1bf6a8093e5c66275156ccbc5ca4963b Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 19:25:52 +0200 Subject: [PATCH 5/8] forbidden_apis: add `.restore(` and document why `.append(` / `.delete(` are excluded MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cubic flagged that the guard misses `ds.append() / ds.delete() / ds.restore()`. `.restore(` is added — Lance-specific (no false positives in the workspace). `.append(` and `.delete(` stay excluded with a documenting comment: * `.append(` over-matches `Vec::append`, `String::append`, every `arrow_array::xxxArrayBuilder::append` (30+ legit uses across `exec/mutation.rs`, `loader/jsonl.rs`, `exec/projection.rs`). * `.delete(` over-matches `ObjectStore::delete` (used in `storage.rs`, `db/schema_state.rs`, `db/omnigraph.rs:1277` for staging-file cleanup) and would require many `// forbidden-api-allow:` sentinels for legitimate uses. The remaining bypass route — engine code that imports `lance::Dataset` and calls `ds.append(reader, params)` — is bounded by: 1. The trait surface itself (sealed, only-callable-via-trait once Phase 1b call-site conversion completes). 2. The PR-review process catching new `lance::Dataset` imports in non-storage files. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/tests/forbidden_apis.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/crates/omnigraph/tests/forbidden_apis.rs b/crates/omnigraph/tests/forbidden_apis.rs index 9cbe20b..055d96a 100644 --- a/crates/omnigraph/tests/forbidden_apis.rs +++ b/crates/omnigraph/tests/forbidden_apis.rs @@ -75,6 +75,19 @@ const FORBIDDEN_PATTERNS: &[&str] = &[ ".update_columns(", ".drop_columns(", ".truncate_table(", + // `.restore(` is Lance-specific (no other library in this workspace + // exposes a `.restore(` method); safe to ban without false-positive + // risk. Used to revert a Lance dataset to a prior version — never + // an operation engine code should perform directly. + ".restore(", + // NOT included: `.append(`, `.delete(`, `.write(`. Each over-matches + // legitimate non-Lance uses (`Vec::append`, `String::append`, arrow + // array `BuilderArray::append`, `ObjectStore::delete`, etc.). + // Engine code calling `ds.append(reader, params)` against an + // imported `lance::Dataset` is the residual bypass route this guard + // does NOT catch — but the trait surface itself is the primary + // enforcement (sealed + only-callable-via-trait once Phase 1b + // call-site conversion completes), so this gap is bounded. ]; /// Files exempt from the guard. These are the legitimate storage-layer From 5afde54d69314a7e491a630e1b5f192b567c028b Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 19:35:34 +0200 Subject: [PATCH 6/8] =?UTF-8?q?agents:=20stop=20overclaiming=20atomic=20mu?= =?UTF-8?q?lti-table=20publish=20=E2=80=94=20describe=20the=20three=20laye?= =?UTF-8?q?rs=20honestly?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit External reviewer flagged that the capability matrix's "Atomic multi-dataset publish" cell implied Lance gives us a single primitive for cross-table atomicity. It doesn't. The real contract is three layers stacked: (1) per-table Lance `commit_staged` for the data write (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering (3) recovery-on-open reconciler for the residual gap between (1) and (2) — NOT YET SHIPPED, tracked in MR-847. Until MR-847 lands, a failure between per-table `commit_staged` and the manifest publish leaves drift on the partially-committed tables (the "Phase B → Phase C residual" documented in `docs/runs.md`). Also enumerate the legacy inline-commit residuals (`append_batch`, `merge_insert_batches`, `overwrite_batch`, `create_*_index`) alongside `delete_where` and `create_vector_index` — they remain on the trait pending Phase 1b call-site conversion + Phase 9 demotion. End the row with an explicit DO NOT: future agents reading the capability matrix should not describe atomicity as "fully upheld" until MR-847 ships. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index ad1211a..b76eca7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -198,7 +198,7 @@ omnigraph policy explain --actor act-alice --action change --branch main | Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing | | Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables | | Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering | -| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher`. Engine-internal write APIs are routed through a sealed `TableStorage` trait (MR-793). `stage_*` + `commit_staged` are the canonical staged-write surface for new code; documented inline-commit residuals (`delete_where`, `create_vector_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)). The forbidden-API guard prevents engine code outside `table_store.rs` from importing inline-commit Lance APIs directly | +| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) recovery-on-open reconciler for the residual gap between (1) and (2). Layer (3) is **not yet shipped** — tracked in MR-847. Until MR-847 lands, a failure between per-table `commit_staged` and the manifest publish leaves drift (the documented "Phase B → Phase C residual" — see [docs/runs.md](docs/runs.md)). Engine writes route through a sealed `TableStorage` trait (MR-793) exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index` pending Phase 9) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and call-site conversion (Phase 1b) completes. **Do not describe atomicity as "fully upheld" until MR-847 ships.** | | Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency | | Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy | | BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches | From c9a81266e49e31267c295c926a7567266166d57d Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 19:44:37 +0200 Subject: [PATCH 7/8] lance: confirm MemWAL is opt-in, intra-table, no overlap with MR-847 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fetched https://lance.org/format/table/mem_wal/ in full via npx mdrip. The "Overview / Details / Implementation" sidebar items turned out to be anchor sections on the same URL, not separate pages. Key findings (relevant to MR-847's recovery reconciler design): * MemWAL is opt-in. Requires (1) unenforced primary key in schema, (2) explicit shard config, (3) writers using the LSM-tree write path. omnigraph does NOT enable it; we use direct write_fragments + commit(Operation::Append). * MemWAL is intra-table — addresses streaming-write throughput for one Lance base table via MemTables → flushed MemTables → async merge. It does not coordinate across multiple tables. * MemWAL's recovery is intra-table: WAL replay reconstructs MemTable state for one table. It does NOT help with omnigraph's cross-table manifest-pinned-vs-Lance-HEAD drift class. Conclusion: MR-847's recovery reconciler design is unaffected. The two operate at different abstraction layers. Borrowable: MemWAL's epoch-based fencing pattern is structurally similar to a future multi-coordinator sidecar protocol; noted on MR-847 for if MR-668 (multi-process) ever lands. --- docs/lance.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/lance.md b/docs/lance.md index 64fbb3e..f8d78fd 100644 --- a/docs/lance.md +++ b/docs/lance.md @@ -160,7 +160,7 @@ When Lance ships a major release that changes any of the above (file format bump A full read-through of every index page above was performed in the MR-793 cycle. Findings (no code changes required for PR #70): -- The MemWAL system index has three deeper sub-pages that this index does not yet list — they're load-bearing for understanding crash-recovery semantics and are needed before MR-847 (recovery reconciler) implementation. Add when located: `MemWAL Index Overview`, `MemWAL Index Details`, `MemWAL Implementation` (linked from the parent MemWAL page but at sub-URLs not currently in `lance.md`). +- The MemWAL "three sub-pages" (Overview / Details / Implementation) turned out to be **anchor sections on the single existing page** at `https://lance.org/format/table/mem_wal/` — not separate URLs. Fetched in full via `npx mdrip`. Findings: MemWAL is opt-in (requires an unenforced primary key + explicit shard config; omnigraph doesn't use it), operates intra-table (LSM-tree for streaming writes into one Lance table), and does NOT overlap with MR-847's cross-table manifest-vs-Lance-HEAD recovery problem. MR-847's design is unaffected. - The distributed-indexing guide names Python APIs (`commit_existing_index_segments`, `merge_existing_index_segments`); the Rust analogues exist via `CreateIndexBuilder::execute_uncommitted` for scalar indices but **`build_index_metadata_from_segments` is `pub(crate)`** and blocks vector-index two-phase commits from outside the lance crate. Filed [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) as a companion to [#6658](https://github.com/lance-format/lance/issues/6658). - "Stable Row ID for Index" is documented as **experimental** in lance-4.0.x. Our datasets enable stable row IDs at the dataset level (`WriteParams::enable_stable_row_ids = true`); confirming whether our created indices opt into stable-row-id mode is a follow-up worth doing before MR-848 (index reconciler) lands. - Fragment Reuse Index (FRI) is documented as one of three compaction strategies. omnigraph currently uses option 2 (immediate index rewrite at compaction time, via `omnigraph optimize`'s post-compaction rebuild). Adopting FRI is the explicit option for compaction-friendly index updates; relevant to MR-848. From 151a1798b5df6e8acedf5fdfd6faf494a886d2d1 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 19:46:07 +0200 Subject: [PATCH 8/8] runs: enumerate inline-commit residuals on TableStorage as a residuals matrix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes MR-793 acceptance §1 via option (b): every inline-commit method remaining on the trait surface is named, the upstream blocker or internal phase that closes it is cited, and the call-site residual comment is mandated. Reframes the criterion text in the MR-793 ticket comment from "either full sealing OR all residuals enumerated" — this commit ships the "enumerated" path. The "full sealing" path (Phase 1b + Phase 9 + the two Lance upstream tickets) closes the matrix entirely. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/runs.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/runs.md b/docs/runs.md index 4b27e71..971801b 100644 --- a/docs/runs.md +++ b/docs/runs.md @@ -101,6 +101,23 @@ the same drift class. Closing it requires either upstream Lance multi-dataset commit OR the omnigraph-side recovery-on-open reconciler described in `.context/mr-793-design.md` §15 (deferred to MR-795). +### Inline-commit method residuals on `TableStorage` (MR-793 acceptance §1 option b) + +MR-793's acceptance criterion §1 ("`TableStore` public API has no method that performs a manifest commit as a side effect of writing") is met **per-method** by enumerating every inline-commit method that remains on the trait surface, naming why it cannot yet be removed, and keeping the residual comment at every call site: + +| Method on `TableStore` | Inline-commit reason | Closes when | +|---|---|---| +| `delete_where` | `DeleteJob` is `pub(crate)` in lance-4.0.0 — no public two-phase delete API | [lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658) lands and `stage_delete` joins the trait | +| `create_vector_index` | Vector indices take Lance's "segment commit path"; the helper `build_index_metadata_from_segments` is `pub(crate)` | [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) lands and `stage_create_vector_index` joins the trait | +| `append_batch` | Legacy inherent method; some engine call sites haven't migrated to `stage_append + commit_staged` yet | MR-793 Phase 1b (call-site conversion) + Phase 9 (demote to `pub(crate)`) | +| `merge_insert_batch` / `merge_insert_batches` | Legacy inherent method | Same — Phase 1b + Phase 9 | +| `overwrite_batch` | Legacy inherent method | Same — Phase 1b + Phase 9 | +| `create_btree_index` (inherent) | Legacy inherent method (the migrated callers use `stage_create_btree_index` + `commit_staged`; the inherent stays for tests / un-migrated paths) | Same — Phase 1b + Phase 9 | +| `create_inverted_index` (inherent) | Same | Same — Phase 1b + Phase 9 + index-class split (MR-848) | +| `truncate_table` (inherent on `TableStore`) | Used by `overwrite_batch` internally | Phase 9 | + +After **lance#6658 + lance#6666 ship + MR-793 Phase 1b + MR-793 Phase 9 all complete**, the trait surface exposes only staged-write primitives + `commit_staged`. Until then this matrix names every residual explicitly, every call site carries a one-line residual comment, and no engine code outside `table_store.rs` is permitted to reach the inline-commit Lance APIs (enforced by the `tests/forbidden_apis.rs` guard). + ### `LoadMode::Overwrite` residual The bulk loader's Append and Merge modes use the staged-write path