From 56b63191970f78a2038f56c735ddd351b8590863 Mon Sep 17 00:00:00 2001 From: Andrew Altshuler Date: Tue, 28 Apr 2026 04:51:10 +0300 Subject: [PATCH] Enforce schema validators on every write path (#59) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Several validators were defined but only called from a subset of write paths, so writes that violated @unique, @range, @check, enum, or @cardinality constraints could silently succeed and corrupt data. Adds two new helpers in loader/mod.rs: - validate_enum_constraints — batch-level enum check, scans Arrow string columns (and list-of-string columns) for values outside the declared set - enforce_unique_constraints_intra_batch — single-batch duplicate detection over named columns; partial enforcement (does not check against committed rows yet — cross-batch enforcement is a separate effort) Wires the validators into: - load_jsonl_reader nodes (alongside the existing validate_value_constraints call) and edges (which had no enum or unique check at all) - exec/mutation.rs node insert, edge insert, and update paths - mutation edge insert now also calls validate_edge_cardinality after the row lands but before the manifest commit, matching the loader's Phase 3 behavior A new tests/validators.rs suite asserts rejection on every entry path for invalid enum values, @range violations, intra-batch @unique duplicates, and edge @card excesses. Co-authored-by: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/exec/mutation.rs | 40 +++- crates/omnigraph/src/loader/mod.rs | 189 ++++++++++++++++++- crates/omnigraph/tests/validators.rs | 255 ++++++++++++++++++++++++++ 3 files changed, 482 insertions(+), 2 deletions(-) create mode 100644 crates/omnigraph/tests/validators.rs diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index b437606..d4df8c5 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -712,6 +712,15 @@ impl Omnigraph { let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; crate::loader::validate_value_constraints(&batch, node_type)?; + crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?; + let unique_props = crate::loader::unique_property_names_for_node(node_type); + if !unique_props.is_empty() { + crate::loader::enforce_unique_constraints_intra_batch( + &batch, + type_name, + &unique_props, + )?; + } let has_key = node_type.key_property().is_some(); let (state, table_branch) = if has_key { self.upsert_batch(type_name, true, schema, batch).await? @@ -741,8 +750,27 @@ impl Omnigraph { let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; validate_edge_insert_endpoints(self, type_name, &resolved).await?; + crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?; + let unique_props = crate::loader::unique_property_names_for_edge(edge_type); + if !unique_props.is_empty() { + crate::loader::enforce_unique_constraints_intra_batch( + &batch, + type_name, + &unique_props, + )?; + } + let active_branch = self.active_branch().map(str::to_string); let (state, table_branch) = self.append_batch(type_name, false, schema, batch).await?; + crate::loader::validate_edge_cardinality( + self, + active_branch.as_deref(), + type_name, + state.version, + table_branch.as_deref(), + ) + .await?; + let table_key = format!("edge:{}", type_name); self.commit_updates(&[crate::db::SubTableUpdate { table_key, @@ -885,7 +913,17 @@ impl Omnigraph { resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?); } let updated = apply_assignments(&schema, &matched, &resolved, &blob_props)?; - crate::loader::validate_value_constraints(&updated, &self.catalog().node_types[type_name])?; + let node_type = &self.catalog().node_types[type_name]; + crate::loader::validate_value_constraints(&updated, node_type)?; + crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?; + let unique_props = crate::loader::unique_property_names_for_node(node_type); + if !unique_props.is_empty() { + crate::loader::enforce_unique_constraints_intra_batch( + &updated, + type_name, + &unique_props, + )?; + } // Re-open for merge_insert (scan consumed the dataset; // version guard was already applied by open_for_mutation above) diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 0e29c2a..83d1118 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -343,6 +343,11 @@ async fn load_jsonl_reader( let node_type = &catalog.node_types[type_name]; let batch = build_node_batch(node_type, rows)?; validate_value_constraints(&batch, node_type)?; + validate_enum_constraints(&batch, &node_type.properties, type_name)?; + let unique_props = unique_property_names_for_node(node_type); + if !unique_props.is_empty() { + enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?; + } let loaded_count = batch.num_rows(); let table_key = format!("node:{}", type_name); snapshot @@ -416,6 +421,11 @@ async fn load_jsonl_reader( for (edge_name, rows) in &edge_rows { let edge_type = &catalog.edge_types[edge_name]; let batch = build_edge_batch(edge_type, rows)?; + validate_enum_constraints(&batch, &edge_type.properties, edge_name)?; + let unique_props = unique_property_names_for_edge(edge_type); + if !unique_props.is_empty() { + enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?; + } let loaded_count = batch.num_rows(); let table_key = format!("edge:{}", edge_name); snapshot @@ -1177,6 +1187,183 @@ pub(crate) fn validate_value_constraints( Ok(()) } +/// Validate that every enum-typed property in `properties` only contains values +/// from its declared enum value set. Operates on a single `RecordBatch` so it +/// can be called from any write path that already holds a batch. +/// +/// Scalar string enums are checked directly. List-of-enum properties are +/// checked element-by-element across the underlying string values. +pub(crate) fn validate_enum_constraints( + batch: &RecordBatch, + properties: &HashMap, + type_name: &str, +) -> Result<()> { + use arrow_array::{Array, ListArray}; + + for (prop_name, prop_type) in properties { + let Some(allowed) = prop_type.enum_values.as_ref() else { + continue; + }; + let Some(col) = batch.column_by_name(prop_name) else { + continue; + }; + if prop_type.list { + let Some(list_col) = col.as_any().downcast_ref::() else { + continue; + }; + for row in 0..list_col.len() { + if list_col.is_null(row) { + continue; + } + let item_arr = list_col.value(row); + let Some(str_arr) = item_arr.as_any().downcast_ref::() else { + continue; + }; + for i in 0..str_arr.len() { + if str_arr.is_null(i) { + continue; + } + let val = str_arr.value(i); + if !allowed.iter().any(|a| a.as_str() == val) { + return Err(OmniError::manifest(format!( + "invalid enum value '{}' for {}.{} (expected: {})", + val, + type_name, + prop_name, + allowed.join(", ") + ))); + } + } + } + } else if let Some(str_col) = col.as_any().downcast_ref::() { + for row in 0..str_col.len() { + if str_col.is_null(row) { + continue; + } + let val = str_col.value(row); + if !allowed.iter().any(|a| a.as_str() == val) { + return Err(OmniError::manifest(format!( + "invalid enum value '{}' for {}.{} (expected: {})", + val, + type_name, + prop_name, + allowed.join(", ") + ))); + } + } + } + } + Ok(()) +} + +/// Detect duplicate values within a single `RecordBatch` for any of the named +/// `unique_properties`. Returns an error on the first duplicate found. +/// +/// Note: this only catches duplicates *within* the batch. Cross-batch +/// uniqueness against already-committed rows is not enforced here — that +/// requires a dataset scan and is tracked separately. +pub(crate) fn enforce_unique_constraints_intra_batch( + batch: &RecordBatch, + type_name: &str, + unique_properties: &[String], +) -> Result<()> { + for property in unique_properties { + let Some(col_idx) = batch.schema().index_of(property).ok() else { + continue; + }; + let arr = batch.column(col_idx); + let mut seen: HashMap = HashMap::new(); + for row in 0..batch.num_rows() { + let Some(value) = scalar_to_string(arr, row) else { + continue; + }; + if let Some(prev_row) = seen.insert(value.clone(), row) { + return Err(OmniError::manifest(format!( + "@unique violation on {}.{}: value '{}' appears in rows {} and {}", + type_name, property, value, prev_row, row + ))); + } + } + } + Ok(()) +} + +/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for +/// uniqueness comparison. Returns `None` for null values (nulls are exempt +/// from uniqueness in standard SQL semantics). +fn scalar_to_string(array: &ArrayRef, row: usize) -> Option { + use arrow_array::Array; + if array.is_null(row) { + return None; + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + if let Some(a) = array.as_any().downcast_ref::() { + return Some(a.value(row).to_string()); + } + None +} + +/// Build the flat list of property names that must be checked for uniqueness +/// on a node type. Includes both `@unique` properties (from +/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness). +pub(crate) fn unique_property_names_for_node( + node_type: &omnigraph_compiler::catalog::NodeType, +) -> Vec { + let mut props: Vec = node_type + .unique_constraints + .iter() + .flatten() + .cloned() + .collect(); + if let Some(key) = &node_type.key { + props.extend(key.iter().cloned()); + } + props.sort(); + props.dedup(); + props +} + +/// Same as [`unique_property_names_for_node`] but for an edge type. +pub(crate) fn unique_property_names_for_edge( + edge_type: &omnigraph_compiler::catalog::EdgeType, +) -> Vec { + let mut props: Vec = edge_type + .unique_constraints + .iter() + .flatten() + .cloned() + .collect(); + props.sort(); + props.dedup(); + props +} + fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option { use arrow_array::{ Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array, @@ -1212,7 +1399,7 @@ fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 { // ─── Edge cardinality validation ───────────────────────────────────────────── -async fn validate_edge_cardinality( +pub(crate) async fn validate_edge_cardinality( db: &crate::db::Omnigraph, branch: Option<&str>, edge_name: &str, diff --git a/crates/omnigraph/tests/validators.rs b/crates/omnigraph/tests/validators.rs new file mode 100644 index 0000000..96483d3 --- /dev/null +++ b/crates/omnigraph/tests/validators.rs @@ -0,0 +1,255 @@ +// Cross-path validator wire-up tests: each validator (enum, intra-batch +// unique, range, edge cardinality) must reject invalid data on every write +// path — JSONL load, mutation insert (node + edge where applicable), +// mutation update. + +mod helpers; + +use omnigraph::db::Omnigraph; +use omnigraph::loader::{LoadMode, load_jsonl}; + +use helpers::{mutate_main, params}; + +const ENUM_SCHEMA: &str = r#" +node Person { + name: String @key + role: enum(admin, guest, member) +} +"#; + +const ENUM_VALID_SEED: &str = r#"{"type":"Person","data":{"name":"Alice","role":"admin"}}"#; + +const ENUM_MUTATIONS: &str = r#" +query insert_person($name: String, $role: String) { + insert Person { name: $name, role: $role } +} + +query set_role($name: String, $role: String) { + update Person set { role: $role } where name = $name +} +"#; + +const RANGE_SCHEMA: &str = r#" +node Person { + name: String @key + age: I32? + @range(age, 0..120) +} +"#; + +const RANGE_MUTATIONS: &str = r#" +query insert_person($name: String, $age: I32) { + insert Person { name: $name, age: $age } +} + +query set_age($name: String, $age: I32) { + update Person set { age: $age } where name = $name +} +"#; + +const UNIQUE_SCHEMA: &str = r#" +node User { + name: String @key + email: String? + @unique(email) +} +"#; + +const CARDINALITY_SCHEMA: &str = r#" +node Person { name: String @key } +node Company { name: String @key } +edge WorksAt: Person -> Company @card(0..1) +"#; + +const CARDINALITY_SEED: &str = r#"{"type":"Person","data":{"name":"Alice"}} +{"type":"Company","data":{"name":"Acme"}} +{"type":"Company","data":{"name":"Beta"}}"#; + +const CARDINALITY_MUTATIONS: &str = r#" +query add_employment($person: String, $company: String) { + insert WorksAt { from: $person, to: $company } +} +"#; + +async fn init_with(schema: &str, data: &str) -> (tempfile::TempDir, Omnigraph) { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, schema).await.unwrap(); + if !data.is_empty() { + load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap(); + } + (dir, db) +} + +// ─── Enum validation ───────────────────────────────────────────────────────── + +#[tokio::test] +async fn enum_rejected_on_jsonl_load() { + let (_dir, mut db) = init_with(ENUM_SCHEMA, "").await; + let bad = r#"{"type":"Person","data":{"name":"Alice","role":"superadmin"}}"#; + let err = load_jsonl(&mut db, bad, LoadMode::Overwrite) + .await + .unwrap_err(); + assert!( + err.to_string().contains("invalid enum value 'superadmin'"), + "got: {}", + err + ); +} + +#[tokio::test] +async fn enum_rejected_on_mutation_insert() { + let (_dir, mut db) = init_with(ENUM_SCHEMA, ENUM_VALID_SEED).await; + let err = mutate_main( + &mut db, + ENUM_MUTATIONS, + "insert_person", + ¶ms(&[("$name", "Bob"), ("$role", "superadmin")]), + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("invalid enum value 'superadmin'"), + "got: {}", + err + ); +} + +#[tokio::test] +async fn enum_rejected_on_mutation_update() { + let (_dir, mut db) = init_with(ENUM_SCHEMA, ENUM_VALID_SEED).await; + let err = mutate_main( + &mut db, + ENUM_MUTATIONS, + "set_role", + ¶ms(&[("$name", "Alice"), ("$role", "superadmin")]), + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("invalid enum value 'superadmin'"), + "got: {}", + err + ); +} + +// ─── Range validation ──────────────────────────────────────────────────────── + +#[tokio::test] +async fn range_rejected_on_jsonl_load() { + let (_dir, mut db) = init_with(RANGE_SCHEMA, "").await; + let bad = r#"{"type":"Person","data":{"name":"Alice","age":250}}"#; + let err = load_jsonl(&mut db, bad, LoadMode::Overwrite) + .await + .unwrap_err(); + assert!(err.to_string().contains("@range violation"), "got: {}", err); +} + +#[tokio::test] +async fn range_rejected_on_mutation_insert() { + let (_dir, mut db) = init_with( + RANGE_SCHEMA, + r#"{"type":"Person","data":{"name":"Alice","age":30}}"#, + ) + .await; + let err = mutate_main( + &mut db, + RANGE_MUTATIONS, + "insert_person", + &helpers::mixed_params(&[("$name", "Bob")], &[("$age", 250)]), + ) + .await + .unwrap_err(); + assert!(err.to_string().contains("@range violation"), "got: {}", err); +} + +#[tokio::test] +async fn range_rejected_on_mutation_update() { + let (_dir, mut db) = init_with( + RANGE_SCHEMA, + r#"{"type":"Person","data":{"name":"Alice","age":30}}"#, + ) + .await; + let err = mutate_main( + &mut db, + RANGE_MUTATIONS, + "set_age", + &helpers::mixed_params(&[("$name", "Alice")], &[("$age", 250)]), + ) + .await + .unwrap_err(); + assert!(err.to_string().contains("@range violation"), "got: {}", err); +} + +// ─── Intra-batch unique validation ─────────────────────────────────────────── + +#[tokio::test] +async fn intra_batch_unique_rejected_on_jsonl_load() { + let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await; + let bad = r#"{"type":"User","data":{"name":"Alice","email":"dup@example.com"}} +{"type":"User","data":{"name":"Bob","email":"dup@example.com"}}"#; + let err = load_jsonl(&mut db, bad, LoadMode::Overwrite) + .await + .unwrap_err(); + assert!( + err.to_string().contains("@unique violation on User.email"), + "got: {}", + err + ); +} + +// Note: single-row mutation insert can't violate intra-batch uniqueness +// (only one row in the batch). Cross-batch uniqueness against committed rows +// is out of scope for this wire-up — see the unified write-validator effort. + +// ─── Edge cardinality ──────────────────────────────────────────────────────── + +#[tokio::test] +async fn cardinality_rejected_on_mutation_insert_edge() { + let (_dir, mut db) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await; + + // First WorksAt edge — within @card(0..1). + mutate_main( + &mut db, + CARDINALITY_MUTATIONS, + "add_employment", + ¶ms(&[("$person", "Alice"), ("$company", "Acme")]), + ) + .await + .unwrap(); + + // Second WorksAt for the same source — exceeds max=1. + let err = mutate_main( + &mut db, + CARDINALITY_MUTATIONS, + "add_employment", + ¶ms(&[("$person", "Alice"), ("$company", "Beta")]), + ) + .await + .unwrap_err(); + assert!( + err.to_string().to_lowercase().contains("cardinality") + || err.to_string().to_lowercase().contains("@card"), + "got: {}", + err + ); +} + +#[tokio::test] +async fn cardinality_rejected_on_jsonl_load() { + // Already covered by existing loader Phase 3 logic but assert the + // same error surface as the mutation path so a regression is caught + // even if only one path changes. + let (_dir, mut db) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await; + let bad = r#"{"edge":"WorksAt","from":"Alice","to":"Acme"} +{"edge":"WorksAt","from":"Alice","to":"Beta"}"#; + let err = load_jsonl(&mut db, bad, LoadMode::Append) + .await + .unwrap_err(); + assert!( + err.to_string().to_lowercase().contains("cardinality") + || err.to_string().to_lowercase().contains("@card"), + "got: {}", + err + ); +}