MR-794 step 1: add staged-write primitives to TableStore

Lance's distributed-write API splits "write fragment files" from "advance
HEAD": write_fragments returns a Transaction with FragmentMetadata; a
later CommitBuilder::execute(transaction) commits via the manifest CAS.
The same shape exists for merge_insert via
MergeInsertBuilder::execute_uncommitted. Scanner::with_fragments(staged)
lets in-flight reads see uncommitted staged data.

Adds wrappers for these primitives:

- StagedWrite carries the uncommitted Transaction plus the new Fragments
  (extracted for read-your-writes via Scanner::with_fragments).
- TableStore::stage_append wraps InsertBuilder::execute_uncommitted.
- TableStore::stage_merge_insert wraps MergeInsertBuilder::execute_uncommitted.
- TableStore::commit_staged wraps CommitBuilder::execute.
- TableStore::scan_with_staged / count_rows_with_staged thread the staged
  fragments into a Scanner alongside the dataset's committed fragments.

The MutationStaging integration that uses these primitives is the next
step in MR-794 — it requires a coordinated rewrite of execute_insert /
execute_update / execute_delete plus the load_jsonl_reader path, plus
end-of-query commit logic. Doc comment on MutationStaging is updated to
reference MR-794 and these primitives so the followup is well-anchored.

