From 779a6a039e3a2d10af4eeac8f989da57609a4099 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 31 May 2026 01:16:33 +0000 Subject: [PATCH] fix(loader): enforce composite @unique(a, b) as a true composite key Node/edge composite uniqueness constraints were flattened into a single list of property names, so @unique(a, b) was enforced as independent single-field checks @unique(a) AND @unique(b) at intake. Preserve the constraint grouping and check each group as a composite key, mirroring the merge-path enforcement. Error messages now name the full composite. MR-983 --- crates/omnigraph/src/exec/mutation.rs | 18 ++-- crates/omnigraph/src/loader/mod.rs | 113 ++++++++++++++++---------- crates/omnigraph/src/table_store.rs | 2 +- crates/omnigraph/tests/consistency.rs | 53 +++++++++++- 4 files changed, 132 insertions(+), 54 deletions(-) diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 02b2a21..a2679d5 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -904,12 +904,12 @@ impl Omnigraph { let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; crate::loader::validate_value_constraints(&batch, node_type)?; crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_node(node_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &batch, type_name, - &unique_props, + &unique_groups, )?; } let has_key = node_type.key_property().is_some(); @@ -945,12 +945,12 @@ impl Omnigraph { let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?; crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_edge(edge_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &batch, type_name, - &unique_props, + &unique_groups, )?; } let table_key = format!("edge:{}", type_name); @@ -1093,12 +1093,12 @@ impl Omnigraph { let node_type = &self.catalog().node_types[type_name]; crate::loader::validate_value_constraints(&updated, node_type)?; crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_node(node_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &updated, type_name, - &unique_props, + &unique_groups, )?; } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 46a46e2..6152072 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -396,9 +396,9 @@ async fn load_jsonl_reader( let batch = build_node_batch(node_type, rows)?; validate_value_constraints(&batch, node_type)?; validate_enum_constraints(&batch, &node_type.properties, type_name)?; - let unique_props = unique_property_names_for_node(node_type); - if !unique_props.is_empty() { - enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?; + let unique_groups = unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { + enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?; } let loaded_count = batch.num_rows(); let table_key = format!("node:{}", type_name); @@ -507,9 +507,9 @@ async fn load_jsonl_reader( let edge_type = &catalog.edge_types[edge_name]; let batch = build_edge_batch(edge_type, rows)?; validate_enum_constraints(&batch, &edge_type.properties, edge_name)?; - let unique_props = unique_property_names_for_edge(edge_type); - if !unique_props.is_empty() { - enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?; + let unique_groups = unique_constraint_groups_for_edge(edge_type); + if !unique_groups.is_empty() { + enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?; } let loaded_count = batch.num_rows(); let table_key = format!("edge:{}", edge_name); @@ -1422,8 +1422,16 @@ pub(crate) fn validate_enum_constraints( Ok(()) } -/// Detect duplicate values within a single `RecordBatch` for any of the named -/// `unique_properties`. Returns an error on the first duplicate found. +/// Detect duplicate values within a single `RecordBatch` for any of the +/// `unique_constraints` groups. Each group is a list of one or more columns +/// that together form a uniqueness key: a violation occurs when two rows share +/// the same tuple of values across *all* columns in a group, so a composite +/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an +/// error on the first duplicate found. +/// +/// Rows where any column in a group is null are exempt (standard SQL semantics +/// for uniqueness over nullable columns), as is any group whose columns are +/// not all present in the batch (e.g. a partial-schema load). /// /// Note: this only catches duplicates *within* the batch. Cross-batch /// uniqueness against already-committed rows is not enforced here — that @@ -1431,22 +1439,41 @@ pub(crate) fn validate_enum_constraints( pub(crate) fn enforce_unique_constraints_intra_batch( batch: &RecordBatch, type_name: &str, - unique_properties: &[String], + unique_constraints: &[Vec], ) -> Result<()> { - for property in unique_properties { - let Some(col_idx) = batch.schema().index_of(property).ok() else { + for columns in unique_constraints { + let Some(col_indices) = columns + .iter() + .map(|name| batch.schema().index_of(name).ok()) + .collect::>>() + else { continue; }; - let arr = batch.column(col_idx); let mut seen: HashMap = HashMap::new(); for row in 0..batch.num_rows() { - let Some(value) = scalar_to_string(arr, row) else { + let mut parts = Vec::with_capacity(col_indices.len()); + let mut any_null = false; + for &col_idx in &col_indices { + let Some(value) = scalar_to_string(batch.column(col_idx), row) else { + any_null = true; + break; + }; + parts.push(value); + } + if any_null { continue; - }; + } + // Unit separator (U+001F) keeps composite parts unambiguous; it + // cannot appear in the scalar renderings of supported types. + let value = parts.join("\u{1f}"); if let Some(prev_row) = seen.insert(value.clone(), row) { return Err(OmniError::manifest(format!( "@unique violation on {}.{}: value '{}' appears in rows {} and {}", - type_name, property, value, prev_row, row + type_name, + format_unique_columns(columns), + value, + prev_row, + row ))); } } @@ -1454,6 +1481,15 @@ pub(crate) fn enforce_unique_constraints_intra_batch( Ok(()) } +/// Render a unique constraint's columns for error messages: a single column +/// as `col`, a composite as `(a, b)`. +fn format_unique_columns(columns: &[String]) -> String { + match columns { + [single] => single.clone(), + _ => format!("({})", columns.join(", ")), + } +} + /// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for /// uniqueness comparison. Returns `None` for null values (nulls are exempt /// from uniqueness in standard SQL semantics). @@ -1495,39 +1531,30 @@ fn scalar_to_string(array: &ArrayRef, row: usize) -> Option { None } -/// Build the flat list of property names that must be checked for uniqueness -/// on a node type. Includes both `@unique` properties (from -/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness). -pub(crate) fn unique_property_names_for_node( +/// Build the list of uniqueness constraint groups to enforce on a node type. +/// Each group is the column tuple of one constraint. Includes every +/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the +/// `@key` (which implies uniqueness over its column tuple). Grouping is +/// preserved so a composite `@unique(a, b)` is enforced as a composite key +/// rather than degraded into independent single-field checks. +pub(crate) fn unique_constraint_groups_for_node( node_type: &omnigraph_compiler::catalog::NodeType, -) -> Vec { - let mut props: Vec = node_type - .unique_constraints - .iter() - .flatten() - .cloned() - .collect(); - if let Some(key) = &node_type.key { - props.extend(key.iter().cloned()); +) -> Vec> { + let mut groups: Vec> = node_type.unique_constraints.clone(); + if let Some(key) = &node_type.key + && !groups.contains(key) + { + groups.push(key.clone()); } - props.sort(); - props.dedup(); - props + groups } -/// Same as [`unique_property_names_for_node`] but for an edge type. -pub(crate) fn unique_property_names_for_edge( +/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges +/// have no `@key`). +pub(crate) fn unique_constraint_groups_for_edge( edge_type: &omnigraph_compiler::catalog::EdgeType, -) -> Vec { - let mut props: Vec = edge_type - .unique_constraints - .iter() - .flatten() - .cloned() - .collect(); - props.sort(); - props.dedup(); - props +) -> Vec> { + edge_type.unique_constraints.clone() } fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option { diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 46b15b0..7643cbc 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -690,7 +690,7 @@ impl TableStore { // before the FirstSeen setter has a chance to silently collapse // anything): // - Load path: `enforce_unique_constraints_intra_batch` - // (`loader/mod.rs:1453`) errors on intra-batch `@key` dups. + // (`loader/mod.rs:1471`) errors on intra-batch `@key` dups. // - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`) // accumulates and dedupes by `id`. // - Branch-merge path: `compute_source_delta` / diff --git a/crates/omnigraph/tests/consistency.rs b/crates/omnigraph/tests/consistency.rs index 26517db..729f2e8 100644 --- a/crates/omnigraph/tests/consistency.rs +++ b/crates/omnigraph/tests/consistency.rs @@ -188,7 +188,7 @@ node Thing { /// /// Defense in depth: /// 1. The loader's `enforce_unique_constraints_intra_batch` -/// (`loader/mod.rs:1453`), invoked unconditionally on any node type +/// (`loader/mod.rs:1471`), invoked unconditionally on any node type /// with a `@key`, errors on intra-batch duplicate `@key` values at /// intake — pinned by this test across every `LoadMode`. /// 2. The `check_batch_unique_by_keys` precondition at the top of @@ -229,6 +229,57 @@ node Thing { } } +/// Regression for MR-983: a node-level composite `@unique(a, b)` must be +/// enforced as a true composite key, not degraded into independent +/// single-field checks. Pre-fix, `unique_property_names_for_node` flattened +/// every constraint group into one property list, so `@unique(source, +/// external_id)` was enforced as `@unique(source)` *and* `@unique(external_id)` +/// — rejecting rows that were unique on the composite key and naming only the +/// first field in the error. +#[tokio::test] +async fn loader_enforces_composite_unique_as_composite_key() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let schema = r#" +node ExternalID { + slug: String @key + source: String @index + external_id: String @index + @unique(source, external_id) +} +"#; + let mut db = Omnigraph::init(uri, schema).await.unwrap(); + + // Same `source`, different `external_id` → unique on the composite key. + // This is the exact repro from MR-983 and must be accepted. + let composite_ok = r#"{"type":"ExternalID","data":{"slug":"a","source":"whatsapp","external_id":"+E.164"}} +{"type":"ExternalID","data":{"slug":"b","source":"whatsapp","external_id":"pn:12345"}} +"#; + load_jsonl(&mut db, composite_ok, LoadMode::Overwrite) + .await + .expect("rows unique on the composite (source, external_id) must be accepted"); + assert_eq!(count_rows(&db, "node:ExternalID").await, 2); + + // Both composite columns equal → genuine violation. The error must name + // the whole composite, not just the first field. + let composite_dupe = r#"{"type":"ExternalID","data":{"slug":"c","source":"whatsapp","external_id":"dup"}} +{"type":"ExternalID","data":{"slug":"d","source":"whatsapp","external_id":"dup"}} +"#; + let err = load_jsonl(&mut db, composite_dupe, LoadMode::Overwrite) + .await + .unwrap_err(); + let msg = err.to_string(); + // Columns are canonicalized to sorted order in the catalog, so the + // message reads `(external_id, source)`; assert order-agnostically that + // both composite columns are named (not just the first, as pre-fix). + assert!( + msg.contains("@unique violation") + && msg.contains("source") + && msg.contains("external_id"), + "composite violation must name both columns (got: {msg})" + ); +} + /// Canary for the upstream Lance gap that the `FirstSeen` workaround /// in `table_store.rs` masks. The bug class is "Window 2": load → /// indices built explicitly → merge → merge. Even with the engine