MR-794 step 2: address PR #68 review — merge semantics, cardinality, residual

Five fixes from PR #68 review (Cursor Bugbot + Codex + Cubic):

* **scan_with_pending gains merge-shadow semantics** (Codex P1, Cubic P1#1):
  new `key_column: Option<&str>` parameter. When set, committed rows
  whose key value appears in any pending batch are excluded from the
  scan — making `scan_with_pending` correctly merge-semantic for chained
  updates instead of naively unioning. execute_update calls with
  Some("id"). Without this, a chained `update where age > 30` could
  match a row whose pending value already moved out of range.

* **Multi-delete on same table no longer trips ExpectedVersionMismatch**
  (Cursor Bugbot HIGH): open_table_for_mutation routes through
  reopen_for_mutation when staging.inline_committed has the table,
  using the post-inline-commit Lance version captured at record_inline
  time. The legacy open_for_mutation_on_branch fence (Lance HEAD ==
  manifest pinned) is correct cross-writer but wrong intra-query when
  deletes have already advanced HEAD on this table. Branch goes away
  when Lance ships two-phase delete (lance-format/lance#6658).

* **Cardinality validation consolidated** (Cursor LOW + Codex P2 +
  Cubic P1#2 + Cubic P2): new exec/staging::count_src_per_edge +
  enforce_cardinality_bounds shared by mutation and loader paths.
  Restores the missing min-cardinality check on the engine path.
  Loader Merge mode passes Some("id") to dedupe edges being updated
  by id (not double-count committed + pending). Loader Append mode
  and engine path pass None (ULID-generated ids never collide).

* **Dead count_rows_with_pending removed** (Cursor LOW): never called.

* **Misleading concat-helper comment fixed** (Cubic P3): claimed
  schema normalization the helper doesn't implement. Updated to match
  reality.

* **Documentation honesty** (Cubic P1#3): MR-794 narrows but doesn't
  eliminate the "Lance HEAD ahead of __manifest" drift class. Drift is
  unreachable for op-execution failures (the partial_failure test pins
  this), but a residual remains at the finalize→publisher boundary
  because Lance has no multi-dataset commit primitive: per-table
  commit_staged calls run sequentially before manifest commit. Updated
  docs/runs.md, docs/invariants.md §VI.25, docs/releases/v0.4.1.md to
  scope the claim precisely.

* **Failpoint test pinning the residual**: new
  mutation.post_finalize_pre_publisher failpoint + two tests in
  tests/failpoints.rs that confirm the documented residual behavior.
  Catches future regressions that widen the residual.

Test additions on tests/runs.rs:

* chained_updates_with_overlapping_predicate_respects_intermediate_value
* multi_statement_delete_on_same_node_table
* cascade_delete_node_then_explicit_delete_edge_on_same_table
* mutation_insert_edge_enforces_min_cardinality
* load_merge_mode_dedupes_edge_for_cardinality_count

113/113 engine integration tests pass (runs + end_to_end + consistency
+ staged_writes + validators). Failpoints feature build runs in CI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-01 13:47:55 +02:00
parent a61e82f47a
commit 3223b51cf1
No known key found for this signature in database
9 changed files with 828 additions and 199 deletions

View file

@ -565,19 +565,59 @@ use super::staging::{MutationStaging, PendingMode};
/// first touch. The captured version is the publisher's CAS fence at
/// end-of-query (per-table OCC).
///
/// The dataset is always opened at HEAD on the requested branch. Under
/// the staged-write rewire (MR-794 step 2+), no per-op commit advances
/// HEAD, so subsequent opens within the same query observe the same
/// version. The exception is the inline-commit delete path
/// (`execute_delete_*`), which still advances HEAD per-op — but D₂ at
/// parse time prevents inserts/updates and deletes from coexisting in
/// one query, so this can't conflict with a pending-batch read.
/// On first touch, opens the dataset at HEAD on the requested branch
/// via `open_for_mutation_on_branch`, which compares Lance HEAD against
/// the manifest's pinned version — that fence is the engine's
/// publisher-style OCC catching cross-writer drift before we make any
/// changes.
///
/// On subsequent touches *within the same query*, behavior depends on
/// whether the table has already been inline-committed by a delete op:
///
/// - **Insert / update path (no inline commit between touches).** Lance
/// HEAD has not moved since first touch, so a fresh
/// `open_for_mutation_on_branch` would still match the manifest
/// pinned version. We just go through it again; `ensure_path` is a
/// no-op (idempotent on the captured `expected_version`).
/// - **Delete cascade or multi-delete on the same table.** A prior
/// `delete_where` on this table has already advanced Lance HEAD past
/// the manifest's pinned version (the manifest doesn't move until
/// end-of-query). Going through `open_for_mutation_on_branch` again
/// would trip its `ensure_expected_version` equality check
/// (`actual = pinned + 1` vs `expected = pinned`). Instead we route
/// through `reopen_for_mutation` at the post-inline-commit Lance
/// version captured in `staging.inline_committed[table_key]`, which
/// is the source of truth for "where is Lance HEAD right now on
/// this table within this query."
///
/// The `inline_committed` reopen branch closes the cursor-bot HIGH
/// "multi-delete fails on same table" finding. The branch goes away
/// once Lance exposes a two-phase delete API
/// ([lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658))
/// and we can stage deletes on the same path as inserts/updates.
async fn open_table_for_mutation(
db: &Omnigraph,
staging: &mut MutationStaging,
branch: Option<&str>,
table_key: &str,
) -> Result<(Dataset, String, Option<String>)> {
if let Some(prior) = staging.inline_committed.get(table_key) {
let path = staging.paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"open_table_for_mutation: inline_committed[{}] without paths entry",
table_key
))
})?;
let ds = db
.reopen_for_mutation(
table_key,
&path.full_path,
path.table_branch.as_deref(),
prior.table_version,
)
.await?;
return Ok((ds, path.full_path.clone(), path.table_branch.clone()));
}
let (ds, full_path, table_branch) =
db.open_for_mutation_on_branch(branch, table_key).await?;
let expected_version = ds.version().version;
@ -701,6 +741,16 @@ impl Omnigraph {
let (updates, expected_versions) = staging
.finalize(self, requested.as_deref())
.await?;
// Failpoint that wedges the documented finalize→publisher
// residual: per-table `commit_staged` calls already
// advanced Lance HEAD on every touched table; a failure
// injected here mirrors the production-rare case where
// the publisher's CAS pre-check rejects (or the manifest
// write throws) after staged commits succeeded. Used by
// `tests/failpoints.rs::finalize_publisher_residual_*`
// to pin the documented residual behavior. See
// `docs/runs.md` "Finalize → publisher residual".
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
self.commit_updates_on_branch_with_expected(
requested.as_deref(),
&updates,
@ -947,6 +997,12 @@ impl Omnigraph {
(!blob_props.is_empty()).then_some(non_blob_cols.as_slice());
let pending_batches = staging.pending_batches(&table_key);
let pending_schema = staging.pending_schema(&table_key);
// Use merge semantics on the union: a committed row whose `id`
// also appears in pending has been logically updated by an
// earlier op in this query and is shadowed from the scan,
// otherwise the predicate runs against stale committed values
// and a chained `update where <pred>` can match a row whose
// pending value no longer satisfies <pred>.
let batches = self
.table_store()
.scan_with_pending(
@ -955,6 +1011,7 @@ impl Omnigraph {
pending_schema,
projection,
Some(&pred_sql),
Some("id"),
)
.await?;
@ -965,11 +1022,14 @@ impl Omnigraph {
});
}
// Concat the matched batches into one. The pending side may have
// a slightly different schema (e.g. Lance's `_rowid` column on
// the committed side, missing on the pending side). Normalize by
// dropping any `_rowid` / `_rowaddr` columns and reordering to
// the table's canonical schema.
// Concat the matched batches (committed + pending) into one. The
// helper trusts that both sides share a schema — Lance returns
// dataset-schema-ordered columns and DataFusion returns
// MemTable-schema-ordered columns; both should match the catalog's
// arrow_schema when the projection is consistent. If they
// diverge (typically a blob-table mid-schema-shift), the helper
// surfaces a clear error directing the caller to split the
// mutation.
let matched = concat_match_batches_to_schema(&schema, &blob_props, batches)?;
let affected_count = matched.num_rows();
@ -1218,10 +1278,10 @@ fn concat_match_batches_to_schema(
})
}
/// Validate that adding `pending` edges plus the committed edges does not
/// exceed the per-source cardinality bound on `edge_type`. Reads the `src`
/// column from both committed (Lance) and pending (in-memory) and counts
/// per src.
/// Validate `@card` bounds against committed (Lance) + pending (in-memory)
/// edges for one edge table. Engine path: each insert produces a fresh
/// ULID id, so committed and pending cannot share a primary key — no
/// dedup needed (`dedupe_key_column = None`).
async fn validate_edge_cardinality_with_pending(
db: &Omnigraph,
committed_ds: &Dataset,
@ -1229,66 +1289,18 @@ async fn validate_edge_cardinality_with_pending(
table_key: &str,
edge_type: &omnigraph_compiler::catalog::EdgeType,
) -> Result<()> {
use std::collections::HashMap as StdHashMap;
if edge_type.cardinality.is_default() {
return Ok(());
}
let mut counts: StdHashMap<String, u32> = StdHashMap::new();
// Committed side: scan `src` column (cheap, no filter).
let committed = db
.table_store()
.scan(committed_ds, Some(&["src"]), None, None)
.await?;
for batch in &committed {
let srcs = batch
.column_by_name("src")
.ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
// Pending side: walk in-memory pending batches for `src`.
for batch in staging.pending_batches(table_key) {
let Some(col) = batch.column_by_name("src") else {
continue;
};
let Some(srcs) = col.as_any().downcast_ref::<StringArray>() else {
continue;
};
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
let card = &edge_type.cardinality;
for (src, count) in &counts {
if let Some(max) = card.max {
if *count > max {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (max {})",
edge_type.name, src, count, max
)));
}
}
// Note: per-source minimum cardinality cannot be checked
// mid-query (a bound of `2..` requires both edges to be inserted
// before validation). The publisher path could re-validate at
// commit time; for now, defer to the loader's end-of-query check.
let _ = card.min;
}
Ok(())
let counts = super::staging::count_src_per_edge(
db,
committed_ds,
table_key,
staging,
None,
)
.await?;
super::staging::enforce_cardinality_bounds(edge_type, &counts)
}
fn enrich_mutation_params(params: &ParamMap) -> Result<ParamMap> {

View file

@ -23,6 +23,8 @@ use std::sync::Arc;
use arrow_array::{Array, RecordBatch, StringArray, UInt32Array};
use arrow_schema::SchemaRef;
use lance::Dataset;
use omnigraph_compiler::catalog::EdgeType;
use crate::db::SubTableUpdate;
use crate::error::{OmniError, Result};
@ -389,3 +391,148 @@ fn dedupe_merge_batches_by_id(
arrow_select::concat::concat_batches(schema, &sliced)
.map_err(|e| OmniError::Lance(e.to_string()))
}
// ─── Cardinality helpers (shared by mutation + loader paths) ────────────────
/// Count edges per `src` value across committed (Lance scan) + pending
/// (in-memory). Caller supplies an opened committed dataset so the
/// mutation path (which already has one) and the loader path (which
/// opens via snapshot) share the same body.
///
/// `dedupe_key_column` controls whether committed rows are shadowed by
/// pending:
/// - `None` — every committed row counts, every pending row counts.
/// Correct when committed and pending cannot share a primary key
/// (engine inserts always use fresh ULID edge ids; loader Append
/// mode uses fresh ids too).
/// - `Some(col)` — committed rows whose `col` value also appears in any
/// pending batch are EXCLUDED from the committed count, so a Merge-mode
/// load that *updates* an existing edge (potentially changing its
/// `src`) counts the post-update row exactly once. Without this,
/// `LoadMode::Merge` double-counts.
pub(crate) async fn count_src_per_edge(
db: &crate::db::Omnigraph,
committed_ds: &Dataset,
table_key: &str,
staging: &MutationStaging,
dedupe_key_column: Option<&str>,
) -> Result<HashMap<String, u32>> {
let mut counts: HashMap<String, u32> = HashMap::new();
let pending_batches = staging.pending_batches(table_key);
// Collect pending key values (for shadow-on-merge dedupe). Only when
// dedupe is requested AND there's anything pending.
let pending_keys: Option<HashSet<String>> = match dedupe_key_column {
Some(col) if !pending_batches.is_empty() => {
let mut set = HashSet::new();
for batch in pending_batches {
if let Some(arr) = batch
.column_by_name(col)
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
{
for i in 0..arr.len() {
if arr.is_valid(i) {
set.insert(arr.value(i).to_string());
}
}
}
}
Some(set)
}
_ => None,
};
// Committed side: scan `src` plus the dedupe key column when set, so
// we can both count and shadow in one pass.
let projection: Vec<&str> = match dedupe_key_column {
Some(col) if pending_keys.as_ref().is_some_and(|s| !s.is_empty()) => vec!["src", col],
_ => vec!["src"],
};
let committed = db
.table_store()
.scan(committed_ds, Some(&projection), None, None)
.await?;
for batch in &committed {
let srcs = batch
.column_by_name("src")
.ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
// Optional shadow-key column (only present when dedupe is on).
let key_arr = match (&pending_keys, dedupe_key_column) {
(Some(set), Some(col)) if !set.is_empty() => batch
.column_by_name(col)
.and_then(|c| c.as_any().downcast_ref::<StringArray>()),
_ => None,
};
for i in 0..srcs.len() {
if !srcs.is_valid(i) {
continue;
}
// Shadow this committed row if its key is in pending.
if let (Some(arr), Some(set)) = (key_arr, pending_keys.as_ref()) {
if arr.is_valid(i) && set.contains(arr.value(i)) {
continue;
}
}
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
// Pending side: walk in-memory batches for `src`. (No dedupe needed —
// `dedupe_merge_batches_by_id` runs at finalize-time so any same-id
// duplicates within pending are already collapsed by the time the
// publisher commits, but cardinality runs before finalize. The
// engine's per-op edge insert produces one row per op with a fresh
// ULID, so within-pending duplicates are not a concern here.)
for batch in pending_batches {
let Some(col) = batch.column_by_name("src") else {
continue;
};
let Some(srcs) = col.as_any().downcast_ref::<StringArray>() else {
continue;
};
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
Ok(counts)
}
/// Apply `@card(min..max)` bounds to a per-source count map.
///
/// Both bounds are checked. The `min` check produces a misleading error
/// during a per-op insert mid-query (a bound of `2..` requires both
/// edges to be inserted before validation passes), but the historical
/// behavior was to enforce min per-op anyway — keeping users from
/// accidentally publishing a graph that violates the schema. Consumers
/// that need end-of-query semantics call this from after all edge ops
/// are accumulated (the loader does, via Phase 3).
pub(crate) fn enforce_cardinality_bounds(
edge_type: &EdgeType,
counts: &HashMap<String, u32>,
) -> Result<()> {
let card = &edge_type.cardinality;
for (src, count) in counts {
if let Some(max) = card.max {
if *count > max {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (max {})",
edge_type.name, src, count, max
)));
}
}
if *count < card.min {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (min {})",
edge_type.name, src, count, card.min
)));
}
}
Ok(())
}

