diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 0e6434b..1068f90 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -670,36 +670,34 @@ fn update_unique_constraints( table_key: &str, batch: &RecordBatch, constraints: &[Vec], - seen: &mut [HashMap], + seen: &mut [HashMap, String>], conflicts: &mut Vec, ) -> Result<()> { for (constraint_idx, columns) in constraints.iter().enumerate() { let seen = &mut seen[constraint_idx]; - for row in 0..batch.num_rows() { - let mut parts = Vec::with_capacity(columns.len()); - let mut any_null = false; - for column_name in columns { - let column = batch.column_by_name(column_name).ok_or_else(|| { + // Resolve the group's columns once. The candidate dataset always + // carries the full table schema, so a missing column is an internal + // error rather than a skip. + let group_columns = columns + .iter() + .map(|column_name| { + batch.column_by_name(column_name).cloned().ok_or_else(|| { OmniError::manifest(format!( "table {} missing unique column '{}'", table_key, column_name )) - })?; - if column.is_null(row) { - any_null = true; - break; - } - parts.push( - array_value_to_string(column.as_ref(), row) - .map_err(|e| OmniError::Lance(e.to_string()))?, - ); - } - if any_null { + }) + }) + .collect::>>()?; + for row in 0..batch.num_rows() { + // Same tuple key as the intake path — one shared derivation in + // `crate::loader::composite_unique_key`, so the two cannot drift on + // separator or scalar conversion. Null rows are exempt. + let Some(key) = crate::loader::composite_unique_key(&group_columns, row)? else { continue; - } - let value = crate::loader::composite_unique_key(&parts); + }; let row_id = row_id_at(batch, row)?; - if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) { + if let Some(first_row_id) = seen.insert(key, row_id.clone()) { conflicts.push(MergeConflict { table_key: table_key.to_string(), row_id: Some(row_id.clone()), diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 9a80b39..707c46a 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -1445,34 +1445,32 @@ pub(crate) fn enforce_unique_constraints_intra_batch( unique_constraints: &[Vec], ) -> Result<()> { for columns in unique_constraints { - let Some(col_indices) = columns + // Resolve the group's columns once. A group whose columns aren't all + // present in this batch is skipped (e.g. a partial-schema load). + let Some(group_columns) = columns .iter() - .map(|name| batch.schema().index_of(name).ok()) - .collect::>>() + .map(|name| { + batch + .schema() + .index_of(name) + .ok() + .map(|i| batch.column(i).clone()) + }) + .collect::>>() else { continue; }; - let mut seen: HashMap = HashMap::new(); + let mut seen: HashMap, usize> = HashMap::new(); for row in 0..batch.num_rows() { - let mut parts = Vec::with_capacity(col_indices.len()); - let mut any_null = false; - for &col_idx in &col_indices { - let Some(value) = scalar_to_string(batch.column(col_idx), row) else { - any_null = true; - break; - }; - parts.push(value); - } - if any_null { + let Some(key) = composite_unique_key(&group_columns, row)? else { continue; - } - let value = composite_unique_key(&parts); - if let Some(prev_row) = seen.insert(value.clone(), row) { + }; + if let Some(prev_row) = seen.insert(key.clone(), row) { return Err(OmniError::manifest(format!( "@unique violation on {}.{}: value '{}' appears in rows {} and {}", type_name, - format_unique_columns(columns), - value, + format_tuple(columns), + format_tuple(&key), prev_row, row ))); @@ -1482,66 +1480,105 @@ pub(crate) fn enforce_unique_constraints_intra_batch( Ok(()) } -/// Join one row's rendered, non-null column values into a single composite -/// uniqueness key. The separator is the unit separator (U+001F) — a control -/// char highly unlikely to occur in real data, so distinct tuples like -/// `("a|b", "c")` and `("a", "b|c")` stay distinct rather than colliding. +/// Build the composite uniqueness key for `row` over a constraint group's +/// already-resolved columns (in declaration order). /// -/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and -/// the branch-merge path (`exec/merge.rs::update_unique_constraints`) so the -/// two cannot silently drift to incompatible keyings. -pub(crate) fn composite_unique_key(parts: &[String]) -> String { - parts.join("\u{1f}") +/// The key is the *tuple* of per-column scalar strings (`Vec`), keyed +/// directly in the dedup map — there is no separator, so no data value can +/// forge a collision (an earlier version joined on `U+001F`, which a value +/// containing that control char could still defeat). +/// +/// - `Ok(None)` if any column is null: the row is exempt (a partial tuple +/// can't violate uniqueness under SQL null semantics). +/// - `Ok(Some(tuple))` otherwise. +/// - `Err(..)` propagated from [`unique_key_scalar`] on an un-keyable value. +/// +/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and the +/// branch-merge path (`exec/merge.rs::update_unique_constraints`) so the two +/// derive identical keys and cannot drift on separator or scalar conversion. +pub(crate) fn composite_unique_key( + group_columns: &[ArrayRef], + row: usize, +) -> Result>> { + let mut parts = Vec::with_capacity(group_columns.len()); + for column in group_columns { + match unique_key_scalar(column, row)? { + Some(value) => parts.push(value), + None => return Ok(None), + } + } + Ok(Some(parts)) } -/// Render a unique constraint's columns for error messages: a single column -/// as `col`, a composite as `(a, b)`. -fn format_unique_columns(columns: &[String]) -> String { - match columns { +/// Render a constraint's column tuple for error messages: a single item as +/// `col`, a composite as `(a, b)`. Used for both the column list and the +/// offending value tuple, which share the same shape. +fn format_tuple(items: &[String]) -> String { + match items { [single] => single.clone(), - _ => format!("({})", columns.join(", ")), + _ => format!("({})", items.join(", ")), } } -/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for -/// uniqueness comparison. Returns `None` for null values (nulls are exempt -/// from uniqueness in standard SQL semantics). -fn scalar_to_string(array: &ArrayRef, row: usize) -> Option { - use arrow_array::Array; +/// Reduce a single Arrow scalar at (`array`, `row`) to its uniqueness-key +/// string. +/// +/// - `Ok(None)` for a null value: nulls are exempt from uniqueness (standard +/// SQL semantics over nullable columns). +/// - `Ok(Some(s))` for every scalar type a `@unique` / `@key` column can hold. +/// Strings are covered in all three physical Arrow encodings (`Utf8`, +/// `LargeUtf8`, `Utf8View`), so a legal string column is always keyable +/// regardless of how Lance materializes it on read-back. +/// - `Err(..)` for a non-null value whose Arrow type can't be reduced to a key +/// (a list, blob, or vector column). This fails loudly rather than silently +/// exempting the row, and because every legal scalar encoding is handled +/// above, the error fires only for a genuinely un-keyable column type — never +/// for a legal value that merely arrived in an unenumerated encoding. +fn unique_key_scalar(array: &ArrayRef, row: usize) -> Result> { + use arrow_array::{Array, LargeStringArray, StringViewArray}; if array.is_null(row) { - return None; + return Ok(None); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Ok(Some(a.value(row).to_string())); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } if let Some(a) = array.as_any().downcast_ref::() { - return Some(a.value(row).to_string()); + return Ok(Some(a.value(row).to_string())); } - None + Err(OmniError::manifest(format!( + "uniqueness key: unsupported column type {:?} for @unique/@key enforcement", + array.data_type() + ))) } /// Build the list of uniqueness constraint groups to enforce on a node type. @@ -2209,4 +2246,66 @@ edge WorksAt: Person -> Company let err = result.unwrap_err().to_string(); assert!(err.contains("NaN"), "error should mention NaN: {}", err); } + + #[test] + fn composite_unique_key_builds_tuple_and_exempts_null() { + let a: ArrayRef = Arc::new(StringArray::from(vec![Some("x|y"), Some("x"), None])); + let b: ArrayRef = Arc::new(StringArray::from(vec![Some("z"), Some("y|z"), Some("q")])); + let cols = [a, b]; + + // Tuple key, so `("x|y", "z")` and `("x", "y|z")` stay distinct — + // a separator-joined key (the old `|` join) would collapse both to + // `x|y|z`. + assert_eq!( + composite_unique_key(&cols, 0).unwrap(), + Some(vec!["x|y".to_string(), "z".to_string()]) + ); + assert_eq!( + composite_unique_key(&cols, 1).unwrap(), + Some(vec!["x".to_string(), "y|z".to_string()]) + ); + assert_ne!( + composite_unique_key(&cols, 0).unwrap(), + composite_unique_key(&cols, 1).unwrap() + ); + + // Any null column → the whole row is exempt (SQL null semantics). + assert_eq!(composite_unique_key(&cols, 2).unwrap(), None); + } + + #[test] + fn unique_key_scalar_errors_loudly_on_unkeyable_type() { + use arrow_array::LargeBinaryArray; + // A binary/blob column can't be reduced to a uniqueness key. Before the + // hardening this returned `None`, so a `@unique` on such a column was + // silently un-enforced; now it errors instead of weakening the + // constraint in silence. + let blob: ArrayRef = Arc::new(LargeBinaryArray::from(vec![Some(&b"abc"[..])])); + let err = unique_key_scalar(&blob, 0).unwrap_err(); + assert!( + err.to_string().contains("unsupported column type"), + "un-keyable type must fail loudly (got: {err})" + ); + } + + #[test] + fn unique_key_scalar_handles_all_string_encodings() { + use arrow_array::{LargeStringArray, StringViewArray}; + // A legal string column is keyable in every physical Arrow encoding + // Lance might hand back (Utf8 / LargeUtf8 / Utf8View). None of these may + // fall through to the loud `Err` path — that branch is reserved for + // genuinely un-keyable column types, not a legal value in an + // unenumerated encoding. + let utf8: ArrayRef = Arc::new(StringArray::from(vec![Some("v")])); + let large: ArrayRef = Arc::new(LargeStringArray::from(vec![Some("v")])); + let view: ArrayRef = Arc::new(StringViewArray::from(vec![Some("v")])); + for array in [&utf8, &large, &view] { + assert_eq!( + unique_key_scalar(array, 0).unwrap(), + Some("v".to_string()), + "string array {:?} must render, not error", + array.data_type() + ); + } + } } diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index bdf0dd5..d786fc4 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -856,7 +856,7 @@ impl TableStore { // before the FirstSeen setter has a chance to silently collapse // anything): // - Load path: `enforce_unique_constraints_intra_batch` - // (`loader/mod.rs:1471`) errors on intra-batch `@key` dups. + // (`loader/mod.rs:1442`) errors on intra-batch `@key` dups. // - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`) // accumulates and dedupes by `id`. // - Branch-merge path: `compute_source_delta` / diff --git a/crates/omnigraph/tests/consistency.rs b/crates/omnigraph/tests/consistency.rs index 729f2e8..b16aff9 100644 --- a/crates/omnigraph/tests/consistency.rs +++ b/crates/omnigraph/tests/consistency.rs @@ -188,7 +188,7 @@ node Thing { /// /// Defense in depth: /// 1. The loader's `enforce_unique_constraints_intra_batch` -/// (`loader/mod.rs:1471`), invoked unconditionally on any node type +/// (`loader/mod.rs:1442`), invoked unconditionally on any node type /// with a `@key`, errors on intra-batch duplicate `@key` values at /// intake — pinned by this test across every `LoadMode`. /// 2. The `check_batch_unique_by_keys` precondition at the top of @@ -280,6 +280,71 @@ node ExternalID { ); } +/// Guard: the intake path (load/insert/update) and the branch-merge path must +/// derive the same composite `@unique(a, b)` key, so a pair of rows unique on +/// the tuple is accepted by BOTH. Both paths now key on the tuple itself (no +/// separator), so a value containing any byte — including the `|` that an +/// earlier merge-path join used as its separator — can't forge a collision. +/// `("x|y", "z")` and `("x", "y|z")` are distinct tuples and must survive a +/// load-on-branch then merge without a phantom `UniqueViolation`. This pins the +/// cross-path consistency against any future drift in the shared keying. +#[tokio::test] +async fn composite_unique_key_is_consistent_across_intake_and_merge() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let schema = r#" +node Item { + slug: String @key + a: String @index + b: String @index + @unique(a, b) +} +"#; + let insert_item = r#" +query insert_item($slug: String, $a: String, $b: String) { + insert Item { slug: $slug, a: $a, b: $b } +} +"#; + let main = Omnigraph::init(uri, schema).await.unwrap(); + main.branch_create("feature").await.unwrap(); + + // Two rows unique on the composite (a, b), where `a`/`b` carry a literal + // `|`. Distinct under a tuple key; identical (`x|y|z`) under a `|`-join. + let feature = Omnigraph::open(uri).await.unwrap(); + feature + .mutate( + "feature", + insert_item, + "insert_item", + ¶ms(&[("$slug", "r1"), ("$a", "x|y"), ("$b", "z")]), + ) + .await + .expect("intake must accept the first composite-unique row"); + feature + .mutate( + "feature", + insert_item, + "insert_item", + ¶ms(&[("$slug", "r2"), ("$a", "x"), ("$b", "y|z")]), + ) + .await + .expect("intake must accept the second composite-unique row (distinct on the tuple)"); + + // The merge re-validates uniqueness over the adopted source rows. Both + // rows are unique on (a, b), so this must merge cleanly with no phantom + // conflict — intake and merge must key the tuple identically. + let merge_result = feature.branch_merge("feature", "main").await; + assert!( + merge_result.is_ok(), + "rows unique on the composite (a, b) must merge cleanly; \ + intake and merge must key the tuple the same way (got: {:?})", + merge_result.err() + ); + + let reopened = Omnigraph::open(uri).await.unwrap(); + assert_eq!(count_rows(&reopened, "node:Item").await, 2); +} + /// Canary for the upstream Lance gap that the `FirstSeen` workaround /// in `table_store.rs` masks. The bug class is "Window 2": load → /// indices built explicitly → merge → merge. Even with the engine diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index b29d740..4baff5e 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -101,7 +101,7 @@ Use it this way: | Deletes | Inline-commit residual; delete-only queries allowed, mixed insert/update/delete rejected by D2 | [query-language.md](../user/query-language.md), [writes.md](writes.md) | | Branch delete | Manifest is the single authority, flipped atomically first; per-table forks + commit-graph branch are derived state, reclaimed best-effort (`force_delete_branch`) with the `cleanup` reconciler as the guaranteed backstop. Reusing a name whose reclaim failed before `cleanup` surfaces an actionable error | [branches-commits.md](../user/branches-commits.md), [maintenance.md](../user/maintenance.md) | | Schema validation | Type checks, required fields, defaults, edge endpoint checks, and edge cardinality are enforced on write paths | [schema-language.md](../user/schema-language.md), [execution.md](execution.md) | -| Unique constraints | Intra-batch and write-path checks exist; full cross-version uniqueness is still a gap | [schema-language.md](../user/schema-language.md) | +| Unique constraints | Intra-batch and write-path checks exist; intake and branch-merge derive the composite key through one shared function (`loader::composite_unique_key`, a separator-free `Vec` tuple) and fail loudly on an un-keyable column type rather than silently exempting it; full cross-version uniqueness against already-committed rows is still a gap | [schema-language.md](../user/schema-language.md) | | Storage trait | `TableStorage` exists as the sealed staged-write surface; full call-site migration and capability/stat surfaces are incomplete | [writes.md](writes.md), [architecture.md](architecture.md) | | Index lifecycle | `ensure_indices` is explicit today; reconciler-based convergence is roadmap | [indexes.md](../user/indexes.md), [maintenance.md](../user/maintenance.md) | | Traversal IDs | Runtime still builds `TypeIndex`; Lance stable row-id based graph IDs are roadmap | [architecture.md](architecture.md), [query-language.md](../user/query-language.md) |