Enforce schema validators on every write path (#59)

Several validators were defined but only called from a subset of write
paths, so writes that violated @unique, @range, @check, enum, or
@cardinality constraints could silently succeed and corrupt data.

Adds two new helpers in loader/mod.rs:

- validate_enum_constraints — batch-level enum check, scans Arrow
  string columns (and list-of-string columns) for values outside the
  declared set
- enforce_unique_constraints_intra_batch — single-batch duplicate
  detection over named columns; partial enforcement (does not check
  against committed rows yet — cross-batch enforcement is a separate
  effort)

Wires the validators into:

- load_jsonl_reader nodes (alongside the existing
  validate_value_constraints call) and edges (which had no enum or
  unique check at all)
- exec/mutation.rs node insert, edge insert, and update paths
- mutation edge insert now also calls validate_edge_cardinality after
  the row lands but before the manifest commit, matching the loader's
  Phase 3 behavior

A new tests/validators.rs suite asserts rejection on every entry path
for invalid enum values, @range violations, intra-batch @unique
duplicates, and edge @card excesses.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-04-28 04:51:10 +03:00 committed by GitHub
parent c8047b6620
commit 56b6319197
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 482 additions and 2 deletions

View file

@ -712,6 +712,15 @@ impl Omnigraph {
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
crate::loader::validate_value_constraints(&batch, node_type)?;
crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&batch,
type_name,
&unique_props,
)?;
}
let has_key = node_type.key_property().is_some();
let (state, table_branch) = if has_key {
self.upsert_batch(type_name, true, schema, batch).await?
@ -741,8 +750,27 @@ impl Omnigraph {
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
validate_edge_insert_endpoints(self, type_name, &resolved).await?;
crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_edge(edge_type);
if !unique_props.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&batch,
type_name,
&unique_props,
)?;
}
let active_branch = self.active_branch().map(str::to_string);
let (state, table_branch) = self.append_batch(type_name, false, schema, batch).await?;
crate::loader::validate_edge_cardinality(
self,
active_branch.as_deref(),
type_name,
state.version,
table_branch.as_deref(),
)
.await?;
let table_key = format!("edge:{}", type_name);
self.commit_updates(&[crate::db::SubTableUpdate {
table_key,
@ -885,7 +913,17 @@ impl Omnigraph {
resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?);
}
let updated = apply_assignments(&schema, &matched, &resolved, &blob_props)?;
crate::loader::validate_value_constraints(&updated, &self.catalog().node_types[type_name])?;
let node_type = &self.catalog().node_types[type_name];
crate::loader::validate_value_constraints(&updated, node_type)?;
crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?;
let unique_props = crate::loader::unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
crate::loader::enforce_unique_constraints_intra_batch(
&updated,
type_name,
&unique_props,
)?;
}
// Re-open for merge_insert (scan consumed the dataset;
// version guard was already applied by open_for_mutation above)

View file

@ -343,6 +343,11 @@ async fn load_jsonl_reader<R: BufRead>(
let node_type = &catalog.node_types[type_name];
let batch = build_node_batch(node_type, rows)?;
validate_value_constraints(&batch, node_type)?;
validate_enum_constraints(&batch, &node_type.properties, type_name)?;
let unique_props = unique_property_names_for_node(node_type);
if !unique_props.is_empty() {
enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?;
}
let loaded_count = batch.num_rows();
let table_key = format!("node:{}", type_name);
snapshot
@ -416,6 +421,11 @@ async fn load_jsonl_reader<R: BufRead>(
for (edge_name, rows) in &edge_rows {
let edge_type = &catalog.edge_types[edge_name];
let batch = build_edge_batch(edge_type, rows)?;
validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
let unique_props = unique_property_names_for_edge(edge_type);
if !unique_props.is_empty() {
enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?;
}
let loaded_count = batch.num_rows();
let table_key = format!("edge:{}", edge_name);
snapshot
@ -1177,6 +1187,183 @@ pub(crate) fn validate_value_constraints(
Ok(())
}
/// Validate that every enum-typed property in `properties` only contains values
/// from its declared enum value set. Operates on a single `RecordBatch` so it
/// can be called from any write path that already holds a batch.
///
/// Scalar string enums are checked directly. List-of-enum properties are
/// checked element-by-element across the underlying string values.
pub(crate) fn validate_enum_constraints(
batch: &RecordBatch,
properties: &HashMap<String, omnigraph_compiler::types::PropType>,
type_name: &str,
) -> Result<()> {
use arrow_array::{Array, ListArray};
for (prop_name, prop_type) in properties {
let Some(allowed) = prop_type.enum_values.as_ref() else {
continue;
};
let Some(col) = batch.column_by_name(prop_name) else {
continue;
};
if prop_type.list {
let Some(list_col) = col.as_any().downcast_ref::<ListArray>() else {
continue;
};
for row in 0..list_col.len() {
if list_col.is_null(row) {
continue;
}
let item_arr = list_col.value(row);
let Some(str_arr) = item_arr.as_any().downcast_ref::<StringArray>() else {
continue;
};
for i in 0..str_arr.len() {
if str_arr.is_null(i) {
continue;
}
let val = str_arr.value(i);
if !allowed.iter().any(|a| a.as_str() == val) {
return Err(OmniError::manifest(format!(
"invalid enum value '{}' for {}.{} (expected: {})",
val,
type_name,
prop_name,
allowed.join(", ")
)));
}
}
}
} else if let Some(str_col) = col.as_any().downcast_ref::<StringArray>() {
for row in 0..str_col.len() {
if str_col.is_null(row) {
continue;
}
let val = str_col.value(row);
if !allowed.iter().any(|a| a.as_str() == val) {
return Err(OmniError::manifest(format!(
"invalid enum value '{}' for {}.{} (expected: {})",
val,
type_name,
prop_name,
allowed.join(", ")
)));
}
}
}
}
Ok(())
}
/// Detect duplicate values within a single `RecordBatch` for any of the named
/// `unique_properties`. Returns an error on the first duplicate found.
///
/// Note: this only catches duplicates *within* the batch. Cross-batch
/// uniqueness against already-committed rows is not enforced here — that
/// requires a dataset scan and is tracked separately.
pub(crate) fn enforce_unique_constraints_intra_batch(
batch: &RecordBatch,
type_name: &str,
unique_properties: &[String],
) -> Result<()> {
for property in unique_properties {
let Some(col_idx) = batch.schema().index_of(property).ok() else {
continue;
};
let arr = batch.column(col_idx);
let mut seen: HashMap<String, usize> = HashMap::new();
for row in 0..batch.num_rows() {
let Some(value) = scalar_to_string(arr, row) else {
continue;
};
if let Some(prev_row) = seen.insert(value.clone(), row) {
return Err(OmniError::manifest(format!(
"@unique violation on {}.{}: value '{}' appears in rows {} and {}",
type_name, property, value, prev_row, row
)));
}
}
}
Ok(())
}
/// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for
/// uniqueness comparison. Returns `None` for null values (nulls are exempt
/// from uniqueness in standard SQL semantics).
fn scalar_to_string(array: &ArrayRef, row: usize) -> Option<String> {
use arrow_array::Array;
if array.is_null(row) {
return None;
}
if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<Int32Array>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<Int64Array>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<UInt32Array>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<UInt64Array>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<Float32Array>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<Float64Array>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<BooleanArray>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<Date32Array>() {
return Some(a.value(row).to_string());
}
if let Some(a) = array.as_any().downcast_ref::<Date64Array>() {
return Some(a.value(row).to_string());
}
None
}
/// Build the flat list of property names that must be checked for uniqueness
/// on a node type. Includes both `@unique` properties (from
/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness).
pub(crate) fn unique_property_names_for_node(
node_type: &omnigraph_compiler::catalog::NodeType,
) -> Vec<String> {
let mut props: Vec<String> = node_type
.unique_constraints
.iter()
.flatten()
.cloned()
.collect();
if let Some(key) = &node_type.key {
props.extend(key.iter().cloned());
}
props.sort();
props.dedup();
props
}
/// Same as [`unique_property_names_for_node`] but for an edge type.
pub(crate) fn unique_property_names_for_edge(
edge_type: &omnigraph_compiler::catalog::EdgeType,
) -> Vec<String> {
let mut props: Vec<String> = edge_type
.unique_constraints
.iter()
.flatten()
.cloned()
.collect();
props.sort();
props.dedup();
props
}
fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
use arrow_array::{
Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
@ -1212,7 +1399,7 @@ fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
// ─── Edge cardinality validation ─────────────────────────────────────────────
async fn validate_edge_cardinality(
pub(crate) async fn validate_edge_cardinality(
db: &crate::db::Omnigraph,
branch: Option<&str>,
edge_name: &str,

View file

@ -0,0 +1,255 @@
// Cross-path validator wire-up tests: each validator (enum, intra-batch
// unique, range, edge cardinality) must reject invalid data on every write
// path — JSONL load, mutation insert (node + edge where applicable),
// mutation update.
mod helpers;
use omnigraph::db::Omnigraph;
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::{mutate_main, params};
const ENUM_SCHEMA: &str = r#"
node Person {
name: String @key
role: enum(admin, guest, member)
}
"#;
const ENUM_VALID_SEED: &str = r#"{"type":"Person","data":{"name":"Alice","role":"admin"}}"#;
const ENUM_MUTATIONS: &str = r#"
query insert_person($name: String, $role: String) {
insert Person { name: $name, role: $role }
}
query set_role($name: String, $role: String) {
update Person set { role: $role } where name = $name
}
"#;
const RANGE_SCHEMA: &str = r#"
node Person {
name: String @key
age: I32?
@range(age, 0..120)
}
"#;
const RANGE_MUTATIONS: &str = r#"
query insert_person($name: String, $age: I32) {
insert Person { name: $name, age: $age }
}
query set_age($name: String, $age: I32) {
update Person set { age: $age } where name = $name
}
"#;
const UNIQUE_SCHEMA: &str = r#"
node User {
name: String @key
email: String?
@unique(email)
}
"#;
const CARDINALITY_SCHEMA: &str = r#"
node Person { name: String @key }
node Company { name: String @key }
edge WorksAt: Person -> Company @card(0..1)
"#;
const CARDINALITY_SEED: &str = r#"{"type":"Person","data":{"name":"Alice"}}
{"type":"Company","data":{"name":"Acme"}}
{"type":"Company","data":{"name":"Beta"}}"#;
const CARDINALITY_MUTATIONS: &str = r#"
query add_employment($person: String, $company: String) {
insert WorksAt { from: $person, to: $company }
}
"#;
async fn init_with(schema: &str, data: &str) -> (tempfile::TempDir, Omnigraph) {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
if !data.is_empty() {
load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap();
}
(dir, db)
}
// ─── Enum validation ─────────────────────────────────────────────────────────
#[tokio::test]
async fn enum_rejected_on_jsonl_load() {
let (_dir, mut db) = init_with(ENUM_SCHEMA, "").await;
let bad = r#"{"type":"Person","data":{"name":"Alice","role":"superadmin"}}"#;
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
.await
.unwrap_err();
assert!(
err.to_string().contains("invalid enum value 'superadmin'"),
"got: {}",
err
);
}
#[tokio::test]
async fn enum_rejected_on_mutation_insert() {
let (_dir, mut db) = init_with(ENUM_SCHEMA, ENUM_VALID_SEED).await;
let err = mutate_main(
&mut db,
ENUM_MUTATIONS,
"insert_person",
&params(&[("$name", "Bob"), ("$role", "superadmin")]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains("invalid enum value 'superadmin'"),
"got: {}",
err
);
}
#[tokio::test]
async fn enum_rejected_on_mutation_update() {
let (_dir, mut db) = init_with(ENUM_SCHEMA, ENUM_VALID_SEED).await;
let err = mutate_main(
&mut db,
ENUM_MUTATIONS,
"set_role",
&params(&[("$name", "Alice"), ("$role", "superadmin")]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains("invalid enum value 'superadmin'"),
"got: {}",
err
);
}
// ─── Range validation ────────────────────────────────────────────────────────
#[tokio::test]
async fn range_rejected_on_jsonl_load() {
let (_dir, mut db) = init_with(RANGE_SCHEMA, "").await;
let bad = r#"{"type":"Person","data":{"name":"Alice","age":250}}"#;
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
.await
.unwrap_err();
assert!(err.to_string().contains("@range violation"), "got: {}", err);
}
#[tokio::test]
async fn range_rejected_on_mutation_insert() {
let (_dir, mut db) = init_with(
RANGE_SCHEMA,
r#"{"type":"Person","data":{"name":"Alice","age":30}}"#,
)
.await;
let err = mutate_main(
&mut db,
RANGE_MUTATIONS,
"insert_person",
&helpers::mixed_params(&[("$name", "Bob")], &[("$age", 250)]),
)
.await
.unwrap_err();
assert!(err.to_string().contains("@range violation"), "got: {}", err);
}
#[tokio::test]
async fn range_rejected_on_mutation_update() {
let (_dir, mut db) = init_with(
RANGE_SCHEMA,
r#"{"type":"Person","data":{"name":"Alice","age":30}}"#,
)
.await;
let err = mutate_main(
&mut db,
RANGE_MUTATIONS,
"set_age",
&helpers::mixed_params(&[("$name", "Alice")], &[("$age", 250)]),
)
.await
.unwrap_err();
assert!(err.to_string().contains("@range violation"), "got: {}", err);
}
// ─── Intra-batch unique validation ───────────────────────────────────────────
#[tokio::test]
async fn intra_batch_unique_rejected_on_jsonl_load() {
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
let bad = r#"{"type":"User","data":{"name":"Alice","email":"dup@example.com"}}
{"type":"User","data":{"name":"Bob","email":"dup@example.com"}}"#;
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
.await
.unwrap_err();
assert!(
err.to_string().contains("@unique violation on User.email"),
"got: {}",
err
);
}
// Note: single-row mutation insert can't violate intra-batch uniqueness
// (only one row in the batch). Cross-batch uniqueness against committed rows
// is out of scope for this wire-up — see the unified write-validator effort.
// ─── Edge cardinality ────────────────────────────────────────────────────────
#[tokio::test]
async fn cardinality_rejected_on_mutation_insert_edge() {
let (_dir, mut db) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await;
// First WorksAt edge — within @card(0..1).
mutate_main(
&mut db,
CARDINALITY_MUTATIONS,
"add_employment",
&params(&[("$person", "Alice"), ("$company", "Acme")]),
)
.await
.unwrap();
// Second WorksAt for the same source — exceeds max=1.
let err = mutate_main(
&mut db,
CARDINALITY_MUTATIONS,
"add_employment",
&params(&[("$person", "Alice"), ("$company", "Beta")]),
)
.await
.unwrap_err();
assert!(
err.to_string().to_lowercase().contains("cardinality")
|| err.to_string().to_lowercase().contains("@card"),
"got: {}",
err
);
}
#[tokio::test]
async fn cardinality_rejected_on_jsonl_load() {
// Already covered by existing loader Phase 3 logic but assert the
// same error surface as the mutation path so a regression is caught
// even if only one path changes.
let (_dir, mut db) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await;
let bad = r#"{"edge":"WorksAt","from":"Alice","to":"Acme"}
{"edge":"WorksAt","from":"Alice","to":"Beta"}"#;
let err = load_jsonl(&mut db, bad, LoadMode::Append)
.await
.unwrap_err();
assert!(
err.to_string().to_lowercase().contains("cardinality")
|| err.to_string().to_lowercase().contains("@card"),
"got: {}",
err
);
}