View file

@ -521,6 +521,7 @@ async fn load_jsonl_reader<R: BufRead>(
edge_type,
&table_key,
&staging,
mode,
)
.await?;
} else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
@ -1553,87 +1554,49 @@ pub(crate) async fn validate_edge_cardinality(
/// Validate edge `@card` cardinality with in-memory pending edges visible.
///
/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
/// scans committed edges via Lance and unions counts with the pending
/// edge batches accumulated by the staged loader. Used by Append/Merge
/// loads (the Overwrite path uses `validate_edge_cardinality` which
/// opens the just-written Lance version).
/// opens the committed dataset at the pre-load snapshot version, then
/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
/// path uses `validate_edge_cardinality` which opens the just-written
/// Lance version).
///
/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
/// so committed edges that the load is *updating* (same edge id, possibly
/// changed `src`) are not double-counted (Cubic P1 finding on PR #68).
/// `LoadMode::Append` passes `None` because each line generates a fresh
/// ULID id that never collides with committed.
async fn validate_edge_cardinality_with_pending_loader(
db: &Omnigraph,
branch: Option<&str>,
edge_type: &omnigraph_compiler::catalog::EdgeType,
table_key: &str,
staging: &MutationStaging,
mode: LoadMode,
) -> Result<()> {
if edge_type.cardinality.is_default() {
return Ok(());
}
let mut counts: HashMap<String, u32> = HashMap::new();
// Committed side: open at pre-write version (the snapshot pinned at
// load entry; pending writes haven't committed yet).
let snapshot = db.snapshot_for_branch(branch).await?;
if let Some(entry) = snapshot.entry(table_key) {
let ds = db
.open_dataset_at_state(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
)
let Some(entry) = snapshot.entry(table_key) else {
// No manifest entry — table doesn't exist yet. Pending-only is
// fine; the helper handles empty committed scans.
return Ok(());
};
let ds = db
.open_dataset_at_state(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
)
.await?;
let dedupe_key = match mode {
LoadMode::Merge => Some("id"),
LoadMode::Append | LoadMode::Overwrite => None,
};
let counts =
crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
.await?;
let batches = db
.table_store()
.scan(&ds, Some(&["src"]), None, None)
.await?;
for batch in &batches {
let srcs = batch
.column_by_name("src")
.ok_or_else(|| OmniError::Lance("missing 'src' column".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
}
// Pending side: walk in-memory pending batches for `src`.
for batch in staging.pending_batches(table_key) {
let Some(col) = batch.column_by_name("src") else {
continue;
};
let Some(srcs) = col.as_any().downcast_ref::<StringArray>() else {
continue;
};
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
let card = &edge_type.cardinality;
for (src, count) in &counts {
if let Some(max) = card.max {
if *count > max {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (max {})",
edge_type.name, src, count, max
)));
}
}
if *count < card.min {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (min {})",
edge_type.name, src, count, card.min
)));
}
}
Ok(())
crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
}
/// Collect all valid node IDs for a given type, with in-memory pending

