From 3223b51cf130ad5a97df3af7c71e9cf673caeef4 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 1 May 2026 13:47:55 +0200 Subject: [PATCH] =?UTF-8?q?MR-794=20step=202:=20address=20PR=20#68=20revie?= =?UTF-8?q?w=20=E2=80=94=20merge=20semantics,=20cardinality,=20residual?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five fixes from PR #68 review (Cursor Bugbot + Codex + Cubic): * **scan_with_pending gains merge-shadow semantics** (Codex P1, Cubic P1#1): new `key_column: Option<&str>` parameter. When set, committed rows whose key value appears in any pending batch are excluded from the scan — making `scan_with_pending` correctly merge-semantic for chained updates instead of naively unioning. execute_update calls with Some("id"). Without this, a chained `update where age > 30` could match a row whose pending value already moved out of range. * **Multi-delete on same table no longer trips ExpectedVersionMismatch** (Cursor Bugbot HIGH): open_table_for_mutation routes through reopen_for_mutation when staging.inline_committed has the table, using the post-inline-commit Lance version captured at record_inline time. The legacy open_for_mutation_on_branch fence (Lance HEAD == manifest pinned) is correct cross-writer but wrong intra-query when deletes have already advanced HEAD on this table. Branch goes away when Lance ships two-phase delete (lance-format/lance#6658). * **Cardinality validation consolidated** (Cursor LOW + Codex P2 + Cubic P1#2 + Cubic P2): new exec/staging::count_src_per_edge + enforce_cardinality_bounds shared by mutation and loader paths. Restores the missing min-cardinality check on the engine path. Loader Merge mode passes Some("id") to dedupe edges being updated by id (not double-count committed + pending). Loader Append mode and engine path pass None (ULID-generated ids never collide). * **Dead count_rows_with_pending removed** (Cursor LOW): never called. * **Misleading concat-helper comment fixed** (Cubic P3): claimed schema normalization the helper doesn't implement. Updated to match reality. * **Documentation honesty** (Cubic P1#3): MR-794 narrows but doesn't eliminate the "Lance HEAD ahead of __manifest" drift class. Drift is unreachable for op-execution failures (the partial_failure test pins this), but a residual remains at the finalize→publisher boundary because Lance has no multi-dataset commit primitive: per-table commit_staged calls run sequentially before manifest commit. Updated docs/runs.md, docs/invariants.md §VI.25, docs/releases/v0.4.1.md to scope the claim precisely. * **Failpoint test pinning the residual**: new mutation.post_finalize_pre_publisher failpoint + two tests in tests/failpoints.rs that confirm the documented residual behavior. Catches future regressions that widen the residual. Test additions on tests/runs.rs: * chained_updates_with_overlapping_predicate_respects_intermediate_value * multi_statement_delete_on_same_node_table * cascade_delete_node_then_explicit_delete_edge_on_same_table * mutation_insert_edge_enforces_min_cardinality * load_merge_mode_dedupes_edge_for_cardinality_count 113/113 engine integration tests pass (runs + end_to_end + consistency + staged_writes + validators). Failpoints feature build runs in CI. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/exec/mutation.rs | 158 ++++++++------- crates/omnigraph/src/exec/staging.rs | 147 ++++++++++++++ crates/omnigraph/src/loader/mod.rs | 101 +++------ crates/omnigraph/src/table_store.rs | 168 ++++++++++----- crates/omnigraph/tests/failpoints.rs | 124 +++++++++++ crates/omnigraph/tests/runs.rs | 282 ++++++++++++++++++++++++++ docs/invariants.md | 2 +- docs/releases/v0.4.1.md | 9 +- docs/runs.md | 36 ++++ 9 files changed, 828 insertions(+), 199 deletions(-) diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 7397074..a588c68 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -565,19 +565,59 @@ use super::staging::{MutationStaging, PendingMode}; /// first touch. The captured version is the publisher's CAS fence at /// end-of-query (per-table OCC). /// -/// The dataset is always opened at HEAD on the requested branch. Under -/// the staged-write rewire (MR-794 step 2+), no per-op commit advances -/// HEAD, so subsequent opens within the same query observe the same -/// version. The exception is the inline-commit delete path -/// (`execute_delete_*`), which still advances HEAD per-op — but D₂ at -/// parse time prevents inserts/updates and deletes from coexisting in -/// one query, so this can't conflict with a pending-batch read. +/// On first touch, opens the dataset at HEAD on the requested branch +/// via `open_for_mutation_on_branch`, which compares Lance HEAD against +/// the manifest's pinned version — that fence is the engine's +/// publisher-style OCC catching cross-writer drift before we make any +/// changes. +/// +/// On subsequent touches *within the same query*, behavior depends on +/// whether the table has already been inline-committed by a delete op: +/// +/// - **Insert / update path (no inline commit between touches).** Lance +/// HEAD has not moved since first touch, so a fresh +/// `open_for_mutation_on_branch` would still match the manifest +/// pinned version. We just go through it again; `ensure_path` is a +/// no-op (idempotent on the captured `expected_version`). +/// - **Delete cascade or multi-delete on the same table.** A prior +/// `delete_where` on this table has already advanced Lance HEAD past +/// the manifest's pinned version (the manifest doesn't move until +/// end-of-query). Going through `open_for_mutation_on_branch` again +/// would trip its `ensure_expected_version` equality check +/// (`actual = pinned + 1` vs `expected = pinned`). Instead we route +/// through `reopen_for_mutation` at the post-inline-commit Lance +/// version captured in `staging.inline_committed[table_key]`, which +/// is the source of truth for "where is Lance HEAD right now on +/// this table within this query." +/// +/// The `inline_committed` reopen branch closes the cursor-bot HIGH +/// "multi-delete fails on same table" finding. The branch goes away +/// once Lance exposes a two-phase delete API +/// ([lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658)) +/// and we can stage deletes on the same path as inserts/updates. async fn open_table_for_mutation( db: &Omnigraph, staging: &mut MutationStaging, branch: Option<&str>, table_key: &str, ) -> Result<(Dataset, String, Option)> { + if let Some(prior) = staging.inline_committed.get(table_key) { + let path = staging.paths.get(table_key).ok_or_else(|| { + OmniError::manifest_internal(format!( + "open_table_for_mutation: inline_committed[{}] without paths entry", + table_key + )) + })?; + let ds = db + .reopen_for_mutation( + table_key, + &path.full_path, + path.table_branch.as_deref(), + prior.table_version, + ) + .await?; + return Ok((ds, path.full_path.clone(), path.table_branch.clone())); + } let (ds, full_path, table_branch) = db.open_for_mutation_on_branch(branch, table_key).await?; let expected_version = ds.version().version; @@ -701,6 +741,16 @@ impl Omnigraph { let (updates, expected_versions) = staging .finalize(self, requested.as_deref()) .await?; + // Failpoint that wedges the documented finalize→publisher + // residual: per-table `commit_staged` calls already + // advanced Lance HEAD on every touched table; a failure + // injected here mirrors the production-rare case where + // the publisher's CAS pre-check rejects (or the manifest + // write throws) after staged commits succeeded. Used by + // `tests/failpoints.rs::finalize_publisher_residual_*` + // to pin the documented residual behavior. See + // `docs/runs.md` "Finalize → publisher residual". + crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?; self.commit_updates_on_branch_with_expected( requested.as_deref(), &updates, @@ -947,6 +997,12 @@ impl Omnigraph { (!blob_props.is_empty()).then_some(non_blob_cols.as_slice()); let pending_batches = staging.pending_batches(&table_key); let pending_schema = staging.pending_schema(&table_key); + // Use merge semantics on the union: a committed row whose `id` + // also appears in pending has been logically updated by an + // earlier op in this query and is shadowed from the scan, + // otherwise the predicate runs against stale committed values + // and a chained `update where ` can match a row whose + // pending value no longer satisfies . let batches = self .table_store() .scan_with_pending( @@ -955,6 +1011,7 @@ impl Omnigraph { pending_schema, projection, Some(&pred_sql), + Some("id"), ) .await?; @@ -965,11 +1022,14 @@ impl Omnigraph { }); } - // Concat the matched batches into one. The pending side may have - // a slightly different schema (e.g. Lance's `_rowid` column on - // the committed side, missing on the pending side). Normalize by - // dropping any `_rowid` / `_rowaddr` columns and reordering to - // the table's canonical schema. + // Concat the matched batches (committed + pending) into one. The + // helper trusts that both sides share a schema — Lance returns + // dataset-schema-ordered columns and DataFusion returns + // MemTable-schema-ordered columns; both should match the catalog's + // arrow_schema when the projection is consistent. If they + // diverge (typically a blob-table mid-schema-shift), the helper + // surfaces a clear error directing the caller to split the + // mutation. let matched = concat_match_batches_to_schema(&schema, &blob_props, batches)?; let affected_count = matched.num_rows(); @@ -1218,10 +1278,10 @@ fn concat_match_batches_to_schema( }) } -/// Validate that adding `pending` edges plus the committed edges does not -/// exceed the per-source cardinality bound on `edge_type`. Reads the `src` -/// column from both committed (Lance) and pending (in-memory) and counts -/// per src. +/// Validate `@card` bounds against committed (Lance) + pending (in-memory) +/// edges for one edge table. Engine path: each insert produces a fresh +/// ULID id, so committed and pending cannot share a primary key — no +/// dedup needed (`dedupe_key_column = None`). async fn validate_edge_cardinality_with_pending( db: &Omnigraph, committed_ds: &Dataset, @@ -1229,66 +1289,18 @@ async fn validate_edge_cardinality_with_pending( table_key: &str, edge_type: &omnigraph_compiler::catalog::EdgeType, ) -> Result<()> { - use std::collections::HashMap as StdHashMap; - if edge_type.cardinality.is_default() { return Ok(()); } - - let mut counts: StdHashMap = StdHashMap::new(); - - // Committed side: scan `src` column (cheap, no filter). - let committed = db - .table_store() - .scan(committed_ds, Some(&["src"]), None, None) - .await?; - for batch in &committed { - let srcs = batch - .column_by_name("src") - .ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))? - .as_any() - .downcast_ref::() - .ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?; - for i in 0..srcs.len() { - if srcs.is_valid(i) { - *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; - } - } - } - - // Pending side: walk in-memory pending batches for `src`. - for batch in staging.pending_batches(table_key) { - let Some(col) = batch.column_by_name("src") else { - continue; - }; - let Some(srcs) = col.as_any().downcast_ref::() else { - continue; - }; - for i in 0..srcs.len() { - if srcs.is_valid(i) { - *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; - } - } - } - - let card = &edge_type.cardinality; - for (src, count) in &counts { - if let Some(max) = card.max { - if *count > max { - return Err(OmniError::manifest(format!( - "@card violation on edge {}: source '{}' has {} edges (max {})", - edge_type.name, src, count, max - ))); - } - } - // Note: per-source minimum cardinality cannot be checked - // mid-query (a bound of `2..` requires both edges to be inserted - // before validation). The publisher path could re-validate at - // commit time; for now, defer to the loader's end-of-query check. - let _ = card.min; - } - - Ok(()) + let counts = super::staging::count_src_per_edge( + db, + committed_ds, + table_key, + staging, + None, + ) + .await?; + super::staging::enforce_cardinality_bounds(edge_type, &counts) } fn enrich_mutation_params(params: &ParamMap) -> Result { diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 2b2c193..033cb99 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -23,6 +23,8 @@ use std::sync::Arc; use arrow_array::{Array, RecordBatch, StringArray, UInt32Array}; use arrow_schema::SchemaRef; +use lance::Dataset; +use omnigraph_compiler::catalog::EdgeType; use crate::db::SubTableUpdate; use crate::error::{OmniError, Result}; @@ -389,3 +391,148 @@ fn dedupe_merge_batches_by_id( arrow_select::concat::concat_batches(schema, &sliced) .map_err(|e| OmniError::Lance(e.to_string())) } + +// ─── Cardinality helpers (shared by mutation + loader paths) ──────────────── + +/// Count edges per `src` value across committed (Lance scan) + pending +/// (in-memory). Caller supplies an opened committed dataset so the +/// mutation path (which already has one) and the loader path (which +/// opens via snapshot) share the same body. +/// +/// `dedupe_key_column` controls whether committed rows are shadowed by +/// pending: +/// - `None` — every committed row counts, every pending row counts. +/// Correct when committed and pending cannot share a primary key +/// (engine inserts always use fresh ULID edge ids; loader Append +/// mode uses fresh ids too). +/// - `Some(col)` — committed rows whose `col` value also appears in any +/// pending batch are EXCLUDED from the committed count, so a Merge-mode +/// load that *updates* an existing edge (potentially changing its +/// `src`) counts the post-update row exactly once. Without this, +/// `LoadMode::Merge` double-counts. +pub(crate) async fn count_src_per_edge( + db: &crate::db::Omnigraph, + committed_ds: &Dataset, + table_key: &str, + staging: &MutationStaging, + dedupe_key_column: Option<&str>, +) -> Result> { + let mut counts: HashMap = HashMap::new(); + + let pending_batches = staging.pending_batches(table_key); + + // Collect pending key values (for shadow-on-merge dedupe). Only when + // dedupe is requested AND there's anything pending. + let pending_keys: Option> = match dedupe_key_column { + Some(col) if !pending_batches.is_empty() => { + let mut set = HashSet::new(); + for batch in pending_batches { + if let Some(arr) = batch + .column_by_name(col) + .and_then(|c| c.as_any().downcast_ref::()) + { + for i in 0..arr.len() { + if arr.is_valid(i) { + set.insert(arr.value(i).to_string()); + } + } + } + } + Some(set) + } + _ => None, + }; + + // Committed side: scan `src` plus the dedupe key column when set, so + // we can both count and shadow in one pass. + let projection: Vec<&str> = match dedupe_key_column { + Some(col) if pending_keys.as_ref().is_some_and(|s| !s.is_empty()) => vec!["src", col], + _ => vec!["src"], + }; + let committed = db + .table_store() + .scan(committed_ds, Some(&projection), None, None) + .await?; + for batch in &committed { + let srcs = batch + .column_by_name("src") + .ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?; + // Optional shadow-key column (only present when dedupe is on). + let key_arr = match (&pending_keys, dedupe_key_column) { + (Some(set), Some(col)) if !set.is_empty() => batch + .column_by_name(col) + .and_then(|c| c.as_any().downcast_ref::()), + _ => None, + }; + for i in 0..srcs.len() { + if !srcs.is_valid(i) { + continue; + } + // Shadow this committed row if its key is in pending. + if let (Some(arr), Some(set)) = (key_arr, pending_keys.as_ref()) { + if arr.is_valid(i) && set.contains(arr.value(i)) { + continue; + } + } + *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; + } + } + + // Pending side: walk in-memory batches for `src`. (No dedupe needed — + // `dedupe_merge_batches_by_id` runs at finalize-time so any same-id + // duplicates within pending are already collapsed by the time the + // publisher commits, but cardinality runs before finalize. The + // engine's per-op edge insert produces one row per op with a fresh + // ULID, so within-pending duplicates are not a concern here.) + for batch in pending_batches { + let Some(col) = batch.column_by_name("src") else { + continue; + }; + let Some(srcs) = col.as_any().downcast_ref::() else { + continue; + }; + for i in 0..srcs.len() { + if srcs.is_valid(i) { + *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; + } + } + } + + Ok(counts) +} + +/// Apply `@card(min..max)` bounds to a per-source count map. +/// +/// Both bounds are checked. The `min` check produces a misleading error +/// during a per-op insert mid-query (a bound of `2..` requires both +/// edges to be inserted before validation passes), but the historical +/// behavior was to enforce min per-op anyway — keeping users from +/// accidentally publishing a graph that violates the schema. Consumers +/// that need end-of-query semantics call this from after all edge ops +/// are accumulated (the loader does, via Phase 3). +pub(crate) fn enforce_cardinality_bounds( + edge_type: &EdgeType, + counts: &HashMap, +) -> Result<()> { + let card = &edge_type.cardinality; + for (src, count) in counts { + if let Some(max) = card.max { + if *count > max { + return Err(OmniError::manifest(format!( + "@card violation on edge {}: source '{}' has {} edges (max {})", + edge_type.name, src, count, max + ))); + } + } + if *count < card.min { + return Err(OmniError::manifest(format!( + "@card violation on edge {}: source '{}' has {} edges (min {})", + edge_type.name, src, count, card.min + ))); + } + } + Ok(()) +} diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index d05a315..a349a64 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -521,6 +521,7 @@ async fn load_jsonl_reader( edge_type, &table_key, &staging, + mode, ) .await?; } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) { @@ -1553,87 +1554,49 @@ pub(crate) async fn validate_edge_cardinality( /// Validate edge `@card` cardinality with in-memory pending edges visible. /// /// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`: -/// scans committed edges via Lance and unions counts with the pending -/// edge batches accumulated by the staged loader. Used by Append/Merge -/// loads (the Overwrite path uses `validate_edge_cardinality` which -/// opens the just-written Lance version). +/// opens the committed dataset at the pre-load snapshot version, then +/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds` +/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite +/// path uses `validate_edge_cardinality` which opens the just-written +/// Lance version). +/// +/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")` +/// so committed edges that the load is *updating* (same edge id, possibly +/// changed `src`) are not double-counted (Cubic P1 finding on PR #68). +/// `LoadMode::Append` passes `None` because each line generates a fresh +/// ULID id that never collides with committed. async fn validate_edge_cardinality_with_pending_loader( db: &Omnigraph, branch: Option<&str>, edge_type: &omnigraph_compiler::catalog::EdgeType, table_key: &str, staging: &MutationStaging, + mode: LoadMode, ) -> Result<()> { if edge_type.cardinality.is_default() { return Ok(()); } - - let mut counts: HashMap = HashMap::new(); - - // Committed side: open at pre-write version (the snapshot pinned at - // load entry; pending writes haven't committed yet). let snapshot = db.snapshot_for_branch(branch).await?; - if let Some(entry) = snapshot.entry(table_key) { - let ds = db - .open_dataset_at_state( - &entry.table_path, - entry.table_branch.as_deref(), - entry.table_version, - ) + let Some(entry) = snapshot.entry(table_key) else { + // No manifest entry — table doesn't exist yet. Pending-only is + // fine; the helper handles empty committed scans. + return Ok(()); + }; + let ds = db + .open_dataset_at_state( + &entry.table_path, + entry.table_branch.as_deref(), + entry.table_version, + ) + .await?; + let dedupe_key = match mode { + LoadMode::Merge => Some("id"), + LoadMode::Append | LoadMode::Overwrite => None, + }; + let counts = + crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key) .await?; - let batches = db - .table_store() - .scan(&ds, Some(&["src"]), None, None) - .await?; - for batch in &batches { - let srcs = batch - .column_by_name("src") - .ok_or_else(|| OmniError::Lance("missing 'src' column".into()))? - .as_any() - .downcast_ref::() - .ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?; - for i in 0..srcs.len() { - if srcs.is_valid(i) { - *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; - } - } - } - } - - // Pending side: walk in-memory pending batches for `src`. - for batch in staging.pending_batches(table_key) { - let Some(col) = batch.column_by_name("src") else { - continue; - }; - let Some(srcs) = col.as_any().downcast_ref::() else { - continue; - }; - for i in 0..srcs.len() { - if srcs.is_valid(i) { - *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; - } - } - } - - let card = &edge_type.cardinality; - for (src, count) in &counts { - if let Some(max) = card.max { - if *count > max { - return Err(OmniError::manifest(format!( - "@card violation on edge {}: source '{}' has {} edges (max {})", - edge_type.name, src, count, max - ))); - } - } - if *count < card.min { - return Err(OmniError::manifest(format!( - "@card violation on edge {}: source '{}' has {} edges (min {})", - edge_type.name, src, count, card.min - ))); - } - } - - Ok(()) + crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts) } /// Collect all valid node IDs for a given type, with in-memory pending diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 52deb2f..d87858d 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -841,6 +841,22 @@ impl TableStore { /// pending side runs it through a fresh DataFusion `SessionContext` /// with the batches registered as a `MemTable` named `pending`. /// + /// `key_column` controls how committed and pending are unioned: + /// - **`None` (union semantics)**: every committed row that matches + /// the filter and every pending row that matches the filter is + /// returned. Correct when committed and pending cannot share a + /// primary key — e.g., Append-mode loads with ULID-generated ids, + /// or any read where pending hasn't been used to update committed + /// rows. + /// - **`Some(col)` (merge / shadow semantics)**: committed rows whose + /// `col` value appears in any pending batch are EXCLUDED from the + /// result; only pending's view of those rows is returned. Required + /// for Merge-mode reads (e.g., `execute_update` on the engine path) + /// so a chained `update` doesn't see stale committed values that + /// a prior op already updated in pending. Without this, a predicate + /// like `where age > 30` can match a row that an earlier + /// `set age = 20` already moved out of range. + /// /// When `pending_batches` is empty this delegates to the regular /// scan path. pub async fn scan_with_pending( @@ -850,11 +866,30 @@ impl TableStore { pending_schema: Option, projection: Option<&[&str]>, filter: Option<&str>, + key_column: Option<&str>, ) -> Result> { - let mut out = self.scan(committed_ds, projection, filter, None).await?; + let committed = self.scan(committed_ds, projection, filter, None).await?; if pending_batches.is_empty() { - return Ok(out); + return Ok(committed); } + + // Shadow committed rows whose key value also appears in pending. + // This makes scan_with_pending implement merge semantics rather + // than naive union: any row that has a pending update is + // represented ONLY by its pending value, never by both its + // (stale) committed value and its (current) pending value. + let committed = match key_column { + Some(key_col) => { + let pending_keys = collect_string_column_values(pending_batches, key_col)?; + if pending_keys.is_empty() { + committed + } else { + filter_out_rows_where_string_in(committed, key_col, &pending_keys)? + } + } + None => committed, + }; + let pending = scan_pending_batches( pending_batches, pending_schema, @@ -862,62 +897,12 @@ impl TableStore { filter, ) .await?; + + let mut out = committed; out.extend(pending); Ok(out) } - /// `count_rows` variant that respects in-memory pending batches via - /// DataFusion `MemTable`. Used by edge-cardinality validation that - /// needs to see staged edges before commit. - /// - /// Cheaper than `scan_with_pending` for the count case because we - /// don't materialize columns on the pending side. - pub async fn count_rows_with_pending( - &self, - committed_ds: &Dataset, - pending_batches: &[RecordBatch], - pending_schema: Option, - filter: Option, - ) -> Result { - let committed = self.count_rows(committed_ds, filter.clone()).await?; - if pending_batches.is_empty() { - return Ok(committed); - } - // Count via DataFusion: COUNT(*) from pending [WHERE filter]. - let schema = - pending_schema.unwrap_or_else(|| pending_batches[0].schema()); - let ctx = datafusion::execution::context::SessionContext::new(); - let mem = datafusion::datasource::MemTable::try_new( - schema, - vec![pending_batches.to_vec()], - ) - .map_err(|e| OmniError::Lance(e.to_string()))?; - ctx.register_table("pending", Arc::new(mem)) - .map_err(|e| OmniError::Lance(e.to_string()))?; - let where_clause = filter - .map(|f| format!("WHERE {f}")) - .unwrap_or_default(); - let sql = format!("SELECT COUNT(*) AS c FROM pending {where_clause}"); - let df = ctx - .sql(&sql) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - let batches = df - .collect() - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - let pending_count = batches - .into_iter() - .filter(|b| b.num_rows() > 0) - .map(|b| { - use arrow_array::cast::AsArray; - let arr = b.column(0).as_primitive::(); - arr.value(0) as usize - }) - .sum::(); - Ok(committed + pending_count) - } - /// `count_rows` variant that respects staged writes. Used for /// edge-cardinality validation that needs to see staged edges before /// commit. Same `committed - removed + new` composition as @@ -1171,6 +1156,81 @@ fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<( /// /// `pending_batches` must be non-empty (the caller short-circuits on /// empty). +/// Collect the set of values in a Utf8 column across multiple batches. +/// Used by `scan_with_pending`'s merge-semantic path to identify +/// committed rows that are shadowed by pending writes. NULL values are +/// skipped. +fn collect_string_column_values( + batches: &[RecordBatch], + column: &str, +) -> Result> { + use arrow_array::{Array, StringArray}; + let mut out = std::collections::HashSet::new(); + for batch in batches { + let Some(col) = batch.column_by_name(column) else { + return Err(OmniError::Lance(format!( + "scan_with_pending: pending batch missing key column '{}'", + column + ))); + }; + let arr = col.as_any().downcast_ref::().ok_or_else(|| { + OmniError::Lance(format!( + "scan_with_pending: key column '{}' is not Utf8", + column + )) + })?; + for i in 0..arr.len() { + if arr.is_valid(i) { + out.insert(arr.value(i).to_string()); + } + } + } + Ok(out) +} + +/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`. +/// Used by `scan_with_pending`'s merge-semantic path to shadow committed +/// rows that pending has already updated. Returns the surviving rows. +fn filter_out_rows_where_string_in( + batches: Vec, + column: &str, + excluded: &std::collections::HashSet, +) -> Result> { + use arrow_array::{Array, BooleanArray, StringArray}; + let mut out = Vec::with_capacity(batches.len()); + for batch in batches { + if batch.num_rows() == 0 { + out.push(batch); + continue; + } + let Some(col) = batch.column_by_name(column) else { + // The committed scan didn't project this column. We cannot + // shadow without it; pass the batch through unchanged. + out.push(batch); + continue; + }; + let arr = col.as_any().downcast_ref::().ok_or_else(|| { + OmniError::Lance(format!( + "scan_with_pending: committed column '{}' is not Utf8", + column + )) + })?; + let mask: BooleanArray = (0..arr.len()) + .map(|i| { + if arr.is_valid(i) { + Some(!excluded.contains(arr.value(i))) + } else { + Some(true) + } + }) + .collect(); + let filtered = arrow_select::filter::filter_record_batch(&batch, &mask) + .map_err(|e| OmniError::Lance(e.to_string()))?; + out.push(filtered); + } + Ok(out) +} + async fn scan_pending_batches( pending_batches: &[RecordBatch], pending_schema: Option, diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index bdc5f83..3adc15d 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -140,6 +140,130 @@ async fn schema_apply_recovers_partial_rename() { assert_no_staging_files(dir.path()); } +/// Pin the documented "finalize → publisher residual" from MR-794. +/// +/// `MutationStaging::finalize` runs `commit_staged` per touched table +/// sequentially before the publisher commits the manifest. Lance has no +/// multi-dataset atomic commit primitive, so a failure between the +/// per-table staged commits and the manifest commit leaves Lance HEAD +/// advanced on the touched tables with no manifest update — and the +/// next mutation surfaces `ExpectedVersionMismatch` on those tables. +/// +/// This isn't a code bug we can fix without an upstream Lance change; +/// it's the documented residual (see `docs/runs.md` "Finalize → +/// publisher residual"). The test pins the behavior so future code +/// changes catch any silent regression: if someone widens the residual +/// (e.g. failing earlier in finalize without rolling back), this test +/// will surface a different error than `ExpectedVersionMismatch`. If +/// someone narrows the residual (e.g. lance ships multi-dataset commit +/// and we plumb it), this test will start passing the next mutation +/// — and someone has to update the assertion + the docs. +#[tokio::test] +async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recovers() { + use omnigraph::error::{ManifestConflictDetails, OmniError}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA) + .await + .unwrap(); + + { + let _failpoint = + ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + + // First mutation: finalize succeeds (commit_staged advances Lance + // HEAD on node:Person), then the failpoint kicks before the + // publisher's manifest commit. The caller sees the synthetic + // error. + let err = mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains( + "injected failpoint triggered: mutation.post_finalize_pre_publisher" + ), + "unexpected error: {err}" + ); + } + // Failpoint dropped — subsequent calls are not synthetic-failed. + + // Next mutation against the same table surfaces the documented + // residual: Lance HEAD on node:Person advanced (commit_staged ran), + // manifest didn't, so the publisher CAS at next-mutation time + // surfaces ExpectedVersionMismatch. + let err = mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Frank")], &[("$age", 33)]), + ) + .await + .unwrap_err(); + let OmniError::Manifest(manifest_err) = err else { + panic!("expected Manifest error, got {err:?}"); + }; + let Some(ManifestConflictDetails::ExpectedVersionMismatch { + ref table_key, + expected, + actual, + }) = manifest_err.details + else { + panic!( + "expected ExpectedVersionMismatch (the documented residual), got {:?}", + manifest_err.details + ); + }; + assert_eq!( + table_key, "node:Person", + "drift should be on the table the failed finalize touched" + ); + assert!( + actual > expected, + "Lance HEAD on the drifted table should be ahead of manifest pinned: actual={actual} expected={expected}", + ); +} + +/// Companion to the above — confirms that a finalize→publisher failure +/// on one table leaves OTHER tables untouched. Subsequent writes to +/// non-drifted tables proceed normally; the drift is contained. +#[tokio::test] +async fn finalize_publisher_residual_does_not_drift_untouched_tables() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA) + .await + .unwrap(); + + { + let _failpoint = + ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _ = mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .expect_err("synthetic failpoint must fire"); + } + + // node:Person drifted. node:Company didn't — try a Company write. + use omnigraph::loader::{LoadMode, load_jsonl}; + load_jsonl( + &mut db, + r#"{"type": "Company", "data": {"name": "Acme"}}"#, + LoadMode::Append, + ) + .await + .expect("Company write on a non-drifted table should succeed"); +} + fn assert_no_staging_files(repo: &std::path::Path) { for name in [ "_schema.pg.staging", diff --git a/crates/omnigraph/tests/runs.rs b/crates/omnigraph/tests/runs.rs index 2e9b480..b9f18a8 100644 --- a/crates/omnigraph/tests/runs.rs +++ b/crates/omnigraph/tests/runs.rs @@ -507,6 +507,19 @@ query mixed_insert_and_delete($name: String, $age: I32, $victim: String) { insert Person { name: $name, age: $age } delete Person where name = $victim } + +query update_then_filter_by_old_value( + $first_name: String, $first_new_age: I32, + $second_threshold: I32, $second_new_age: I32 +) { + update Person set { age: $first_new_age } where name = $first_name + update Person set { age: $second_new_age } where age > $second_threshold +} + +query delete_two_persons($first: String, $second: String) { + delete Person where name = $first + delete Person where name = $second +} "#; /// D₂: a query mixing inserts/updates with deletes is rejected at parse @@ -838,3 +851,272 @@ edge WorksAt: Person -> Company @card(0..1) "follow-up load must succeed (no drift on edge table)", ); } + +// ─── PR #68 review-comment fixes — pinned coverage ────────────────────────── + +/// Codex P1 / Cubic P1 #1: chained `update` ops in one query must respect +/// each previous op's view of the rows. Without merge-shadow semantics on +/// `scan_with_pending`, the second update sees the stale committed value +/// (the first update's row still appears in the Lance scan because the +/// pending side hasn't committed), the predicate matches it, and the +/// dedupe-last-wins step at finalize ends up applying the second update +/// to a row whose pending value should have shielded it. +/// +/// Concretely: Alice starts at age=30 in TEST_DATA. Op-1 sets Alice to +/// age=99. Op-2 updates anyone with age > 50 to age=10. After op-1, +/// Alice's logical value is age=99 — within op-2's predicate. So op-2 +/// SHOULD update Alice to age=10. The interesting case is: op-2 must +/// see Alice at age=99 (op-1's pending value), not age=30 (committed). +/// If the helper unioned without shadowing, op-2 would also match the +/// stale committed Alice (age=30 doesn't trigger the predicate, but the +/// row would appear twice and dedupe could pick either). The test +/// asserts both ends: Alice ends at age=10, the publisher publishes +/// once. +#[tokio::test] +async fn chained_updates_with_overlapping_predicate_respects_intermediate_value() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let pre_version = version_main(&db).await.unwrap(); + + db.mutate( + "main", + STAGED_QUERIES, + "update_then_filter_by_old_value", + &mixed_params( + &[("$first_name", "Alice")], + &[ + ("$first_new_age", 99), + ("$second_threshold", 50), + ("$second_new_age", 10), + ], + ), + ) + .await + .unwrap(); + + // After op-1: Alice = 99. After op-2 (where age > 50): Alice + // matches (99 > 50) → set to 10. End state: Alice = 10. + let batches = read_table(&db, "node:Person").await; + let mut alice_age: Option = None; + for batch in &batches { + let names = batch + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let ages = batch + .column_by_name("age") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + if names.is_valid(i) && names.value(i) == "Alice" && ages.is_valid(i) { + alice_age = Some(ages.value(i)); + } + } + } + assert_eq!( + alice_age, + Some(10), + "chained-update final value must reflect the second update applied to op-1's pending value" + ); + + let post_version = version_main(&db).await.unwrap(); + assert_eq!( + post_version, + pre_version + 1, + "chained update must publish exactly once", + ); +} + +/// Cursor Bugbot HIGH: two `delete` ops on the same node table in one +/// query. Pre-fix, op-2's `open_table_for_mutation` went through +/// `open_for_mutation_on_branch` which trips `ensure_expected_version` +/// (Lance HEAD has advanced past the manifest's pinned version after +/// op-1's inline-commit, but the manifest hasn't moved). Post-fix, +/// `open_table_for_mutation` reopens via `inline_committed[table_key]` +/// at the post-delete Lance version. Test asserts both deletes succeed +/// in one query, both rows are gone, manifest version advances by 1. +#[tokio::test] +async fn multi_statement_delete_on_same_node_table() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let pre_persons = count_rows(&db, "node:Person").await; + let pre_version = version_main(&db).await.unwrap(); + + db.mutate( + "main", + STAGED_QUERIES, + "delete_two_persons", + ¶ms(&[("$first", "Alice"), ("$second", "Bob")]), + ) + .await + .expect("multi-delete on same table must succeed"); + + assert_eq!( + count_rows(&db, "node:Person").await, + pre_persons - 2, + "both deletes must land", + ); + let post_version = version_main(&db).await.unwrap(); + assert_eq!( + post_version, + pre_version + 1, + "multi-delete query publishes exactly once at end", + ); + + // Both rows actually gone: + for name in ["Alice", "Bob"] { + let qr = db + .query( + ReadTarget::branch("main"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", name)]), + ) + .await + .unwrap(); + assert_eq!(qr.num_rows(), 0, "{name} should be deleted"); + } +} + +/// Cursor Bugbot HIGH (cascade variant): deleting a node cascades to its +/// edges, advancing Lance HEAD on the edge table. A subsequent +/// `delete ` op in the same query must reopen at the +/// post-cascade-commit version of the edge table — not trip +/// `ensure_expected_version` against the manifest's pinned version. +#[tokio::test] +async fn cascade_delete_node_then_explicit_delete_edge_on_same_table() { + const QUERY: &str = r#" +query cascade_then_explicit($name: String, $other: String) { + delete Person where name = $name + delete Knows where from = $other +} +"#; + + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let pre_knows = count_rows(&db, "edge:Knows").await; + + db.mutate( + "main", + QUERY, + "cascade_then_explicit", + ¶ms(&[("$name", "Alice"), ("$other", "Bob")]), + ) + .await + .expect("cascade-then-explicit-delete on same edge table must succeed"); + + let post_knows = count_rows(&db, "edge:Knows").await; + assert!( + post_knows < pre_knows, + "cascade + explicit delete should remove edges; pre={pre_knows} post={post_knows}", + ); +} + +/// Codex P2 / Cursor Bugbot LOW / Cubic P2: the engine cardinality path +/// must enforce `min` bounds. Pre-fix the engine path silently dropped +/// the min check (a `let _ = card.min;` line). The loader path always +/// enforced both. Post-fix, both paths route through +/// `enforce_cardinality_bounds` which checks both bounds. +/// +/// Build a custom schema with `Knows: Person -> Person @card(2..*)`. +/// Inserting a single Knows edge violates min=2. The mutation path must +/// reject. +#[tokio::test] +async fn mutation_insert_edge_enforces_min_cardinality() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + const MIN_CARD_SCHEMA: &str = r#" +node Person { + name: String @key +} +edge Knows: Person -> Person @card(2..) +"#; + const MIN_CARD_QUERY: &str = r#" +query add_friend($from: String, $to: String) { + insert Knows { from: $from, to: $to } +} +"#; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, MIN_CARD_SCHEMA).await.unwrap(); + + let seed = r#"{"type": "Person", "data": {"name": "Alice"}} +{"type": "Person", "data": {"name": "Bob"}} +"#; + load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap(); + + // Single insert: count=1 < min=2 → reject with clear message. + let err = db + .mutate( + "main", + MIN_CARD_QUERY, + "add_friend", + ¶ms(&[("$from", "Alice"), ("$to", "Bob")]), + ) + .await + .expect_err("min cardinality must reject the engine path"); + let OmniError::Manifest(manifest_err) = err else { + panic!("expected Manifest error, got {err:?}"); + }; + assert!( + manifest_err.message.contains("@card violation") + && manifest_err.message.contains("min 2"), + "unexpected error: {}", + manifest_err.message, + ); +} + +/// Cubic P1 #2: `LoadMode::Merge` on edges must NOT double-count the +/// committed edge AND its updated pending replacement. Build a custom +/// schema where WorksAt has @card(0..1). Seed Alice with one WorksAt to +/// Acme. Then Merge-load the SAME edge id (so it's an update, not an +/// insert) pointing Alice's WorksAt at Bigco. Cardinality must count +/// Alice's edges as 1 (the post-merge count), not 2 (committed + pending). +#[tokio::test] +async fn load_merge_mode_dedupes_edge_for_cardinality_count() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + const CARD_SCHEMA: &str = r#" +node Person { + name: String @key +} +node Company { + name: String @key +} +edge WorksAt: Person -> Company @card(0..1) +"#; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap(); + + // Seed: Alice + Acme + Bigco + WorksAt(id=w1, Alice→Acme). Note the + // loader reads edge ids from the `data.id` field (not top-level), so + // we place the id inside `data` for both the seed and the update. + let seed = r#"{"type": "Person", "data": {"name": "Alice"}} +{"type": "Company", "data": {"name": "Acme"}} +{"type": "Company", "data": {"name": "Bigco"}} +{"edge": "WorksAt", "from": "Alice", "to": "Acme", "data": {"id": "w1"}} +"#; + load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap(); + + // Merge-update the same edge id w1 to point at Bigco. Counted naively + // as union, Alice has 2 WorksAt (committed Acme + pending Bigco) which + // would trip @card(0..1). With merge dedupe, Alice has 1 WorksAt. + let merge_data = r#"{"edge": "WorksAt", "from": "Alice", "to": "Bigco", "data": {"id": "w1"}} +"#; + load_jsonl(&mut db, merge_data, LoadMode::Merge) + .await + .expect("Merge update must dedupe the committed edge by id"); + + // Confirm there's exactly 1 WorksAt edge after merge. + assert_eq!(count_rows(&db, "edge:WorksAt").await, 1); +} diff --git a/docs/invariants.md b/docs/invariants.md index 3e3af86..996b5c0 100644 --- a/docs/invariants.md +++ b/docs/invariants.md @@ -110,7 +110,7 @@ Specific defaults (timeout values, memory caps, TTL windows) are *configuration* *Status: aspirational — referential integrity at scale requires SIP-backed cross-table validation; not yet implemented. Cross-batch / cross-version uniqueness tracked in MR-714.* 25. **Isolation: per-query snapshot; read-your-writes within and across queries in a session.** Each query reads from one consistent manifest version. Within a multi-statement mutation, the read subplan inside each write operator sees the writes from earlier statements. Across queries in a session, reads always resolve the latest manifest version — no reader pinning to older snapshots. - *Status: upheld for inserts/updates after MR-794 step 2+ — `MutationStaging`'s in-memory accumulator + `TableStore::scan_with_pending` (DataFusion `MemTable` union with the committed Lance scan) implements read-your-writes within a multi-statement mutation. Delete-touching mutations are limited to delete-only by parse-time D₂; closing the within-query RYW gap for deletes requires Lance's two-phase delete API (tracked: MR-793 / Lance-upstream).* + *Status: upheld for inserts/updates after MR-794 step 2+ — `MutationStaging`'s in-memory accumulator + `TableStore::scan_with_pending` (DataFusion `MemTable` union with the committed Lance scan, with merge-shadow semantics for chained updates) implements read-your-writes within a multi-statement mutation. Delete-touching mutations are limited to delete-only by parse-time D₂; closing the within-query RYW gap for deletes requires Lance's two-phase delete API (tracked: MR-793 / Lance-upstream lance-format/lance#6658). The "Lance HEAD ahead of `__manifest`" drift class is unreachable for op-execution failures (the partial-failure test pins this), but a narrowed residual remains at the finalize→publisher boundary because Lance has no multi-dataset commit primitive — see [docs/runs.md](runs.md) "Finalize → publisher residual".* 26. **Durability before acknowledgement.** Commit returns only after the substrate has confirmed durable persistence. No "fast" or "fire-and-forget" durability levels. diff --git a/docs/releases/v0.4.1.md b/docs/releases/v0.4.1.md index 4c5d44a..031b2e7 100644 --- a/docs/releases/v0.4.1.md +++ b/docs/releases/v0.4.1.md @@ -14,8 +14,13 @@ mutation proceeds normally. `MutationStaging.pending` per touched table. No Lance HEAD advance happens during op execution; one `stage_*` + `commit_staged` per table runs at end-of-query, then `ManifestBatchPublisher::publish` - commits the manifest atomically. A mid-query failure leaves Lance - HEAD untouched on staged tables. + commits the manifest atomically. **For op-execution failures** + (validation errors, missing endpoints, parse-time D₂ rejection), Lance + HEAD on every staged table is untouched and the next mutation + proceeds normally. A narrowed residual remains at the + finalize→publisher boundary (multi-table `commit_staged` is not + atomic with the manifest commit) — see [docs/runs.md](../runs.md) + "Finalize → publisher residual" for details. - **D₂ parse-time rule**: a single mutation query is either insert/update-only or delete-only. Mixed → rejected with a clear error directing the caller to split into two queries. Lance 4.0.0 diff --git a/docs/runs.md b/docs/runs.md index 63295fd..459b7ca 100644 --- a/docs/runs.md +++ b/docs/runs.md @@ -75,6 +75,42 @@ will replace it. Operator-driven (rare in agent workloads); document permanently until Lance exposes `Operation::Overwrite { fragments }` as a two-phase op. +### Finalize → publisher residual + +The staged-write rewire eliminates one drift class **by construction at +the writer layer**: an op that fails before pushing to the in-memory +accumulator (validation errors, missing endpoints, parse-time D₂ +rejection) leaves Lance HEAD untouched on every staged table. This is +the case the `partial_failure_leaves_target_queryable_and_unblocks_next_mutation` +test pins. + +A second, narrower drift class remains. `MutationStaging::finalize` +runs `stage_*` + `commit_staged` per touched table sequentially, then +the publisher commits the manifest. Lance has no multi-dataset atomic +commit, so the per-table `commit_staged` calls are independent +operations: if commit_staged on table N+1 fails *after* commit_staged +on tables 1..N succeeded, or if the publisher's CAS pre-check rejects +*after* every commit_staged succeeded, tables 1..N are left at +`Lance HEAD = manifest_pinned + 1`. The next mutation against those +tables surfaces `ManifestConflictDetails::ExpectedVersionMismatch` — +the same loud failure mode the rewire was designed to make rare, just +no longer "unreachable." + +Triggers: transient Lance write errors during finalize (object-store +retry budget exhaustion, disk full); persistent publisher contention +exceeding `PUBLISHER_RETRY_BUDGET = 5` retries. Closing this requires +either a Lance multi-dataset atomic-commit primitive (filed upstream +alongside the two-phase delete request) or a manifest-layer journal +that replays staged commits on next open. Both are heavyweight; the +v1 stance is "narrowed window, documented residual, surface the loud +error when it fires." + +The publisher-CAS contract is unchanged: a *concurrent writer* that +advances any of our touched tables between snapshot capture and +publisher commit produces exactly one winner. The residual above is +about *our* abandoned commits in the failure path, not about +concurrency races. + ## Conflict shape Concurrent writers to the same `(table, branch)` produce exactly one