diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index ab7df62..d7513a8 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -544,12 +544,15 @@ fn apply_assignments( /// with `Lance HEAD > manifest_version`. The next `mutate_as` against the /// same table will surface `ExpectedVersionMismatch` (Lance HEAD ahead of /// the manifest snapshot). Lance's `restore()` is *not* a rewind — it -/// creates a new commit, monotonically advancing the version. A proper fix -/// requires per-table Lance-internal branches (write to a transient branch, -/// fast-forward main on success, drop branch on failure); tracked as a -/// follow-up to MR-771. In practice this path is narrow: most validation -/// runs before any Lance write, so single-statement mutations are -/// unaffected. See `docs/runs.md`. +/// creates a new commit, monotonically advancing the version. The proper +/// fix uses Lance's distributed-write API (`write_fragments` / +/// `Scanner::with_fragments` / `Operation::Append { fragments }`) — see +/// [MR-794](https://linear.app/modernrelay/issue/MR-794). The staging +/// primitives `TableStore::stage_append` / `stage_merge_insert` / +/// `commit_staged` / `scan_with_staged` are in place; full integration +/// with this struct is the MR-794 follow-up. In practice this path is +/// narrow today: most validation runs before any Lance write, so +/// single-statement mutations are unaffected. See `docs/runs.md`. #[derive(Default)] struct MutationStaging { expected_versions: HashMap, diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index f9d641f..1404d6c 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -4,14 +4,18 @@ use arrow_select::concat::concat_batches; use futures::TryStreamExt; use lance::Dataset; use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner}; -use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams}; +use lance::dataset::transaction::{Operation, Transaction}; +use lance::dataset::{ + CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, + WriteParams, +}; use lance::datatypes::BlobHandling; use lance::index::scalar::IndexDetails; use lance_file::version::LanceFileVersion; use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams}; use lance_index::{DatasetIndexExt, IndexType, is_system_index}; use lance_linalg::distance::MetricType; -use lance_table::format::IndexMetadata; +use lance_table::format::{Fragment, IndexMetadata}; use std::sync::Arc; use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write}; @@ -33,6 +37,24 @@ pub struct DeleteState { pub(crate) version_metadata: TableVersionMetadata, } +/// A Lance write that has produced fragment files on object storage but is +/// not yet committed to the dataset's manifest. Used by `MutationStaging` +/// (`exec/mutation.rs`) and the loader to defer Lance commits to +/// end-of-query, so a mid-query failure leaves the touched table at the +/// pre-mutation HEAD instead of drifting ahead. +/// +/// `transaction` is opaque from our side — Lance owns its semantics. We +/// commit it via `CommitBuilder::execute(transaction)` (see +/// `TableStore::commit_staged`). `new_fragments` is extracted for +/// read-your-writes within the same query: pass it to +/// `Scanner::with_fragments(...)` so subsequent ops in the same mutation +/// see the staged data alongside the committed snapshot. +#[derive(Debug, Clone)] +pub struct StagedWrite { + pub transaction: Transaction, + pub new_fragments: Vec, +} + #[derive(Debug, Clone)] pub struct TableStore { root_uri: String, @@ -500,6 +522,202 @@ impl TableStore { }) } + // ─── Staged-write API (MR-794) ─────────────────────────────────────────── + // + // These primitives wrap Lance's distributed-write API: each call writes + // fragment files to object storage but does NOT advance the dataset's + // HEAD or commit a manifest entry. The returned `Transaction` is held by + // the caller (typically `MutationStaging` or the loader's accumulator) + // and committed at end-of-query via `commit_staged`. On failure the + // fragments remain unreferenced and are reclaimed by `cleanup_old_versions`. + // + // The extracted `Vec` is for read-your-writes within the same + // query: subsequent ops construct a `Scanner` and call + // `scanner.with_fragments(staged.clone())` to see staged data alongside + // the committed snapshot. Lance's filter pushdown, vector search, and + // FTS all respect the supplied fragment list. + + /// Stage an append: write fragment files for `batch`, return the + /// uncommitted Lance transaction plus the new fragments for + /// read-your-writes. + pub async fn stage_append(&self, ds: &Dataset, batch: RecordBatch) -> Result { + if batch.num_rows() == 0 { + return Err(OmniError::manifest_internal( + "stage_append called with empty batch".to_string(), + )); + } + let params = WriteParams { + mode: WriteMode::Append, + allow_external_blob_outside_bases: true, + ..Default::default() + }; + let transaction = InsertBuilder::new(Arc::new(ds.clone())) + .with_params(¶ms) + .execute_uncommitted(vec![batch]) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let new_fragments = match &transaction.operation { + Operation::Append { fragments } => fragments.clone(), + Operation::Overwrite { fragments, .. } => fragments.clone(), + other => { + return Err(OmniError::manifest_internal(format!( + "stage_append: unexpected Lance operation {:?}", + std::mem::discriminant(other) + ))); + } + }; + Ok(StagedWrite { + transaction, + new_fragments, + }) + } + + /// Stage a merge_insert (upsert): write fragment files describing the + /// merge result, return the uncommitted transaction plus the new + /// fragments. The transaction's `Operation::Update` carries the + /// fragments-to-remove and fragments-to-add; for read-your-writes we + /// expose `new_fragments` (rows that will be visible after commit). + pub async fn stage_merge_insert( + &self, + ds: Dataset, + batch: RecordBatch, + key_columns: Vec, + when_matched: WhenMatched, + when_not_matched: WhenNotMatched, + ) -> Result { + if batch.num_rows() == 0 { + return Err(OmniError::manifest_internal( + "stage_merge_insert called with empty batch".to_string(), + )); + } + let ds = Arc::new(ds); + let job = MergeInsertBuilder::try_new(ds, key_columns) + .map_err(|e| OmniError::Lance(e.to_string()))? + .when_matched(when_matched) + .when_not_matched(when_not_matched) + .try_build() + .map_err(|e| OmniError::Lance(e.to_string()))?; + let schema = batch.schema(); + let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema); + let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader)); + let uncommitted = job + .execute_uncommitted(stream) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + // Operation::Update { new_fragments, updated_fragments, .. } — + // `new_fragments` are the freshly inserted rows; `updated_fragments` + // are rewrites of existing fragments that include both retained and + // updated rows. For read-your-writes we surface BOTH so subsequent + // reads see updated rows even before the manifest commits. + let new_fragments = match &uncommitted.transaction.operation { + Operation::Update { + new_fragments, + updated_fragments, + .. + } => { + let mut all = updated_fragments.clone(); + all.extend(new_fragments.iter().cloned()); + all + } + Operation::Append { fragments } => fragments.clone(), + other => { + return Err(OmniError::manifest_internal(format!( + "stage_merge_insert: unexpected Lance operation {:?}", + std::mem::discriminant(other) + ))); + } + }; + Ok(StagedWrite { + transaction: uncommitted.transaction, + new_fragments, + }) + } + + /// Commit a previously-staged transaction onto `ds`, returning the new + /// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by + /// the publisher at end-of-query to materialize all staged writes before + /// the meta-manifest commit. + pub async fn commit_staged( + &self, + ds: Arc, + transaction: Transaction, + ) -> Result { + CommitBuilder::new(ds) + .execute(transaction) + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + + /// Run a scan with optional uncommitted staged fragments visible + /// alongside the committed snapshot. When `staged_fragments` is empty + /// this is identical to `scan(...)`. + pub async fn scan_with_staged( + &self, + ds: &Dataset, + staged_fragments: &[Fragment], + projection: Option<&[&str]>, + filter: Option<&str>, + ) -> Result> { + if staged_fragments.is_empty() { + return self.scan(ds, projection, filter, None).await; + } + let mut scanner = ds.scan(); + if let Some(cols) = projection { + let owned: Vec = cols.iter().map(|s| s.to_string()).collect(); + scanner + .project(&owned) + .map_err(|e| OmniError::Lance(e.to_string()))?; + } + if let Some(f) = filter { + scanner + .filter(f) + .map_err(|e| OmniError::Lance(e.to_string()))?; + } + // Combine committed fragments (from the dataset's manifest at + // current version) with the supplied staged fragments. Lance's + // Scanner::with_fragments scopes the scan to exactly this list — + // pull committed fragments out of the dataset's manifest. + let mut combined: Vec = ds.manifest.fragments.iter().cloned().collect(); + combined.extend(staged_fragments.iter().cloned()); + scanner.with_fragments(combined); + let stream = scanner + .try_into_stream() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + stream + .try_collect() + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + + /// `count_rows` variant that respects staged fragments. Used for + /// edge-cardinality validation that needs to see staged edges before + /// commit. + pub async fn count_rows_with_staged( + &self, + ds: &Dataset, + staged_fragments: &[Fragment], + filter: Option, + ) -> Result { + if staged_fragments.is_empty() { + return self.count_rows(ds, filter).await; + } + let mut scanner = ds.scan(); + if let Some(f) = filter { + scanner + .filter(&f) + .map_err(|e| OmniError::Lance(e.to_string()))?; + } + let mut combined: Vec = ds.manifest.fragments.iter().cloned().collect(); + combined.extend(staged_fragments.iter().cloned()); + scanner.with_fragments(combined); + let count = scanner + .count_rows() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + Ok(count as usize) + } + async fn user_indices_for_column( &self, ds: &Dataset,