MR-794 step 2: address PR #68 follow-up review (Cubic) — pending dedupe + projection guard + CI

Three new findings from Cubic on commit 3223b51:

* **Pending edge cardinality counted within-input duplicates** (P2):
  count_src_per_edge's pending walk added every row to the count,
  including duplicate rows that finalize will collapse via
  dedupe_merge_batches_by_id. A LoadMode::Merge with the same edge id
  twice would over-count → spurious @card violation. Fix: when
  dedupe_key_column is Some, walk pending in reverse, track seen keys
  via HashSet, count only the kept (last-occurrence) rows. Mirrors
  finalize-time dedupe so cardinality counts what stage_merge_insert
  actually publishes.

* **scan_with_pending silently disabled merge-shadow when projection
  omitted key_column** (P2): if a caller passed Some("id") as
  key_column but their projection didn't include "id", the
  filter_out_rows_where_string_in helper passed batches through
  unchanged — silently degrading to union semantics. Fix: validate
  up front that projection contains key_column when both are Some;
  return a typed Lance error otherwise. Tightened the helper too:
  missing column is now an internal error (was a silent passthrough).

* **Cascade-vs-explicit delete test was too weak** (P2): asserted
  only that edge count decreased after delete. The cascade alone
  could satisfy that even if the explicit second-delete silently
  no-op'd. Strengthened: assert post_knows == 0, which only holds
  when both ops landed (Bob→Diana would survive if op-2 no-op'd).

CI gap: also added test_failpoints_feature job to .github/workflows/ci.yml.
The workspace test runs without --features failpoints (the feature is
behind a Cargo flag), so the failpoints test suite was never exercised
by CI before now. The new job builds + runs
`cargo test -p omnigraph-engine --features failpoints --test failpoints`
on every full CI run, mirroring the test_aws_feature pattern.

New tests on tests/runs.rs:

* load_merge_mode_dedupes_within_pending_for_cardinality_count
  (Cubic P2 #2 — pending-vs-pending dedup, distinct from the
  load_merge_mode_dedupes_edge_for_cardinality_count test which
  covers committed-vs-pending dedup).
* scan_with_pending_rejects_key_column_missing_from_projection
  (Cubic P2 #3 — verifies the up-front validation rejects bad
  callers and that the happy path still works correctly).

Local test results:

* tests/runs.rs: 23/23 passed
* tests/failpoints.rs --features failpoints: 7/7 passed (includes the
  two new finalize→publisher residual tests landed in 3223b51).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-01 20:47:45 +02:00
parent 3223b51cf1
commit 052b6e680f
No known key found for this signature in database
4 changed files with 306 additions and 16 deletions

View file

@ -239,6 +239,48 @@ jobs:
if: needs.classify_changes.outputs.run_full_ci == 'true'
run: cargo test --locked -p omnigraph-server --features aws
test_failpoints_feature:
name: Test omnigraph-engine --features failpoints
needs: classify_changes
runs-on: ubuntu-latest
timeout-minutes: 30
permissions:
contents: read
env:
CARGO_TERM_COLOR: always
steps:
- name: Skip for text-only changes
if: needs.classify_changes.outputs.run_full_ci != 'true'
run: echo "Text-only change detected; skipping failpoints feature build."
- name: Checkout source
if: needs.classify_changes.outputs.run_full_ci == 'true'
uses: actions/checkout@v5.0.1
- name: Install system dependencies
if: needs.classify_changes.outputs.run_full_ci == 'true'
run: |
sudo apt-get update
sudo apt-get install -y protobuf-compiler libprotobuf-dev
- name: Install Rust stable
if: needs.classify_changes.outputs.run_full_ci == 'true'
uses: dtolnay/rust-toolchain@stable
with:
toolchain: stable
- name: Cache Rust build data
if: needs.classify_changes.outputs.run_full_ci == 'true'
uses: Swatinem/rust-cache@v2
with:
workspaces: |
. -> target
key: failpoints-feature
- name: Run failpoints test suite
if: needs.classify_changes.outputs.run_full_ci == 'true'
run: cargo test --locked -p omnigraph-engine --features failpoints --test failpoints
rustfs_integration:
name: RustFS S3 Integration
needs:

View file

@ -481,12 +481,31 @@ pub(crate) async fn count_src_per_edge(
}
}
// Pending side: walk in-memory batches for `src`. (No dedupe needed —
// `dedupe_merge_batches_by_id` runs at finalize-time so any same-id
// duplicates within pending are already collapsed by the time the
// publisher commits, but cardinality runs before finalize. The
// engine's per-op edge insert produces one row per op with a fresh
// ULID, so within-pending duplicates are not a concern here.)
// Pending side: walk in-memory batches for `src`. When dedupe is on,
// collapse rows that share `dedupe_key_column` to their last occurrence
// — mirrors `dedupe_merge_batches_by_id`'s last-write-wins applied at
// finalize time, so cardinality counts what `commit_staged` will
// actually publish, not raw input duplicates.
//
// Without this, a Merge-mode load whose input JSONL has two rows with
// the same edge id would be double-counted here, even though the
// finalize-time dedupe would collapse them to one. The result: spurious
// `@card` violations on perfectly valid Merge inputs.
match dedupe_key_column {
Some(key_col) => count_pending_src_with_dedupe(pending_batches, key_col, &mut counts)?,
None => count_pending_src_naive(pending_batches, &mut counts),
}
Ok(counts)
}
/// Count pending edges per `src` with NO dedup. Correct when caller
/// guarantees pending rows have unique primary keys (engine inserts via
/// fresh ULID; loader Append mode).
fn count_pending_src_naive(
pending_batches: &[RecordBatch],
counts: &mut HashMap<String, u32>,
) {
for batch in pending_batches {
let Some(col) = batch.column_by_name("src") else {
continue;
@ -500,8 +519,59 @@ pub(crate) async fn count_src_per_edge(
}
}
}
}
Ok(counts)
/// Count pending edges per `src` after deduping rows that share
/// `dedupe_key_column`. Last occurrence wins (mirrors
/// `dedupe_merge_batches_by_id`'s walk-in-reverse contract). Required for
/// `LoadMode::Merge` where the same edge id may appear multiple times in
/// one load and finalize will collapse them to the last value.
fn count_pending_src_with_dedupe(
pending_batches: &[RecordBatch],
dedupe_key_column: &str,
counts: &mut HashMap<String, u32>,
) -> Result<()> {
// Walk in reverse, track seen keys, keep one (key, src) pair per key.
let mut seen: HashSet<String> = HashSet::new();
let mut kept_srcs: Vec<String> = Vec::new();
for batch in pending_batches.iter().rev() {
let Some(key_col) = batch.column_by_name(dedupe_key_column) else {
// Pending batch is missing the key column — fall back to naive
// counting for this batch (caller's contract was about merge
// semantics; if the column isn't there we can't dedupe).
continue;
};
let key_arr = key_col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
OmniError::Lance(format!(
"count_src_per_edge: pending '{}' column is not Utf8",
dedupe_key_column
))
})?;
let src_arr = batch
.column_by_name("src")
.and_then(|c| c.as_any().downcast_ref::<StringArray>());
let Some(srcs) = src_arr else {
continue;
};
for i in (0..batch.num_rows()).rev() {
if !srcs.is_valid(i) {
continue;
}
// NULL key: keep (NULL != NULL semantics — every NULL counts).
if !key_arr.is_valid(i) {
kept_srcs.push(srcs.value(i).to_string());
continue;
}
let key = key_arr.value(i);
if seen.insert(key.to_string()) {
kept_srcs.push(srcs.value(i).to_string());
}
}
}
for src in kept_srcs {
*counts.entry(src).or_insert(0) += 1;
}
Ok(())
}
/// Apply `@card(min..max)` bounds to a per-source count map.

