mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Merge pull request #70 from ModernRelay/ragnorc/mr793-phases-1-6
TableStorage trait + staged-write surface
This commit is contained in:
commit
2e20f4d69f
14 changed files with 1799 additions and 52 deletions
|
|
@ -5,7 +5,7 @@ This file is the always-on map for AI coding agents (Claude Code, Codex, Cursor,
|
|||
**Required reading every session, every change:**
|
||||
|
||||
1. **[docs/invariants.md](docs/invariants.md)** — the architectural invariants and §IX deny-list. Apply to every PR, not only architecture work.
|
||||
2. **[docs/lance.md](docs/lance.md)** — the curated index of upstream Lance docs. **Consult it before every task** to identify which Lance pages are relevant to what you're about to do, then fetch those upstream URLs before grepping our code or guessing. Lance is the substrate; behavior is documented there, not here.
|
||||
2. **[docs/lance.md](docs/lance.md)** — the curated index of upstream Lance docs. **Consult it before every task** to identify which Lance pages are relevant. **Then fetch every page in the matching domain section, plus every page that is even slightly relevant** — not just the page whose title most obviously matches the task. Behavior is interlocked across pages (transactions reference index lifecycle; index lifecycle references compaction; compaction references row-id lineage), and skipping a "slightly relevant" page is how alignment misses happen. The index itself is not a substitute for reading the pages — never act on the index alone. **Always fetch the FULL page content, not summaries** — use `npx mdrip <url>` (or `npx mdrip --max-chars 200000 <url>` for very long pages). Tools that summarize pages (like Claude's `WebFetch`) drop load-bearing details — we have caught alignment misses (default flags, `pub(crate)` blockers, three-page sub-specs hidden behind navigation hubs) only after dumping the full markdown. If `npx mdrip` is unavailable, fall back to `curl <url> | pandoc -f html -t markdown` or paste the rendered page text manually; never act on a summarized fetch alone.
|
||||
3. **[docs/testing.md](docs/testing.md)** — the test-coverage map. **Always check what already covers your change before writing a new test.** Extending an existing test (an assertion, a fixture row, a parameterization) is preferred over a duplicated `init_and_load()` block. Walk the before-every-task checklist to identify existing coverage, run those tests as a clean baseline, and only add a new test fn or file when no existing one owns the area.
|
||||
|
||||
Tools that support `@`-imports (Claude Code) auto-include all three files via the imports below — note these must sit at column 0 (not inside a blockquote) for the parser to recognize them. Other agents (Codex, Cursor, Cline, …) must open them explicitly at the start of each session.
|
||||
|
|
@ -198,7 +198,7 @@ omnigraph policy explain --actor act-alice --action change --branch main
|
|||
| Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing |
|
||||
| Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables |
|
||||
| Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering |
|
||||
| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher` |
|
||||
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) recovery-on-open reconciler for the residual gap between (1) and (2). Layer (3) is **not yet shipped** — tracked in MR-847. Until MR-847 lands, a failure between per-table `commit_staged` and the manifest publish leaves drift (the documented "Phase B → Phase C residual" — see [docs/runs.md](docs/runs.md)). Engine writes route through a sealed `TableStorage` trait (MR-793) exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index` pending Phase 9) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and call-site conversion (Phase 1b) completes. **Do not describe atomicity as "fully upheld" until MR-847 ships.** |
|
||||
| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency |
|
||||
| Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy |
|
||||
| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches |
|
||||
|
|
|
|||
|
|
@ -200,6 +200,19 @@ impl Omnigraph {
|
|||
&self.table_store
|
||||
}
|
||||
|
||||
/// Engine-facing trait surface around `TableStore`.
|
||||
///
|
||||
/// MR-793 Phase 1: this is the canonical accessor for newly-written
|
||||
/// engine code. The trait's signatures use opaque `SnapshotHandle` /
|
||||
/// `StagedHandle` instead of leaking `lance::Dataset` /
|
||||
/// `lance::dataset::transaction::Transaction`. Existing call sites
|
||||
/// that still use `db.table_store.X(...)` (the inherent struct
|
||||
/// methods) are migrated incrementally — see §9 of
|
||||
/// `.context/mr-793-design.md`.
|
||||
pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
|
||||
&self.table_store
|
||||
}
|
||||
|
||||
pub(crate) async fn open_coordinator_for_branch(
|
||||
&self,
|
||||
branch: Option<&str>,
|
||||
|
|
|
|||
|
|
@ -237,7 +237,37 @@ pub(super) async fn apply_schema_with_lock(
|
|||
)
|
||||
.await?;
|
||||
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
|
||||
let mut target_ds = TableStore::overwrite_dataset(&dataset_uri, batch).await?;
|
||||
// MR-793 Phase 6: route through stage_overwrite + commit_staged
|
||||
// for non-empty batches. Lance's `InsertBuilder::execute_uncommitted`
|
||||
// errors on empty data (lance-4.0.0 `src/dataset/write/insert.rs:144`),
|
||||
// so the empty-rewrite case stays on `overwrite_dataset` (which
|
||||
// accepts empty input). The empty case is rare in schema_apply
|
||||
// — it only fires when the source table itself was already empty
|
||||
// — and schema_apply runs under `__schema_apply_lock__` so the
|
||||
// narrow inline-commit residual is bounded.
|
||||
let mut target_ds = if batch.num_rows() == 0 {
|
||||
TableStore::overwrite_dataset(&dataset_uri, batch).await?
|
||||
} else {
|
||||
// Pass `entry.table_branch.as_deref()` (not `None`) for
|
||||
// consistency with the indexed_tables block below. Schema
|
||||
// apply runs under `__schema_apply_lock__` which today
|
||||
// rejects non-main branches, so `entry.table_branch` is
|
||||
// expected to be `None`. But the defensive passthrough
|
||||
// means a future relaxation of the lock-check can't quietly
|
||||
// open the wrong HEAD here.
|
||||
let existing = db
|
||||
.table_store
|
||||
.open_dataset_head_for_write(
|
||||
table_key,
|
||||
&dataset_uri,
|
||||
entry.table_branch.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
let staged = db.table_store.stage_overwrite(&existing, batch).await?;
|
||||
db.table_store
|
||||
.commit_staged(Arc::new(existing), staged.transaction)
|
||||
.await?
|
||||
};
|
||||
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
|
||||
.await?;
|
||||
let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
|
||||
|
|
|
|||
|
|
@ -286,15 +286,18 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
) -> Result<()> {
|
||||
if let Some(type_name) = table_key.strip_prefix("node:") {
|
||||
if !db.table_store.has_btree_index(ds, "id").await? {
|
||||
db.table_store
|
||||
.create_btree_index(ds, &["id"])
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e))
|
||||
})?;
|
||||
stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
|
||||
}
|
||||
|
||||
if let Some(node_type) = catalog.node_types.get(type_name) {
|
||||
// Per MR-793 §10 OQ3: stage scalar indices first (BTree,
|
||||
// Inverted), then call `create_vector_index` inline. The
|
||||
// inline-commit on a vector index advances HEAD, which would
|
||||
// invalidate any uncommitted scalar index transactions if we
|
||||
// stacked them. Today the per-stage shape commits each
|
||||
// scalar index immediately so the order constraint is
|
||||
// implicit, but if we ever batch scalar stages we must
|
||||
// ensure they all land before the vector inline-commit.
|
||||
for index_cols in &node_type.indices {
|
||||
if index_cols.len() != 1 {
|
||||
continue;
|
||||
|
|
@ -303,18 +306,16 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
if let Some(prop_type) = node_type.properties.get(prop_name) {
|
||||
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
|
||||
if !db.table_store.has_fts_index(ds, prop_name).await? {
|
||||
db.table_store
|
||||
.create_inverted_index(ds, prop_name.as_str())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
"create Inverted index on {}({}): {}",
|
||||
table_key, prop_name, e
|
||||
))
|
||||
})?;
|
||||
stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
|
||||
.await?;
|
||||
}
|
||||
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
|
||||
if !db.table_store.has_vector_index(ds, prop_name).await? {
|
||||
// Inline-commit residual: lance-4.0.0 does not
|
||||
// expose `build_index_metadata_from_segments` as
|
||||
// `pub`, so vector indices cannot be staged from
|
||||
// outside the lance crate. Document at the call
|
||||
// site; companion ticket to lance-format/lance#6658.
|
||||
db.table_store
|
||||
.create_vector_index(ds, prop_name.as_str())
|
||||
.await
|
||||
|
|
@ -334,28 +335,13 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
|
||||
if table_key.starts_with("edge:") {
|
||||
if !db.table_store.has_btree_index(ds, "id").await? {
|
||||
db.table_store
|
||||
.create_btree_index(ds, &["id"])
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!("create BTree index on {}(id): {}", table_key, e))
|
||||
})?;
|
||||
stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
|
||||
}
|
||||
if !db.table_store.has_btree_index(ds, "src").await? {
|
||||
db.table_store
|
||||
.create_btree_index(ds, &["src"])
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!("create BTree index on {}(src): {}", table_key, e))
|
||||
})?;
|
||||
stage_and_commit_btree(db, table_key, ds, &["src"]).await?;
|
||||
}
|
||||
if !db.table_store.has_btree_index(ds, "dst").await? {
|
||||
db.table_store
|
||||
.create_btree_index(ds, &["dst"])
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!("create BTree index on {}(dst): {}", table_key, e))
|
||||
})?;
|
||||
stage_and_commit_btree(db, table_key, ds, &["dst"]).await?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
|
@ -366,6 +352,80 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
)))
|
||||
}
|
||||
|
||||
/// Stage a BTREE index transaction and commit it, advancing the in-memory
|
||||
/// `*ds` to the new HEAD. MR-793 Phase 4: replaces the previous
|
||||
/// inline-commit `create_btree_index(ds)` call with the staged primitive
|
||||
/// + an immediate `commit_staged`. Per-call behavior is unchanged
|
||||
/// (HEAD advances once per index), but the bytes-on-disk and HEAD-advance
|
||||
/// are now decoupled at the `TableStore` API surface — a caller that
|
||||
/// needs end-of-batch atomicity can stage many transactions and commit
|
||||
/// them in one pass (Phase 8's index reconciler relies on this).
|
||||
async fn stage_and_commit_btree(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
columns: &[&str],
|
||||
) -> Result<()> {
|
||||
let staged = db
|
||||
.table_store
|
||||
.stage_create_btree_index(ds, columns)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
"stage_create_btree_index on {}({:?}): {}",
|
||||
table_key, columns, e
|
||||
))
|
||||
})?;
|
||||
// Failpoint between stage and commit. Used by `tests/failpoints.rs`
|
||||
// to demonstrate that a Phase A failure in the staged-index path
|
||||
// leaves no Lance-HEAD drift on the touched table.
|
||||
crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?;
|
||||
let new_ds = db
|
||||
.table_store
|
||||
.commit_staged(Arc::new(ds.clone()), staged.transaction)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
"commit BTree index on {}({:?}): {}",
|
||||
table_key, columns, e
|
||||
))
|
||||
})?;
|
||||
*ds = new_ds;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stage an INVERTED (FTS) index transaction and commit it. See
|
||||
/// `stage_and_commit_btree` for the MR-793 Phase 4 rationale.
|
||||
async fn stage_and_commit_inverted(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
ds: &mut Dataset,
|
||||
column: &str,
|
||||
) -> Result<()> {
|
||||
let staged = db
|
||||
.table_store
|
||||
.stage_create_inverted_index(ds, column)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
"stage_create_inverted_index on {}({}): {}",
|
||||
table_key, column, e
|
||||
))
|
||||
})?;
|
||||
let new_ds = db
|
||||
.table_store
|
||||
.commit_staged(Arc::new(ds.clone()), staged.transaction)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
"commit Inverted index on {}({}): {}",
|
||||
table_key, column, e
|
||||
))
|
||||
})?;
|
||||
*ds = new_ds;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn prepare_updates_for_commit(
|
||||
db: &Omnigraph,
|
||||
branch: Option<&str>,
|
||||
|
|
|
|||
|
|
@ -911,7 +911,14 @@ async fn publish_rewritten_merge_table(
|
|||
let mut current_ds = ds;
|
||||
|
||||
// Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
|
||||
// existing rows, bumps _row_last_updated_at_version only for actually-changed rows)
|
||||
// existing rows, bumps _row_last_updated_at_version only for actually-changed rows).
|
||||
//
|
||||
// MR-793 Phase 5: routed through the staged primitive so a failure
|
||||
// between writing fragments and committing leaves no Lance-HEAD
|
||||
// drift. The commit_staged here is per-table per-call (Lance has no
|
||||
// multi-dataset atomic commit); the residual sits at this single
|
||||
// commit point, narrowed from the previous "merge_insert + delete +
|
||||
// index" multi-step inline-commit chain.
|
||||
if let Some(delta) = &staged.delta_staged {
|
||||
let batches: Vec<RecordBatch> = target_db
|
||||
.table_store()
|
||||
|
|
@ -921,29 +928,40 @@ async fn publish_rewritten_merge_table(
|
|||
.filter(|batch| batch.num_rows() > 0)
|
||||
.collect();
|
||||
if !batches.is_empty() {
|
||||
let state = target_db
|
||||
// Concat into one batch — stage_merge_insert takes a single batch.
|
||||
let combined = if batches.len() == 1 {
|
||||
batches.into_iter().next().unwrap()
|
||||
} else {
|
||||
let schema = batches[0].schema();
|
||||
arrow_select::concat::concat_batches(&schema, &batches)
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
};
|
||||
let staged_merge = target_db
|
||||
.table_store()
|
||||
.merge_insert_batches(
|
||||
&full_path,
|
||||
current_ds,
|
||||
batches,
|
||||
.stage_merge_insert(
|
||||
current_ds.clone(),
|
||||
combined,
|
||||
vec!["id".to_string()],
|
||||
lance::dataset::WhenMatched::UpdateAll,
|
||||
lance::dataset::WhenNotMatched::InsertAll,
|
||||
)
|
||||
.await?;
|
||||
current_ds = target_db
|
||||
.reopen_for_mutation(
|
||||
table_key,
|
||||
&full_path,
|
||||
table_branch.as_deref(),
|
||||
state.version,
|
||||
)
|
||||
.table_store()
|
||||
.commit_staged(Arc::new(current_ds), staged_merge.transaction)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 2: delete removed rows via deletion vectors
|
||||
// Phase 2: delete removed rows via deletion vectors.
|
||||
//
|
||||
// INLINE-COMMIT RESIDUAL: lance-4.0.0 does not expose a public
|
||||
// two-phase delete API (DeleteJob is `pub(crate)` —
|
||||
// lance-format/lance#6658 is open with no PRs). MR-793 deliberately
|
||||
// does NOT introduce a `stage_delete` wrapper that would secretly
|
||||
// inline-commit (a side-channel — see design doc §3.2). When the
|
||||
// upstream API ships, swap this `delete_where` call for
|
||||
// `stage_delete` + `commit_staged`.
|
||||
if !staged.deleted_ids.is_empty() {
|
||||
let escaped: Vec<String> = staged
|
||||
.deleted_ids
|
||||
|
|
@ -957,7 +975,13 @@ async fn publish_rewritten_merge_table(
|
|||
.await?;
|
||||
}
|
||||
|
||||
// Phase 3: rebuild indices
|
||||
// Phase 3: rebuild indices.
|
||||
//
|
||||
// `build_indices_on_dataset` was migrated in MR-793 Phase 4 to use
|
||||
// `stage_create_btree_index` / `stage_create_inverted_index` +
|
||||
// `commit_staged` for scalar indices. Vector indices remain inline
|
||||
// (residual — `build_index_metadata_from_segments` is `pub(crate)`
|
||||
// in lance-4.0.0; companion ticket to lance-format/lance#6658).
|
||||
let row_count = target_db
|
||||
.table_store()
|
||||
.table_state(&full_path, ¤t_ds)
|
||||
|
|
|
|||
|
|
@ -8,4 +8,5 @@ pub mod graph_index;
|
|||
pub mod loader;
|
||||
pub mod runtime_cache;
|
||||
pub mod storage;
|
||||
pub mod storage_layer;
|
||||
pub mod table_store;
|
||||
|
|
|
|||
838
crates/omnigraph/src/storage_layer.rs
Normal file
838
crates/omnigraph/src/storage_layer.rs
Normal file
|
|
@ -0,0 +1,838 @@
|
|||
//! Storage trait surface — MR-793.
|
||||
//!
|
||||
//! `TableStorage` is the engine-internal trait that exposes the
|
||||
//! staged-write primitives (`stage_append`, `stage_merge_insert`,
|
||||
//! `stage_overwrite`, `stage_create_btree_index`,
|
||||
//! `stage_create_inverted_index`) plus `commit_staged` as the canonical
|
||||
//! way for new engine writers to advance Lance HEAD without coupling
|
||||
//! "write bytes" with "advance HEAD" in one Lance API call.
|
||||
//!
|
||||
//! ## Transitional residuals on the trait
|
||||
//!
|
||||
//! Several inline-commit methods remain on the trait surface as
|
||||
//! documented residuals: `delete_where` (Lance 4.0.0's `DeleteJob` is
|
||||
//! `pub(crate)` — see [#6658](https://github.com/lance-format/lance/issues/6658)),
|
||||
//! `create_vector_index` (segment-commit-path requires
|
||||
//! `build_index_metadata_from_segments` which is `pub(crate)` — see
|
||||
//! [#6666](https://github.com/lance-format/lance/issues/6666)), and the
|
||||
//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` /
|
||||
//! `create_btree_index` / `create_inverted_index` paths kept while
|
||||
//! engine call sites finish migrating off of them (Phase 1b / Phase 9
|
||||
//! of MR-793). These are named honestly at every call site; the
|
||||
//! forbidden-API guard test catches direct lance::* misuse outside the
|
||||
//! storage layer.
|
||||
//!
|
||||
//! ## Sealed
|
||||
//!
|
||||
//! `TableStorage: sealed::Sealed`. Only types in this crate can implement
|
||||
//! the trait, so a downstream crate cannot subvert the contract by
|
||||
//! providing its own impl.
|
||||
//!
|
||||
//! ## Opaque handles
|
||||
//!
|
||||
//! `SnapshotHandle` and `StagedHandle` wrap `lance::Dataset` and
|
||||
//! `StagedWrite` respectively. Their inner Lance types are
|
||||
//! `pub(crate)` — engine code outside `table_store` cannot reach
|
||||
//! through. This is the §III.9 alignment: `lance::Dataset` does not
|
||||
//! appear in trait signatures.
|
||||
//!
|
||||
//! ## Migration status (MR-793 PR #70)
|
||||
//!
|
||||
//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new
|
||||
//! staged primitives (`stage_overwrite`, scalar index staging), and
|
||||
//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto
|
||||
//! the staged surface. Phase 1b (call-site conversion to
|
||||
//! `Arc<dyn TableStorage>`), Phase 9 (demote unused inline-commit
|
||||
//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847),
|
||||
//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups.
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_schema::SchemaRef;
|
||||
use async_trait::async_trait;
|
||||
use lance::Dataset;
|
||||
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
|
||||
use lance::dataset::{WhenMatched, WhenNotMatched};
|
||||
|
||||
use crate::db::{Snapshot, SubTableEntry};
|
||||
use crate::error::Result;
|
||||
use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
|
||||
|
||||
// ─── sealed module ──────────────────────────────────────────────────────────
|
||||
|
||||
pub(crate) mod sealed {
|
||||
/// Sealed marker — only types defined in `omnigraph-engine` can
|
||||
/// implement `TableStorage`. Combined with the trait being the only
|
||||
/// route to write APIs from engine code, this gives type-system
|
||||
/// enforcement of the staged-write invariant.
|
||||
pub trait Sealed {}
|
||||
|
||||
impl Sealed for crate::table_store::TableStore {}
|
||||
}
|
||||
|
||||
// ─── opaque handles ────────────────────────────────────────────────────────
|
||||
|
||||
/// Opaque handle to a snapshot of a single sub-table dataset at a
|
||||
/// specific version.
|
||||
///
|
||||
/// Engine code never sees `lance::Dataset` directly; it holds
|
||||
/// `SnapshotHandle` and passes it back to `TableStorage` methods.
|
||||
/// Inside this crate, `pub(crate)` accessors expose the inner
|
||||
/// `Arc<Dataset>` to the `TableStorage` impl.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SnapshotHandle {
|
||||
pub(crate) inner: Arc<Dataset>,
|
||||
}
|
||||
|
||||
impl SnapshotHandle {
|
||||
/// Construct from a Lance dataset. `pub(crate)` — only
|
||||
/// `TableStore` should produce these.
|
||||
pub(crate) fn new(ds: Dataset) -> Self {
|
||||
Self { inner: Arc::new(ds) }
|
||||
}
|
||||
|
||||
/// Borrow the underlying Lance dataset. `pub(crate)` so only the
|
||||
/// `TableStorage` impl in this crate can reach through.
|
||||
pub(crate) fn dataset(&self) -> &Dataset {
|
||||
&self.inner
|
||||
}
|
||||
|
||||
/// Take ownership of the inner `Arc<Dataset>`. Used when committing
|
||||
/// staged writes (the call needs to consume the snapshot).
|
||||
pub(crate) fn into_arc(self) -> Arc<Dataset> {
|
||||
self.inner
|
||||
}
|
||||
|
||||
// ── public, lance-free accessors ──
|
||||
|
||||
/// Current Lance manifest version of the snapshot.
|
||||
pub fn version(&self) -> u64 {
|
||||
self.inner.version().version
|
||||
}
|
||||
|
||||
/// Whether the underlying dataset uses stable row IDs.
|
||||
pub fn uses_stable_row_ids(&self) -> bool {
|
||||
self.inner.manifest.uses_stable_row_ids()
|
||||
}
|
||||
}
|
||||
|
||||
/// Opaque handle to a staged Lance transaction (data write or scalar
|
||||
/// index build) that has not yet advanced HEAD.
|
||||
///
|
||||
/// Produced by `TableStorage::stage_*`, consumed by
|
||||
/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite`
|
||||
/// (transaction + read-your-writes deltas) behind `pub(crate)`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StagedHandle {
|
||||
pub(crate) inner: StagedWrite,
|
||||
}
|
||||
|
||||
impl StagedHandle {
|
||||
pub(crate) fn new(staged: StagedWrite) -> Self {
|
||||
Self { inner: staged }
|
||||
}
|
||||
|
||||
/// Take ownership of the inner `StagedWrite`. Used by
|
||||
/// `commit_staged`.
|
||||
pub(crate) fn into_staged(self) -> StagedWrite {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper: clone the inner `StagedWrite` out of each `StagedHandle` and
|
||||
/// collect into a `Vec<StagedWrite>` for handing to
|
||||
/// `TableStore::stage_append`'s `prior_stages` parameter. The result is
|
||||
/// owned (not borrowed) — callers that already had a `&[StagedHandle]`
|
||||
/// pay a clone cost per element. `StagedWrite::clone` is cheap because
|
||||
/// `Transaction` and `Vec<Fragment>` are shallow-clone friendly.
|
||||
pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
|
||||
handles.iter().map(|h| h.inner.clone()).collect()
|
||||
}
|
||||
|
||||
// ─── TableStorage trait ────────────────────────────────────────────────────
|
||||
|
||||
/// Engine-internal trait covering every Lance dataset operation an
|
||||
/// `omnigraph` engine call site might perform.
|
||||
///
|
||||
/// `TableStore` is the only `impl`. The trait is sealed; the inline
|
||||
/// Lance APIs are not reachable through trait dispatch. New writers that
|
||||
/// might advance Lance HEAD MUST add a staged-shape method here.
|
||||
#[async_trait]
|
||||
pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
|
||||
// ── Snapshot opens (no HEAD advance) ────────────────────────────────
|
||||
|
||||
async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn open_snapshot_at_table(
|
||||
&self,
|
||||
snapshot: &Snapshot,
|
||||
table_key: &str,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn open_dataset_head(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
branch: Option<&str>,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn open_dataset_head_for_write(
|
||||
&self,
|
||||
table_key: &str,
|
||||
dataset_uri: &str,
|
||||
branch: Option<&str>,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn open_dataset_at_state(
|
||||
&self,
|
||||
table_path: &str,
|
||||
branch: Option<&str>,
|
||||
version: u64,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn fork_branch_from_state(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
source_branch: Option<&str>,
|
||||
table_key: &str,
|
||||
source_version: u64,
|
||||
target_branch: &str,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
|
||||
|
||||
async fn reopen_for_mutation(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
expected_version: u64,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
fn ensure_expected_version(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
table_key: &str,
|
||||
expected_version: u64,
|
||||
) -> Result<()>;
|
||||
|
||||
// ── Reads (no HEAD advance) ────────────────────────────────────────
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
order_by: Option<Vec<ColumnOrdering>>,
|
||||
) -> Result<Vec<RecordBatch>>;
|
||||
|
||||
async fn scan_with_row_id(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
order_by: Option<Vec<ColumnOrdering>>,
|
||||
with_row_id: bool,
|
||||
) -> Result<Vec<RecordBatch>>;
|
||||
|
||||
async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
|
||||
|
||||
async fn scan_batches_for_rewrite(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
) -> Result<Vec<RecordBatch>>;
|
||||
|
||||
async fn count_rows(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
filter: Option<String>,
|
||||
) -> Result<usize>;
|
||||
|
||||
async fn count_rows_with_staged(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
staged: &[StagedHandle],
|
||||
filter: Option<String>,
|
||||
) -> Result<usize>;
|
||||
|
||||
async fn scan_with_staged(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
staged: &[StagedHandle],
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
) -> Result<Vec<RecordBatch>>;
|
||||
|
||||
async fn scan_with_pending(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
pending: &[RecordBatch],
|
||||
pending_schema: Option<SchemaRef>,
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
key_column: Option<&str>,
|
||||
) -> Result<Vec<RecordBatch>>;
|
||||
|
||||
async fn first_row_id_for_filter(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
filter: &str,
|
||||
) -> Result<Option<u64>>;
|
||||
|
||||
async fn table_state(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: &SnapshotHandle,
|
||||
) -> Result<TableState>;
|
||||
|
||||
// ── Staged writes (no HEAD advance) ────────────────────────────────
|
||||
|
||||
async fn stage_append(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
prior_stages: &[StagedHandle],
|
||||
) -> Result<StagedHandle>;
|
||||
|
||||
async fn stage_merge_insert(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
key_columns: Vec<String>,
|
||||
when_matched: WhenMatched,
|
||||
when_not_matched: WhenNotMatched,
|
||||
) -> Result<StagedHandle>;
|
||||
|
||||
async fn commit_staged(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
staged: StagedHandle,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
/// Stage an overwrite (Operation::Overwrite). MR-793 Phase 2.
|
||||
async fn stage_overwrite(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<StagedHandle>;
|
||||
|
||||
/// Stage a BTREE scalar index build. MR-793 Phase 2.
|
||||
async fn stage_create_btree_index(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
columns: &[&str],
|
||||
) -> Result<StagedHandle>;
|
||||
|
||||
/// Stage an INVERTED (FTS) scalar index build. MR-793 Phase 2.
|
||||
async fn stage_create_inverted_index(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<StagedHandle>;
|
||||
|
||||
// ── Inline-commit residuals (named honestly per MR-793 §3.2) ──────
|
||||
//
|
||||
// These methods advance Lance HEAD as a side effect of writing.
|
||||
// They stay on the trait until the corresponding upstream Lance API
|
||||
// ships:
|
||||
//
|
||||
// * `delete_where` — Lance #6658 (two-phase delete).
|
||||
// * `create_*_index` — `build_index_metadata_from_segments` is
|
||||
// `pub(crate)` for vector indices in lance-4.0.0; scalar indices
|
||||
// migrate to staged in MR-793 Phase 2.
|
||||
// * `append_batch`, `merge_insert_batches`, `overwrite_batch` —
|
||||
// legacy paths that will be demoted to `pub(crate)` in MR-793
|
||||
// Phase 9 once all engine sites route through the staged
|
||||
// primitives.
|
||||
|
||||
async fn append_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<(SnapshotHandle, TableState)>;
|
||||
|
||||
async fn merge_insert_batches(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batches: Vec<RecordBatch>,
|
||||
key_columns: Vec<String>,
|
||||
when_matched: WhenMatched,
|
||||
when_not_matched: WhenNotMatched,
|
||||
) -> Result<TableState>;
|
||||
|
||||
async fn overwrite_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<(SnapshotHandle, TableState)>;
|
||||
|
||||
async fn delete_where(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
filter: &str,
|
||||
) -> Result<(SnapshotHandle, DeleteState)>;
|
||||
|
||||
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
|
||||
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
|
||||
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
|
||||
|
||||
async fn create_btree_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
columns: &[&str],
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn create_inverted_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn create_vector_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
// ── URI helpers ────────────────────────────────────────────────────
|
||||
//
|
||||
// These are pure string formatting; they live on the trait so engine
|
||||
// code holding `Arc<dyn TableStorage>` can compute dataset URIs
|
||||
// without importing the concrete struct.
|
||||
|
||||
fn root_uri(&self) -> &str;
|
||||
fn dataset_uri(&self, table_path: &str) -> String;
|
||||
|
||||
// ── Streaming access (used by the export path) ────────────────────
|
||||
//
|
||||
// Engine code that needs a `DatasetRecordBatchStream` (rather than a
|
||||
// collected `Vec<RecordBatch>`) goes through this trait method.
|
||||
// Useful for the JSONL exporter that streams rows to a writer
|
||||
// without materializing the whole result.
|
||||
|
||||
async fn scan_stream(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
order_by: Option<Vec<ColumnOrdering>>,
|
||||
with_row_id: bool,
|
||||
) -> Result<DatasetRecordBatchStream>;
|
||||
}
|
||||
|
||||
// ─── single impl: TableStore ──────────────────────────────────────────────
|
||||
|
||||
#[async_trait]
|
||||
impl TableStorage for TableStore {
|
||||
async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
|
||||
self.open_at_entry(entry).await.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn open_snapshot_at_table(
|
||||
&self,
|
||||
snapshot: &Snapshot,
|
||||
table_key: &str,
|
||||
) -> Result<SnapshotHandle> {
|
||||
self.open_snapshot_table(snapshot, table_key)
|
||||
.await
|
||||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn open_dataset_head(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
branch: Option<&str>,
|
||||
) -> Result<SnapshotHandle> {
|
||||
TableStore::open_dataset_head(self, dataset_uri, branch)
|
||||
.await
|
||||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn open_dataset_head_for_write(
|
||||
&self,
|
||||
table_key: &str,
|
||||
dataset_uri: &str,
|
||||
branch: Option<&str>,
|
||||
) -> Result<SnapshotHandle> {
|
||||
TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
|
||||
.await
|
||||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn open_dataset_at_state(
|
||||
&self,
|
||||
table_path: &str,
|
||||
branch: Option<&str>,
|
||||
version: u64,
|
||||
) -> Result<SnapshotHandle> {
|
||||
TableStore::open_dataset_at_state(self, table_path, branch, version)
|
||||
.await
|
||||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn fork_branch_from_state(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
source_branch: Option<&str>,
|
||||
table_key: &str,
|
||||
source_version: u64,
|
||||
target_branch: &str,
|
||||
) -> Result<SnapshotHandle> {
|
||||
TableStore::fork_branch_from_state(
|
||||
self,
|
||||
dataset_uri,
|
||||
source_branch,
|
||||
table_key,
|
||||
source_version,
|
||||
target_branch,
|
||||
)
|
||||
.await
|
||||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
|
||||
TableStore::delete_branch(self, dataset_uri, branch).await
|
||||
}
|
||||
|
||||
async fn reopen_for_mutation(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
expected_version: u64,
|
||||
) -> Result<SnapshotHandle> {
|
||||
TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
|
||||
.await
|
||||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
fn ensure_expected_version(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
table_key: &str,
|
||||
expected_version: u64,
|
||||
) -> Result<()> {
|
||||
TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
order_by: Option<Vec<ColumnOrdering>>,
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
|
||||
}
|
||||
|
||||
async fn scan_with_row_id(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
order_by: Option<Vec<ColumnOrdering>>,
|
||||
with_row_id: bool,
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
TableStore::scan_with(
|
||||
self,
|
||||
snapshot.dataset(),
|
||||
projection,
|
||||
filter,
|
||||
order_by,
|
||||
with_row_id,
|
||||
|_| Ok(()),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
|
||||
TableStore::scan_batches(self, snapshot.dataset()).await
|
||||
}
|
||||
|
||||
async fn scan_batches_for_rewrite(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
|
||||
}
|
||||
|
||||
async fn count_rows(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
filter: Option<String>,
|
||||
) -> Result<usize> {
|
||||
TableStore::count_rows(self, snapshot.dataset(), filter).await
|
||||
}
|
||||
|
||||
async fn count_rows_with_staged(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
staged: &[StagedHandle],
|
||||
filter: Option<String>,
|
||||
) -> Result<usize> {
|
||||
let staged_writes = staged_handles_as_writes(staged);
|
||||
TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
|
||||
}
|
||||
|
||||
async fn scan_with_staged(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
staged: &[StagedHandle],
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
let staged_writes = staged_handles_as_writes(staged);
|
||||
TableStore::scan_with_staged(
|
||||
self,
|
||||
snapshot.dataset(),
|
||||
&staged_writes,
|
||||
projection,
|
||||
filter,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn scan_with_pending(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
pending: &[RecordBatch],
|
||||
pending_schema: Option<SchemaRef>,
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
key_column: Option<&str>,
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
TableStore::scan_with_pending(
|
||||
self,
|
||||
snapshot.dataset(),
|
||||
pending,
|
||||
pending_schema,
|
||||
projection,
|
||||
filter,
|
||||
key_column,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn first_row_id_for_filter(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
filter: &str,
|
||||
) -> Result<Option<u64>> {
|
||||
TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
|
||||
}
|
||||
|
||||
async fn table_state(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: &SnapshotHandle,
|
||||
) -> Result<TableState> {
|
||||
TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
|
||||
}
|
||||
|
||||
async fn stage_append(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
prior_stages: &[StagedHandle],
|
||||
) -> Result<StagedHandle> {
|
||||
let staged_writes = staged_handles_as_writes(prior_stages);
|
||||
TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
|
||||
.await
|
||||
.map(StagedHandle::new)
|
||||
}
|
||||
|
||||
async fn stage_merge_insert(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
key_columns: Vec<String>,
|
||||
when_matched: WhenMatched,
|
||||
when_not_matched: WhenNotMatched,
|
||||
) -> Result<StagedHandle> {
|
||||
let ds = Arc::try_unwrap(snapshot.into_arc())
|
||||
.unwrap_or_else(|arc| (*arc).clone());
|
||||
TableStore::stage_merge_insert(
|
||||
self,
|
||||
ds,
|
||||
batch,
|
||||
key_columns,
|
||||
when_matched,
|
||||
when_not_matched,
|
||||
)
|
||||
.await
|
||||
.map(StagedHandle::new)
|
||||
}
|
||||
|
||||
async fn commit_staged(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
staged: StagedHandle,
|
||||
) -> Result<SnapshotHandle> {
|
||||
let ds_arc = snapshot.into_arc();
|
||||
let transaction = staged.into_staged().transaction;
|
||||
TableStore::commit_staged(self, ds_arc, transaction)
|
||||
.await
|
||||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn stage_overwrite(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<StagedHandle> {
|
||||
TableStore::stage_overwrite(self, snapshot.dataset(), batch)
|
||||
.await
|
||||
.map(StagedHandle::new)
|
||||
}
|
||||
|
||||
async fn stage_create_btree_index(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
columns: &[&str],
|
||||
) -> Result<StagedHandle> {
|
||||
TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
|
||||
.await
|
||||
.map(StagedHandle::new)
|
||||
}
|
||||
|
||||
async fn stage_create_inverted_index(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<StagedHandle> {
|
||||
TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
|
||||
.await
|
||||
.map(StagedHandle::new)
|
||||
}
|
||||
|
||||
async fn append_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<(SnapshotHandle, TableState)> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc())
|
||||
.unwrap_or_else(|arc| (*arc).clone());
|
||||
let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
|
||||
Ok((SnapshotHandle::new(ds), state))
|
||||
}
|
||||
|
||||
async fn merge_insert_batches(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batches: Vec<RecordBatch>,
|
||||
key_columns: Vec<String>,
|
||||
when_matched: WhenMatched,
|
||||
when_not_matched: WhenNotMatched,
|
||||
) -> Result<TableState> {
|
||||
let ds = Arc::try_unwrap(snapshot.into_arc())
|
||||
.unwrap_or_else(|arc| (*arc).clone());
|
||||
TableStore::merge_insert_batches(
|
||||
self,
|
||||
dataset_uri,
|
||||
ds,
|
||||
batches,
|
||||
key_columns,
|
||||
when_matched,
|
||||
when_not_matched,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn overwrite_batch(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
batch: RecordBatch,
|
||||
) -> Result<(SnapshotHandle, TableState)> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc())
|
||||
.unwrap_or_else(|arc| (*arc).clone());
|
||||
let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
|
||||
Ok((SnapshotHandle::new(ds), state))
|
||||
}
|
||||
|
||||
async fn delete_where(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
snapshot: SnapshotHandle,
|
||||
filter: &str,
|
||||
) -> Result<(SnapshotHandle, DeleteState)> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc())
|
||||
.unwrap_or_else(|arc| (*arc).clone());
|
||||
let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
|
||||
Ok((SnapshotHandle::new(ds), state))
|
||||
}
|
||||
|
||||
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
|
||||
TableStore::has_btree_index(self, snapshot.dataset(), column).await
|
||||
}
|
||||
|
||||
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
|
||||
TableStore::has_fts_index(self, snapshot.dataset(), column).await
|
||||
}
|
||||
|
||||
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
|
||||
TableStore::has_vector_index(self, snapshot.dataset(), column).await
|
||||
}
|
||||
|
||||
async fn create_btree_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
columns: &[&str],
|
||||
) -> Result<SnapshotHandle> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc())
|
||||
.unwrap_or_else(|arc| (*arc).clone());
|
||||
TableStore::create_btree_index(self, &mut ds, columns).await?;
|
||||
Ok(SnapshotHandle::new(ds))
|
||||
}
|
||||
|
||||
async fn create_inverted_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<SnapshotHandle> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc())
|
||||
.unwrap_or_else(|arc| (*arc).clone());
|
||||
TableStore::create_inverted_index(self, &mut ds, column).await?;
|
||||
Ok(SnapshotHandle::new(ds))
|
||||
}
|
||||
|
||||
async fn create_vector_index(
|
||||
&self,
|
||||
snapshot: SnapshotHandle,
|
||||
column: &str,
|
||||
) -> Result<SnapshotHandle> {
|
||||
let mut ds = Arc::try_unwrap(snapshot.into_arc())
|
||||
.unwrap_or_else(|arc| (*arc).clone());
|
||||
TableStore::create_vector_index(self, &mut ds, column).await?;
|
||||
Ok(SnapshotHandle::new(ds))
|
||||
}
|
||||
|
||||
fn root_uri(&self) -> &str {
|
||||
TableStore::root_uri(self)
|
||||
}
|
||||
|
||||
fn dataset_uri(&self, table_path: &str) -> String {
|
||||
TableStore::dataset_uri(self, table_path)
|
||||
}
|
||||
|
||||
async fn scan_stream(
|
||||
&self,
|
||||
snapshot: &SnapshotHandle,
|
||||
projection: Option<&[&str]>,
|
||||
filter: Option<&str>,
|
||||
order_by: Option<Vec<ColumnOrdering>>,
|
||||
with_row_id: bool,
|
||||
) -> Result<DatasetRecordBatchStream> {
|
||||
// Note: existing TableStore::scan_stream is an associated fn that
|
||||
// takes &Dataset, so we delegate via the dataset reference held by
|
||||
// the snapshot.
|
||||
TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await
|
||||
}
|
||||
}
|
||||
|
|
@ -4,7 +4,7 @@ use arrow_select::concat::concat_batches;
|
|||
use futures::TryStreamExt;
|
||||
use lance::Dataset;
|
||||
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
|
||||
use lance::dataset::transaction::{Operation, Transaction};
|
||||
use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
|
||||
use lance::dataset::{
|
||||
CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
|
||||
WriteParams,
|
||||
|
|
@ -760,6 +760,172 @@ impl TableStore {
|
|||
.map_err(|e| OmniError::Lance(e.to_string()))
|
||||
}
|
||||
|
||||
/// Stage an overwrite (write_fragments + Operation::Overwrite { schema, fragments }).
|
||||
/// Returns a StagedWrite carrying the replacement fragments. HEAD does
|
||||
/// NOT advance.
|
||||
///
|
||||
/// Lance shape: `InsertBuilder::with_params(WriteParams { mode: Overwrite, .. })
|
||||
/// .execute_uncommitted(vec![batch])` produces a `Transaction` whose
|
||||
/// `Operation::Overwrite` carries the new schema + fragments. The
|
||||
/// transaction is committed via `commit_staged` (same call as
|
||||
/// `stage_append`).
|
||||
///
|
||||
/// MR-793 Phase 2: introduces this for the schema_apply rewrite path.
|
||||
/// Lance API verified in `.context/mr-793-design.md` Appendix A.1.
|
||||
pub async fn stage_overwrite(
|
||||
&self,
|
||||
ds: &Dataset,
|
||||
batch: RecordBatch,
|
||||
) -> Result<StagedWrite> {
|
||||
if batch.num_rows() == 0 {
|
||||
return Err(OmniError::manifest_internal(
|
||||
"stage_overwrite called with empty batch".to_string(),
|
||||
));
|
||||
}
|
||||
let params = WriteParams {
|
||||
mode: WriteMode::Overwrite,
|
||||
allow_external_blob_outside_bases: true,
|
||||
..Default::default()
|
||||
};
|
||||
let transaction = InsertBuilder::new(Arc::new(ds.clone()))
|
||||
.with_params(¶ms)
|
||||
.execute_uncommitted(vec![batch])
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
let mut new_fragments = match &transaction.operation {
|
||||
Operation::Overwrite { fragments, .. } => fragments.clone(),
|
||||
other => {
|
||||
return Err(OmniError::manifest_internal(format!(
|
||||
"stage_overwrite: unexpected Lance operation {:?}",
|
||||
std::mem::discriminant(other)
|
||||
)));
|
||||
}
|
||||
};
|
||||
// Overwrite REPLACES every committed fragment, and Lance restarts
|
||||
// fragment-ID and row-ID counters at the post-commit version.
|
||||
// For our pre-commit staged view we need to:
|
||||
// 1) Renumber temporary fragment IDs (Lance returns them as
|
||||
// `id = 0` from `execute_uncommitted` — see stage_append
|
||||
// for the same fix). For Overwrite there are no committed
|
||||
// fragments to collide with (they're all in
|
||||
// removed_fragment_ids below), so start at 1.
|
||||
// 2) For stable-row-id datasets, assign row_id_meta starting
|
||||
// at 0 (Overwrite is a fresh-start) so `scan_with_staged`
|
||||
// doesn't hit the "Missing row id meta" panic in
|
||||
// lance-4.0.0 dataset/rowids.rs:22.
|
||||
assign_fragment_ids(&mut new_fragments, 1);
|
||||
if ds.manifest.uses_stable_row_ids() {
|
||||
assign_row_id_meta(&mut new_fragments, 0)?;
|
||||
}
|
||||
// Overwrite REPLACES every committed fragment. For
|
||||
// read-your-writes via scan_with_staged, list every committed
|
||||
// fragment in removed_fragment_ids so the post-stage view shows
|
||||
// ONLY the staged fragments.
|
||||
let removed_fragment_ids: Vec<u64> =
|
||||
ds.manifest.fragments.iter().map(|f| f.id).collect();
|
||||
Ok(StagedWrite {
|
||||
transaction,
|
||||
new_fragments,
|
||||
removed_fragment_ids,
|
||||
})
|
||||
}
|
||||
|
||||
/// Stage a BTREE scalar index build. Returns a StagedWrite whose
|
||||
/// transaction commits via `commit_staged`. HEAD does NOT advance.
|
||||
///
|
||||
/// Lance shape: `CreateIndexBuilder::execute_uncommitted` returns
|
||||
/// `IndexMetadata`; we manually wrap it in `Operation::CreateIndex
|
||||
/// { new_indices, removed_indices }` via the public `TransactionBuilder`,
|
||||
/// replicating the simple (non-segment-commit-path) branch of Lance's
|
||||
/// `CreateIndexBuilder::execute` (lance-4.0.0 `src/index/create.rs:502-512`).
|
||||
///
|
||||
/// `removed_indices` mirrors `execute()` lines 466-476: when the
|
||||
/// build replaces an existing same-named index, those entries are
|
||||
/// listed for tombstoning by the manifest commit.
|
||||
///
|
||||
/// MR-793 Phase 2: scalar index types (BTree, Inverted) are
|
||||
/// stage-able. Vector indices are NOT (segment-commit-path requires
|
||||
/// `build_index_metadata_from_segments` which is `pub(crate)` in
|
||||
/// lance-4.0.0); see `create_vector_index` and Appendix A.3.
|
||||
pub async fn stage_create_btree_index(
|
||||
&self,
|
||||
ds: &Dataset,
|
||||
columns: &[&str],
|
||||
) -> Result<StagedWrite> {
|
||||
let params = ScalarIndexParams::default();
|
||||
let mut ds_clone = ds.clone();
|
||||
let new_idx = ds_clone
|
||||
.create_index_builder(columns, IndexType::BTree, ¶ms)
|
||||
.replace(true)
|
||||
.execute_uncommitted()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!("stage_create_btree_index: {}", e))
|
||||
})?;
|
||||
let removed_indices: Vec<IndexMetadata> = ds
|
||||
.load_indices()
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
.iter()
|
||||
.filter(|idx| idx.name == new_idx.name)
|
||||
.cloned()
|
||||
.collect();
|
||||
let transaction = TransactionBuilder::new(
|
||||
new_idx.dataset_version,
|
||||
Operation::CreateIndex {
|
||||
new_indices: vec![new_idx],
|
||||
removed_indices,
|
||||
},
|
||||
)
|
||||
.build();
|
||||
Ok(StagedWrite {
|
||||
transaction,
|
||||
new_fragments: Vec::new(),
|
||||
removed_fragment_ids: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Stage an INVERTED (FTS) scalar index build. Same shape as
|
||||
/// `stage_create_btree_index`; see its docs for the Lance API
|
||||
/// citation and contract notes.
|
||||
pub async fn stage_create_inverted_index(
|
||||
&self,
|
||||
ds: &Dataset,
|
||||
column: &str,
|
||||
) -> Result<StagedWrite> {
|
||||
let params = InvertedIndexParams::default();
|
||||
let mut ds_clone = ds.clone();
|
||||
let new_idx = ds_clone
|
||||
.create_index_builder(&[column], IndexType::Inverted, ¶ms)
|
||||
.replace(true)
|
||||
.execute_uncommitted()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!("stage_create_inverted_index: {}", e))
|
||||
})?;
|
||||
let removed_indices: Vec<IndexMetadata> = ds
|
||||
.load_indices()
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
.iter()
|
||||
.filter(|idx| idx.name == new_idx.name)
|
||||
.cloned()
|
||||
.collect();
|
||||
let transaction = TransactionBuilder::new(
|
||||
new_idx.dataset_version,
|
||||
Operation::CreateIndex {
|
||||
new_indices: vec![new_idx],
|
||||
removed_indices,
|
||||
},
|
||||
)
|
||||
.build();
|
||||
Ok(StagedWrite {
|
||||
transaction,
|
||||
new_fragments: Vec::new(),
|
||||
removed_fragment_ids: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Run a scan with optional uncommitted staged writes visible
|
||||
/// alongside the committed snapshot. When `staged` is empty this is
|
||||
/// identical to `scan(...)`.
|
||||
|
|
|
|||
|
|
@ -265,6 +265,71 @@ async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
|
|||
.expect("Company write on a non-drifted table should succeed");
|
||||
}
|
||||
|
||||
/// MR-793 Phase 4 acceptance bar — proves that a Phase A failure in
|
||||
/// the staged-index path (`stage_create_btree_index` succeeded;
|
||||
/// `commit_staged` not yet called) leaves NO Lance-HEAD drift on the
|
||||
/// existing tables. Subsequent operations against those tables succeed
|
||||
/// without `ExpectedVersionMismatch`.
|
||||
///
|
||||
/// Path: `apply_schema(v1 → v2)` adds a new node type. The
|
||||
/// `added_tables` loop in `schema_apply` creates the empty dataset and
|
||||
/// then calls `build_indices_on_dataset_for_catalog` →
|
||||
/// `stage_and_commit_btree(..., &["id"])`. The failpoint fires
|
||||
/// between `stage_create_btree_index` and `commit_staged`, so the
|
||||
/// staged segments are written under `_indices/<uuid>/` but Lance HEAD
|
||||
/// on the new dataset is unchanged at v=1. The schema-apply lock
|
||||
/// branch is released by `apply_schema`'s outer match. Existing
|
||||
/// tables (e.g. `node:Person`) are completely untouched by the new
|
||||
/// node's added_tables iteration — they're outside the failed apply
|
||||
/// path entirely — and we assert that mutations against them continue
|
||||
/// to work.
|
||||
///
|
||||
/// The orphan empty dataset from the failed apply is acceptable
|
||||
/// residual: it's unreferenced by `__manifest` and will be reclaimed
|
||||
/// by `cleanup_old_versions` (or removed when a future apply at the
|
||||
/// same target path resolves the rename).
|
||||
#[tokio::test]
|
||||
async fn ensure_indices_phase_a_btree_failure_leaves_existing_tables_writable() {
|
||||
let _scenario = FailScenario::setup();
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap().to_string();
|
||||
|
||||
// Init with TEST_SCHEMA which declares Person + Knows. Indices on
|
||||
// those tables get built during init.
|
||||
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
||||
|
||||
// Apply a schema that adds a new node type. The added_tables loop
|
||||
// will hit the failpoint between stage and commit on the new
|
||||
// node:Project table's btree-on-id build. (TEST_SCHEMA already
|
||||
// has Person + Company + Knows + WorksAt — pick a name that isn't
|
||||
// already declared.)
|
||||
let extended_schema = format!("{}\nnode Project {{ name: String @key }}\n", helpers::TEST_SCHEMA);
|
||||
|
||||
{
|
||||
let _failpoint = ScopedFailPoint::new(
|
||||
"ensure_indices.post_stage_pre_commit_btree",
|
||||
"return",
|
||||
);
|
||||
let err = db.apply_schema(&extended_schema).await.unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("ensure_indices.post_stage_pre_commit_btree"),
|
||||
"schema apply should fail with the synthetic failpoint error, got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
// Existing tables stayed at their pre-apply versions; subsequent
|
||||
// mutations against them succeed (no Lance-HEAD drift).
|
||||
mutate_main(
|
||||
&mut db,
|
||||
helpers::MUTATION_QUERIES,
|
||||
"insert_person",
|
||||
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
||||
)
|
||||
.await
|
||||
.expect("Person mutation must succeed after the failed schema apply — existing tables are not drifted");
|
||||
}
|
||||
|
||||
fn assert_no_staging_files(repo: &std::path::Path) {
|
||||
for name in [
|
||||
"_schema.pg.staging",
|
||||
|
|
|
|||
215
crates/omnigraph/tests/forbidden_apis.rs
Normal file
215
crates/omnigraph/tests/forbidden_apis.rs
Normal file
|
|
@ -0,0 +1,215 @@
|
|||
//! MR-793 Phase 3 — forbidden-API guard test.
|
||||
//!
|
||||
//! Engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) MUST NOT
|
||||
//! call Lance's inline-commit data-write APIs directly. The
|
||||
//! `Storage` trait (`crate::storage_layer::TableStorage`) is the canonical
|
||||
//! surface; staged primitives (`stage_append`, `stage_merge_insert`,
|
||||
//! `stage_overwrite`, `stage_create_btree_index`,
|
||||
//! `stage_create_inverted_index`) plus `commit_staged` are the only
|
||||
//! way to advance Lance HEAD.
|
||||
//!
|
||||
//! The trait is sealed (only `TableStore` impls it), so by-construction
|
||||
//! the trait surface forbids ad-hoc Lance calls. This test is **defense
|
||||
//! in depth** — it catches the case where engine code reaches around
|
||||
//! the trait by importing `lance::dataset::*` types directly.
|
||||
//!
|
||||
//! ## How it works
|
||||
//!
|
||||
//! Walks `crates/omnigraph/src/{exec,db/omnigraph,loader,changes}/**/*.rs`,
|
||||
//! greps each line for forbidden symbols. Lines whose preceding line
|
||||
//! contains the sentinel comment `// forbidden-api-allow: <reason>` are
|
||||
//! exempt — reviewers see the sentinel in diff and can ask "is this
|
||||
//! exemption justified?"
|
||||
//!
|
||||
//! ## What's deliberately out of scope (allow-listed by directory)
|
||||
//!
|
||||
//! - `crates/omnigraph/src/table_store.rs` — IS the storage layer.
|
||||
//! The forbidden Lance APIs live here legitimately.
|
||||
//! - `crates/omnigraph/src/db/manifest/**` — uses `CommitBuilder` for
|
||||
//! the cross-table manifest commit. Documented exception.
|
||||
//! - `crates/omnigraph/src/storage_layer.rs` — IS the trait module.
|
||||
//!
|
||||
//! ## Initial state (MR-793 Phase 3)
|
||||
//!
|
||||
//! At the time this test was written, MR-793 has migrated three writers
|
||||
//! (ensure_indices, branch_merge, schema_apply rewrites) onto staged
|
||||
//! primitives. Other engine call sites (the bulk loader, exec/mutation,
|
||||
//! exec/query, etc.) still use the legacy inherent `TableStore` methods
|
||||
//! — they're not visible at the trait boundary, but they DO call lance
|
||||
//! types. The allow-list below reflects this transitional state. Phase 9
|
||||
//! tightens the allow-list as call sites migrate.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
const FORBIDDEN_PATTERNS: &[&str] = &[
|
||||
// Builder types — direct construction is the side door around the
|
||||
// staged-write surface.
|
||||
"MergeInsertBuilder",
|
||||
"InsertBuilder::",
|
||||
"DeleteBuilder",
|
||||
"CommitBuilder::new",
|
||||
".create_index_builder(",
|
||||
".create_index_segment_builder(",
|
||||
// Associated-function forms of inline-commit Lance APIs. These would
|
||||
// only appear in source if the file imports `lance::Dataset` and
|
||||
// calls the static fn — exactly the misuse we want to catch. These
|
||||
// patterns deliberately exclude `.append(` / `.delete(` / `.write(`
|
||||
// because those would over-match (`.delete_branch(`, `Vec::append`,
|
||||
// arrow-array `.append(`, etc.).
|
||||
"Dataset::write",
|
||||
"Dataset::append",
|
||||
"Dataset::delete",
|
||||
"Dataset::merge_insert",
|
||||
"Dataset::add_columns",
|
||||
"Dataset::update_columns",
|
||||
"Dataset::drop_columns",
|
||||
"Dataset::truncate_table",
|
||||
"Dataset::restore",
|
||||
// Lance-specific method names that don't clash with our `TableStore`
|
||||
// wrappers (we use `merge_insert_batch{,es}`, `add_columns_to_*`,
|
||||
// etc. — never the bare Lance names). Engine code that writes
|
||||
// `ds.merge_insert(...)` against a `Dataset` value is reaching
|
||||
// around the trait surface.
|
||||
".merge_insert(",
|
||||
".add_columns(",
|
||||
".update_columns(",
|
||||
".drop_columns(",
|
||||
".truncate_table(",
|
||||
// `.restore(` is Lance-specific (no other library in this workspace
|
||||
// exposes a `.restore(` method); safe to ban without false-positive
|
||||
// risk. Used to revert a Lance dataset to a prior version — never
|
||||
// an operation engine code should perform directly.
|
||||
".restore(",
|
||||
// NOT included: `.append(`, `.delete(`, `.write(`. Each over-matches
|
||||
// legitimate non-Lance uses (`Vec::append`, `String::append`, arrow
|
||||
// array `BuilderArray::append`, `ObjectStore::delete`, etc.).
|
||||
// Engine code calling `ds.append(reader, params)` against an
|
||||
// imported `lance::Dataset` is the residual bypass route this guard
|
||||
// does NOT catch — but the trait surface itself is the primary
|
||||
// enforcement (sealed + only-callable-via-trait once Phase 1b
|
||||
// call-site conversion completes), so this gap is bounded.
|
||||
];
|
||||
|
||||
/// Files exempt from the guard. These are the legitimate storage-layer
|
||||
/// or manifest-layer implementations that USE the forbidden APIs to
|
||||
/// provide the staged primitives or to maintain the system tables
|
||||
/// (commit graph, manifest).
|
||||
const ALLOW_LIST_FILES: &[&str] = &[
|
||||
"table_store.rs", // The storage layer itself.
|
||||
"storage_layer.rs", // The trait module.
|
||||
"commit_graph.rs", // Maintains `_graph_commits.lance` system table.
|
||||
"graph_coordinator.rs", // Drives the manifest publisher / branch coordinator.
|
||||
];
|
||||
|
||||
/// Directories exempt from the guard. Files under these paths may use
|
||||
/// the forbidden APIs.
|
||||
const ALLOW_LIST_DIRS: &[&str] = &[
|
||||
"db/manifest", // Manifest publisher uses CommitBuilder for cross-table commits.
|
||||
"db/manifest/", // Belt + suspenders for the directory match.
|
||||
];
|
||||
|
||||
const SENTINEL: &str = "// forbidden-api-allow:";
|
||||
|
||||
fn engine_src_root() -> PathBuf {
|
||||
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR");
|
||||
PathBuf::from(manifest_dir).join("src")
|
||||
}
|
||||
|
||||
fn is_allow_listed(path: &Path) -> bool {
|
||||
let path_str = path.to_string_lossy();
|
||||
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
|
||||
if ALLOW_LIST_FILES.iter().any(|f| *f == name) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
ALLOW_LIST_DIRS.iter().any(|d| path_str.contains(d))
|
||||
}
|
||||
|
||||
fn walk_rust_files(root: &Path) -> Vec<PathBuf> {
|
||||
let mut out = Vec::new();
|
||||
walk_into(root, &mut out);
|
||||
out
|
||||
}
|
||||
|
||||
fn walk_into(dir: &Path, out: &mut Vec<PathBuf>) {
|
||||
let entries = match std::fs::read_dir(dir) {
|
||||
Ok(e) => e,
|
||||
Err(_) => return,
|
||||
};
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
walk_into(&path, out);
|
||||
} else if path.extension().and_then(|s| s.to_str()) == Some("rs") {
|
||||
out.push(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn engine_code_does_not_call_forbidden_lance_apis() {
|
||||
let src = engine_src_root();
|
||||
let mut violations = Vec::new();
|
||||
|
||||
for file in walk_rust_files(&src) {
|
||||
if is_allow_listed(&file) {
|
||||
continue;
|
||||
}
|
||||
let contents = match std::fs::read_to_string(&file) {
|
||||
Ok(c) => c,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let lines: Vec<&str> = contents.lines().collect();
|
||||
for (idx, line) in lines.iter().enumerate() {
|
||||
let trimmed = line.trim_start();
|
||||
// Skip comment-only lines — references to forbidden API
|
||||
// names in doc-comments, design notes, or residual-marker
|
||||
// comments are documentation, not code use. The trait
|
||||
// surface (sealed + trait-only) is the actual enforcement;
|
||||
// this test only catches code use.
|
||||
if trimmed.starts_with("//")
|
||||
|| trimmed.starts_with("/*")
|
||||
|| trimmed.starts_with("*")
|
||||
{
|
||||
continue;
|
||||
}
|
||||
// Allow lines marked with the sentinel on the SAME line or
|
||||
// the immediately preceding line.
|
||||
if line.contains(SENTINEL) {
|
||||
continue;
|
||||
}
|
||||
if idx > 0 && lines[idx - 1].contains(SENTINEL) {
|
||||
continue;
|
||||
}
|
||||
for pattern in FORBIDDEN_PATTERNS {
|
||||
if line.contains(pattern) {
|
||||
let rel = file
|
||||
.strip_prefix(&src)
|
||||
.unwrap_or(&file)
|
||||
.display()
|
||||
.to_string();
|
||||
violations.push(format!(
|
||||
"{}:{}: forbidden pattern `{}` — {}",
|
||||
rel,
|
||||
idx + 1,
|
||||
pattern,
|
||||
line.trim()
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !violations.is_empty() {
|
||||
panic!(
|
||||
"MR-793 forbidden-API guard found {} violation(s) in engine code. \
|
||||
Engine code MUST route through the `TableStorage` trait (or its \
|
||||
inherent counterparts on `TableStore`) instead of calling Lance's \
|
||||
inline-commit APIs directly. If a use is genuinely justified, add \
|
||||
the comment `// forbidden-api-allow: <reason>` on the same line or \
|
||||
the line above.\n\nViolations:\n {}",
|
||||
violations.len(),
|
||||
violations.join("\n ")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -492,3 +492,269 @@ async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior
|
|||
surface in production paths — see exec/staging.rs."
|
||||
);
|
||||
}
|
||||
|
||||
// ─── MR-793 Phase 2: stage_overwrite + scalar index staging ─────────────────
|
||||
|
||||
/// `stage_overwrite` writes replacement fragments to object storage but
|
||||
/// does NOT advance Lance HEAD until `commit_staged` runs. Mirrors
|
||||
/// `stage_append`'s contract.
|
||||
#[tokio::test]
|
||||
async fn stage_overwrite_does_not_advance_head_until_commit() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
|
||||
let store = TableStore::new(dir.path().to_str().unwrap());
|
||||
|
||||
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
|
||||
.await
|
||||
.unwrap();
|
||||
let pre_version = ds.version().version;
|
||||
|
||||
let staged = store
|
||||
.stage_overwrite(&ds, person_batch(&[("zoe", Some(99))]))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
ds.version().version,
|
||||
pre_version,
|
||||
"stage_overwrite must not advance HEAD"
|
||||
);
|
||||
// Reopen at HEAD; still pre-version (no commit happened on disk).
|
||||
let reopened = Dataset::open(&uri).await.unwrap();
|
||||
assert_eq!(reopened.version().version, pre_version);
|
||||
|
||||
// After commit_staged, HEAD advances and the dataset shows the
|
||||
// overwrite result (zoe alone — alice replaced).
|
||||
let new_ds = store
|
||||
.commit_staged(Arc::new(ds.clone()), staged.transaction)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(new_ds.version().version > pre_version);
|
||||
let after = store.scan_batches(&new_ds).await.unwrap();
|
||||
assert_eq!(collect_ids(&after), vec!["zoe"]);
|
||||
}
|
||||
|
||||
/// `stage_overwrite` semantically REPLACES every committed fragment.
|
||||
/// `removed_fragment_ids` lists every committed fragment so
|
||||
/// `scan_with_staged` shows only the staged rows (not committed + staged).
|
||||
#[tokio::test]
|
||||
async fn stage_overwrite_replaces_all_fragments() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
|
||||
let store = TableStore::new(dir.path().to_str().unwrap());
|
||||
|
||||
let ds = TableStore::write_dataset(
|
||||
&uri,
|
||||
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let committed_fragment_ids: std::collections::HashSet<u64> =
|
||||
ds.manifest.fragments.iter().map(|f| f.id).collect();
|
||||
|
||||
let staged = store
|
||||
.stage_overwrite(&ds, person_batch(&[("zoe", Some(99))]))
|
||||
.await
|
||||
.unwrap();
|
||||
let removed: std::collections::HashSet<u64> =
|
||||
staged.removed_fragment_ids.iter().copied().collect();
|
||||
assert_eq!(
|
||||
removed, committed_fragment_ids,
|
||||
"stage_overwrite must list every committed fragment as removed so \
|
||||
scan_with_staged shadows them all (overwrite semantics — pre-data \
|
||||
is being wiped)"
|
||||
);
|
||||
|
||||
let batches = store
|
||||
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
collect_ids(&batches),
|
||||
vec!["zoe"],
|
||||
"scan_with_staged must show only the staged row, not committed + staged"
|
||||
);
|
||||
}
|
||||
|
||||
/// `stage_create_btree_index` writes index segments to object storage
|
||||
/// but does NOT advance Lance HEAD until `commit_staged`. After commit,
|
||||
/// the index is queryable.
|
||||
#[tokio::test]
|
||||
async fn stage_create_btree_index_does_not_advance_head_until_commit() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
|
||||
let store = TableStore::new(dir.path().to_str().unwrap());
|
||||
|
||||
let ds = TableStore::write_dataset(
|
||||
&uri,
|
||||
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let pre_version = ds.version().version;
|
||||
assert!(
|
||||
!store.has_btree_index(&ds, "id").await.unwrap(),
|
||||
"fresh dataset has no btree index on `id`"
|
||||
);
|
||||
|
||||
let staged = store.stage_create_btree_index(&ds, &["id"]).await.unwrap();
|
||||
assert_eq!(
|
||||
ds.version().version,
|
||||
pre_version,
|
||||
"stage_create_btree_index must not advance HEAD"
|
||||
);
|
||||
let reopened = Dataset::open(&uri).await.unwrap();
|
||||
assert_eq!(
|
||||
reopened.version().version,
|
||||
pre_version,
|
||||
"no Lance commit happened on disk"
|
||||
);
|
||||
assert!(
|
||||
!store.has_btree_index(&reopened, "id").await.unwrap(),
|
||||
"index is not visible until commit_staged"
|
||||
);
|
||||
|
||||
let new_ds = store
|
||||
.commit_staged(Arc::new(ds.clone()), staged.transaction)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(new_ds.version().version > pre_version);
|
||||
assert!(
|
||||
store.has_btree_index(&new_ds, "id").await.unwrap(),
|
||||
"after commit_staged, the index IS visible"
|
||||
);
|
||||
}
|
||||
|
||||
/// `stage_create_inverted_index` (FTS) — same shape as the BTREE test.
|
||||
#[tokio::test]
|
||||
async fn stage_create_inverted_index_does_not_advance_head_until_commit() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
|
||||
let store = TableStore::new(dir.path().to_str().unwrap());
|
||||
|
||||
let ds = TableStore::write_dataset(
|
||||
&uri,
|
||||
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let pre_version = ds.version().version;
|
||||
|
||||
let staged = store
|
||||
.stage_create_inverted_index(&ds, "id")
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
ds.version().version,
|
||||
pre_version,
|
||||
"stage_create_inverted_index must not advance HEAD"
|
||||
);
|
||||
assert!(!store.has_fts_index(&ds, "id").await.unwrap());
|
||||
|
||||
let new_ds = store
|
||||
.commit_staged(Arc::new(ds.clone()), staged.transaction)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(new_ds.version().version > pre_version);
|
||||
assert!(
|
||||
store.has_fts_index(&new_ds, "id").await.unwrap(),
|
||||
"after commit_staged, the FTS index IS visible"
|
||||
);
|
||||
}
|
||||
|
||||
/// Pin the inline-commit behavior of `delete_where`. Lance 4.0.0 does
|
||||
/// NOT expose a public `DeleteJob::execute_uncommitted`
|
||||
/// (`pub(crate)` — see lance-format/lance#6658). MR-793 deliberately
|
||||
/// does NOT introduce a `stage_delete` wrapper that would secretly
|
||||
/// inline-commit (a side-channel — see design doc §3.2). Instead, the
|
||||
/// trait keeps `delete_where` as the only delete entry point, named
|
||||
/// honestly.
|
||||
///
|
||||
/// **When Lance #6658 lands**: this test will need to flip — replace
|
||||
/// the assertion with a `stage_delete` + `commit_staged` round-trip
|
||||
/// and remove the residual line in `docs/runs.md`.
|
||||
#[tokio::test]
|
||||
async fn delete_where_advances_head_inline_documents_residual() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
|
||||
let store = TableStore::new(dir.path().to_str().unwrap());
|
||||
|
||||
let mut ds = TableStore::write_dataset(
|
||||
&uri,
|
||||
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let pre_version = ds.version().version;
|
||||
|
||||
let result = store
|
||||
.delete_where(&uri, &mut ds, "id = 'alice'")
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.deleted_rows, 1);
|
||||
assert!(
|
||||
result.version > pre_version,
|
||||
"delete_where ADVANCES Lance HEAD inline (the residual). When \
|
||||
lance-format/lance#6658 ships and we migrate to stage_delete + \
|
||||
commit_staged, flip this assertion to assert that staging does \
|
||||
NOT advance HEAD."
|
||||
);
|
||||
}
|
||||
|
||||
/// Companion to `delete_where_*`: pin the inline-commit behavior of
|
||||
/// `create_vector_index`. Lance 4.0.0 vector indices take the
|
||||
/// "segment commit path" which calls `build_index_metadata_from_segments`
|
||||
/// (`pub(crate)` in lance-4.0.0 `src/index.rs:111`). Until upstream
|
||||
/// exposes that helper (companion ticket to #6658), MR-793's trait
|
||||
/// surface deliberately does NOT include `stage_create_vector_index` —
|
||||
/// see design doc Appendix A.3.
|
||||
#[tokio::test]
|
||||
async fn create_vector_index_advances_head_inline_documents_residual() {
|
||||
use arrow_array::FixedSizeListArray;
|
||||
use arrow_schema::FieldRef;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = format!("{}/vec.lance", dir.path().to_str().unwrap());
|
||||
let store = TableStore::new(dir.path().to_str().unwrap());
|
||||
|
||||
// Build a small dataset with a fixed-size vector column. Vector index
|
||||
// training requires multiple rows; provide enough.
|
||||
let dim = 4usize;
|
||||
let n_rows = 8usize;
|
||||
let item_field: FieldRef = Arc::new(Field::new("item", DataType::Float32, true));
|
||||
let vec_field = Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(item_field.clone(), dim as i32),
|
||||
false,
|
||||
);
|
||||
let id_field = Field::new("id", DataType::Utf8, false);
|
||||
let schema = Arc::new(Schema::new(vec![id_field, vec_field]));
|
||||
|
||||
let ids: Vec<String> = (0..n_rows).map(|i| format!("v{}", i)).collect();
|
||||
let id_arr = StringArray::from(ids);
|
||||
let flat: Vec<f32> = (0..(n_rows * dim)).map(|i| i as f32).collect();
|
||||
let values = arrow_array::Float32Array::from(flat);
|
||||
let vec_arr =
|
||||
FixedSizeListArray::new(item_field, dim as i32, Arc::new(values), None);
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(id_arr), Arc::new(vec_arr)],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut ds = TableStore::write_dataset(&uri, batch).await.unwrap();
|
||||
let pre_version = ds.version().version;
|
||||
assert!(!store.has_vector_index(&ds, "embedding").await.unwrap());
|
||||
|
||||
store
|
||||
.create_vector_index(&mut ds, "embedding")
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(
|
||||
ds.version().version > pre_version,
|
||||
"create_vector_index ADVANCES Lance HEAD inline (the residual). \
|
||||
When the upstream Lance helper `build_index_metadata_from_segments` \
|
||||
is made `pub`, add `stage_create_vector_index` to the trait and \
|
||||
flip this test to assert staging does NOT advance HEAD."
|
||||
);
|
||||
assert!(store.has_vector_index(&ds, "embedding").await.unwrap());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -105,6 +105,7 @@ These are user-visible commitments. They state what the engine guarantees and wh
|
|||
Specific defaults (timeout values, memory caps, TTL windows) are *configuration*, not invariants — see [docs/constants.md](constants.md) and per-deployment configuration. The invariant is that bounds and contracts exist, not their numerical values.
|
||||
|
||||
23. **Atomicity is per-query.** Every `.gq` query is atomic — multi-statement mutations are all-or-nothing via the substrate's atomic-commit primitive. No cross-query `BEGIN`/`COMMIT`; branches and merges fill that role for agent workflows.
|
||||
*Status: upheld at the writer-trait surface for inserts / updates / scalar-index builds / merge_insert / overwrite after MR-793 PR #70 — the sealed `TableStorage` trait routes those through `stage_*` + `commit_staged`, so a Phase A failure (between writing fragments and committing) leaves no Lance-HEAD drift on touched tables. **Per-table commit_staged → manifest publish window remains** — a failure between commits across multiple touched tables can leave drift on the partially-committed tables. Lance has no multi-dataset atomic commit primitive; closing this requires the recovery-on-open reconciler tracked in MR-847. Additionally, two writer paths still inline-commit pending upstream Lance work: `delete_where` (lance-format/lance#6658) and `create_vector_index` (lance-format/lance#6666).*
|
||||
|
||||
24. **Schema integrity is strict at commit.** Type validation, required-field presence (auto-filled from `@default` if declared), uniqueness across batches and versions, and referential integrity — all enforced before commit succeeds. Per-write softening flags are opt-in, never default.
|
||||
*Status: aspirational — referential integrity at scale requires SIP-backed cross-table validation; not yet implemented. Cross-batch / cross-version uniqueness tracked in MR-714.*
|
||||
|
|
@ -140,6 +141,7 @@ Specific defaults (timeout values, memory caps, TTL windows) are *configuration*
|
|||
These are *how* we realize the invariants today. They are committed conventions — until we explicitly revise them, new code follows them. They are not eternal: a future architecture review may replace any of these with a different mechanism that upholds the same invariants. The deny-list (§IX) protects them in the meantime.
|
||||
|
||||
35. **Reconciler pattern for derivable state.** Index coverage, statistics, anything derivable from manifest state — reconciled, not job-queued. *Realizes the "don't maintain state parallel to the substrate" invariant.* See MR-737 §5.16.
|
||||
*Status: partial after MR-793 PR #70 — scalar index builds (BTree, Inverted) now route through the staged primitives `stage_create_*_index` + `commit_staged` instead of inline `create_*_index`; this is the building block. The reconciler pattern itself (background `IndexReconciler` task driven by manifest commits, removing synchronous index work from the publish path) is tracked in MR-848. Vector indices remain inline-commit until lance-format/lance#6666 ships.*
|
||||
|
||||
36. **Polymorphism via Union, not per-feature lowering.** Interfaces / wildcards / alternation on nodes and edges share one IR (`Polymorphism<T>`) and one lowering (Union of per-type concrete plans). *Realizes "shared mechanism for shared shape."* See MR-737 §5.13.
|
||||
*Status: aspirational — node interfaces in MR-579; edge wildcards in MR-744.*
|
||||
|
|
@ -170,6 +172,7 @@ These are *how* we realize the invariants today. They are committed conventions
|
|||
44. **Tests at every boundary.** `MemStorage` for engine tests; planner-only tests; executor-only tests with a stub storage. No layer tested only via end-to-end.
|
||||
|
||||
45. **Reference implementation per trait.** Every trait has a primary impl (Lance for storage) and at least a test impl.
|
||||
*Status: partial after MR-793 PR #70 — `TableStorage` (the engine-internal staged-write trait, sealed) has its primary impl on `TableStore` (Lance-backed). The trait's signatures use opaque `SnapshotHandle` / `StagedHandle` types so a future test impl (e.g., `MemStorage`) can land without changing call sites. No test impl yet; `tempfile::tempdir()` + Lance is the de-facto test substrate today (see [docs/testing.md](testing.md)).*
|
||||
|
||||
46. **Documented capability surface.** New capabilities are documented with what they advertise, who consumes them, how the planner uses them.
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ OmniGraph sits on top of Lance. Many problems — index lifecycle, branching, tr
|
|||
|
||||
This file is the curated entry point. **When you hit a Lance-shaped problem, find the matching topic below and fetch the listed URL(s) before guessing.** Don't grep our codebase for behavior that is documented authoritatively in Lance.
|
||||
|
||||
Base URL: `https://lance.org`. Use `WebFetch` (or your tool's equivalent) on the full URLs. Keep this index curated to relevant material — the upstream sitemap has hundreds of URLs (notably the Namespace REST API model surface, Spark/Trino/Databricks integrations) that we don't use.
|
||||
Base URL: `https://lance.org`. **Fetch the FULL page content, not summaries** — use `npx mdrip <url>` (or `npx mdrip --max-chars 200000 <url>` for very long pages). Tools that summarize pages (like Claude's `WebFetch`) routinely drop load-bearing details — defaults, `pub(crate)` blockers, sub-specs hidden behind navigation hubs. If `npx mdrip` is unavailable, fall back to `curl <url> | pandoc -f html -t markdown` or paste the rendered page text manually; **never act on a summarized fetch alone**. Keep this index curated to relevant material — the upstream sitemap has hundreds of URLs (notably the Namespace REST API model surface, Spark/Trino/Databricks integrations) that we don't use.
|
||||
|
||||
> **Substrate boundary check.** Before fetching, recall [docs/invariants.md §I](invariants.md): if Lance already does the thing, we don't reimplement it. The most common reason to read these docs is to confirm a substrate behavior, not to learn what to clone.
|
||||
|
||||
|
|
@ -155,3 +155,14 @@ If a future need pulls one of these into scope, add a row to the matching domain
|
|||
## Maintenance
|
||||
|
||||
When Lance ships a major release that changes any of the above (file format bump, new index type, transaction semantics change, new branching primitive), refresh this index in the same change as the omnigraph upgrade. Stale Lance pointers are worse than no pointers.
|
||||
|
||||
### Last alignment audit: 2026-05-02 (Lance 4.0.1 upstream; omnigraph pinned at 4.0.0)
|
||||
|
||||
A full read-through of every index page above was performed in the MR-793 cycle. Findings (no code changes required for PR #70):
|
||||
|
||||
- The MemWAL "three sub-pages" (Overview / Details / Implementation) turned out to be **anchor sections on the single existing page** at `https://lance.org/format/table/mem_wal/` — not separate URLs. Fetched in full via `npx mdrip`. Findings: MemWAL is opt-in (requires an unenforced primary key + explicit shard config; omnigraph doesn't use it), operates intra-table (LSM-tree for streaming writes into one Lance table), and does NOT overlap with MR-847's cross-table manifest-vs-Lance-HEAD recovery problem. MR-847's design is unaffected.
|
||||
- The distributed-indexing guide names Python APIs (`commit_existing_index_segments`, `merge_existing_index_segments`); the Rust analogues exist via `CreateIndexBuilder::execute_uncommitted` for scalar indices but **`build_index_metadata_from_segments` is `pub(crate)`** and blocks vector-index two-phase commits from outside the lance crate. Filed [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) as a companion to [#6658](https://github.com/lance-format/lance/issues/6658).
|
||||
- "Stable Row ID for Index" is documented as **experimental** in lance-4.0.x. Our datasets enable stable row IDs at the dataset level (`WriteParams::enable_stable_row_ids = true`); confirming whether our created indices opt into stable-row-id mode is a follow-up worth doing before MR-848 (index reconciler) lands.
|
||||
- Fragment Reuse Index (FRI) is documented as one of three compaction strategies. omnigraph currently uses option 2 (immediate index rewrite at compaction time, via `omnigraph optimize`'s post-compaction rebuild). Adopting FRI is the explicit option for compaction-friendly index updates; relevant to MR-848.
|
||||
|
||||
Bump this date stanza on the next alignment pass.
|
||||
|
|
|
|||
55
docs/runs.md
55
docs/runs.md
|
|
@ -63,6 +63,61 @@ edges break referential integrity). Until Lance exposes a two-phase
|
|||
delete API, the parse-time rejection keeps both paths atomic and
|
||||
correct. Tracked: MR-793, plus a Lance-upstream ticket.
|
||||
|
||||
### MR-793 status (storage trait two-phase invariant) — partial
|
||||
|
||||
MR-793 hoists the staged-write pattern into a `TableStorage` trait
|
||||
surface with sealed-trait enforcement and opaque `SnapshotHandle` /
|
||||
`StagedHandle` types — see `crates/omnigraph/src/storage_layer.rs`.
|
||||
The trait is the canonical surface for new engine code; existing call
|
||||
sites still use the inherent `TableStore` methods (mechanical migration
|
||||
deferred to a follow-up cycle — tracked).
|
||||
|
||||
Three writers have been migrated onto staged primitives:
|
||||
|
||||
* **`ensure_indices`** (`db/omnigraph/table_ops.rs::build_indices_on_dataset_for_catalog`)
|
||||
— scalar indices (BTree, Inverted) now use `stage_create_*_index` +
|
||||
`commit_staged`. Vector indices stay inline (residual — Lance
|
||||
`build_index_metadata_from_segments` is `pub(crate)` in 4.0.0;
|
||||
companion ticket to lance-format/lance#6658 needed).
|
||||
* **`branch_merge::publish_rewritten_merge_table`**
|
||||
(`exec/merge.rs`) — merge_insert now uses `stage_merge_insert` +
|
||||
`commit_staged`. Deletes stay inline (Lance #6658 residual).
|
||||
* **`schema_apply` rewritten_tables** (`db/omnigraph/schema_apply.rs`)
|
||||
— non-empty rewrites use `stage_overwrite` + `commit_staged`.
|
||||
Empty-batch rewrites stay inline (Lance `InsertBuilder::execute_uncommitted`
|
||||
rejects empty data; the empty case is rare and bounded by the
|
||||
schema-apply lock branch).
|
||||
|
||||
A defense-in-depth integration test (`tests/forbidden_apis.rs`) walks
|
||||
engine source and fails if non-allow-listed code calls Lance's
|
||||
inline-commit APIs directly. The trait surface itself is the primary
|
||||
enforcement (sealed + only-callable-via-trait once call sites land);
|
||||
the grep test catches type-system bypass attempts.
|
||||
|
||||
The "finalize → publisher residual" described below applies equally to
|
||||
the migrated writers — Lance has no multi-dataset atomic commit
|
||||
primitive, so the per-table commit_staged → manifest publish gap is
|
||||
the same drift class. Closing it requires either upstream Lance
|
||||
multi-dataset commit OR the omnigraph-side recovery-on-open reconciler
|
||||
described in `.context/mr-793-design.md` §15 (deferred to MR-795).
|
||||
|
||||
### Inline-commit method residuals on `TableStorage` (MR-793 acceptance §1 option b)
|
||||
|
||||
MR-793's acceptance criterion §1 ("`TableStore` public API has no method that performs a manifest commit as a side effect of writing") is met **per-method** by enumerating every inline-commit method that remains on the trait surface, naming why it cannot yet be removed, and keeping the residual comment at every call site:
|
||||
|
||||
| Method on `TableStore` | Inline-commit reason | Closes when |
|
||||
|---|---|---|
|
||||
| `delete_where` | `DeleteJob` is `pub(crate)` in lance-4.0.0 — no public two-phase delete API | [lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658) lands and `stage_delete` joins the trait |
|
||||
| `create_vector_index` | Vector indices take Lance's "segment commit path"; the helper `build_index_metadata_from_segments` is `pub(crate)` | [lance-format/lance#6666](https://github.com/lance-format/lance/issues/6666) lands and `stage_create_vector_index` joins the trait |
|
||||
| `append_batch` | Legacy inherent method; some engine call sites haven't migrated to `stage_append + commit_staged` yet | MR-793 Phase 1b (call-site conversion) + Phase 9 (demote to `pub(crate)`) |
|
||||
| `merge_insert_batch` / `merge_insert_batches` | Legacy inherent method | Same — Phase 1b + Phase 9 |
|
||||
| `overwrite_batch` | Legacy inherent method | Same — Phase 1b + Phase 9 |
|
||||
| `create_btree_index` (inherent) | Legacy inherent method (the migrated callers use `stage_create_btree_index` + `commit_staged`; the inherent stays for tests / un-migrated paths) | Same — Phase 1b + Phase 9 |
|
||||
| `create_inverted_index` (inherent) | Same | Same — Phase 1b + Phase 9 + index-class split (MR-848) |
|
||||
| `truncate_table` (inherent on `TableStore`) | Used by `overwrite_batch` internally | Phase 9 |
|
||||
|
||||
After **lance#6658 + lance#6666 ship + MR-793 Phase 1b + MR-793 Phase 9 all complete**, the trait surface exposes only staged-write primitives + `commit_staged`. Until then this matrix names every residual explicitly, every call site carries a one-line residual comment, and no engine code outside `table_store.rs` is permitted to reach the inline-commit Lance APIs (enforced by the `tests/forbidden_apis.rs` guard).
|
||||
|
||||
### `LoadMode::Overwrite` residual
|
||||
|
||||
The bulk loader's Append and Merge modes use the staged-write path
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue