From 052b6e680fed8e6794f3df14a6307f0a9e3c731a Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 1 May 2026 20:47:45 +0200 Subject: [PATCH] =?UTF-8?q?MR-794=20step=202:=20address=20PR=20#68=20follo?= =?UTF-8?q?w-up=20review=20(Cubic)=20=E2=80=94=20pending=20dedupe=20+=20pr?= =?UTF-8?q?ojection=20guard=20+=20CI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three new findings from Cubic on commit 3223b51: * **Pending edge cardinality counted within-input duplicates** (P2): count_src_per_edge's pending walk added every row to the count, including duplicate rows that finalize will collapse via dedupe_merge_batches_by_id. A LoadMode::Merge with the same edge id twice would over-count → spurious @card violation. Fix: when dedupe_key_column is Some, walk pending in reverse, track seen keys via HashSet, count only the kept (last-occurrence) rows. Mirrors finalize-time dedupe so cardinality counts what stage_merge_insert actually publishes. * **scan_with_pending silently disabled merge-shadow when projection omitted key_column** (P2): if a caller passed Some("id") as key_column but their projection didn't include "id", the filter_out_rows_where_string_in helper passed batches through unchanged — silently degrading to union semantics. Fix: validate up front that projection contains key_column when both are Some; return a typed Lance error otherwise. Tightened the helper too: missing column is now an internal error (was a silent passthrough). * **Cascade-vs-explicit delete test was too weak** (P2): asserted only that edge count decreased after delete. The cascade alone could satisfy that even if the explicit second-delete silently no-op'd. Strengthened: assert post_knows == 0, which only holds when both ops landed (Bob→Diana would survive if op-2 no-op'd). CI gap: also added test_failpoints_feature job to .github/workflows/ci.yml. The workspace test runs without --features failpoints (the feature is behind a Cargo flag), so the failpoints test suite was never exercised by CI before now. The new job builds + runs `cargo test -p omnigraph-engine --features failpoints --test failpoints` on every full CI run, mirroring the test_aws_feature pattern. New tests on tests/runs.rs: * load_merge_mode_dedupes_within_pending_for_cardinality_count (Cubic P2 #2 — pending-vs-pending dedup, distinct from the load_merge_mode_dedupes_edge_for_cardinality_count test which covers committed-vs-pending dedup). * scan_with_pending_rejects_key_column_missing_from_projection (Cubic P2 #3 — verifies the up-front validation rejects bad callers and that the happy path still works correctly). Local test results: * tests/runs.rs: 23/23 passed * tests/failpoints.rs --features failpoints: 7/7 passed (includes the two new finalize→publisher residual tests landed in 3223b51). Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci.yml | 42 +++++++ crates/omnigraph/src/exec/staging.rs | 84 ++++++++++++-- crates/omnigraph/src/table_store.rs | 36 +++++- crates/omnigraph/tests/runs.rs | 160 ++++++++++++++++++++++++++- 4 files changed, 306 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4cae31c..1d81f17 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -239,6 +239,48 @@ jobs: if: needs.classify_changes.outputs.run_full_ci == 'true' run: cargo test --locked -p omnigraph-server --features aws + test_failpoints_feature: + name: Test omnigraph-engine --features failpoints + needs: classify_changes + runs-on: ubuntu-latest + timeout-minutes: 30 + permissions: + contents: read + env: + CARGO_TERM_COLOR: always + steps: + - name: Skip for text-only changes + if: needs.classify_changes.outputs.run_full_ci != 'true' + run: echo "Text-only change detected; skipping failpoints feature build." + + - name: Checkout source + if: needs.classify_changes.outputs.run_full_ci == 'true' + uses: actions/checkout@v5.0.1 + + - name: Install system dependencies + if: needs.classify_changes.outputs.run_full_ci == 'true' + run: | + sudo apt-get update + sudo apt-get install -y protobuf-compiler libprotobuf-dev + + - name: Install Rust stable + if: needs.classify_changes.outputs.run_full_ci == 'true' + uses: dtolnay/rust-toolchain@stable + with: + toolchain: stable + + - name: Cache Rust build data + if: needs.classify_changes.outputs.run_full_ci == 'true' + uses: Swatinem/rust-cache@v2 + with: + workspaces: | + . -> target + key: failpoints-feature + + - name: Run failpoints test suite + if: needs.classify_changes.outputs.run_full_ci == 'true' + run: cargo test --locked -p omnigraph-engine --features failpoints --test failpoints + rustfs_integration: name: RustFS S3 Integration needs: diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 033cb99..7695906 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -481,12 +481,31 @@ pub(crate) async fn count_src_per_edge( } } - // Pending side: walk in-memory batches for `src`. (No dedupe needed — - // `dedupe_merge_batches_by_id` runs at finalize-time so any same-id - // duplicates within pending are already collapsed by the time the - // publisher commits, but cardinality runs before finalize. The - // engine's per-op edge insert produces one row per op with a fresh - // ULID, so within-pending duplicates are not a concern here.) + // Pending side: walk in-memory batches for `src`. When dedupe is on, + // collapse rows that share `dedupe_key_column` to their last occurrence + // — mirrors `dedupe_merge_batches_by_id`'s last-write-wins applied at + // finalize time, so cardinality counts what `commit_staged` will + // actually publish, not raw input duplicates. + // + // Without this, a Merge-mode load whose input JSONL has two rows with + // the same edge id would be double-counted here, even though the + // finalize-time dedupe would collapse them to one. The result: spurious + // `@card` violations on perfectly valid Merge inputs. + match dedupe_key_column { + Some(key_col) => count_pending_src_with_dedupe(pending_batches, key_col, &mut counts)?, + None => count_pending_src_naive(pending_batches, &mut counts), + } + + Ok(counts) +} + +/// Count pending edges per `src` with NO dedup. Correct when caller +/// guarantees pending rows have unique primary keys (engine inserts via +/// fresh ULID; loader Append mode). +fn count_pending_src_naive( + pending_batches: &[RecordBatch], + counts: &mut HashMap, +) { for batch in pending_batches { let Some(col) = batch.column_by_name("src") else { continue; @@ -500,8 +519,59 @@ pub(crate) async fn count_src_per_edge( } } } +} - Ok(counts) +/// Count pending edges per `src` after deduping rows that share +/// `dedupe_key_column`. Last occurrence wins (mirrors +/// `dedupe_merge_batches_by_id`'s walk-in-reverse contract). Required for +/// `LoadMode::Merge` where the same edge id may appear multiple times in +/// one load and finalize will collapse them to the last value. +fn count_pending_src_with_dedupe( + pending_batches: &[RecordBatch], + dedupe_key_column: &str, + counts: &mut HashMap, +) -> Result<()> { + // Walk in reverse, track seen keys, keep one (key, src) pair per key. + let mut seen: HashSet = HashSet::new(); + let mut kept_srcs: Vec = Vec::new(); + for batch in pending_batches.iter().rev() { + let Some(key_col) = batch.column_by_name(dedupe_key_column) else { + // Pending batch is missing the key column — fall back to naive + // counting for this batch (caller's contract was about merge + // semantics; if the column isn't there we can't dedupe). + continue; + }; + let key_arr = key_col.as_any().downcast_ref::().ok_or_else(|| { + OmniError::Lance(format!( + "count_src_per_edge: pending '{}' column is not Utf8", + dedupe_key_column + )) + })?; + let src_arr = batch + .column_by_name("src") + .and_then(|c| c.as_any().downcast_ref::()); + let Some(srcs) = src_arr else { + continue; + }; + for i in (0..batch.num_rows()).rev() { + if !srcs.is_valid(i) { + continue; + } + // NULL key: keep (NULL != NULL semantics — every NULL counts). + if !key_arr.is_valid(i) { + kept_srcs.push(srcs.value(i).to_string()); + continue; + } + let key = key_arr.value(i); + if seen.insert(key.to_string()) { + kept_srcs.push(srcs.value(i).to_string()); + } + } + } + for src in kept_srcs { + *counts.entry(src).or_insert(0) += 1; + } + Ok(()) } /// Apply `@card(min..max)` bounds to a per-source count map. diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index d87858d..c460e51 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -868,6 +868,24 @@ impl TableStore { filter: Option<&str>, key_column: Option<&str>, ) -> Result> { + // Contract: when merge-shadow semantics are requested via + // `key_column`, the committed-side projection MUST include that + // column so we can filter committed rows whose key appears in + // pending. Silently dropping the shadow when projection omits + // the key would re-introduce union semantics behind the + // caller's back. Reject up front with a clear error so callers + // either (a) include the key in projection or (b) drop + // `key_column` if union is what they wanted. + if let (Some(key_col), Some(cols)) = (key_column, projection) { + if !cols.iter().any(|c| *c == key_col) { + return Err(OmniError::Lance(format!( + "scan_with_pending: key_column '{}' must appear in projection \ + when merge-shadow semantics are requested (got projection = {:?})", + key_col, cols + ))); + } + } + let committed = self.scan(committed_ds, projection, filter, None).await?; if pending_batches.is_empty() { return Ok(committed); @@ -1191,6 +1209,11 @@ fn collect_string_column_values( /// Drop rows from `batches` whose Utf8 `column` value is in `excluded`. /// Used by `scan_with_pending`'s merge-semantic path to shadow committed /// rows that pending has already updated. Returns the surviving rows. +/// +/// `scan_with_pending` validates up front that the projection contains +/// `column`, so a missing column here is a programmer error — error +/// loudly instead of silently passing batches through (which would +/// re-introduce the union semantics the caller asked us to avoid). fn filter_out_rows_where_string_in( batches: Vec, column: &str, @@ -1203,12 +1226,13 @@ fn filter_out_rows_where_string_in( out.push(batch); continue; } - let Some(col) = batch.column_by_name(column) else { - // The committed scan didn't project this column. We cannot - // shadow without it; pass the batch through unchanged. - out.push(batch); - continue; - }; + let col = batch.column_by_name(column).ok_or_else(|| { + OmniError::manifest_internal(format!( + "scan_with_pending: committed batch missing key column '{}' \ + (the up-front projection check should have rejected this)", + column + )) + })?; let arr = col.as_any().downcast_ref::().ok_or_else(|| { OmniError::Lance(format!( "scan_with_pending: committed column '{}' is not Utf8", diff --git a/crates/omnigraph/tests/runs.rs b/crates/omnigraph/tests/runs.rs index b9f18a8..5353a2f 100644 --- a/crates/omnigraph/tests/runs.rs +++ b/crates/omnigraph/tests/runs.rs @@ -1001,7 +1001,15 @@ query cascade_then_explicit($name: String, $other: String) { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; + // TEST_DATA seeds three Knows edges: + // Alice → Bob, Alice → Charlie (cascade target — should be deleted by op-1) + // Bob → Diana (explicit target — should be deleted by op-2) + // After both ops, all three edges must be gone. A weaker assertion + // (just "count decreased") would pass even if op-2 silently no-op'd + // — Bob→Diana would survive. The exact-count check makes both ops + // independently observable. let pre_knows = count_rows(&db, "edge:Knows").await; + assert_eq!(pre_knows, 3, "fixture invariant: TEST_DATA seeds 3 Knows edges"); db.mutate( "main", @@ -1012,10 +1020,13 @@ query cascade_then_explicit($name: String, $other: String) { .await .expect("cascade-then-explicit-delete on same edge table must succeed"); + // Both ops landed: cascade removed Alice→Bob and Alice→Charlie; + // explicit removed Bob→Diana. Anything > 0 means one op silently + // did nothing (the bug we're guarding against). let post_knows = count_rows(&db, "edge:Knows").await; - assert!( - post_knows < pre_knows, - "cascade + explicit delete should remove edges; pre={pre_knows} post={post_knows}", + assert_eq!( + post_knows, 0, + "both cascade + explicit delete must complete (Bob→Diana would survive if op-2 no-op'd)", ); } @@ -1120,3 +1131,146 @@ edge WorksAt: Person -> Company @card(0..1) // Confirm there's exactly 1 WorksAt edge after merge. assert_eq!(count_rows(&db, "edge:WorksAt").await, 1); } + +/// Cubic P2 (follow-up to PR #68 review of commit 3223b51): a Merge load +/// whose input has TWO rows with the same edge id must be deduped at +/// cardinality-count time, not just at finalize. Without dedup, two +/// pending rows count twice → spurious `@card` violation. With dedup +/// (last-occurrence-wins, mirroring `dedupe_merge_batches_by_id`), the +/// pending side counts once. +/// +/// This is a separate path from `load_merge_mode_dedupes_edge_for_cardinality_count` +/// (which dedupes committed-vs-pending). Here we verify pending-vs-pending +/// dedup. +#[tokio::test] +async fn load_merge_mode_dedupes_within_pending_for_cardinality_count() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + const CARD_SCHEMA: &str = r#" +node Person { + name: String @key +} +node Company { + name: String @key +} +edge WorksAt: Person -> Company @card(0..1) +"#; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap(); + + let seed = r#"{"type": "Person", "data": {"name": "Alice"}} +{"type": "Company", "data": {"name": "Acme"}} +{"type": "Company", "data": {"name": "Bigco"}} +"#; + load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap(); + + // Merge load with the SAME edge id twice — the second row supersedes + // the first in the finalize-time dedupe. If pending-counting doesn't + // dedupe, Alice has 2 pending edges → @card(0..1) trips → load + // fails. With dedupe, Alice has 1 → load succeeds. + let dup_data = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme", "data": {"id": "w1"}} +{"edge": "WorksAt", "from": "Alice", "to": "Bigco", "data": {"id": "w1"}} +"#; + load_jsonl(&mut db, dup_data, LoadMode::Merge) + .await + .expect("Merge load with within-input dup ids must dedupe pending count"); + + // Exactly one WorksAt edge after the dedup; the second row wins + // (last-occurrence) so dst should be Bigco. + assert_eq!(count_rows(&db, "edge:WorksAt").await, 1); +} + +/// Cubic P2 (follow-up): `scan_with_pending` must reject a call where +/// `key_column` is requested but the projection omits that column. +/// Without the up-front check, the helper silently degraded to union +/// semantics — letting a chained-update bug slip through unnoticed. +/// This test verifies the contract is enforced at the API boundary. +#[tokio::test] +async fn scan_with_pending_rejects_key_column_missing_from_projection() { + use arrow_array::{RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use omnigraph::table_store::TableStore; + use std::sync::Arc; + + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("note", DataType::Utf8, true), + ])); + let seed = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a", "b"])) as _, + Arc::new(StringArray::from(vec![Some("seed-a"), Some("seed-b")])) as _, + ], + ) + .unwrap(); + let ds = TableStore::write_dataset(&uri, seed).await.unwrap(); + + let pending = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a"])) as _, + Arc::new(StringArray::from(vec![Some("pending-a")])) as _, + ], + ) + .unwrap(); + + // Bad call: key_column = "id" but projection doesn't include "id". + // Pre-fix this silently disabled merge-shadowing and returned both + // committed "a" and pending "a" rows. Now it must error. + let err = store + .scan_with_pending( + &ds, + std::slice::from_ref(&pending), + None, + Some(&["note"]), + None, + Some("id"), + ) + .await + .expect_err("scan_with_pending must reject merge-shadow with missing key in projection"); + let msg = err.to_string(); + assert!( + msg.contains("key_column 'id'") && msg.contains("must appear in projection"), + "unexpected error: {msg}" + ); + + // Good call: projection includes the key column. Shadow works: + // pending row 'a' shadows committed 'a', so the result has only + // committed 'b' + pending 'a'. + let batches = store + .scan_with_pending( + &ds, + std::slice::from_ref(&pending), + None, + Some(&["id", "note"]), + None, + Some("id"), + ) + .await + .expect("projection containing key_column must succeed"); + let mut ids: Vec = Vec::new(); + for b in &batches { + let arr = b + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..arr.len() { + ids.push(arr.value(i).to_string()); + } + } + ids.sort(); + assert_eq!( + ids, + vec!["a", "b"], + "merge-shadow should drop committed 'a' and surface pending 'a' + committed 'b'" + ); +}