diff --git a/AGENTS.md b/AGENTS.md index 21c9e2a..ad1211a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -198,7 +198,7 @@ omnigraph policy explain --actor act-alice --action change --branch main | Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing | | Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables | | Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering | -| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher`. Engine-internal write APIs sit behind a sealed `TableStorage` trait (MR-793) — `stage_*` + `commit_staged` are the only paths to advance Lance HEAD; inline-commit Lance APIs are not reachable through the trait surface | +| Atomic single-dataset commits | ✅ | **Atomic multi-dataset publish** via `__manifest` + `ManifestBatchPublisher`. Engine-internal write APIs are routed through a sealed `TableStorage` trait (MR-793). `stage_*` + `commit_staged` are the canonical staged-write surface for new code; documented inline-commit residuals (`delete_where`, `create_vector_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)). The forbidden-API guard prevents engine code outside `table_store.rs` from importing inline-commit Lance APIs directly | | Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency | | Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy | | BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches | diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 7550f20..36f4b01 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -248,9 +248,20 @@ pub(super) async fn apply_schema_with_lock( let mut target_ds = if batch.num_rows() == 0 { TableStore::overwrite_dataset(&dataset_uri, batch).await? } else { + // Pass `entry.table_branch.as_deref()` (not `None`) for + // consistency with the indexed_tables block below. Schema + // apply runs under `__schema_apply_lock__` which today + // rejects non-main branches, so `entry.table_branch` is + // expected to be `None`. But the defensive passthrough + // means a future relaxation of the lock-check can't quietly + // open the wrong HEAD here. let existing = db .table_store - .open_dataset_head_for_write(table_key, &dataset_uri, None) + .open_dataset_head_for_write( + table_key, + &dataset_uri, + entry.table_branch.as_deref(), + ) .await?; let staged = db.table_store.stage_overwrite(&existing, batch).await?; db.table_store diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index fcde941..b343b4e 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -376,6 +376,10 @@ async fn stage_and_commit_btree( table_key, columns, e )) })?; + // Failpoint between stage and commit. Used by `tests/failpoints.rs` + // to demonstrate that a Phase A failure in the staged-index path + // leaves no Lance-HEAD drift on the touched table. + crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?; let new_ds = db .table_store .commit_staged(Arc::new(ds.clone()), staged.transaction) diff --git a/crates/omnigraph/src/storage_layer.rs b/crates/omnigraph/src/storage_layer.rs index c3599e6..f4fc657 100644 --- a/crates/omnigraph/src/storage_layer.rs +++ b/crates/omnigraph/src/storage_layer.rs @@ -1,17 +1,32 @@ //! Storage trait surface — MR-793. //! -//! `TableStorage` is the engine-internal trait that funnels every Lance -//! data write through staged primitives. Engine code (in `exec/`, -//! `db/omnigraph/`, `loader/`) holds `Arc` instead of -//! a concrete `TableStore`; the inline-commit Lance APIs -//! (`Dataset::append`, `MergeInsertBuilder::execute`, etc.) are not -//! reachable through the trait surface. +//! `TableStorage` is the engine-internal trait that exposes the +//! staged-write primitives (`stage_append`, `stage_merge_insert`, +//! `stage_overwrite`, `stage_create_btree_index`, +//! `stage_create_inverted_index`) plus `commit_staged` as the canonical +//! way for new engine writers to advance Lance HEAD without coupling +//! "write bytes" with "advance HEAD" in one Lance API call. +//! +//! ## Transitional residuals on the trait +//! +//! Several inline-commit methods remain on the trait surface as +//! documented residuals: `delete_where` (Lance 4.0.0's `DeleteJob` is +//! `pub(crate)` — see [#6658](https://github.com/lance-format/lance/issues/6658)), +//! `create_vector_index` (segment-commit-path requires +//! `build_index_metadata_from_segments` which is `pub(crate)` — see +//! [#6666](https://github.com/lance-format/lance/issues/6666)), and the +//! legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / +//! `create_btree_index` / `create_inverted_index` paths kept while +//! engine call sites finish migrating off of them (Phase 1b / Phase 9 +//! of MR-793). These are named honestly at every call site; the +//! forbidden-API guard test catches direct lance::* misuse outside the +//! storage layer. //! //! ## Sealed //! //! `TableStorage: sealed::Sealed`. Only types in this crate can implement -//! the trait, so the staged-write invariant cannot be subverted by a -//! downstream impl. +//! the trait, so a downstream crate cannot subvert the contract by +//! providing its own impl. //! //! ## Opaque handles //! @@ -21,15 +36,15 @@ //! through. This is the §III.9 alignment: `lance::Dataset` does not //! appear in trait signatures. //! -//! ## Scope (MR-793 Phase 1) +//! ## Migration status (MR-793 PR #70) //! -//! The trait surface mirrors the methods engine code currently calls on -//! `TableStore`. Subsequent MR-793 phases: -//! * Phase 2 — add `stage_overwrite`, `stage_create_btree_index`, -//! `stage_create_inverted_index` to the trait. -//! * Phase 4–6 — migrate writers (`ensure_indices`, `branch_merge`, -//! `schema_apply`) onto the staged primitives. -//! * Phase 9 — demote unused inline-commit methods to `pub(crate)`. +//! Phases 1a / 2 / 4 / 5 / 6 are landed: trait scaffolding, three new +//! staged primitives (`stage_overwrite`, scalar index staging), and +//! migration of `ensure_indices`, `branch_merge`, `schema_apply` onto +//! the staged surface. Phase 1b (call-site conversion to +//! `Arc`), Phase 9 (demote unused inline-commit +//! methods to `pub(crate)`), Phase 7 (recovery reconciler — MR-847), +//! and Phase 8 (index reconciler — MR-848) are deferred to follow-ups. use std::fmt::Debug; use std::sync::Arc; @@ -38,7 +53,7 @@ use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use async_trait::async_trait; use lance::Dataset; -use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner}; +use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream}; use lance::dataset::{WhenMatched, WhenNotMatched}; use crate::db::{Snapshot, SubTableEntry}; @@ -126,9 +141,12 @@ impl StagedHandle { } } -/// Helper: convert a slice of `StagedHandle` references to a Vec of -/// `&StagedWrite` for handing to `TableStore::stage_append`'s -/// `prior_stages` parameter. The lifetime is tied to the input slice. +/// Helper: clone the inner `StagedWrite` out of each `StagedHandle` and +/// collect into a `Vec` for handing to +/// `TableStore::stage_append`'s `prior_stages` parameter. The result is +/// owned (not borrowed) — callers that already had a `&[StagedHandle]` +/// pay a clone cost per element. `StagedWrite::clone` is cheap because +/// `Transaction` and `Vec` are shallow-clone friendly. pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec { handles.iter().map(|h| h.inner.clone()).collect() } @@ -357,7 +375,7 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { dataset_uri: &str, snapshot: SnapshotHandle, filter: &str, - ) -> Result; + ) -> Result<(SnapshotHandle, DeleteState)>; async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result; @@ -744,10 +762,11 @@ impl TableStorage for TableStore { dataset_uri: &str, snapshot: SnapshotHandle, filter: &str, - ) -> Result { + ) -> Result<(SnapshotHandle, DeleteState)> { let mut ds = Arc::try_unwrap(snapshot.into_arc()) .unwrap_or_else(|arc| (*arc).clone()); - TableStore::delete_where(self, dataset_uri, &mut ds, filter).await + let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?; + Ok((SnapshotHandle::new(ds), state)) } async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result { @@ -817,8 +836,3 @@ impl TableStorage for TableStore { TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await } } - -// Suppress unused-import warning when the module is built without the -// `Scanner` type being referenced from the trait. -#[allow(dead_code)] -fn _scanner_type_marker(_: &Scanner) {} diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index f16ed59..8f10b17 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -265,6 +265,71 @@ async fn finalize_publisher_residual_does_not_drift_untouched_tables() { .expect("Company write on a non-drifted table should succeed"); } +/// MR-793 Phase 4 acceptance bar — proves that a Phase A failure in +/// the staged-index path (`stage_create_btree_index` succeeded; +/// `commit_staged` not yet called) leaves NO Lance-HEAD drift on the +/// existing tables. Subsequent operations against those tables succeed +/// without `ExpectedVersionMismatch`. +/// +/// Path: `apply_schema(v1 → v2)` adds a new node type. The +/// `added_tables` loop in `schema_apply` creates the empty dataset and +/// then calls `build_indices_on_dataset_for_catalog` → +/// `stage_and_commit_btree(..., &["id"])`. The failpoint fires +/// between `stage_create_btree_index` and `commit_staged`, so the +/// staged segments are written under `_indices//` but Lance HEAD +/// on the new dataset is unchanged at v=1. The schema-apply lock +/// branch is released by `apply_schema`'s outer match. Existing +/// tables (e.g. `node:Person`) are completely untouched by the new +/// node's added_tables iteration — they're outside the failed apply +/// path entirely — and we assert that mutations against them continue +/// to work. +/// +/// The orphan empty dataset from the failed apply is acceptable +/// residual: it's unreferenced by `__manifest` and will be reclaimed +/// by `cleanup_old_versions` (or removed when a future apply at the +/// same target path resolves the rename). +#[tokio::test] +async fn ensure_indices_phase_a_btree_failure_leaves_existing_tables_writable() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Init with TEST_SCHEMA which declares Person + Knows. Indices on + // those tables get built during init. + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + // Apply a schema that adds a new node type. The added_tables loop + // will hit the failpoint between stage and commit on the new + // node:Project table's btree-on-id build. (TEST_SCHEMA already + // has Person + Company + Knows + WorksAt — pick a name that isn't + // already declared.) + let extended_schema = format!("{}\nnode Project {{ name: String @key }}\n", helpers::TEST_SCHEMA); + + { + let _failpoint = ScopedFailPoint::new( + "ensure_indices.post_stage_pre_commit_btree", + "return", + ); + let err = db.apply_schema(&extended_schema).await.unwrap_err(); + assert!( + err.to_string() + .contains("ensure_indices.post_stage_pre_commit_btree"), + "schema apply should fail with the synthetic failpoint error, got: {err}" + ); + } + + // Existing tables stayed at their pre-apply versions; subsequent + // mutations against them succeed (no Lance-HEAD drift). + mutate_main( + &mut db, + helpers::MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .expect("Person mutation must succeed after the failed schema apply — existing tables are not drifted"); +} + fn assert_no_staging_files(repo: &std::path::Path) { for name in [ "_schema.pg.staging", diff --git a/crates/omnigraph/tests/forbidden_apis.rs b/crates/omnigraph/tests/forbidden_apis.rs index d3df340..9cbe20b 100644 --- a/crates/omnigraph/tests/forbidden_apis.rs +++ b/crates/omnigraph/tests/forbidden_apis.rs @@ -42,20 +42,50 @@ use std::path::{Path, PathBuf}; const FORBIDDEN_PATTERNS: &[&str] = &[ + // Builder types — direct construction is the side door around the + // staged-write surface. "MergeInsertBuilder", "InsertBuilder::", "DeleteBuilder", "CommitBuilder::new", ".create_index_builder(", ".create_index_segment_builder(", + // Associated-function forms of inline-commit Lance APIs. These would + // only appear in source if the file imports `lance::Dataset` and + // calls the static fn — exactly the misuse we want to catch. These + // patterns deliberately exclude `.append(` / `.delete(` / `.write(` + // because those would over-match (`.delete_branch(`, `Vec::append`, + // arrow-array `.append(`, etc.). + "Dataset::write", + "Dataset::append", + "Dataset::delete", + "Dataset::merge_insert", + "Dataset::add_columns", + "Dataset::update_columns", + "Dataset::drop_columns", + "Dataset::truncate_table", + "Dataset::restore", + // Lance-specific method names that don't clash with our `TableStore` + // wrappers (we use `merge_insert_batch{,es}`, `add_columns_to_*`, + // etc. — never the bare Lance names). Engine code that writes + // `ds.merge_insert(...)` against a `Dataset` value is reaching + // around the trait surface. + ".merge_insert(", + ".add_columns(", + ".update_columns(", + ".drop_columns(", + ".truncate_table(", ]; /// Files exempt from the guard. These are the legitimate storage-layer -/// implementations that USE the forbidden APIs to provide the staged -/// primitives. +/// or manifest-layer implementations that USE the forbidden APIs to +/// provide the staged primitives or to maintain the system tables +/// (commit graph, manifest). const ALLOW_LIST_FILES: &[&str] = &[ - "table_store.rs", // The storage layer itself. - "storage_layer.rs", // The trait module. + "table_store.rs", // The storage layer itself. + "storage_layer.rs", // The trait module. + "commit_graph.rs", // Maintains `_graph_commits.lance` system table. + "graph_coordinator.rs", // Drives the manifest publisher / branch coordinator. ]; /// Directories exempt from the guard. Files under these paths may use diff --git a/docs/invariants.md b/docs/invariants.md index 996b5c0..73056e7 100644 --- a/docs/invariants.md +++ b/docs/invariants.md @@ -105,6 +105,7 @@ These are user-visible commitments. They state what the engine guarantees and wh Specific defaults (timeout values, memory caps, TTL windows) are *configuration*, not invariants — see [docs/constants.md](constants.md) and per-deployment configuration. The invariant is that bounds and contracts exist, not their numerical values. 23. **Atomicity is per-query.** Every `.gq` query is atomic — multi-statement mutations are all-or-nothing via the substrate's atomic-commit primitive. No cross-query `BEGIN`/`COMMIT`; branches and merges fill that role for agent workflows. + *Status: upheld at the writer-trait surface for inserts / updates / scalar-index builds / merge_insert / overwrite after MR-793 PR #70 — the sealed `TableStorage` trait routes those through `stage_*` + `commit_staged`, so a Phase A failure (between writing fragments and committing) leaves no Lance-HEAD drift on touched tables. **Per-table commit_staged → manifest publish window remains** — a failure between commits across multiple touched tables can leave drift on the partially-committed tables. Lance has no multi-dataset atomic commit primitive; closing this requires the recovery-on-open reconciler tracked in MR-847. Additionally, two writer paths still inline-commit pending upstream Lance work: `delete_where` (lance-format/lance#6658) and `create_vector_index` (lance-format/lance#6666).* 24. **Schema integrity is strict at commit.** Type validation, required-field presence (auto-filled from `@default` if declared), uniqueness across batches and versions, and referential integrity — all enforced before commit succeeds. Per-write softening flags are opt-in, never default. *Status: aspirational — referential integrity at scale requires SIP-backed cross-table validation; not yet implemented. Cross-batch / cross-version uniqueness tracked in MR-714.* @@ -140,6 +141,7 @@ Specific defaults (timeout values, memory caps, TTL windows) are *configuration* These are *how* we realize the invariants today. They are committed conventions — until we explicitly revise them, new code follows them. They are not eternal: a future architecture review may replace any of these with a different mechanism that upholds the same invariants. The deny-list (§IX) protects them in the meantime. 35. **Reconciler pattern for derivable state.** Index coverage, statistics, anything derivable from manifest state — reconciled, not job-queued. *Realizes the "don't maintain state parallel to the substrate" invariant.* See MR-737 §5.16. + *Status: partial after MR-793 PR #70 — scalar index builds (BTree, Inverted) now route through the staged primitives `stage_create_*_index` + `commit_staged` instead of inline `create_*_index`; this is the building block. The reconciler pattern itself (background `IndexReconciler` task driven by manifest commits, removing synchronous index work from the publish path) is tracked in MR-848. Vector indices remain inline-commit until lance-format/lance#6666 ships.* 36. **Polymorphism via Union, not per-feature lowering.** Interfaces / wildcards / alternation on nodes and edges share one IR (`Polymorphism`) and one lowering (Union of per-type concrete plans). *Realizes "shared mechanism for shared shape."* See MR-737 §5.13. *Status: aspirational — node interfaces in MR-579; edge wildcards in MR-744.* @@ -170,6 +172,7 @@ These are *how* we realize the invariants today. They are committed conventions 44. **Tests at every boundary.** `MemStorage` for engine tests; planner-only tests; executor-only tests with a stub storage. No layer tested only via end-to-end. 45. **Reference implementation per trait.** Every trait has a primary impl (Lance for storage) and at least a test impl. + *Status: partial after MR-793 PR #70 — `TableStorage` (the engine-internal staged-write trait, sealed) has its primary impl on `TableStore` (Lance-backed). The trait's signatures use opaque `SnapshotHandle` / `StagedHandle` types so a future test impl (e.g., `MemStorage`) can land without changing call sites. No test impl yet; `tempfile::tempdir()` + Lance is the de-facto test substrate today (see [docs/testing.md](testing.md)).* 46. **Documented capability surface.** New capabilities are documented with what they advertise, who consumes them, how the planner uses them.