MR-794 step 2: address PR #68 review — merge semantics, cardinality, residual

Five fixes from PR #68 review (Cursor Bugbot + Codex + Cubic):

* **scan_with_pending gains merge-shadow semantics** (Codex P1, Cubic P1#1):
  new `key_column: Option<&str>` parameter. When set, committed rows
  whose key value appears in any pending batch are excluded from the
  scan — making `scan_with_pending` correctly merge-semantic for chained
  updates instead of naively unioning. execute_update calls with
  Some("id"). Without this, a chained `update where age > 30` could
  match a row whose pending value already moved out of range.

* **Multi-delete on same table no longer trips ExpectedVersionMismatch**
  (Cursor Bugbot HIGH): open_table_for_mutation routes through
  reopen_for_mutation when staging.inline_committed has the table,
  using the post-inline-commit Lance version captured at record_inline
  time. The legacy open_for_mutation_on_branch fence (Lance HEAD ==
  manifest pinned) is correct cross-writer but wrong intra-query when
  deletes have already advanced HEAD on this table. Branch goes away
  when Lance ships two-phase delete (lance-format/lance#6658).

* **Cardinality validation consolidated** (Cursor LOW + Codex P2 +
  Cubic P1#2 + Cubic P2): new exec/staging::count_src_per_edge +
  enforce_cardinality_bounds shared by mutation and loader paths.
  Restores the missing min-cardinality check on the engine path.
  Loader Merge mode passes Some("id") to dedupe edges being updated
  by id (not double-count committed + pending). Loader Append mode
  and engine path pass None (ULID-generated ids never collide).

* **Dead count_rows_with_pending removed** (Cursor LOW): never called.

* **Misleading concat-helper comment fixed** (Cubic P3): claimed
  schema normalization the helper doesn't implement. Updated to match
  reality.

* **Documentation honesty** (Cubic P1#3): MR-794 narrows but doesn't
  eliminate the "Lance HEAD ahead of __manifest" drift class. Drift is
  unreachable for op-execution failures (the partial_failure test pins
  this), but a residual remains at the finalize→publisher boundary
  because Lance has no multi-dataset commit primitive: per-table
  commit_staged calls run sequentially before manifest commit. Updated
  docs/runs.md, docs/invariants.md §VI.25, docs/releases/v0.4.1.md to
  scope the claim precisely.

* **Failpoint test pinning the residual**: new
  mutation.post_finalize_pre_publisher failpoint + two tests in
  tests/failpoints.rs that confirm the documented residual behavior.
  Catches future regressions that widen the residual.

Test additions on tests/runs.rs:

* chained_updates_with_overlapping_predicate_respects_intermediate_value
* multi_statement_delete_on_same_node_table
* cascade_delete_node_then_explicit_delete_edge_on_same_table
* mutation_insert_edge_enforces_min_cardinality
* load_merge_mode_dedupes_edge_for_cardinality_count

113/113 engine integration tests pass (runs + end_to_end + consistency
+ staged_writes + validators). Failpoints feature build runs in CI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-01 13:47:55 +02:00
parent a61e82f47a
commit 3223b51cf1
No known key found for this signature in database
9 changed files with 828 additions and 199 deletions

View file

@ -521,6 +521,7 @@ async fn load_jsonl_reader<R: BufRead>(
edge_type,
&table_key,
&staging,
mode,
)
.await?;
} else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
@ -1553,87 +1554,49 @@ pub(crate) async fn validate_edge_cardinality(
/// Validate edge `@card` cardinality with in-memory pending edges visible.
///
/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
/// scans committed edges via Lance and unions counts with the pending
/// edge batches accumulated by the staged loader. Used by Append/Merge
/// loads (the Overwrite path uses `validate_edge_cardinality` which
/// opens the just-written Lance version).
/// opens the committed dataset at the pre-load snapshot version, then
/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
/// path uses `validate_edge_cardinality` which opens the just-written
/// Lance version).
///
/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
/// so committed edges that the load is *updating* (same edge id, possibly
/// changed `src`) are not double-counted (Cubic P1 finding on PR #68).
/// `LoadMode::Append` passes `None` because each line generates a fresh
/// ULID id that never collides with committed.
async fn validate_edge_cardinality_with_pending_loader(
db: &Omnigraph,
branch: Option<&str>,
edge_type: &omnigraph_compiler::catalog::EdgeType,
table_key: &str,
staging: &MutationStaging,
mode: LoadMode,
) -> Result<()> {
if edge_type.cardinality.is_default() {
return Ok(());
}
let mut counts: HashMap<String, u32> = HashMap::new();
// Committed side: open at pre-write version (the snapshot pinned at
// load entry; pending writes haven't committed yet).
let snapshot = db.snapshot_for_branch(branch).await?;
if let Some(entry) = snapshot.entry(table_key) {
let ds = db
.open_dataset_at_state(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
)
let Some(entry) = snapshot.entry(table_key) else {
// No manifest entry — table doesn't exist yet. Pending-only is
// fine; the helper handles empty committed scans.
return Ok(());
};
let ds = db
.open_dataset_at_state(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
)
.await?;
let dedupe_key = match mode {
LoadMode::Merge => Some("id"),
LoadMode::Append | LoadMode::Overwrite => None,
};
let counts =
crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
.await?;
let batches = db
.table_store()
.scan(&ds, Some(&["src"]), None, None)
.await?;
for batch in &batches {
let srcs = batch
.column_by_name("src")
.ok_or_else(|| OmniError::Lance("missing 'src' column".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
}
// Pending side: walk in-memory pending batches for `src`.
for batch in staging.pending_batches(table_key) {
let Some(col) = batch.column_by_name("src") else {
continue;
};
let Some(srcs) = col.as_any().downcast_ref::<StringArray>() else {
continue;
};
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
let card = &edge_type.cardinality;
for (src, count) in &counts {
if let Some(max) = card.max {
if *count > max {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (max {})",
edge_type.name, src, count, max
)));
}
}
if *count < card.min {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (min {})",
edge_type.name, src, count, card.min
)));
}
}
Ok(())
crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
}
/// Collect all valid node IDs for a given type, with in-memory pending