From b7f5276ab53abd3f7ae5e105a00255ae9a6c2064 Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Jun 2026 17:17:31 +0300 Subject: [PATCH] fix(loader): enforce composite @unique(a, b) as a true composite key (#133) * fix(loader): enforce composite @unique(a, b) as a true composite key Node/edge composite uniqueness constraints were flattened into a single list of property names, so @unique(a, b) was enforced as independent single-field checks @unique(a) AND @unique(b) at intake. Preserve the constraint grouping and check each group as a composite key, mirroring the merge-path enforcement. Error messages now name the full composite. MR-983 * docs: clarify unit-separator comment in composite unique check * docs: fix separator reference in composite unique comment (merge.rs also uses U+001F) * fix(merge): align composite @unique key separator with intake (U+001F) The branch-merge path (update_unique_constraints) joined composite key columns with '|', while intake joins with U+001F. The same @unique(a, b) was keyed two different ways, and '|'-join can raise phantom merge conflicts for values containing '|' (e.g. ('x|y','z') vs ('x','y|z')). Factor the tuple-join into one shared helper (loader::composite_unique_key) so the intake and merge paths cannot drift again. Add branching regression tests for edge @unique(src, dst) on the merge path. Refs MR-983. --------- Co-authored-by: Ragnor Comerford Co-authored-by: Andrew Altshuler --- crates/omnigraph/src/exec/merge.rs | 2 +- crates/omnigraph/src/exec/mutation.rs | 18 ++-- crates/omnigraph/src/loader/mod.rs | 123 +++++++++++++++++--------- crates/omnigraph/src/table_store.rs | 2 +- crates/omnigraph/tests/branching.rs | 101 +++++++++++++++++++++ crates/omnigraph/tests/consistency.rs | 53 ++++++++++- 6 files changed, 244 insertions(+), 55 deletions(-) diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index eb6c4a3..0e6434b 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -697,7 +697,7 @@ fn update_unique_constraints( if any_null { continue; } - let value = parts.join("|"); + let value = crate::loader::composite_unique_key(&parts); let row_id = row_id_at(batch, row)?; if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) { conflicts.push(MergeConflict { diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 985889a..0e7ded7 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -905,12 +905,12 @@ impl Omnigraph { let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; crate::loader::validate_value_constraints(&batch, node_type)?; crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_node(node_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &batch, type_name, - &unique_props, + &unique_groups, )?; } let has_key = node_type.key_property().is_some(); @@ -946,12 +946,12 @@ impl Omnigraph { let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?; crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_edge(edge_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &batch, type_name, - &unique_props, + &unique_groups, )?; } let table_key = format!("edge:{}", type_name); @@ -1094,12 +1094,12 @@ impl Omnigraph { let node_type = &self.catalog().node_types[type_name]; crate::loader::validate_value_constraints(&updated, node_type)?; crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_node(node_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &updated, type_name, - &unique_props, + &unique_groups, )?; } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index d5d74c0..9a80b39 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -399,9 +399,9 @@ async fn load_jsonl_reader( let batch = build_node_batch(node_type, rows)?; validate_value_constraints(&batch, node_type)?; validate_enum_constraints(&batch, &node_type.properties, type_name)?; - let unique_props = unique_property_names_for_node(node_type); - if !unique_props.is_empty() { - enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?; + let unique_groups = unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { + enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?; } let loaded_count = batch.num_rows(); let table_key = format!("node:{}", type_name); @@ -510,9 +510,9 @@ async fn load_jsonl_reader( let edge_type = &catalog.edge_types[edge_name]; let batch = build_edge_batch(edge_type, rows)?; validate_enum_constraints(&batch, &edge_type.properties, edge_name)?; - let unique_props = unique_property_names_for_edge(edge_type); - if !unique_props.is_empty() { - enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?; + let unique_groups = unique_constraint_groups_for_edge(edge_type); + if !unique_groups.is_empty() { + enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?; } let loaded_count = batch.num_rows(); let table_key = format!("edge:{}", edge_name); @@ -1425,8 +1425,16 @@ pub(crate) fn validate_enum_constraints( Ok(()) } -/// Detect duplicate values within a single `RecordBatch` for any of the named -/// `unique_properties`. Returns an error on the first duplicate found. +/// Detect duplicate values within a single `RecordBatch` for any of the +/// `unique_constraints` groups. Each group is a list of one or more columns +/// that together form a uniqueness key: a violation occurs when two rows share +/// the same tuple of values across *all* columns in a group, so a composite +/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an +/// error on the first duplicate found. +/// +/// Rows where any column in a group is null are exempt (standard SQL semantics +/// for uniqueness over nullable columns), as is any group whose columns are +/// not all present in the batch (e.g. a partial-schema load). /// /// Note: this only catches duplicates *within* the batch. Cross-batch /// uniqueness against already-committed rows is not enforced here — that @@ -1434,22 +1442,39 @@ pub(crate) fn validate_enum_constraints( pub(crate) fn enforce_unique_constraints_intra_batch( batch: &RecordBatch, type_name: &str, - unique_properties: &[String], + unique_constraints: &[Vec], ) -> Result<()> { - for property in unique_properties { - let Some(col_idx) = batch.schema().index_of(property).ok() else { + for columns in unique_constraints { + let Some(col_indices) = columns + .iter() + .map(|name| batch.schema().index_of(name).ok()) + .collect::>>() + else { continue; }; - let arr = batch.column(col_idx); let mut seen: HashMap = HashMap::new(); for row in 0..batch.num_rows() { - let Some(value) = scalar_to_string(arr, row) else { + let mut parts = Vec::with_capacity(col_indices.len()); + let mut any_null = false; + for &col_idx in &col_indices { + let Some(value) = scalar_to_string(batch.column(col_idx), row) else { + any_null = true; + break; + }; + parts.push(value); + } + if any_null { continue; - }; + } + let value = composite_unique_key(&parts); if let Some(prev_row) = seen.insert(value.clone(), row) { return Err(OmniError::manifest(format!( "@unique violation on {}.{}: value '{}' appears in rows {} and {}", - type_name, property, value, prev_row, row + type_name, + format_unique_columns(columns), + value, + prev_row, + row ))); } } @@ -1457,6 +1482,27 @@ pub(crate) fn enforce_unique_constraints_intra_batch( Ok(()) } +/// Join one row's rendered, non-null column values into a single composite +/// uniqueness key. The separator is the unit separator (U+001F) — a control +/// char highly unlikely to occur in real data, so distinct tuples like +/// `("a|b", "c")` and `("a", "b|c")` stay distinct rather than colliding. +/// +/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and +/// the branch-merge path (`exec/merge.rs::update_unique_constraints`) so the +/// two cannot silently drift to incompatible keyings. +pub(crate) fn composite_unique_key(parts: &[String]) -> String { + parts.join("\u{1f}") +} + +/// Render a unique constraint's columns for error messages: a single column +/// as `col`, a composite as `(a, b)`. +fn format_unique_columns(columns: &[String]) -> String { + match columns { + [single] => single.clone(), + _ => format!("({})", columns.join(", ")), + } +} + /// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for /// uniqueness comparison. Returns `None` for null values (nulls are exempt /// from uniqueness in standard SQL semantics). @@ -1498,39 +1544,30 @@ fn scalar_to_string(array: &ArrayRef, row: usize) -> Option { None } -/// Build the flat list of property names that must be checked for uniqueness -/// on a node type. Includes both `@unique` properties (from -/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness). -pub(crate) fn unique_property_names_for_node( +/// Build the list of uniqueness constraint groups to enforce on a node type. +/// Each group is the column tuple of one constraint. Includes every +/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the +/// `@key` (which implies uniqueness over its column tuple). Grouping is +/// preserved so a composite `@unique(a, b)` is enforced as a composite key +/// rather than degraded into independent single-field checks. +pub(crate) fn unique_constraint_groups_for_node( node_type: &omnigraph_compiler::catalog::NodeType, -) -> Vec { - let mut props: Vec = node_type - .unique_constraints - .iter() - .flatten() - .cloned() - .collect(); - if let Some(key) = &node_type.key { - props.extend(key.iter().cloned()); +) -> Vec> { + let mut groups: Vec> = node_type.unique_constraints.clone(); + if let Some(key) = &node_type.key + && !groups.contains(key) + { + groups.push(key.clone()); } - props.sort(); - props.dedup(); - props + groups } -/// Same as [`unique_property_names_for_node`] but for an edge type. -pub(crate) fn unique_property_names_for_edge( +/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges +/// have no `@key`). +pub(crate) fn unique_constraint_groups_for_edge( edge_type: &omnigraph_compiler::catalog::EdgeType, -) -> Vec { - let mut props: Vec = edge_type - .unique_constraints - .iter() - .flatten() - .cloned() - .collect(); - props.sort(); - props.dedup(); - props +) -> Vec> { + edge_type.unique_constraints.clone() } fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option { diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 10123b0..4b52db6 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -732,7 +732,7 @@ impl TableStore { // before the FirstSeen setter has a chance to silently collapse // anything): // - Load path: `enforce_unique_constraints_intra_batch` - // (`loader/mod.rs:1453`) errors on intra-batch `@key` dups. + // (`loader/mod.rs:1471`) errors on intra-batch `@key` dups. // - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`) // accumulates and dedupes by `id`. // - Branch-merge path: `compute_source_delta` / diff --git a/crates/omnigraph/tests/branching.rs b/crates/omnigraph/tests/branching.rs index 5a0c47d..108702c 100644 --- a/crates/omnigraph/tests/branching.rs +++ b/crates/omnigraph/tests/branching.rs @@ -39,6 +39,26 @@ query insert_user($name: String, $email: String) { } "#; +const EDGE_UNIQUE_SCHEMA: &str = r#" +node Person { + name: String @key +} + +edge Knows: Person -> Person { + @unique(src, dst) +} +"#; + +const EDGE_UNIQUE_DATA: &str = r#"{"type":"Person","data":{"name":"Alice"}} +{"type":"Person","data":{"name":"Bob"}} +{"type":"Person","data":{"name":"Carol"}}"#; + +const EDGE_UNIQUE_MUTATIONS: &str = r#" +query add_knows($from: String, $to: String) { + insert Knows { from: $from, to: $to } +} +"#; + const CARDINALITY_SCHEMA: &str = r#" node Person { name: String @key @@ -1119,6 +1139,87 @@ async fn branch_merge_reports_unique_violation_conflict() { } } +/// Regression for the MR-983 follow-up: the branch-merge path must enforce an +/// edge composite `@unique(src, dst)` as a true composite key, consistent with +/// the intake path. Two branches inserting the *same* (src, dst) pair must +/// conflict on merge. +#[tokio::test] +async fn branch_merge_reports_composite_unique_violation_conflict() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await; + main.branch_create("feature").await.unwrap(); + + let mut feature = Omnigraph::open(uri).await.unwrap(); + + mutate_main( + &mut main, + EDGE_UNIQUE_MUTATIONS, + "add_knows", + ¶ms(&[("$from", "Alice"), ("$to", "Bob")]), + ) + .await + .unwrap(); + + mutate_branch( + &mut feature, + "feature", + EDGE_UNIQUE_MUTATIONS, + "add_knows", + ¶ms(&[("$from", "Alice"), ("$to", "Bob")]), + ) + .await + .unwrap(); + + let err = main.branch_merge("feature", "main").await.unwrap_err(); + match err { + OmniError::MergeConflicts(conflicts) => { + assert!(conflicts.iter().any(|conflict| { + conflict.table_key == "edge:Knows" + && conflict.kind == MergeConflictKind::UniqueViolation + })); + } + other => panic!("expected merge conflicts, got {other:?}"), + } +} + +/// Sibling to the above: pairs sharing `src` but differing on `dst` are unique +/// on the (src, dst) tuple and must merge cleanly. Guards against the composite +/// degrading back into a single-field `@unique(src)` on the merge path. +#[tokio::test] +async fn branch_merge_allows_distinct_composite_unique_pairs() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await; + main.branch_create("feature").await.unwrap(); + + let mut feature = Omnigraph::open(uri).await.unwrap(); + + mutate_main( + &mut main, + EDGE_UNIQUE_MUTATIONS, + "add_knows", + ¶ms(&[("$from", "Alice"), ("$to", "Bob")]), + ) + .await + .unwrap(); + + mutate_branch( + &mut feature, + "feature", + EDGE_UNIQUE_MUTATIONS, + "add_knows", + ¶ms(&[("$from", "Alice"), ("$to", "Carol")]), + ) + .await + .unwrap(); + + main.branch_merge("feature", "main") + .await + .expect("distinct (src, dst) pairs are unique on the composite and must merge cleanly"); + assert_eq!(count_rows(&main, "edge:Knows").await, 2); +} + #[tokio::test] async fn branch_merge_reports_cardinality_violation_conflict() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/tests/consistency.rs b/crates/omnigraph/tests/consistency.rs index 26517db..729f2e8 100644 --- a/crates/omnigraph/tests/consistency.rs +++ b/crates/omnigraph/tests/consistency.rs @@ -188,7 +188,7 @@ node Thing { /// /// Defense in depth: /// 1. The loader's `enforce_unique_constraints_intra_batch` -/// (`loader/mod.rs:1453`), invoked unconditionally on any node type +/// (`loader/mod.rs:1471`), invoked unconditionally on any node type /// with a `@key`, errors on intra-batch duplicate `@key` values at /// intake — pinned by this test across every `LoadMode`. /// 2. The `check_batch_unique_by_keys` precondition at the top of @@ -229,6 +229,57 @@ node Thing { } } +/// Regression for MR-983: a node-level composite `@unique(a, b)` must be +/// enforced as a true composite key, not degraded into independent +/// single-field checks. Pre-fix, `unique_property_names_for_node` flattened +/// every constraint group into one property list, so `@unique(source, +/// external_id)` was enforced as `@unique(source)` *and* `@unique(external_id)` +/// — rejecting rows that were unique on the composite key and naming only the +/// first field in the error. +#[tokio::test] +async fn loader_enforces_composite_unique_as_composite_key() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let schema = r#" +node ExternalID { + slug: String @key + source: String @index + external_id: String @index + @unique(source, external_id) +} +"#; + let mut db = Omnigraph::init(uri, schema).await.unwrap(); + + // Same `source`, different `external_id` → unique on the composite key. + // This is the exact repro from MR-983 and must be accepted. + let composite_ok = r#"{"type":"ExternalID","data":{"slug":"a","source":"whatsapp","external_id":"+E.164"}} +{"type":"ExternalID","data":{"slug":"b","source":"whatsapp","external_id":"pn:12345"}} +"#; + load_jsonl(&mut db, composite_ok, LoadMode::Overwrite) + .await + .expect("rows unique on the composite (source, external_id) must be accepted"); + assert_eq!(count_rows(&db, "node:ExternalID").await, 2); + + // Both composite columns equal → genuine violation. The error must name + // the whole composite, not just the first field. + let composite_dupe = r#"{"type":"ExternalID","data":{"slug":"c","source":"whatsapp","external_id":"dup"}} +{"type":"ExternalID","data":{"slug":"d","source":"whatsapp","external_id":"dup"}} +"#; + let err = load_jsonl(&mut db, composite_dupe, LoadMode::Overwrite) + .await + .unwrap_err(); + let msg = err.to_string(); + // Columns are canonicalized to sorted order in the catalog, so the + // message reads `(external_id, source)`; assert order-agnostically that + // both composite columns are named (not just the first, as pre-fix). + assert!( + msg.contains("@unique violation") + && msg.contains("source") + && msg.contains("external_id"), + "composite violation must name both columns (got: {msg})" + ); +} + /// Canary for the upstream Lance gap that the `FirstSeen` workaround /// in `table_store.rs` masks. The bug class is "Window 2": load → /// indices built explicitly → merge → merge. Even with the engine