fix(loader): enforce composite @unique(a, b) as a true composite key

Node/edge composite uniqueness constraints were flattened into a single
list of property names, so @unique(a, b) was enforced as independent
single-field checks @unique(a) AND @unique(b) at intake. Preserve the
constraint grouping and check each group as a composite key, mirroring
the merge-path enforcement. Error messages now name the full composite.

MR-983
This commit is contained in:
Ragnor Comerford 2026-05-31 01:16:33 +00:00
parent 2d5c4b1202
commit 779a6a039e
4 changed files with 132 additions and 54 deletions

View file

@ -904,12 +904,12 @@ impl Omnigraph {
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
crate::loader::validate_value_constraints(&batch, node_type)?;
crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
if !unique_groups.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&batch,
type_name,
&unique_props,
&unique_groups,
)?;
}
let has_key = node_type.key_property().is_some();
@ -945,12 +945,12 @@ impl Omnigraph {
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?;
crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_edge(edge_type);
if !unique_props.is_empty() {
let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type);
if !unique_groups.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&batch,
type_name,
&unique_props,
&unique_groups,
)?;
}
let table_key = format!("edge:{}", type_name);
@ -1093,12 +1093,12 @@ impl Omnigraph {
let node_type = &self.catalog().node_types[type_name];
crate::loader::validate_value_constraints(&updated, node_type)?;
crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
if !unique_groups.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&updated,
type_name,
&unique_props,
&unique_groups,
)?;
}

View file