View file

@ -841,6 +841,22 @@ impl TableStore {
/// pending side runs it through a fresh DataFusion `SessionContext`
/// with the batches registered as a `MemTable` named `pending`.
///
/// `key_column` controls how committed and pending are unioned:
/// - **`None` (union semantics)**: every committed row that matches
/// the filter and every pending row that matches the filter is
/// returned. Correct when committed and pending cannot share a
/// primary key — e.g., Append-mode loads with ULID-generated ids,
/// or any read where pending hasn't been used to update committed
/// rows.
/// - **`Some(col)` (merge / shadow semantics)**: committed rows whose
/// `col` value appears in any pending batch are EXCLUDED from the
/// result; only pending's view of those rows is returned. Required
/// for Merge-mode reads (e.g., `execute_update` on the engine path)
/// so a chained `update` doesn't see stale committed values that
/// a prior op already updated in pending. Without this, a predicate
/// like `where age > 30` can match a row that an earlier
/// `set age = 20` already moved out of range.
///
/// When `pending_batches` is empty this delegates to the regular
/// scan path.
pub async fn scan_with_pending(
@ -850,11 +866,30 @@ impl TableStore {
pending_schema: Option<SchemaRef>,
projection: Option<&[&str]>,
filter: Option<&str>,
key_column: Option<&str>,
) -> Result<Vec<RecordBatch>> {
let mut out = self.scan(committed_ds, projection, filter, None).await?;
let committed = self.scan(committed_ds, projection, filter, None).await?;
if pending_batches.is_empty() {
return Ok(out);
return Ok(committed);
}
// Shadow committed rows whose key value also appears in pending.
// This makes scan_with_pending implement merge semantics rather
// than naive union: any row that has a pending update is
// represented ONLY by its pending value, never by both its
// (stale) committed value and its (current) pending value.
let committed = match key_column {
Some(key_col) => {
let pending_keys = collect_string_column_values(pending_batches, key_col)?;
if pending_keys.is_empty() {
committed
} else {
filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
}
}
None => committed,
};
let pending = scan_pending_batches(
pending_batches,
pending_schema,
@ -862,62 +897,12 @@ impl TableStore {
filter,
)
.await?;
let mut out = committed;
out.extend(pending);
Ok(out)
}
/// `count_rows` variant that respects in-memory pending batches via
/// DataFusion `MemTable`. Used by edge-cardinality validation that
/// needs to see staged edges before commit.
///
/// Cheaper than `scan_with_pending` for the count case because we
/// don't materialize columns on the pending side.
pub async fn count_rows_with_pending(
&self,
committed_ds: &Dataset,
pending_batches: &[RecordBatch],
pending_schema: Option<SchemaRef>,
filter: Option<String>,
) -> Result<usize> {
let committed = self.count_rows(committed_ds, filter.clone()).await?;
if pending_batches.is_empty() {
return Ok(committed);
}
// Count via DataFusion: COUNT(*) from pending [WHERE filter].
let schema =
pending_schema.unwrap_or_else(|| pending_batches[0].schema());
let ctx = datafusion::execution::context::SessionContext::new();
let mem = datafusion::datasource::MemTable::try_new(
schema,
vec![pending_batches.to_vec()],
)
.map_err(|e| OmniError::Lance(e.to_string()))?;
ctx.register_table("pending", Arc::new(mem))
.map_err(|e| OmniError::Lance(e.to_string()))?;
let where_clause = filter
.map(|f| format!("WHERE {f}"))
.unwrap_or_default();
let sql = format!("SELECT COUNT(*) AS c FROM pending {where_clause}");
let df = ctx
.sql(&sql)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let batches = df
.collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let pending_count = batches
.into_iter()
.filter(|b| b.num_rows() > 0)
.map(|b| {
use arrow_array::cast::AsArray;
let arr = b.column(0).as_primitive::<arrow_array::types::Int64Type>();
arr.value(0) as usize
})
.sum::<usize>();
Ok(committed + pending_count)
}
/// `count_rows` variant that respects staged writes. Used for
/// edge-cardinality validation that needs to see staged edges before
/// commit. Same `committed - removed + new` composition as
@ -1171,6 +1156,81 @@ fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<(
///
/// `pending_batches` must be non-empty (the caller short-circuits on
/// empty).
/// Collect the set of values in a Utf8 column across multiple batches.
/// Used by `scan_with_pending`'s merge-semantic path to identify
/// committed rows that are shadowed by pending writes. NULL values are
/// skipped.
fn collect_string_column_values(
batches: &[RecordBatch],
column: &str,
) -> Result<std::collections::HashSet<String>> {
use arrow_array::{Array, StringArray};
let mut out = std::collections::HashSet::new();
for batch in batches {
let Some(col) = batch.column_by_name(column) else {
return Err(OmniError::Lance(format!(
"scan_with_pending: pending batch missing key column '{}'",
column
)));
};
let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
OmniError::Lance(format!(
"scan_with_pending: key column '{}' is not Utf8",
column
))
})?;
for i in 0..arr.len() {
if arr.is_valid(i) {
out.insert(arr.value(i).to_string());
}
}
}
Ok(out)
}
/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
/// rows that pending has already updated. Returns the surviving rows.
fn filter_out_rows_where_string_in(
batches: Vec<RecordBatch>,
column: &str,
excluded: &std::collections::HashSet<String>,
) -> Result<Vec<RecordBatch>> {
use arrow_array::{Array, BooleanArray, StringArray};
let mut out = Vec::with_capacity(batches.len());
for batch in batches {
if batch.num_rows() == 0 {
out.push(batch);
continue;
}
let Some(col) = batch.column_by_name(column) else {
// The committed scan didn't project this column. We cannot
// shadow without it; pass the batch through unchanged.
out.push(batch);
continue;
};
let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
OmniError::Lance(format!(
"scan_with_pending: committed column '{}' is not Utf8",
column
))
})?;
let mask: BooleanArray = (0..arr.len())
.map(|i| {
if arr.is_valid(i) {
Some(!excluded.contains(arr.value(i)))
} else {
Some(true)
}
})
.collect();
let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
.map_err(|e| OmniError::Lance(e.to_string()))?;
out.push(filtered);
}
Ok(out)
}
async fn scan_pending_batches(
pending_batches: &[RecordBatch],
pending_schema: Option<SchemaRef>,

View file

@ -140,6 +140,130 @@ async fn schema_apply_recovers_partial_rename() {
assert_no_staging_files(dir.path());
}
/// Pin the documented "finalize → publisher residual" from MR-794.
///
/// `MutationStaging::finalize` runs `commit_staged` per touched table
/// sequentially before the publisher commits the manifest. Lance has no
/// multi-dataset atomic commit primitive, so a failure between the
/// per-table staged commits and the manifest commit leaves Lance HEAD
/// advanced on the touched tables with no manifest update — and the
/// next mutation surfaces `ExpectedVersionMismatch` on those tables.
///
/// This isn't a code bug we can fix without an upstream Lance change;
/// it's the documented residual (see `docs/runs.md` "Finalize →
/// publisher residual"). The test pins the behavior so future code
/// changes catch any silent regression: if someone widens the residual
/// (e.g. failing earlier in finalize without rolling back), this test
/// will surface a different error than `ExpectedVersionMismatch`. If
/// someone narrows the residual (e.g. lance ships multi-dataset commit
/// and we plumb it), this test will start passing the next mutation
/// — and someone has to update the assertion + the docs.
#[tokio::test]
async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recovers() {
use omnigraph::error::{ManifestConflictDetails, OmniError};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
// First mutation: finalize succeeds (commit_staged advances Lance
// HEAD on node:Person), then the failpoint kicks before the
// publisher's manifest commit. The caller sees the synthetic
// error.
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: mutation.post_finalize_pre_publisher"
),
"unexpected error: {err}"
);
}
// Failpoint dropped — subsequent calls are not synthetic-failed.
// Next mutation against the same table surfaces the documented
// residual: Lance HEAD on node:Person advanced (commit_staged ran),
// manifest didn't, so the publisher CAS at next-mutation time
// surfaces ExpectedVersionMismatch.
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.unwrap_err();
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
let Some(ManifestConflictDetails::ExpectedVersionMismatch {
ref table_key,
expected,
actual,
}) = manifest_err.details
else {
panic!(
"expected ExpectedVersionMismatch (the documented residual), got {:?}",
manifest_err.details
);
};
assert_eq!(
table_key, "node:Person",
"drift should be on the table the failed finalize touched"
);
assert!(
actual > expected,
"Lance HEAD on the drifted table should be ahead of manifest pinned: actual={actual} expected={expected}",
);
}
/// Companion to the above — confirms that a finalize→publisher failure
/// on one table leaves OTHER tables untouched. Subsequent writes to
/// non-drifted tables proceed normally; the drift is contained.
#[tokio::test]
async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let _ = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect_err("synthetic failpoint must fire");
}
// node:Person drifted. node:Company didn't — try a Company write.
use omnigraph::loader::{LoadMode, load_jsonl};
load_jsonl(
&mut db,
r#"{"type": "Company", "data": {"name": "Acme"}}"#,
LoadMode::Append,
)
.await
.expect("Company write on a non-drifted table should succeed");
}
fn assert_no_staging_files(repo: &std::path::Path) {
for name in [
"_schema.pg.staging",

View file

@ -507,6 +507,19 @@ query mixed_insert_and_delete($name: String, $age: I32, $victim: String) {
insert Person { name: $name, age: $age }
delete Person where name = $victim
}
query update_then_filter_by_old_value(
$first_name: String, $first_new_age: I32,
$second_threshold: I32, $second_new_age: I32
) {
update Person set { age: $first_new_age } where name = $first_name
update Person set { age: $second_new_age } where age > $second_threshold
}
query delete_two_persons($first: String, $second: String) {
delete Person where name = $first
delete Person where name = $second
}
"#;
/// D₂: a query mixing inserts/updates with deletes is rejected at parse
@ -838,3 +851,272 @@ edge WorksAt: Person -> Company @card(0..1)
"follow-up load must succeed (no drift on edge table)",
);
}
// ─── PR #68 review-comment fixes — pinned coverage ──────────────────────────
/// Codex P1 / Cubic P1 #1: chained `update` ops in one query must respect
/// each previous op's view of the rows. Without merge-shadow semantics on
/// `scan_with_pending`, the second update sees the stale committed value
/// (the first update's row still appears in the Lance scan because the
/// pending side hasn't committed), the predicate matches it, and the
/// dedupe-last-wins step at finalize ends up applying the second update
/// to a row whose pending value should have shielded it.
///
/// Concretely: Alice starts at age=30 in TEST_DATA. Op-1 sets Alice to
/// age=99. Op-2 updates anyone with age > 50 to age=10. After op-1,
/// Alice's logical value is age=99 — within op-2's predicate. So op-2
/// SHOULD update Alice to age=10. The interesting case is: op-2 must
/// see Alice at age=99 (op-1's pending value), not age=30 (committed).
/// If the helper unioned without shadowing, op-2 would also match the
/// stale committed Alice (age=30 doesn't trigger the predicate, but the
/// row would appear twice and dedupe could pick either). The test
/// asserts both ends: Alice ends at age=10, the publisher publishes
/// once.
#[tokio::test]
async fn chained_updates_with_overlapping_predicate_respects_intermediate_value() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_version = version_main(&db).await.unwrap();
db.mutate(
"main",
STAGED_QUERIES,
"update_then_filter_by_old_value",
&mixed_params(
&[("$first_name", "Alice")],
&[
("$first_new_age", 99),
("$second_threshold", 50),
("$second_new_age", 10),
],
),
)
.await
.unwrap();
// After op-1: Alice = 99. After op-2 (where age > 50): Alice
// matches (99 > 50) → set to 10. End state: Alice = 10.
let batches = read_table(&db, "node:Person").await;
let mut alice_age: Option<i32> = None;
for batch in &batches {
let names = batch
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
let ages = batch
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap();
for i in 0..batch.num_rows() {
if names.is_valid(i) && names.value(i) == "Alice" && ages.is_valid(i) {
alice_age = Some(ages.value(i));
}
}
}
assert_eq!(
alice_age,
Some(10),
"chained-update final value must reflect the second update applied to op-1's pending value"
);
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"chained update must publish exactly once",
);
}
/// Cursor Bugbot HIGH: two `delete` ops on the same node table in one
/// query. Pre-fix, op-2's `open_table_for_mutation` went through
/// `open_for_mutation_on_branch` which trips `ensure_expected_version`
/// (Lance HEAD has advanced past the manifest's pinned version after
/// op-1's inline-commit, but the manifest hasn't moved). Post-fix,
/// `open_table_for_mutation` reopens via `inline_committed[table_key]`
/// at the post-delete Lance version. Test asserts both deletes succeed
/// in one query, both rows are gone, manifest version advances by 1.
#[tokio::test]
async fn multi_statement_delete_on_same_node_table() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_persons = count_rows(&db, "node:Person").await;
let pre_version = version_main(&db).await.unwrap();
db.mutate(
"main",
STAGED_QUERIES,
"delete_two_persons",
&params(&[("$first", "Alice"), ("$second", "Bob")]),
)
.await
.expect("multi-delete on same table must succeed");
assert_eq!(
count_rows(&db, "node:Person").await,
pre_persons - 2,
"both deletes must land",
);
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"multi-delete query publishes exactly once at end",
);
// Both rows actually gone:
for name in ["Alice", "Bob"] {
let qr = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
&params(&[("$name", name)]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 0, "{name} should be deleted");
}
}
/// Cursor Bugbot HIGH (cascade variant): deleting a node cascades to its
/// edges, advancing Lance HEAD on the edge table. A subsequent
/// `delete <Edge>` op in the same query must reopen at the
/// post-cascade-commit version of the edge table — not trip
/// `ensure_expected_version` against the manifest's pinned version.
#[tokio::test]
async fn cascade_delete_node_then_explicit_delete_edge_on_same_table() {
const QUERY: &str = r#"
query cascade_then_explicit($name: String, $other: String) {
delete Person where name = $name
delete Knows where from = $other
}
"#;
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_knows = count_rows(&db, "edge:Knows").await;
db.mutate(
"main",
QUERY,
"cascade_then_explicit",
&params(&[("$name", "Alice"), ("$other", "Bob")]),
)
.await
.expect("cascade-then-explicit-delete on same edge table must succeed");
let post_knows = count_rows(&db, "edge:Knows").await;
assert!(
post_knows < pre_knows,
"cascade + explicit delete should remove edges; pre={pre_knows} post={post_knows}",
);
}
/// Codex P2 / Cursor Bugbot LOW / Cubic P2: the engine cardinality path
/// must enforce `min` bounds. Pre-fix the engine path silently dropped
/// the min check (a `let _ = card.min;` line). The loader path always
/// enforced both. Post-fix, both paths route through
/// `enforce_cardinality_bounds` which checks both bounds.
///
/// Build a custom schema with `Knows: Person -> Person @card(2..*)`.
/// Inserting a single Knows edge violates min=2. The mutation path must
/// reject.
#[tokio::test]
async fn mutation_insert_edge_enforces_min_cardinality() {
use omnigraph::loader::{LoadMode, load_jsonl};
const MIN_CARD_SCHEMA: &str = r#"
node Person {
name: String @key
}
edge Knows: Person -> Person @card(2..)
"#;
const MIN_CARD_QUERY: &str = r#"
query add_friend($from: String, $to: String) {
insert Knows { from: $from, to: $to }
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, MIN_CARD_SCHEMA).await.unwrap();
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Person", "data": {"name": "Bob"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap();
// Single insert: count=1 < min=2 → reject with clear message.
let err = db
.mutate(
"main",
MIN_CARD_QUERY,
"add_friend",
&params(&[("$from", "Alice"), ("$to", "Bob")]),
)
.await
.expect_err("min cardinality must reject the engine path");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("@card violation")
&& manifest_err.message.contains("min 2"),
"unexpected error: {}",
manifest_err.message,
);
}
/// Cubic P1 #2: `LoadMode::Merge` on edges must NOT double-count the
/// committed edge AND its updated pending replacement. Build a custom
/// schema where WorksAt has @card(0..1). Seed Alice with one WorksAt to
/// Acme. Then Merge-load the SAME edge id (so it's an update, not an
/// insert) pointing Alice's WorksAt at Bigco. Cardinality must count
/// Alice's edges as 1 (the post-merge count), not 2 (committed + pending).
#[tokio::test]
async fn load_merge_mode_dedupes_edge_for_cardinality_count() {
use omnigraph::loader::{LoadMode, load_jsonl};
const CARD_SCHEMA: &str = r#"
node Person {
name: String @key
}
node Company {
name: String @key
}
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
// Seed: Alice + Acme + Bigco + WorksAt(id=w1, Alice→Acme). Note the
// loader reads edge ids from the `data.id` field (not top-level), so
// we place the id inside `data` for both the seed and the update.
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Bigco"}}
{"edge": "WorksAt", "from": "Alice", "to": "Acme", "data": {"id": "w1"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap();
// Merge-update the same edge id w1 to point at Bigco. Counted naively
// as union, Alice has 2 WorksAt (committed Acme + pending Bigco) which
// would trip @card(0..1). With merge dedupe, Alice has 1 WorksAt.
let merge_data = r#"{"edge": "WorksAt", "from": "Alice", "to": "Bigco", "data": {"id": "w1"}}
"#;
load_jsonl(&mut db, merge_data, LoadMode::Merge)
.await
.expect("Merge update must dedupe the committed edge by id");
// Confirm there's exactly 1 WorksAt edge after merge.
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
}

View file

@ -110,7 +110,7 @@ Specific defaults (timeout values, memory caps, TTL windows) are *configuration*
*Status: aspirational — referential integrity at scale requires SIP-backed cross-table validation; not yet implemented. Cross-batch / cross-version uniqueness tracked in MR-714.*
25. **Isolation: per-query snapshot; read-your-writes within and across queries in a session.** Each query reads from one consistent manifest version. Within a multi-statement mutation, the read subplan inside each write operator sees the writes from earlier statements. Across queries in a session, reads always resolve the latest manifest version — no reader pinning to older snapshots.
*Status: upheld for inserts/updates after MR-794 step 2+ — `MutationStaging`'s in-memory accumulator + `TableStore::scan_with_pending` (DataFusion `MemTable` union with the committed Lance scan) implements read-your-writes within a multi-statement mutation. Delete-touching mutations are limited to delete-only by parse-time D₂; closing the within-query RYW gap for deletes requires Lance's two-phase delete API (tracked: MR-793 / Lance-upstream).*
*Status: upheld for inserts/updates after MR-794 step 2+ — `MutationStaging`'s in-memory accumulator + `TableStore::scan_with_pending` (DataFusion `MemTable` union with the committed Lance scan, with merge-shadow semantics for chained updates) implements read-your-writes within a multi-statement mutation. Delete-touching mutations are limited to delete-only by parse-time D₂; closing the within-query RYW gap for deletes requires Lance's two-phase delete API (tracked: MR-793 / Lance-upstream lance-format/lance#6658). The "Lance HEAD ahead of `__manifest`" drift class is unreachable for op-execution failures (the partial-failure test pins this), but a narrowed residual remains at the finalize→publisher boundary because Lance has no multi-dataset commit primitive — see [docs/runs.md](runs.md) "Finalize → publisher residual".*
26. **Durability before acknowledgement.** Commit returns only after the substrate has confirmed durable persistence. No "fast" or "fire-and-forget" durability levels.

View file

@ -14,8 +14,13 @@ mutation proceeds normally.
`MutationStaging.pending` per touched table. No Lance HEAD advance
happens during op execution; one `stage_*` + `commit_staged` per
table runs at end-of-query, then `ManifestBatchPublisher::publish`
commits the manifest atomically. A mid-query failure leaves Lance
HEAD untouched on staged tables.
commits the manifest atomically. **For op-execution failures**
(validation errors, missing endpoints, parse-time D₂ rejection), Lance
HEAD on every staged table is untouched and the next mutation
proceeds normally. A narrowed residual remains at the
finalize→publisher boundary (multi-table `commit_staged` is not
atomic with the manifest commit) — see [docs/runs.md](../runs.md)
"Finalize → publisher residual" for details.
- **D₂ parse-time rule**: a single mutation query is either
insert/update-only or delete-only. Mixed → rejected with a clear
error directing the caller to split into two queries. Lance 4.0.0

View file

@ -75,6 +75,42 @@ will replace it. Operator-driven (rare in agent workloads); document
permanently until Lance exposes `Operation::Overwrite { fragments }` as
a two-phase op.
### Finalize → publisher residual
The staged-write rewire eliminates one drift class **by construction at
the writer layer**: an op that fails before pushing to the in-memory
accumulator (validation errors, missing endpoints, parse-time D₂
rejection) leaves Lance HEAD untouched on every staged table. This is
the case the `partial_failure_leaves_target_queryable_and_unblocks_next_mutation`
test pins.
A second, narrower drift class remains. `MutationStaging::finalize`
runs `stage_*` + `commit_staged` per touched table sequentially, then
the publisher commits the manifest. Lance has no multi-dataset atomic
commit, so the per-table `commit_staged` calls are independent
operations: if commit_staged on table N+1 fails *after* commit_staged
on tables 1..N succeeded, or if the publisher's CAS pre-check rejects
*after* every commit_staged succeeded, tables 1..N are left at
`Lance HEAD = manifest_pinned + 1`. The next mutation against those
tables surfaces `ManifestConflictDetails::ExpectedVersionMismatch`
the same loud failure mode the rewire was designed to make rare, just
no longer "unreachable."
Triggers: transient Lance write errors during finalize (object-store
retry budget exhaustion, disk full); persistent publisher contention
exceeding `PUBLISHER_RETRY_BUDGET = 5` retries. Closing this requires
either a Lance multi-dataset atomic-commit primitive (filed upstream
alongside the two-phase delete request) or a manifest-layer journal
that replays staged commits on next open. Both are heavyweight; the
v1 stance is "narrowed window, documented residual, surface the loud
error when it fires."
The publisher-CAS contract is unchanged: a *concurrent writer* that
advances any of our touched tables between snapshot capture and
publisher commit produces exactly one winner. The residual above is
about *our* abandoned commits in the failure path, not about
concurrency races.
## Conflict shape
Concurrent writers to the same `(table, branch)` produce exactly one