View file

@ -868,6 +868,24 @@ impl TableStore {
filter: Option<&str>,
key_column: Option<&str>,
) -> Result<Vec<RecordBatch>> {
// Contract: when merge-shadow semantics are requested via
// `key_column`, the committed-side projection MUST include that
// column so we can filter committed rows whose key appears in
// pending. Silently dropping the shadow when projection omits
// the key would re-introduce union semantics behind the
// caller's back. Reject up front with a clear error so callers
// either (a) include the key in projection or (b) drop
// `key_column` if union is what they wanted.
if let (Some(key_col), Some(cols)) = (key_column, projection) {
if !cols.iter().any(|c| *c == key_col) {
return Err(OmniError::Lance(format!(
"scan_with_pending: key_column '{}' must appear in projection \
when merge-shadow semantics are requested (got projection = {:?})",
key_col, cols
)));
}
}
let committed = self.scan(committed_ds, projection, filter, None).await?;
if pending_batches.is_empty() {
return Ok(committed);
@ -1191,6 +1209,11 @@ fn collect_string_column_values(
/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
/// rows that pending has already updated. Returns the surviving rows.
///
/// `scan_with_pending` validates up front that the projection contains
/// `column`, so a missing column here is a programmer error — error
/// loudly instead of silently passing batches through (which would
/// re-introduce the union semantics the caller asked us to avoid).
fn filter_out_rows_where_string_in(
batches: Vec<RecordBatch>,
column: &str,
@ -1203,12 +1226,13 @@ fn filter_out_rows_where_string_in(
out.push(batch);
continue;
}
let Some(col) = batch.column_by_name(column) else {
// The committed scan didn't project this column. We cannot
// shadow without it; pass the batch through unchanged.
out.push(batch);
continue;
};
let col = batch.column_by_name(column).ok_or_else(|| {
OmniError::manifest_internal(format!(
"scan_with_pending: committed batch missing key column '{}' \
(the up-front projection check should have rejected this)",
column
))
})?;
let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
OmniError::Lance(format!(
"scan_with_pending: committed column '{}' is not Utf8",

View file

@ -1001,7 +1001,15 @@ query cascade_then_explicit($name: String, $other: String) {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// TEST_DATA seeds three Knows edges:
// Alice → Bob, Alice → Charlie (cascade target — should be deleted by op-1)
// Bob → Diana (explicit target — should be deleted by op-2)
// After both ops, all three edges must be gone. A weaker assertion
// (just "count decreased") would pass even if op-2 silently no-op'd
// — Bob→Diana would survive. The exact-count check makes both ops
// independently observable.
let pre_knows = count_rows(&db, "edge:Knows").await;
assert_eq!(pre_knows, 3, "fixture invariant: TEST_DATA seeds 3 Knows edges");
db.mutate(
"main",
@ -1012,10 +1020,13 @@ query cascade_then_explicit($name: String, $other: String) {
.await
.expect("cascade-then-explicit-delete on same edge table must succeed");
// Both ops landed: cascade removed Alice→Bob and Alice→Charlie;
// explicit removed Bob→Diana. Anything > 0 means one op silently
// did nothing (the bug we're guarding against).
let post_knows = count_rows(&db, "edge:Knows").await;
assert!(
post_knows < pre_knows,
"cascade + explicit delete should remove edges; pre={pre_knows} post={post_knows}",
assert_eq!(
post_knows, 0,
"both cascade + explicit delete must complete (Bob→Diana would survive if op-2 no-op'd)",
);
}
@ -1120,3 +1131,146 @@ edge WorksAt: Person -> Company @card(0..1)
// Confirm there's exactly 1 WorksAt edge after merge.
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
}
/// Cubic P2 (follow-up to PR #68 review of commit 3223b51): a Merge load
/// whose input has TWO rows with the same edge id must be deduped at
/// cardinality-count time, not just at finalize. Without dedup, two
/// pending rows count twice → spurious `@card` violation. With dedup
/// (last-occurrence-wins, mirroring `dedupe_merge_batches_by_id`), the
/// pending side counts once.
///
/// This is a separate path from `load_merge_mode_dedupes_edge_for_cardinality_count`
/// (which dedupes committed-vs-pending). Here we verify pending-vs-pending
/// dedup.
#[tokio::test]
async fn load_merge_mode_dedupes_within_pending_for_cardinality_count() {
use omnigraph::loader::{LoadMode, load_jsonl};
const CARD_SCHEMA: &str = r#"
node Person {
name: String @key
}
node Company {
name: String @key
}
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Bigco"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap();
// Merge load with the SAME edge id twice — the second row supersedes
// the first in the finalize-time dedupe. If pending-counting doesn't
// dedupe, Alice has 2 pending edges → @card(0..1) trips → load
// fails. With dedupe, Alice has 1 → load succeeds.
let dup_data = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme", "data": {"id": "w1"}}
{"edge": "WorksAt", "from": "Alice", "to": "Bigco", "data": {"id": "w1"}}
"#;
load_jsonl(&mut db, dup_data, LoadMode::Merge)
.await
.expect("Merge load with within-input dup ids must dedupe pending count");
// Exactly one WorksAt edge after the dedup; the second row wins
// (last-occurrence) so dst should be Bigco.
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
}
/// Cubic P2 (follow-up): `scan_with_pending` must reject a call where
/// `key_column` is requested but the projection omits that column.
/// Without the up-front check, the helper silently degraded to union
/// semantics — letting a chained-update bug slip through unnoticed.
/// This test verifies the contract is enforced at the API boundary.
#[tokio::test]
async fn scan_with_pending_rejects_key_column_missing_from_projection() {
use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use omnigraph::table_store::TableStore;
use std::sync::Arc;
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("note", DataType::Utf8, true),
]));
let seed = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b"])) as _,
Arc::new(StringArray::from(vec![Some("seed-a"), Some("seed-b")])) as _,
],
)
.unwrap();
let ds = TableStore::write_dataset(&uri, seed).await.unwrap();
let pending = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a"])) as _,
Arc::new(StringArray::from(vec![Some("pending-a")])) as _,
],
)
.unwrap();
// Bad call: key_column = "id" but projection doesn't include "id".
// Pre-fix this silently disabled merge-shadowing and returned both
// committed "a" and pending "a" rows. Now it must error.
let err = store
.scan_with_pending(
&ds,
std::slice::from_ref(&pending),
None,
Some(&["note"]),
None,
Some("id"),
)
.await
.expect_err("scan_with_pending must reject merge-shadow with missing key in projection");
let msg = err.to_string();
assert!(
msg.contains("key_column 'id'") && msg.contains("must appear in projection"),
"unexpected error: {msg}"
);
// Good call: projection includes the key column. Shadow works:
// pending row 'a' shadows committed 'a', so the result has only
// committed 'b' + pending 'a'.
let batches = store
.scan_with_pending(
&ds,
std::slice::from_ref(&pending),
None,
Some(&["id", "note"]),
None,
Some("id"),
)
.await
.expect("projection containing key_column must succeed");
let mut ids: Vec<String> = Vec::new();
for b in &batches {
let arr = b
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
for i in 0..arr.len() {
ids.push(arr.value(i).to_string());
}
}
ids.sort();
assert_eq!(
ids,
vec!["a", "b"],
"merge-shadow should drop committed 'a' and surface pending 'a' + committed 'b'"
);
}