@ -396,9 +396,9 @@ async fn load_jsonl_reader<R: BufRead>(
let batch = build_node_batch(node_type, rows)?;
validate_value_constraints(&batch, node_type)?;
validate_enum_constraints(&batch, &node_type.properties, type_name)?;
let unique_props = unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
let unique_groups = unique_constraint_groups_for_node(node_type);
if !unique_groups.is_empty() {
enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?;
}
let loaded_count = batch.num_rows();
let table_key = format!("node:{}", type_name);
@ -507,9 +507,9 @@ async fn load_jsonl_reader<R: BufRead>(
let edge_type = &catalog.edge_types[edge_name];
let batch = build_edge_batch(edge_type, rows)?;
validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
let unique_props = unique_property_names_for_edge(edge_type);
if !unique_props.is_empty() {
enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
let unique_groups = unique_constraint_groups_for_edge(edge_type);
if !unique_groups.is_empty() {
enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?;
}
let loaded_count = batch.num_rows();
let table_key = format!("edge:{}", edge_name);
@ -1422,8 +1422,16 @@ pub(crate) fn validate_enum_constraints(
Ok(())
}
/// Detect duplicate values within a single `RecordBatch` for any of the named
/// `unique_properties`. Returns an error on the first duplicate found.
/// Detect duplicate values within a single `RecordBatch` for any of the
/// `unique_constraints` groups. Each group is a list of one or more columns
/// that together form a uniqueness key: a violation occurs when two rows share
/// the same tuple of values across *all* columns in a group, so a composite
/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an
/// error on the first duplicate found.
///
/// Rows where any column in a group is null are exempt (standard SQL semantics
/// for uniqueness over nullable columns), as is any group whose columns are
/// not all present in the batch (e.g. a partial-schema load).
///
/// Note: this only catches duplicates *within* the batch. Cross-batch
/// uniqueness against already-committed rows is not enforced here — that
@ -1431,22 +1439,41 @@ pub(crate) fn validate_enum_constraints(
pub(crate) fn enforce_unique_constraints_intra_batch(
batch: &RecordBatch,
type_name: &str,
unique_properties: &[String],
unique_constraints: &[Vec<String>],
) -> Result<()> {
for property in unique_properties {
let Some(col_idx) = batch.schema().index_of(property).ok() else {
for columns in unique_constraints {
let Some(col_indices) = columns
.iter()
.map(|name| batch.schema().index_of(name).ok())
.collect::<Option<Vec<usize>>>()
else {
continue;
};
let arr = batch.column(col_idx);
let mut seen: HashMap<String, usize> = HashMap::new();
for row in 0..batch.num_rows() {
let Some(value) = scalar_to_string(arr, row) else {
let mut parts = Vec::with_capacity(col_indices.len());
let mut any_null = false;
for &col_idx in &col_indices {
let Some(value) = scalar_to_string(batch.column(col_idx), row) else {
any_null = true;
break;
};
parts.push(value);
}
if any_null {
continue;
};
}
// Unit separator (U+001F) keeps composite parts unambiguous; it
// cannot appear in the scalar renderings of supported types.
let value = parts.join("\u{1f}");
if let Some(prev_row) = seen.insert(value.clone(), row) {
return Err(OmniError::manifest(format!(
"@unique violation on {}.{}: value '{}' appears in rows {} and {}",
type_name, property, value, prev_row, row
type_name,
format_unique_columns(columns),
value,
prev_row,
row
)));
}
}
@ -1454,6 +1481,15 @@ pub(crate) fn enforce_unique_constraints_intra_batch(
Ok(())
}
/// Render a unique constraint's columns for error messages: a single column
/// as `col`, a composite as `(a, b)`.
fn format_unique_columns(columns: &[String]) -> String {
match columns {
[single] => single.clone(),
_ => format!("({})", columns.join(", ")),
}
}
/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
/// uniqueness comparison. Returns `None` for null values (nulls are exempt
/// from uniqueness in standard SQL semantics).
@ -1495,39 +1531,30 @@ fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
None
}
/// Build the flat list of property names that must be checked for uniqueness
/// on a node type. Includes both `@unique` properties (from
/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
pub(crate) fn unique_property_names_for_node(
/// Build the list of uniqueness constraint groups to enforce on a node type.
/// Each group is the column tuple of one constraint. Includes every
/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the
/// `@key` (which implies uniqueness over its column tuple). Grouping is
/// preserved so a composite `@unique(a, b)` is enforced as a composite key
/// rather than degraded into independent single-field checks.
pub(crate) fn unique_constraint_groups_for_node(
node_type: &omnigraph_compiler::catalog::NodeType,
) -> Vec<String> {
let mut props: Vec<String> = node_type
.unique_constraints
.iter()
.flatten()
.cloned()
.collect();
if let Some(key) = &node_type.key {
props.extend(key.iter().cloned());
) -> Vec<Vec<String>> {
let mut groups: Vec<Vec<String>> = node_type.unique_constraints.clone();
if let Some(key) = &node_type.key
&& !groups.contains(key)
{
groups.push(key.clone());
}
props.sort();
props.dedup();
props
groups
}
/// Same as [`unique_property_names_for_node`] but for an edge type.
pub(crate) fn unique_property_names_for_edge(
/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges
/// have no `@key`).
pub(crate) fn unique_constraint_groups_for_edge(
edge_type: &omnigraph_compiler::catalog::EdgeType,
) -> Vec<String> {
let mut props: Vec<String> = edge_type
.unique_constraints
.iter()
.flatten()
.cloned()
.collect();
props.sort();
props.dedup();
props
) -> Vec<Vec<String>> {
edge_type.unique_constraints.clone()
}
fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {

View file

@ -690,7 +690,7 @@ impl TableStore {
// before the FirstSeen setter has a chance to silently collapse
// anything):
// - Load path: `enforce_unique_constraints_intra_batch`
// (`loader/mod.rs:1453`) errors on intra-batch `@key` dups.
// (`loader/mod.rs:1471`) errors on intra-batch `@key` dups.
// - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`)
// accumulates and dedupes by `id`.
// - Branch-merge path: `compute_source_delta` /

View file

@ -188,7 +188,7 @@ node Thing {
///
/// Defense in depth:
/// 1. The loader's `enforce_unique_constraints_intra_batch`
/// (`loader/mod.rs:1453`), invoked unconditionally on any node type
/// (`loader/mod.rs:1471`), invoked unconditionally on any node type
/// with a `@key`, errors on intra-batch duplicate `@key` values at
/// intake — pinned by this test across every `LoadMode`.
/// 2. The `check_batch_unique_by_keys` precondition at the top of
@ -229,6 +229,57 @@ node Thing {
}
}
/// Regression for MR-983: a node-level composite `@unique(a, b)` must be
/// enforced as a true composite key, not degraded into independent
/// single-field checks. Pre-fix, `unique_property_names_for_node` flattened
/// every constraint group into one property list, so `@unique(source,
/// external_id)` was enforced as `@unique(source)` *and* `@unique(external_id)`
/// — rejecting rows that were unique on the composite key and naming only the
/// first field in the error.
#[tokio::test]
async fn loader_enforces_composite_unique_as_composite_key() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = r#"
node ExternalID {
slug: String @key
source: String @index
external_id: String @index
@unique(source, external_id)
}
"#;
let mut db = Omnigraph::init(uri, schema).await.unwrap();
// Same `source`, different `external_id` → unique on the composite key.
// This is the exact repro from MR-983 and must be accepted.
let composite_ok = r#"{"type":"ExternalID","data":{"slug":"a","source":"whatsapp","external_id":"+E.164"}}
{"type":"ExternalID","data":{"slug":"b","source":"whatsapp","external_id":"pn:12345"}}
"#;
load_jsonl(&mut db, composite_ok, LoadMode::Overwrite)
.await
.expect("rows unique on the composite (source, external_id) must be accepted");
assert_eq!(count_rows(&db, "node:ExternalID").await, 2);
// Both composite columns equal → genuine violation. The error must name
// the whole composite, not just the first field.
let composite_dupe = r#"{"type":"ExternalID","data":{"slug":"c","source":"whatsapp","external_id":"dup"}}
{"type":"ExternalID","data":{"slug":"d","source":"whatsapp","external_id":"dup"}}
"#;
let err = load_jsonl(&mut db, composite_dupe, LoadMode::Overwrite)
.await
.unwrap_err();
let msg = err.to_string();
// Columns are canonicalized to sorted order in the catalog, so the
// message reads `(external_id, source)`; assert order-agnostically that
// both composite columns are named (not just the first, as pre-fix).
assert!(
msg.contains("@unique violation")
&& msg.contains("source")
&& msg.contains("external_id"),
"composite violation must name both columns (got: {msg})"
);
}
/// Canary for the upstream Lance gap that the `FirstSeen` workaround
/// in `table_store.rs` masks. The bug class is "Window 2": load →
/// indices built explicitly → merge → merge. Even with the engine