The current MR-771 limitation in docs/runs.md ("mid-query partial failure
leaves Lance HEAD ahead of __manifest") still applies until the
follow-up lands; the primitives are the building blocks but not yet the
fix.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-04-30 11:43:56 +02:00
parent b73813e525
commit 3601002440
No known key found for this signature in database
2 changed files with 229 additions and 8 deletions

View file

@ -544,12 +544,15 @@ fn apply_assignments(
/// with `Lance HEAD > manifest_version`. The next `mutate_as` against the
/// same table will surface `ExpectedVersionMismatch` (Lance HEAD ahead of
/// the manifest snapshot). Lance's `restore()` is *not* a rewind — it
/// creates a new commit, monotonically advancing the version. A proper fix
/// requires per-table Lance-internal branches (write to a transient branch,
/// fast-forward main on success, drop branch on failure); tracked as a
/// follow-up to MR-771. In practice this path is narrow: most validation
/// runs before any Lance write, so single-statement mutations are
/// unaffected. See `docs/runs.md`.
/// creates a new commit, monotonically advancing the version. The proper
/// fix uses Lance's distributed-write API (`write_fragments` /
/// `Scanner::with_fragments` / `Operation::Append { fragments }`) — see
/// [MR-794](https://linear.app/modernrelay/issue/MR-794). The staging
/// primitives `TableStore::stage_append` / `stage_merge_insert` /
/// `commit_staged` / `scan_with_staged` are in place; full integration
/// with this struct is the MR-794 follow-up. In practice this path is
/// narrow today: most validation runs before any Lance write, so
/// single-statement mutations are unaffected. See `docs/runs.md`.
#[derive(Default)]
struct MutationStaging {
expected_versions: HashMap<String, u64>,

View file

@ -4,14 +4,18 @@ use arrow_select::concat::concat_batches;
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams};
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::{
CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
WriteParams,
};
use lance::datatypes::BlobHandling;
use lance::index::scalar::IndexDetails;
use lance_file::version::LanceFileVersion;
use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
use lance_index::{DatasetIndexExt, IndexType, is_system_index};
use lance_linalg::distance::MetricType;
use lance_table::format::IndexMetadata;
use lance_table::format::{Fragment, IndexMetadata};
use std::sync::Arc;
use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
@ -33,6 +37,24 @@ pub struct DeleteState {
pub(crate) version_metadata: TableVersionMetadata,
}
/// A Lance write that has produced fragment files on object storage but is
/// not yet committed to the dataset's manifest. Used by `MutationStaging`
/// (`exec/mutation.rs`) and the loader to defer Lance commits to
/// end-of-query, so a mid-query failure leaves the touched table at the
/// pre-mutation HEAD instead of drifting ahead.
///
/// `transaction` is opaque from our side — Lance owns its semantics. We
/// commit it via `CommitBuilder::execute(transaction)` (see
/// `TableStore::commit_staged`). `new_fragments` is extracted for
/// read-your-writes within the same query: pass it to
/// `Scanner::with_fragments(...)` so subsequent ops in the same mutation
/// see the staged data alongside the committed snapshot.
#[derive(Debug, Clone)]
pub struct StagedWrite {
pub transaction: Transaction,
pub new_fragments: Vec<Fragment>,
}
#[derive(Debug, Clone)]
pub struct TableStore {
root_uri: String,
@ -500,6 +522,202 @@ impl TableStore {
})
}
// ─── Staged-write API (MR-794) ───────────────────────────────────────────
//
// These primitives wrap Lance's distributed-write API: each call writes
// fragment files to object storage but does NOT advance the dataset's
// HEAD or commit a manifest entry. The returned `Transaction` is held by
// the caller (typically `MutationStaging` or the loader's accumulator)
// and committed at end-of-query via `commit_staged`. On failure the
// fragments remain unreferenced and are reclaimed by `cleanup_old_versions`.
//
// The extracted `Vec<Fragment>` is for read-your-writes within the same
// query: subsequent ops construct a `Scanner` and call
// `scanner.with_fragments(staged.clone())` to see staged data alongside
// the committed snapshot. Lance's filter pushdown, vector search, and
// FTS all respect the supplied fragment list.
/// Stage an append: write fragment files for `batch`, return the
/// uncommitted Lance transaction plus the new fragments for
/// read-your-writes.
pub async fn stage_append(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
if batch.num_rows() == 0 {
return Err(OmniError::manifest_internal(
"stage_append called with empty batch".to_string(),
));
}
let params = WriteParams {
mode: WriteMode::Append,
allow_external_blob_outside_bases: true,
..Default::default()
};
let transaction = InsertBuilder::new(Arc::new(ds.clone()))
.with_params(&params)
.execute_uncommitted(vec![batch])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let new_fragments = match &transaction.operation {
Operation::Append { fragments } => fragments.clone(),
Operation::Overwrite { fragments, .. } => fragments.clone(),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_append: unexpected Lance operation {:?}",
std::mem::discriminant(other)
)));
}
};
Ok(StagedWrite {
transaction,
new_fragments,
})
}
/// Stage a merge_insert (upsert): write fragment files describing the
/// merge result, return the uncommitted transaction plus the new
/// fragments. The transaction's `Operation::Update` carries the
/// fragments-to-remove and fragments-to-add; for read-your-writes we
/// expose `new_fragments` (rows that will be visible after commit).
pub async fn stage_merge_insert(
&self,
ds: Dataset,
batch: RecordBatch,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<StagedWrite> {
if batch.num_rows() == 0 {
return Err(OmniError::manifest_internal(
"stage_merge_insert called with empty batch".to_string(),
));
}
let ds = Arc::new(ds);
let job = MergeInsertBuilder::try_new(ds, key_columns)
.map_err(|e| OmniError::Lance(e.to_string()))?
.when_matched(when_matched)
.when_not_matched(when_not_matched)
.try_build()
.map_err(|e| OmniError::Lance(e.to_string()))?;
let schema = batch.schema();
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
let uncommitted = job
.execute_uncommitted(stream)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Operation::Update { new_fragments, updated_fragments, .. } —
// `new_fragments` are the freshly inserted rows; `updated_fragments`
// are rewrites of existing fragments that include both retained and
// updated rows. For read-your-writes we surface BOTH so subsequent
// reads see updated rows even before the manifest commits.
let new_fragments = match &uncommitted.transaction.operation {
Operation::Update {
new_fragments,
updated_fragments,
..
} => {
let mut all = updated_fragments.clone();
all.extend(new_fragments.iter().cloned());
all
}
Operation::Append { fragments } => fragments.clone(),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_merge_insert: unexpected Lance operation {:?}",
std::mem::discriminant(other)
)));
}
};
Ok(StagedWrite {
transaction: uncommitted.transaction,
new_fragments,
})
}
/// Commit a previously-staged transaction onto `ds`, returning the new
/// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
/// the publisher at end-of-query to materialize all staged writes before
/// the meta-manifest commit.
pub async fn commit_staged(
&self,
ds: Arc<Dataset>,
transaction: Transaction,
) -> Result<Dataset> {
CommitBuilder::new(ds)
.execute(transaction)
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Run a scan with optional uncommitted staged fragments visible
/// alongside the committed snapshot. When `staged_fragments` is empty
/// this is identical to `scan(...)`.
pub async fn scan_with_staged(
&self,
ds: &Dataset,
staged_fragments: &[Fragment],
projection: Option<&[&str]>,
filter: Option<&str>,
) -> Result<Vec<RecordBatch>> {
if staged_fragments.is_empty() {
return self.scan(ds, projection, filter, None).await;
}
let mut scanner = ds.scan();
if let Some(cols) = projection {
let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
scanner
.project(&owned)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
if let Some(f) = filter {
scanner
.filter(f)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
// Combine committed fragments (from the dataset's manifest at
// current version) with the supplied staged fragments. Lance's
// Scanner::with_fragments scopes the scan to exactly this list —
// pull committed fragments out of the dataset's manifest.
let mut combined: Vec<Fragment> = ds.manifest.fragments.iter().cloned().collect();
combined.extend(staged_fragments.iter().cloned());
scanner.with_fragments(combined);
let stream = scanner
.try_into_stream()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
stream
.try_collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// `count_rows` variant that respects staged fragments. Used for
/// edge-cardinality validation that needs to see staged edges before
/// commit.
pub async fn count_rows_with_staged(
&self,
ds: &Dataset,
staged_fragments: &[Fragment],
filter: Option<String>,
) -> Result<usize> {
if staged_fragments.is_empty() {
return self.count_rows(ds, filter).await;
}
let mut scanner = ds.scan();
if let Some(f) = filter {
scanner
.filter(&f)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
let mut combined: Vec<Fragment> = ds.manifest.fragments.iter().cloned().collect();
combined.extend(staged_fragments.iter().cloned());
scanner.with_fragments(combined);
let count = scanner
.count_rows()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(count as usize)
}
async fn user_indices_for_column(
&self,
ds: &Dataset,