mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
fix(loader): enforce composite @unique(a, b) as a true composite key (#133)
* fix(loader): enforce composite @unique(a, b) as a true composite key
Node/edge composite uniqueness constraints were flattened into a single
list of property names, so @unique(a, b) was enforced as independent
single-field checks @unique(a) AND @unique(b) at intake. Preserve the
constraint grouping and check each group as a composite key, mirroring
the merge-path enforcement. Error messages now name the full composite.
MR-983
* docs: clarify unit-separator comment in composite unique check
* docs: fix separator reference in composite unique comment (merge.rs also uses U+001F)
* fix(merge): align composite @unique key separator with intake (U+001F)
The branch-merge path (update_unique_constraints) joined composite key
columns with '|', while intake joins with U+001F. The same @unique(a, b)
was keyed two different ways, and '|'-join can raise phantom merge
conflicts for values containing '|' (e.g. ('x|y','z') vs ('x','y|z')).
Factor the tuple-join into one shared helper (loader::composite_unique_key)
so the intake and merge paths cannot drift again. Add branching regression
tests for edge @unique(src, dst) on the merge path.
Refs MR-983.
---------
Co-authored-by: Ragnor Comerford <ragnor.comerford@gmail.com>
Co-authored-by: Andrew Altshuler <andrew@collectivelab.io>
This commit is contained in:
parent
131b78705d
commit
b7f5276ab5
6 changed files with 244 additions and 55 deletions
|
|
@ -697,7 +697,7 @@ fn update_unique_constraints(
|
|||
if any_null {
|
||||
continue;
|
||||
}
|
||||
let value = parts.join("|");
|
||||
let value = crate::loader::composite_unique_key(&parts);
|
||||
let row_id = row_id_at(batch, row)?;
|
||||
if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
|
||||
conflicts.push(MergeConflict {
|
||||
|
|
|
|||
|
|
@ -905,12 +905,12 @@ impl Omnigraph {
|
|||
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
|
||||
crate::loader::validate_value_constraints(&batch, node_type)?;
|
||||
crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?;
|
||||
let unique_props = crate::loader::unique_property_names_for_node(node_type);
|
||||
if !unique_props.is_empty() {
|
||||
let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
|
||||
if !unique_groups.is_empty() {
|
||||
crate::loader::enforce_unique_constraints_intra_batch(
|
||||
&batch,
|
||||
type_name,
|
||||
&unique_props,
|
||||
&unique_groups,
|
||||
)?;
|
||||
}
|
||||
let has_key = node_type.key_property().is_some();
|
||||
|
|
@ -946,12 +946,12 @@ impl Omnigraph {
|
|||
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
|
||||
validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?;
|
||||
crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?;
|
||||
let unique_props = crate::loader::unique_property_names_for_edge(edge_type);
|
||||
if !unique_props.is_empty() {
|
||||
let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type);
|
||||
if !unique_groups.is_empty() {
|
||||
crate::loader::enforce_unique_constraints_intra_batch(
|
||||
&batch,
|
||||
type_name,
|
||||
&unique_props,
|
||||
&unique_groups,
|
||||
)?;
|
||||
}
|
||||
let table_key = format!("edge:{}", type_name);
|
||||
|
|
@ -1094,12 +1094,12 @@ impl Omnigraph {
|
|||
let node_type = &self.catalog().node_types[type_name];
|
||||
crate::loader::validate_value_constraints(&updated, node_type)?;
|
||||
crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?;
|
||||
let unique_props = crate::loader::unique_property_names_for_node(node_type);
|
||||
if !unique_props.is_empty() {
|
||||
let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
|
||||
if !unique_groups.is_empty() {
|
||||
crate::loader::enforce_unique_constraints_intra_batch(
|
||||
&updated,
|
||||
type_name,
|
||||
&unique_props,
|
||||
&unique_groups,
|
||||
)?;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -399,9 +399,9 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
let batch = build_node_batch(node_type, rows)?;
|
||||
validate_value_constraints(&batch, node_type)?;
|
||||
validate_enum_constraints(&batch, &node_type.properties, type_name)?;
|
||||
let unique_props = unique_property_names_for_node(node_type);
|
||||
if !unique_props.is_empty() {
|
||||
enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
|
||||
let unique_groups = unique_constraint_groups_for_node(node_type);
|
||||
if !unique_groups.is_empty() {
|
||||
enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?;
|
||||
}
|
||||
let loaded_count = batch.num_rows();
|
||||
let table_key = format!("node:{}", type_name);
|
||||
|
|
@ -510,9 +510,9 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
let edge_type = &catalog.edge_types[edge_name];
|
||||
let batch = build_edge_batch(edge_type, rows)?;
|
||||
validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
|
||||
let unique_props = unique_property_names_for_edge(edge_type);
|
||||
if !unique_props.is_empty() {
|
||||
enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
|
||||
let unique_groups = unique_constraint_groups_for_edge(edge_type);
|
||||
if !unique_groups.is_empty() {
|
||||
enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?;
|
||||
}
|
||||
let loaded_count = batch.num_rows();
|
||||
let table_key = format!("edge:{}", edge_name);
|
||||
|
|
@ -1425,8 +1425,16 @@ pub(crate) fn validate_enum_constraints(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Detect duplicate values within a single `RecordBatch` for any of the named
|
||||
/// `unique_properties`. Returns an error on the first duplicate found.
|
||||
/// Detect duplicate values within a single `RecordBatch` for any of the
|
||||
/// `unique_constraints` groups. Each group is a list of one or more columns
|
||||
/// that together form a uniqueness key: a violation occurs when two rows share
|
||||
/// the same tuple of values across *all* columns in a group, so a composite
|
||||
/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an
|
||||
/// error on the first duplicate found.
|
||||
///
|
||||
/// Rows where any column in a group is null are exempt (standard SQL semantics
|
||||
/// for uniqueness over nullable columns), as is any group whose columns are
|
||||
/// not all present in the batch (e.g. a partial-schema load).
|
||||
///
|
||||
/// Note: this only catches duplicates *within* the batch. Cross-batch
|
||||
/// uniqueness against already-committed rows is not enforced here — that
|
||||
|
|
@ -1434,22 +1442,39 @@ pub(crate) fn validate_enum_constraints(
|
|||
pub(crate) fn enforce_unique_constraints_intra_batch(
|
||||
batch: &RecordBatch,
|
||||
type_name: &str,
|
||||
unique_properties: &[String],
|
||||
unique_constraints: &[Vec<String>],
|
||||
) -> Result<()> {
|
||||
for property in unique_properties {
|
||||
let Some(col_idx) = batch.schema().index_of(property).ok() else {
|
||||
for columns in unique_constraints {
|
||||
let Some(col_indices) = columns
|
||||
.iter()
|
||||
.map(|name| batch.schema().index_of(name).ok())
|
||||
.collect::<Option<Vec<usize>>>()
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let arr = batch.column(col_idx);
|
||||
let mut seen: HashMap<String, usize> = HashMap::new();
|
||||
for row in 0..batch.num_rows() {
|
||||
let Some(value) = scalar_to_string(arr, row) else {
|
||||
let mut parts = Vec::with_capacity(col_indices.len());
|
||||
let mut any_null = false;
|
||||
for &col_idx in &col_indices {
|
||||
let Some(value) = scalar_to_string(batch.column(col_idx), row) else {
|
||||
any_null = true;
|
||||
break;
|
||||
};
|
||||
parts.push(value);
|
||||
}
|
||||
if any_null {
|
||||
continue;
|
||||
};
|
||||
}
|
||||
let value = composite_unique_key(&parts);
|
||||
if let Some(prev_row) = seen.insert(value.clone(), row) {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"@unique violation on {}.{}: value '{}' appears in rows {} and {}",
|
||||
type_name, property, value, prev_row, row
|
||||
type_name,
|
||||
format_unique_columns(columns),
|
||||
value,
|
||||
prev_row,
|
||||
row
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
|
@ -1457,6 +1482,27 @@ pub(crate) fn enforce_unique_constraints_intra_batch(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Join one row's rendered, non-null column values into a single composite
|
||||
/// uniqueness key. The separator is the unit separator (U+001F) — a control
|
||||
/// char highly unlikely to occur in real data, so distinct tuples like
|
||||
/// `("a|b", "c")` and `("a", "b|c")` stay distinct rather than colliding.
|
||||
///
|
||||
/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and
|
||||
/// the branch-merge path (`exec/merge.rs::update_unique_constraints`) so the
|
||||
/// two cannot silently drift to incompatible keyings.
|
||||
pub(crate) fn composite_unique_key(parts: &[String]) -> String {
|
||||
parts.join("\u{1f}")
|
||||
}
|
||||
|
||||
/// Render a unique constraint's columns for error messages: a single column
|
||||
/// as `col`, a composite as `(a, b)`.
|
||||
fn format_unique_columns(columns: &[String]) -> String {
|
||||
match columns {
|
||||
[single] => single.clone(),
|
||||
_ => format!("({})", columns.join(", ")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
|
||||
/// uniqueness comparison. Returns `None` for null values (nulls are exempt
|
||||
/// from uniqueness in standard SQL semantics).
|
||||
|
|
@ -1498,39 +1544,30 @@ fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
|
|||
None
|
||||
}
|
||||
|
||||
/// Build the flat list of property names that must be checked for uniqueness
|
||||
/// on a node type. Includes both `@unique` properties (from
|
||||
/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
|
||||
pub(crate) fn unique_property_names_for_node(
|
||||
/// Build the list of uniqueness constraint groups to enforce on a node type.
|
||||
/// Each group is the column tuple of one constraint. Includes every
|
||||
/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the
|
||||
/// `@key` (which implies uniqueness over its column tuple). Grouping is
|
||||
/// preserved so a composite `@unique(a, b)` is enforced as a composite key
|
||||
/// rather than degraded into independent single-field checks.
|
||||
pub(crate) fn unique_constraint_groups_for_node(
|
||||
node_type: &omnigraph_compiler::catalog::NodeType,
|
||||
) -> Vec<String> {
|
||||
let mut props: Vec<String> = node_type
|
||||
.unique_constraints
|
||||
.iter()
|
||||
.flatten()
|
||||
.cloned()
|
||||
.collect();
|
||||
if let Some(key) = &node_type.key {
|
||||
props.extend(key.iter().cloned());
|
||||
) -> Vec<Vec<String>> {
|
||||
let mut groups: Vec<Vec<String>> = node_type.unique_constraints.clone();
|
||||
if let Some(key) = &node_type.key
|
||||
&& !groups.contains(key)
|
||||
{
|
||||
groups.push(key.clone());
|
||||
}
|
||||
props.sort();
|
||||
props.dedup();
|
||||
props
|
||||
groups
|
||||
}
|
||||
|
||||
/// Same as [`unique_property_names_for_node`] but for an edge type.
|
||||
pub(crate) fn unique_property_names_for_edge(
|
||||
/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges
|
||||
/// have no `@key`).
|
||||
pub(crate) fn unique_constraint_groups_for_edge(
|
||||
edge_type: &omnigraph_compiler::catalog::EdgeType,
|
||||
) -> Vec<String> {
|
||||
let mut props: Vec<String> = edge_type
|
||||
.unique_constraints
|
||||
.iter()
|
||||
.flatten()
|
||||
.cloned()
|
||||
.collect();
|
||||
props.sort();
|
||||
props.dedup();
|
||||
props
|
||||
) -> Vec<Vec<String>> {
|
||||
edge_type.unique_constraints.clone()
|
||||
}
|
||||
|
||||
fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
|
||||
|
|
|
|||
|
|
@ -732,7 +732,7 @@ impl TableStore {
|
|||
// before the FirstSeen setter has a chance to silently collapse
|
||||
// anything):
|
||||
// - Load path: `enforce_unique_constraints_intra_batch`
|
||||
// (`loader/mod.rs:1453`) errors on intra-batch `@key` dups.
|
||||
// (`loader/mod.rs:1471`) errors on intra-batch `@key` dups.
|
||||
// - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`)
|
||||
// accumulates and dedupes by `id`.
|
||||
// - Branch-merge path: `compute_source_delta` /
|
||||
|
|
|
|||
|
|
@ -39,6 +39,26 @@ query insert_user($name: String, $email: String) {
|
|||
}
|
||||
"#;
|
||||
|
||||
const EDGE_UNIQUE_SCHEMA: &str = r#"
|
||||
node Person {
|
||||
name: String @key
|
||||
}
|
||||
|
||||
edge Knows: Person -> Person {
|
||||
@unique(src, dst)
|
||||
}
|
||||
"#;
|
||||
|
||||
const EDGE_UNIQUE_DATA: &str = r#"{"type":"Person","data":{"name":"Alice"}}
|
||||
{"type":"Person","data":{"name":"Bob"}}
|
||||
{"type":"Person","data":{"name":"Carol"}}"#;
|
||||
|
||||
const EDGE_UNIQUE_MUTATIONS: &str = r#"
|
||||
query add_knows($from: String, $to: String) {
|
||||
insert Knows { from: $from, to: $to }
|
||||
}
|
||||
"#;
|
||||
|
||||
const CARDINALITY_SCHEMA: &str = r#"
|
||||
node Person {
|
||||
name: String @key
|
||||
|
|
@ -1119,6 +1139,87 @@ async fn branch_merge_reports_unique_violation_conflict() {
|
|||
}
|
||||
}
|
||||
|
||||
/// Regression for the MR-983 follow-up: the branch-merge path must enforce an
|
||||
/// edge composite `@unique(src, dst)` as a true composite key, consistent with
|
||||
/// the intake path. Two branches inserting the *same* (src, dst) pair must
|
||||
/// conflict on merge.
|
||||
#[tokio::test]
|
||||
async fn branch_merge_reports_composite_unique_violation_conflict() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await;
|
||||
main.branch_create("feature").await.unwrap();
|
||||
|
||||
let mut feature = Omnigraph::open(uri).await.unwrap();
|
||||
|
||||
mutate_main(
|
||||
&mut main,
|
||||
EDGE_UNIQUE_MUTATIONS,
|
||||
"add_knows",
|
||||
¶ms(&[("$from", "Alice"), ("$to", "Bob")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
mutate_branch(
|
||||
&mut feature,
|
||||
"feature",
|
||||
EDGE_UNIQUE_MUTATIONS,
|
||||
"add_knows",
|
||||
¶ms(&[("$from", "Alice"), ("$to", "Bob")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let err = main.branch_merge("feature", "main").await.unwrap_err();
|
||||
match err {
|
||||
OmniError::MergeConflicts(conflicts) => {
|
||||
assert!(conflicts.iter().any(|conflict| {
|
||||
conflict.table_key == "edge:Knows"
|
||||
&& conflict.kind == MergeConflictKind::UniqueViolation
|
||||
}));
|
||||
}
|
||||
other => panic!("expected merge conflicts, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sibling to the above: pairs sharing `src` but differing on `dst` are unique
|
||||
/// on the (src, dst) tuple and must merge cleanly. Guards against the composite
|
||||
/// degrading back into a single-field `@unique(src)` on the merge path.
|
||||
#[tokio::test]
|
||||
async fn branch_merge_allows_distinct_composite_unique_pairs() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await;
|
||||
main.branch_create("feature").await.unwrap();
|
||||
|
||||
let mut feature = Omnigraph::open(uri).await.unwrap();
|
||||
|
||||
mutate_main(
|
||||
&mut main,
|
||||
EDGE_UNIQUE_MUTATIONS,
|
||||
"add_knows",
|
||||
¶ms(&[("$from", "Alice"), ("$to", "Bob")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
mutate_branch(
|
||||
&mut feature,
|
||||
"feature",
|
||||
EDGE_UNIQUE_MUTATIONS,
|
||||
"add_knows",
|
||||
¶ms(&[("$from", "Alice"), ("$to", "Carol")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
main.branch_merge("feature", "main")
|
||||
.await
|
||||
.expect("distinct (src, dst) pairs are unique on the composite and must merge cleanly");
|
||||
assert_eq!(count_rows(&main, "edge:Knows").await, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn branch_merge_reports_cardinality_violation_conflict() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ node Thing {
|
|||
///
|
||||
/// Defense in depth:
|
||||
/// 1. The loader's `enforce_unique_constraints_intra_batch`
|
||||
/// (`loader/mod.rs:1453`), invoked unconditionally on any node type
|
||||
/// (`loader/mod.rs:1471`), invoked unconditionally on any node type
|
||||
/// with a `@key`, errors on intra-batch duplicate `@key` values at
|
||||
/// intake — pinned by this test across every `LoadMode`.
|
||||
/// 2. The `check_batch_unique_by_keys` precondition at the top of
|
||||
|
|
@ -229,6 +229,57 @@ node Thing {
|
|||
}
|
||||
}
|
||||
|
||||
/// Regression for MR-983: a node-level composite `@unique(a, b)` must be
|
||||
/// enforced as a true composite key, not degraded into independent
|
||||
/// single-field checks. Pre-fix, `unique_property_names_for_node` flattened
|
||||
/// every constraint group into one property list, so `@unique(source,
|
||||
/// external_id)` was enforced as `@unique(source)` *and* `@unique(external_id)`
|
||||
/// — rejecting rows that were unique on the composite key and naming only the
|
||||
/// first field in the error.
|
||||
#[tokio::test]
|
||||
async fn loader_enforces_composite_unique_as_composite_key() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let schema = r#"
|
||||
node ExternalID {
|
||||
slug: String @key
|
||||
source: String @index
|
||||
external_id: String @index
|
||||
@unique(source, external_id)
|
||||
}
|
||||
"#;
|
||||
let mut db = Omnigraph::init(uri, schema).await.unwrap();
|
||||
|
||||
// Same `source`, different `external_id` → unique on the composite key.
|
||||
// This is the exact repro from MR-983 and must be accepted.
|
||||
let composite_ok = r#"{"type":"ExternalID","data":{"slug":"a","source":"whatsapp","external_id":"+E.164"}}
|
||||
{"type":"ExternalID","data":{"slug":"b","source":"whatsapp","external_id":"pn:12345"}}
|
||||
"#;
|
||||
load_jsonl(&mut db, composite_ok, LoadMode::Overwrite)
|
||||
.await
|
||||
.expect("rows unique on the composite (source, external_id) must be accepted");
|
||||
assert_eq!(count_rows(&db, "node:ExternalID").await, 2);
|
||||
|
||||
// Both composite columns equal → genuine violation. The error must name
|
||||
// the whole composite, not just the first field.
|
||||
let composite_dupe = r#"{"type":"ExternalID","data":{"slug":"c","source":"whatsapp","external_id":"dup"}}
|
||||
{"type":"ExternalID","data":{"slug":"d","source":"whatsapp","external_id":"dup"}}
|
||||
"#;
|
||||
let err = load_jsonl(&mut db, composite_dupe, LoadMode::Overwrite)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let msg = err.to_string();
|
||||
// Columns are canonicalized to sorted order in the catalog, so the
|
||||
// message reads `(external_id, source)`; assert order-agnostically that
|
||||
// both composite columns are named (not just the first, as pre-fix).
|
||||
assert!(
|
||||
msg.contains("@unique violation")
|
||||
&& msg.contains("source")
|
||||
&& msg.contains("external_id"),
|
||||
"composite violation must name both columns (got: {msg})"
|
||||
);
|
||||
}
|
||||
|
||||
/// Canary for the upstream Lance gap that the `FirstSeen` workaround
|
||||
/// in `table_store.rs` masks. The bug class is "Window 2": load →
|
||||
/// indices built explicitly → merge → merge. Even with the engine
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue