fix(unique): collision-free tuple key shared by intake and merge, loud on un-keyable types (#160)

* fix(unique): collision-free tuple key shared by intake and merge, loud on un-keyable types

Hardening on top of #133. That PR introduced a shared
`loader::composite_unique_key(parts)` joining per-column scalars with U+001F
and routed both intake and branch-merge through it, closing the original
'|' vs U+001F separator drift. This takes the shared keying the rest of the
way to correct-by-design:

- Collision-free by construction: the key is now the tuple of per-column
  scalar strings (Vec<String>) keyed directly, no separator, so no data value
  (not even a literal U+001F) can forge a collision.
- One scalar converter across both paths: intake used an explicit type-match,
  merge used Arrow's array_value_to_string. Both now derive the key through
  composite_unique_key(group_columns, row), so they can't drift on conversion.
- Loud on un-keyable types: the scalar converter returned None for any Arrow
  type it didn't recognize, and the caller treated None as null-exempt, so a
  @unique on a column type it couldn't reduce (list, blob) was silently
  un-enforced. It now returns Err, surfacing the constraint it can't enforce
  instead of weakening it in silence.

Tests:
- consistency::composite_unique_key_is_consistent_across_intake_and_merge pins
  that intake and merge key the tuple identically (load-on-branch then merge
  of values containing '|').
- loader unit tests pin tuple keying + null exemption and the loud error on an
  un-keyable (binary) column.

Docs: invariants truth-matrix updated; stale loader/mod.rs line pointers fixed.
Scope unchanged: intra-batch / merge-candidate-set only; cross-version
uniqueness against committed rows stays a documented gap.

* fix(unique): cover all string encodings; make format_tuple private (PR #160 review)

Addresses two Greptile P2 comments on PR #160:

- unique_key_scalar handled only StringArray (Utf8). The loud-on-unknown-type
  behavior turned any legal string column that read back as LargeUtf8 or
  Utf8View into a hard write failure (the old code silently returned None). Add
  LargeStringArray and StringViewArray arms so a legal string column is keyable
  in every physical Arrow encoding; the Err path now fires only for a genuinely
  un-keyable logical type (list/blob/vector), never a legal value in an
  unenumerated encoding.
- format_tuple was pub(crate) but only used within loader/mod.rs; make it a
  private fn (matches the old format_unique_columns it replaced, minimal
  exposed surface).

New unit test unique_key_scalar_handles_all_string_encodings pins that Utf8 /
LargeUtf8 / Utf8View all render rather than error.
This commit is contained in:
Ragnor Comerford 2026-06-09 19:28:21 +02:00 committed by GitHub
parent dbfdddc952
commit e0d88d1295
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 235 additions and 73 deletions

View file

@ -670,36 +670,34 @@ fn update_unique_constraints(
table_key: &str,
batch: &RecordBatch,
constraints: &[Vec<String>],
seen: &mut [HashMap<String, String>],
seen: &mut [HashMap<Vec<String>, String>],
conflicts: &mut Vec<MergeConflict>,
) -> Result<()> {
for (constraint_idx, columns) in constraints.iter().enumerate() {
let seen = &mut seen[constraint_idx];
for row in 0..batch.num_rows() {
let mut parts = Vec::with_capacity(columns.len());
let mut any_null = false;
for column_name in columns {
let column = batch.column_by_name(column_name).ok_or_else(|| {
// Resolve the group's columns once. The candidate dataset always
// carries the full table schema, so a missing column is an internal
// error rather than a skip.
let group_columns = columns
.iter()
.map(|column_name| {
batch.column_by_name(column_name).cloned().ok_or_else(|| {
OmniError::manifest(format!(
"table {} missing unique column '{}'",
table_key, column_name
))
})?;
if column.is_null(row) {
any_null = true;
break;
}
parts.push(
array_value_to_string(column.as_ref(), row)
.map_err(|e| OmniError::Lance(e.to_string()))?,
);
}
if any_null {
})
})
.collect::<Result<Vec<_>>>()?;
for row in 0..batch.num_rows() {
// Same tuple key as the intake path — one shared derivation in
// `crate::loader::composite_unique_key`, so the two cannot drift on
// separator or scalar conversion. Null rows are exempt.
let Some(key) = crate::loader::composite_unique_key(&group_columns, row)? else {
continue;
}
let value = crate::loader::composite_unique_key(&parts);
};
let row_id = row_id_at(batch, row)?;
if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) {
if let Some(first_row_id) = seen.insert(key, row_id.clone()) {
conflicts.push(MergeConflict {
table_key: table_key.to_string(),
row_id: Some(row_id.clone()),

View file

@ -1445,34 +1445,32 @@ pub(crate) fn enforce_unique_constraints_intra_batch(
unique_constraints: &[Vec<String>],
) -> Result<()> {
for columns in unique_constraints {
let Some(col_indices) = columns
// Resolve the group's columns once. A group whose columns aren't all
// present in this batch is skipped (e.g. a partial-schema load).
let Some(group_columns) = columns
.iter()
.map(|name| batch.schema().index_of(name).ok())
.collect::<Option<Vec<usize>>>()
.map(|name| {
batch
.schema()
.index_of(name)
.ok()
.map(|i| batch.column(i).clone())
})
.collect::<Option<Vec<ArrayRef>>>()
else {
continue;
};
let mut seen: HashMap<String, usize> = HashMap::new();
let mut seen: HashMap<Vec<String>, usize> = HashMap::new();
for row in 0..batch.num_rows() {
let mut parts = Vec::with_capacity(col_indices.len());
let mut any_null = false;
for &col_idx in &col_indices {
let Some(value) = scalar_to_string(batch.column(col_idx), row) else {
any_null = true;
break;
};
parts.push(value);
}
if any_null {
let Some(key) = composite_unique_key(&group_columns, row)? else {
continue;
}
let value = composite_unique_key(&parts);
if let Some(prev_row) = seen.insert(value.clone(), row) {
};
if let Some(prev_row) = seen.insert(key.clone(), row) {
return Err(OmniError::manifest(format!(
"@unique violation on {}.{}: value '{}' appears in rows {} and {}",
type_name,
format_unique_columns(columns),
value,
format_tuple(columns),
format_tuple(&key),
prev_row,
row
)));
@ -1482,66 +1480,105 @@ pub(crate) fn enforce_unique_constraints_intra_batch(
Ok(())
}
/// Join one row's rendered, non-null column values into a single composite
/// uniqueness key. The separator is the unit separator (U+001F) — a control
/// char highly unlikely to occur in real data, so distinct tuples like
/// `("a|b", "c")` and `("a", "b|c")` stay distinct rather than colliding.
/// Build the composite uniqueness key for `row` over a constraint group's
/// already-resolved columns (in declaration order).
///
/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and
/// the branch-merge path (`exec/merge.rs::update_unique_constraints`) so the
/// two cannot silently drift to incompatible keyings.
pub(crate) fn composite_unique_key(parts: &[String]) -> String {
parts.join("\u{1f}")
/// The key is the *tuple* of per-column scalar strings (`Vec<String>`), keyed
/// directly in the dedup map — there is no separator, so no data value can
/// forge a collision (an earlier version joined on `U+001F`, which a value
/// containing that control char could still defeat).
///
/// - `Ok(None)` if any column is null: the row is exempt (a partial tuple
/// can't violate uniqueness under SQL null semantics).
/// - `Ok(Some(tuple))` otherwise.
/// - `Err(..)` propagated from [`unique_key_scalar`] on an un-keyable value.
///
/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and the
/// branch-merge path (`exec/merge.rs::update_unique_constraints`) so the two
/// derive identical keys and cannot drift on separator or scalar conversion.
pub(crate) fn composite_unique_key(
group_columns: &[ArrayRef],
row: usize,
) -> Result<Option<Vec<String>>> {
let mut parts = Vec::with_capacity(group_columns.len());
for column in group_columns {
match unique_key_scalar(column, row)? {
Some(value) => parts.push(value),
None => return Ok(None),
}
}
Ok(Some(parts))
}
/// Render a unique constraint's columns for error messages: a single column
/// as `col`, a composite as `(a, b)`.
fn format_unique_columns(columns: &[String]) -> String {
match columns {
/// Render a constraint's column tuple for error messages: a single item as
/// `col`, a composite as `(a, b)`. Used for both the column list and the
/// offending value tuple, which share the same shape.
fn format_tuple(items: &[String]) -> String {
match items {
[single] => single.clone(),
_ => format!("({})", columns.join(", ")),
_ => format!("({})", items.join(", ")),
}
}
/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
/// uniqueness comparison. Returns `None` for null values (nulls are exempt
/// from uniqueness in standard SQL semantics).
fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
use arrow_array::Array;
/// Reduce a single Arrow scalar at (`array`, `row`) to its uniqueness-key
/// string.
///
/// - `Ok(None)` for a null value: nulls are exempt from uniqueness (standard
/// SQL semantics over nullable columns).
/// - `Ok(Some(s))` for every scalar type a `@unique` / `@key` column can hold.
/// Strings are covered in all three physical Arrow encodings (`Utf8`,
/// `LargeUtf8`, `Utf8View`), so a legal string column is always keyable
/// regardless of how Lance materializes it on read-back.
/// - `Err(..)` for a non-null value whose Arrow type can't be reduced to a key
/// (a list, blob, or vector column). This fails loudly rather than silently
/// exempting the row, and because every legal scalar encoding is handled
/// above, the error fires only for a genuinely un-keyable column type — never
/// for a legal value that merely arrived in an unenumerated encoding.
fn unique_key_scalar(array: &ArrayRef, row: usize) -> Result<Option<String>> {
use arrow_array::{Array, LargeStringArray, StringViewArray};
if array.is_null(row) {
return None;
return Ok(None);
}
if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<LargeStringArray>() {
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<StringViewArray>() {
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
return Some(a.value(row).to_string());
return Ok(Some(a.value(row).to_string()));
}
None
Err(OmniError::manifest(format!(
"uniqueness key: unsupported column type {:?} for @unique/@key enforcement",
array.data_type()
)))
}
/// Build the list of uniqueness constraint groups to enforce on a node type.
@ -2209,4 +2246,66 @@ edge WorksAt: Person -> Company
let err = result.unwrap_err().to_string();
assert!(err.contains("NaN"), "error should mention NaN: {}", err);
}
#[test]
fn composite_unique_key_builds_tuple_and_exempts_null() {
let a: ArrayRef = Arc::new(StringArray::from(vec![Some("x|y"), Some("x"), None]));
let b: ArrayRef = Arc::new(StringArray::from(vec![Some("z"), Some("y|z"), Some("q")]));
let cols = [a, b];
// Tuple key, so `("x|y", "z")` and `("x", "y|z")` stay distinct —
// a separator-joined key (the old `|` join) would collapse both to
// `x|y|z`.
assert_eq!(
composite_unique_key(&cols, 0).unwrap(),
Some(vec!["x|y".to_string(), "z".to_string()])
);
assert_eq!(
composite_unique_key(&cols, 1).unwrap(),
Some(vec!["x".to_string(), "y|z".to_string()])
);
assert_ne!(
composite_unique_key(&cols, 0).unwrap(),
composite_unique_key(&cols, 1).unwrap()
);
// Any null column → the whole row is exempt (SQL null semantics).
assert_eq!(composite_unique_key(&cols, 2).unwrap(), None);
}
#[test]
fn unique_key_scalar_errors_loudly_on_unkeyable_type() {
use arrow_array::LargeBinaryArray;
// A binary/blob column can't be reduced to a uniqueness key. Before the
// hardening this returned `None`, so a `@unique` on such a column was
// silently un-enforced; now it errors instead of weakening the
// constraint in silence.
let blob: ArrayRef = Arc::new(LargeBinaryArray::from(vec![Some(&b"abc"[..])]));
let err = unique_key_scalar(&blob, 0).unwrap_err();
assert!(
err.to_string().contains("unsupported column type"),
"un-keyable type must fail loudly (got: {err})"
);
}
#[test]
fn unique_key_scalar_handles_all_string_encodings() {
use arrow_array::{LargeStringArray, StringViewArray};
// A legal string column is keyable in every physical Arrow encoding
// Lance might hand back (Utf8 / LargeUtf8 / Utf8View). None of these may
// fall through to the loud `Err` path — that branch is reserved for
// genuinely un-keyable column types, not a legal value in an
// unenumerated encoding.
let utf8: ArrayRef = Arc::new(StringArray::from(vec![Some("v")]));
let large: ArrayRef = Arc::new(LargeStringArray::from(vec![Some("v")]));
let view: ArrayRef = Arc::new(StringViewArray::from(vec![Some("v")]));
for array in [&utf8, &large, &view] {
assert_eq!(
unique_key_scalar(array, 0).unwrap(),
Some("v".to_string()),
"string array {:?} must render, not error",
array.data_type()
);
}
}
}

View file

@ -856,7 +856,7 @@ impl TableStore {
// before the FirstSeen setter has a chance to silently collapse
// anything):
// - Load path: `enforce_unique_constraints_intra_batch`
// (`loader/mod.rs:1471`) errors on intra-batch `@key` dups.
// (`loader/mod.rs:1442`) errors on intra-batch `@key` dups.
// - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`)
// accumulates and dedupes by `id`.
// - Branch-merge path: `compute_source_delta` /

View file

@ -188,7 +188,7 @@ node Thing {
///
/// Defense in depth:
/// 1. The loader's `enforce_unique_constraints_intra_batch`
/// (`loader/mod.rs:1471`), invoked unconditionally on any node type
/// (`loader/mod.rs:1442`), invoked unconditionally on any node type
/// with a `@key`, errors on intra-batch duplicate `@key` values at
/// intake — pinned by this test across every `LoadMode`.
/// 2. The `check_batch_unique_by_keys` precondition at the top of
@ -280,6 +280,71 @@ node ExternalID {
);
}
/// Guard: the intake path (load/insert/update) and the branch-merge path must
/// derive the same composite `@unique(a, b)` key, so a pair of rows unique on
/// the tuple is accepted by BOTH. Both paths now key on the tuple itself (no
/// separator), so a value containing any byte — including the `|` that an
/// earlier merge-path join used as its separator — can't forge a collision.
/// `("x|y", "z")` and `("x", "y|z")` are distinct tuples and must survive a
/// load-on-branch then merge without a phantom `UniqueViolation`. This pins the
/// cross-path consistency against any future drift in the shared keying.
#[tokio::test]
async fn composite_unique_key_is_consistent_across_intake_and_merge() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = r#"
node Item {
slug: String @key
a: String @index
b: String @index
@unique(a, b)
}
"#;
let insert_item = r#"
query insert_item($slug: String, $a: String, $b: String) {
insert Item { slug: $slug, a: $a, b: $b }
}
"#;
let main = Omnigraph::init(uri, schema).await.unwrap();
main.branch_create("feature").await.unwrap();
// Two rows unique on the composite (a, b), where `a`/`b` carry a literal
// `|`. Distinct under a tuple key; identical (`x|y|z`) under a `|`-join.
let feature = Omnigraph::open(uri).await.unwrap();
feature
.mutate(
"feature",
insert_item,
"insert_item",
&params(&[("$slug", "r1"), ("$a", "x|y"), ("$b", "z")]),
)
.await
.expect("intake must accept the first composite-unique row");
feature
.mutate(
"feature",
insert_item,
"insert_item",
&params(&[("$slug", "r2"), ("$a", "x"), ("$b", "y|z")]),
)
.await
.expect("intake must accept the second composite-unique row (distinct on the tuple)");
// The merge re-validates uniqueness over the adopted source rows. Both
// rows are unique on (a, b), so this must merge cleanly with no phantom
// conflict — intake and merge must key the tuple identically.
let merge_result = feature.branch_merge("feature", "main").await;
assert!(
merge_result.is_ok(),
"rows unique on the composite (a, b) must merge cleanly; \
intake and merge must key the tuple the same way (got: {:?})",
merge_result.err()
);
let reopened = Omnigraph::open(uri).await.unwrap();
assert_eq!(count_rows(&reopened, "node:Item").await, 2);
}
/// Canary for the upstream Lance gap that the `FirstSeen` workaround
/// in `table_store.rs` masks. The bug class is "Window 2": load →
/// indices built explicitly → merge → merge. Even with the engine

View file

@ -101,7 +101,7 @@ Use it this way:
| Deletes | Inline-commit residual; delete-only queries allowed, mixed insert/update/delete rejected by D2 | [query-language.md](../user/query-language.md), [writes.md](writes.md) |
| Branch delete | Manifest is the single authority, flipped atomically first; per-table forks + commit-graph branch are derived state, reclaimed best-effort (`force_delete_branch`) with the `cleanup` reconciler as the guaranteed backstop. Reusing a name whose reclaim failed before `cleanup` surfaces an actionable error | [branches-commits.md](../user/branches-commits.md), [maintenance.md](../user/maintenance.md) |
| Schema validation | Type checks, required fields, defaults, edge endpoint checks, and edge cardinality are enforced on write paths | [schema-language.md](../user/schema-language.md), [execution.md](execution.md) |
| Unique constraints | Intra-batch and write-path checks exist; full cross-version uniqueness is still a gap | [schema-language.md](../user/schema-language.md) |
| Unique constraints | Intra-batch and write-path checks exist; intake and branch-merge derive the composite key through one shared function (`loader::composite_unique_key`, a separator-free `Vec<String>` tuple) and fail loudly on an un-keyable column type rather than silently exempting it; full cross-version uniqueness against already-committed rows is still a gap | [schema-language.md](../user/schema-language.md) |
| Storage trait | `TableStorage` exists as the sealed staged-write surface; full call-site migration and capability/stat surfaces are incomplete | [writes.md](writes.md), [architecture.md](architecture.md) |
| Index lifecycle | `ensure_indices` is explicit today; reconciler-based convergence is roadmap | [indexes.md](../user/indexes.md), [maintenance.md](../user/maintenance.md) |
| Traversal IDs | Runtime still builds `TypeIndex`; Lance stable row-id based graph IDs are roadmap | [architecture.md](architecture.md), [query-language.md](../user/query-language.md) |