mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-07-03 02:51:04 +02:00
feat(engine): unify constraint validation across all write surfaces (#314)
* feat(engine): unify constraint validation across all write surfaces
Constraint enforcement (value/range/check, enum, uniqueness, edge
referential integrity, cardinality) was implemented three times — once
each in the bulk loader, the mutation executor, and the branch-merge
path — and had drifted: merge validated @range/@check but not enum, and
neither the mutation nor the load path enforced cross-version uniqueness
against already-committed rows.
Introduce one catalog-derived evaluator (`crate::validate`) that all
three surfaces route through. It is delta-scoped (checks only the change
set, not the whole graph) and index-backed (probes committed state
through the @key/@unique/src/dst BTREEs instead of full-scanning every
catalog table), reusing the existing leaf checks
(validate_value_constraints, validate_enum_constraints,
composite_unique_key) so the surfaces cannot drift again. A one-row-delta
merge now opens ~3 data tables instead of ~6+, and validation cost is
flat in graph size rather than O(V+E).
Behavior changes (all stricter, none relaxed):
- Enum constraints are now enforced on the merge path (was a gap).
- A write or load whose @unique value collides with an already-committed
different row is now rejected (cross-version uniqueness); re-upserting
an existing @key still upserts.
- Uniqueness distinguishes a duplicate key WITHIN one input batch (two
distinct records -> rejected, e.g. a bulk load listing a @key twice)
from the SAME id reappearing ACROSS batches (ordered supersession of
one logical row -> coalesced, e.g. a mutation insert-then-update).
- Overwrite loads validate per-table: a touched table's committed view is
its replacement image (empty), but a table absent from the batch keeps
its committed rows, so an edges-only overwrite still resolves
referential integrity against retained nodes.
Remove the per-surface validation orchestration the evaluator supersedes,
and the now-orphaned version-pinned dataset opener from the sealed
storage trait (reads route through the snapshot path). Docs (invariants,
testing) updated; full engine suite green.
* test(engine): pin orphan-edge validation on adopt-by-pointer merge
Regression for a gap in the unified merge validation: when a table is
adopted by pointer switch (AdoptSourceState) — source on main, target on a
branch — build_merge_changeset skips it, so referential integrity is never
checked for it. Merging main into a branch that deleted a node while main
added an edge to that node silently publishes the orphan edge.
This test merges main -> feature where feature deleted Bob and main added
Knows Alice->Bob, and asserts an OrphanEdge conflict. Red against HEAD
(merge returns Merged); turns green with the AdoptSourceState validation fix.
* fix(engine): validate adopt-by-pointer merge tables (AdoptSourceState)
The unified merge validator skipped any table classified AdoptSourceState
(a pointer switch / fork), so referential integrity, uniqueness, and
cardinality were never checked for it. Merging main into a branch that
deleted a node while main added an edge to that node silently published the
orphan edge — the prior full-scan validation caught it.
Root cause: classify_adopt keyed AdoptSourceState on the publish mechanism
("does it advance Lance HEAD") and returned before computing any delta, and
build_merge_changeset then skipped the table. Fix decouples the validation
input from the publish mechanism: classify_adopt now always computes the
source-vs-target delta (base == target on this path, so it is the right
validation delta) and carries it as AdoptSourceState { validation_delta };
build_merge_changeset validates it exactly like AdoptWithDelta. The publish
stays a pointer/fork (delta ignored) and remains excluded from recovery
pins, so publish/recovery semantics are unchanged — only validation is
restored. Closes the class: no publish optimization can bypass validation.
Turns the orphan-edge regression test green.
* test(engine): pin typed committed-uniqueness probe on non-String columns
The cross-version @unique check pushes a committed-state filter built from
the stringified key. On a non-String @unique column (e.g. Date) this compares
a Date32 column to a Utf8 literal — and the stringified key is the raw day
count, so the probe raises "Cannot cast string '20633' to Date32" for ANY
second write to the table (colliding or not).
Two regressions: a colliding Date value must surface a proper "@unique
violation" (not a coercion error), and a non-colliding write must succeed.
Both red against HEAD; green with the typed-literal probe fix.
* fix(engine): build committed uniqueness probe from typed column values
The cross-version @unique check pushed a Lance filter built with a
stringified key (lit(String)) against the real, typed column. On a
non-String @unique column this compared a Date32/numeric/bool column to a
Utf8 literal: a coercion error on Date/Bool (failing every write to the
table) or a silent miss on Float. For Date the stringified key was even the
raw day count, so the literal could never parse.
unique_holders now takes typed ScalarValues, built at the call site via
ScalarValue::try_from_array(group_column, row), so the pushed-down predicate
compares like-typed for any scalar @unique. The in-memory intra-delta dedup
keeps the stringified key (a type-agnostic equality grouping, unaffected).
Turns the Date @unique cross-version regression tests green.
* test(engine): pin id-keyed cardinality on merge-load edge moves/dups
Two cardinality drifts between validation and what commit persists:
- Move (B): a Merge-load that moves an edge to a new src only recounts the
new src, so vacating a src and dropping it below @card min is missed —
moving Alice's only WorksAt to Bob silently succeeds under @card(1..).
- Dup (A): a Merge-load batch listing one edge id under two srcs counts it
under both, but commit dedupes by id (last-wins). Alice gets a phantom
second edge and a spurious "has 2 edges (max 1)" violation under @card(0..1).
Both red against HEAD; green with the id-keyed last-wins cardinality model.
* fix(engine): key merge/load cardinality by edge id, last-wins
@card validation diverged from what commit persists in two ways: (1) it only
recounted the new src of a delta edge, so a Merge-load that moves an edge to a
new src never rechecked the vacated src and missed a drop below @card min; (2)
it counted raw delta rows, so the same edge id under two srcs in one batch was
counted under both, while commit dedupes by id (last-wins) — a phantom edge
and a spurious max violation.
evaluate_cardinality now coalesces the delta by edge id (last-wins, matching
dedupe_merge_batches_by_id) and builds the affected-src set from both the new
src of each delta edge AND the old committed src of each changed/deleted edge
id; a committed edge is dropped from its src when the delta deletes or
re-places it. The validated edge set per src now equals the committed image.
Turns the edge-move and duplicate-id cardinality regression tests green.
* docs(rfcs): add RFC 0001 — branch merge by fragment adoption
Proposed design for the by-design fix to merge cost/OOM: adopt the source
branch's Lance fragments by reference (base_paths) instead of re-materializing
rows, with a re-home reconciler + branch-delete reference guard closing the
dangling-reference lifecycle, and a reachability-complete cleanup sweep. Grounded
in the public Lance 7.0.0 multi-base APIs and the prior art (Delta shallow/deep
clone, Iceberg/lakeFS reachability GC). Status: Proposed.
* test(engine): pin @card validation on direct edge delete
Deletes stage as predicates, not constructive batches, so a delete-only
mutation produces an empty change-set and validate_changeset no-ops — a
`delete WorksAt where from = X` that removes a source's only edge commits
below @card(1..), while the merge path (which carries deleted_ids) rejects it.
Red against HEAD (the delete commits); green once the delete path resolves
its predicates into the validation change-set.
* fix(engine): validate edge cardinality on delete via resolved predicates
A delete-only mutation produced an empty change-set (deletes stage as
predicates, not constructive batches), so validate_changeset no-op'd and a
`delete Edge` that dropped a source below @card min committed silently — while
the merge path, which carries deleted_ids, rejects it.
validate_staged_mutation now resolves each staged delete predicate against the
live committed table (CommittedState::deleted_ids_matching, a SQL-filter scan
projecting id) and folds the matched ids into the change-set's deleted_ids for
that table. The existing evaluator then recounts the srcs a delete empties
(@card min) and sees removed rows for RI/node-delete — the same faithful
change-set the merge path already builds, so validation matches what commits.
Covers direct edge deletes, node deletes, and node-delete edge cascades
uniformly (all are staged predicates).
Turns the direct-edge-delete @card regression test green.
* refactor(engine): capture deleted ids at delete time, drop validation re-scan
The delete-cardinality fix resolved staged delete predicates a second time at
validation. Instead, capture the removed ids during the delete op's own scan:
execute_delete_edge and the node-delete edge cascade now scan id (not
count_rows), record the ids via MutationStaging::record_deleted_ids, and
to_changeset() folds them into the change-set's deleted_ids. validate_staged_
mutation reverts to plain to_changeset(); CommittedState::deleted_ids_matching
and scan_filtered_sql are removed.
Behavior-preserving (the @card-on-delete test stays green) and strictly fewer
scans — one scan at delete time replaces count-here + resolve-at-validation.
Node deletes already scanned their ids; this reuses that via a shared
ids_from_batches helper. Full engine suite green; workspace builds clean.
* test(engine): pin overwrite-removal RI + coalesced-unique final image
Two reviewer findings, both red against HEAD:
- F1 (High): overwriting a node table removes nodes without expressing them as
deleted_ids, so a retained edge in a non-overwritten table that references a
removed node is published as an orphan (edge-RI path-b never runs).
overwrite_node_removal_rejects_retained_orphan_edge.
- F2 (Medium): evaluate_unique accumulates superseded keys across batches, so a
mutation that frees a @unique value (Alice.email temp -> final) and reuses it
(insert Carol.email = temp) false-rejects a valid final image.
chained_unique_update_then_reuse_freed_value_is_not_a_violation.
* fix(engine): validate overwrite removals (orphan edges, emptied srcs)
An Overwrite load replaces each touched table, but to_changeset() only recorded
the new batch, never the committed rows the overwrite removes. So overwriting
node:Person to drop Bob while a retained edge:Knows(Alice->Bob) referenced him
published an orphan edge unchecked — edge-RI path-b is gated on the node's
deleted_ids, which were empty.
The loader now computes per overwritten table the removed ids (committed ids in
the pinned base minus the replacement batch's ids, via validate::
overwrite_removed_ids) and folds them into the change-set's deleted_ids. The
evaluator then runs RI path-b and cardinality against them — the same faithful
change-set the merge path builds. Overwrite is per-table, so a table absent from
the batch is untouched; a removed node referenced by a retained edge is now a
loud OrphanEdge.
Updates two tests that asserted the old silent-orphan behavior to
self-consistent overwrites (per-table Overwrite can't drop edge endpoints
without also overwriting the edge tables): end_to_end::overwrite_replaces_data
and writes::load_overwrite_with_bad_edge_reference_unblocks_next_load. The
orphan-rejection case itself is pinned by the new validators test.
* fix(engine): evaluate @unique against the coalesced final delta image
evaluate_unique iterated the raw delta batches and accumulated every key it saw
into one cross-batch map, so a coalesced write that frees then reuses a @unique
value within a query — update a row's email to 'temp', update the same row to
'final', insert a new row with 'temp' — false-rejected: 'temp' lingered in the
seen-set from the superseded first write though it no longer holds in the final
image that commits.
Restructure to validate the final coalesced image — the bytes that actually
publish:
- Pass 1 coalesces the delta by id (last-wins) into each id's final key, and
flags genuine within-ONE-batch duplicates (two distinct input records — the
bulk-load contract) before coalescing, so an unordered load batch with a real
dup still rejects.
- Pass 2 checks two distinct final ids holding the same key.
- Pass 3 does the committed cross-version lookup, excluding the delta's own ids.
Entries are sorted by id before the cross-row/committed passes so violation
order never depends on HashMap iteration. Coalescing first also drops the
redundant committed probes a superseded key used to issue.
Pinned by the chained-update red test; preserves intra-batch dup rejection
(consistency::loader_rejects_intra_batch_duplicate_keys) and cross-version
uniqueness (validators).
* style(engine): drop trailing blank line at staging.rs EOF
Left by a block-delete in an earlier refactor; flagged by git diff --check.
* docs(engine): refresh validate.rs module doc to current consumers
The module doc still said the merge path was the only consumer and the write
path a later, mechanical migration, and listed cardinality as a later
increment. Mutation and bulk load have since migrated onto the evaluator and
cardinality ships — correct both so the doc reflects that all three write
surfaces route through one evaluator.
This commit is contained in:
parent
4afb513700
commit
0dce7c8d18
18 changed files with 2467 additions and 1102 deletions
|
|
@ -1786,15 +1786,6 @@ impl Omnigraph {
|
|||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn open_dataset_at_state(
|
||||
&self,
|
||||
table_path: &str,
|
||||
table_branch: Option<&str>,
|
||||
table_version: u64,
|
||||
) -> Result<SnapshotHandle> {
|
||||
table_ops::open_dataset_at_state(self, table_path, table_branch, table_version).await
|
||||
}
|
||||
|
||||
pub(crate) async fn build_indices_on_dataset(
|
||||
&self,
|
||||
table_key: &str,
|
||||
|
|
|
|||
|
|
@ -936,17 +936,6 @@ pub(super) async fn reopen_for_mutation(
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn open_dataset_at_state(
|
||||
db: &Omnigraph,
|
||||
table_path: &str,
|
||||
table_branch: Option<&str>,
|
||||
table_version: u64,
|
||||
) -> Result<SnapshotHandle> {
|
||||
db.storage()
|
||||
.open_dataset_at_state(table_path, table_branch, table_version)
|
||||
.await
|
||||
}
|
||||
|
||||
/// A declared index the builder could not materialize on this pass. Today the
|
||||
/// only such case is a vector (IVF) column with no trainable vectors yet
|
||||
/// (KMeans needs >=1 vector), e.g. the load-before-embed window. Reported, not
|
||||
|
|
|
|||
|
|
@ -6,8 +6,17 @@ const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
|
|||
#[derive(Debug)]
|
||||
enum CandidateTableState {
|
||||
/// Adopt the source's table state via a pointer switch or a branch fork —
|
||||
/// no data HEAD advance, so nothing to pin for recovery.
|
||||
AdoptSourceState,
|
||||
/// no data HEAD advance, so nothing to pin for recovery. `validation_delta`
|
||||
/// carries the source-vs-target row delta (added/changed/deleted) for the
|
||||
/// evaluator ONLY — the publish is still a pointer/fork — so a pointer-adopt
|
||||
/// whose source diverged is still validated (RI/uniqueness/cardinality)
|
||||
/// against the merged state instead of being silently published. `None` when
|
||||
/// the source matched the target (nothing to validate). Decoupling the
|
||||
/// validation delta from the publish mechanism keeps the publish O(1) while
|
||||
/// closing the unvalidated-adopt gap.
|
||||
AdoptSourceState {
|
||||
validation_delta: Option<AdoptDelta>,
|
||||
},
|
||||
/// Adopt the source's state by applying a non-empty delta onto the target's
|
||||
/// lineage (append new + upsert changed + delete removed). The delta is
|
||||
/// pre-computed at classification so this candidate can be recovery-pinned:
|
||||
|
|
@ -24,7 +33,6 @@ struct StagedTable {
|
|||
|
||||
#[derive(Debug)]
|
||||
struct StagedMergeResult {
|
||||
full_staged: StagedTable,
|
||||
delta_staged: Option<StagedTable>,
|
||||
deleted_ids: Vec<String>,
|
||||
}
|
||||
|
|
@ -446,7 +454,6 @@ async fn stage_streaming_table_merge(
|
|||
conflicts: &mut Vec<MergeConflict>,
|
||||
) -> Result<Option<StagedMergeResult>> {
|
||||
let schema = schema_for_table_key(catalog, table_key)?;
|
||||
let mut full_writer = StagedTableWriter::new(&format!("{}_full", table_key), schema.clone())?;
|
||||
let mut delta_writer = StagedTableWriter::new(&format!("{}_delta", table_key), schema)?;
|
||||
let mut deleted_ids: Vec<String> = Vec::new();
|
||||
let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
|
||||
|
|
@ -514,9 +521,9 @@ async fn stage_streaming_table_merge(
|
|||
}
|
||||
|
||||
if let Some(selection) = selection {
|
||||
// Always write to full (for validation)
|
||||
full_writer.push_row(selection).await?;
|
||||
// Only write changed rows to delta (for publish)
|
||||
// Only changed rows go to the delta (for publish). The full merged
|
||||
// table is no longer staged — validation works off this delta plus
|
||||
// the committed target via index lookups, not a full re-scan.
|
||||
if selection.signature.as_str() != target_sig.unwrap_or("") {
|
||||
delta_writer.push_row(selection).await?;
|
||||
needs_update = true;
|
||||
|
|
@ -538,7 +545,6 @@ async fn stage_streaming_table_merge(
|
|||
};
|
||||
|
||||
Ok(Some(StagedMergeResult {
|
||||
full_staged: full_writer.finish().await?,
|
||||
delta_staged,
|
||||
deleted_ids,
|
||||
}))
|
||||
|
|
@ -621,297 +627,138 @@ fn row_signature(batch: &RecordBatch, row: usize) -> Result<String> {
|
|||
Ok(values.join("\u{1f}"))
|
||||
}
|
||||
|
||||
async fn scan_validation_stream(ds: &Dataset) -> Result<DatasetRecordBatchStream> {
|
||||
crate::table_store::TableStore::scan_stream_with(ds, None, None, None, false, |_| Ok(())).await
|
||||
/// Build the per-table [`ChangeSet`](crate::validate::ChangeSet) for a merge from
|
||||
/// the classified candidates — the new/changed rows (from the staged deltas) and
|
||||
/// removed ids the validator evaluates, instead of re-scanning whole tables.
|
||||
/// `AdoptSourceState` is published as a pointer/fork but still carries a
|
||||
/// `validation_delta` (the source-vs-target rows) when its source diverged, so
|
||||
/// it is validated like `AdoptWithDelta`; only an empty-delta adopt is skipped.
|
||||
async fn build_merge_changeset(
|
||||
db: &Omnigraph,
|
||||
candidates: &HashMap<String, CandidateTableState>,
|
||||
) -> Result<crate::validate::ChangeSet> {
|
||||
let catalog = db.catalog();
|
||||
let mut changeset = crate::validate::ChangeSet::new();
|
||||
for (table_key, candidate) in candidates {
|
||||
// Validation reads only id/src/dst + scalar constraint columns; project
|
||||
// out Vector/Blob so the change-set never holds embeddings (holding the
|
||||
// delta with embeddings would re-introduce the memory pressure the
|
||||
// streaming append exists to avoid).
|
||||
let projection = validation_projection(&catalog, table_key);
|
||||
let projection: Vec<&str> = projection.iter().map(String::as_str).collect();
|
||||
let mut change = crate::validate::TableChange::default();
|
||||
match candidate {
|
||||
// Pointer/fork adopt whose source matched the target: nothing to
|
||||
// validate. A pointer/fork adopt whose source diverged carries a
|
||||
// `validation_delta` and is validated exactly like `AdoptWithDelta`
|
||||
// (only the publish differs — pointer vs HEAD-advancing).
|
||||
CandidateTableState::AdoptSourceState {
|
||||
validation_delta: None,
|
||||
} => continue,
|
||||
CandidateTableState::AdoptSourceState {
|
||||
validation_delta: Some(delta),
|
||||
}
|
||||
| CandidateTableState::AdoptWithDelta(delta) => {
|
||||
if let Some(table) = &delta.appends {
|
||||
change
|
||||
.added
|
||||
.extend(scan_staged_for_validation(db, table, &projection).await?);
|
||||
}
|
||||
if let Some(table) = &delta.upserts {
|
||||
change
|
||||
.changed
|
||||
.extend(scan_staged_for_validation(db, table, &projection).await?);
|
||||
}
|
||||
change.deleted_ids = delta.deleted_ids.clone();
|
||||
}
|
||||
CandidateTableState::RewriteMerged(staged) => {
|
||||
if let Some(table) = &staged.delta_staged {
|
||||
change
|
||||
.changed
|
||||
.extend(scan_staged_for_validation(db, table, &projection).await?);
|
||||
}
|
||||
change.deleted_ids = staged.deleted_ids.clone();
|
||||
}
|
||||
}
|
||||
changeset.insert(table_key.clone(), change);
|
||||
}
|
||||
Ok(changeset)
|
||||
}
|
||||
|
||||
/// Columns validation needs from a staged delta: `id` (+ `src`/`dst` for edges)
|
||||
/// plus scalar/enum property columns. Vector and Blob columns are excluded — no
|
||||
/// constraint reads them, and keeping them out of the change-set keeps validation
|
||||
/// memory bounded regardless of embedding width.
|
||||
fn validation_projection(catalog: &Catalog, table_key: &str) -> Vec<String> {
|
||||
use omnigraph_compiler::types::{PropType, ScalarType};
|
||||
let is_heavy = |ty: &PropType| matches!(ty.scalar, ScalarType::Vector(_) | ScalarType::Blob);
|
||||
let mut cols = vec!["id".to_string()];
|
||||
if let Some(name) = table_key.strip_prefix("node:") {
|
||||
if let Some(node_type) = catalog.node_types.get(name) {
|
||||
for (prop, ty) in &node_type.properties {
|
||||
if !is_heavy(ty) {
|
||||
cols.push(prop.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if let Some(name) = table_key.strip_prefix("edge:") {
|
||||
cols.push("src".to_string());
|
||||
cols.push("dst".to_string());
|
||||
if let Some(edge_type) = catalog.edge_types.get(name) {
|
||||
for (prop, ty) in &edge_type.properties {
|
||||
if !is_heavy(ty) {
|
||||
cols.push(prop.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
cols
|
||||
}
|
||||
|
||||
/// Scan a staged delta table for validation, projected to the constraint columns
|
||||
/// (no embeddings) and kept batch-shaped — never concatenated into one batch, so
|
||||
/// it does not reintroduce the whole-delta materialization the streaming append
|
||||
/// avoids. Empty batches are dropped.
|
||||
async fn scan_staged_for_validation(
|
||||
db: &Omnigraph,
|
||||
table: &StagedTable,
|
||||
projection: &[&str],
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
let snapshot = SnapshotHandle::new(table.dataset.clone());
|
||||
let batches = db
|
||||
.storage()
|
||||
.scan(&snapshot, Some(projection), None, None)
|
||||
.await?;
|
||||
Ok(batches
|
||||
.into_iter()
|
||||
.filter(|batch| batch.num_rows() > 0)
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn validate_merge_candidates(
|
||||
db: &Omnigraph,
|
||||
source_snapshot: &Snapshot,
|
||||
target_snapshot: &Snapshot,
|
||||
candidates: &HashMap<String, CandidateTableState>,
|
||||
changeset: &crate::validate::ChangeSet,
|
||||
) -> Result<()> {
|
||||
let mut conflicts = Vec::new();
|
||||
let mut node_ids: HashMap<String, HashSet<String>> = HashMap::new();
|
||||
|
||||
for (type_name, node_type) in &db.catalog().node_types {
|
||||
let table_key = format!("node:{}", type_name);
|
||||
let mut values = HashSet::new();
|
||||
let mut unique_seen = vec![HashMap::new(); node_type.unique_constraints.len()];
|
||||
|
||||
if let Some(ds) =
|
||||
candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
|
||||
{
|
||||
let mut stream = scan_validation_stream(&ds).await?;
|
||||
while let Some(batch) = stream
|
||||
.try_next()
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
{
|
||||
if let Err(err) = crate::loader::validate_value_constraints(&batch, node_type) {
|
||||
conflicts.push(MergeConflict {
|
||||
table_key: table_key.clone(),
|
||||
row_id: None,
|
||||
kind: MergeConflictKind::ValueConstraintViolation,
|
||||
message: err.to_string(),
|
||||
});
|
||||
}
|
||||
update_unique_constraints(
|
||||
&table_key,
|
||||
&batch,
|
||||
&node_type.unique_constraints,
|
||||
&mut unique_seen,
|
||||
&mut conflicts,
|
||||
)?;
|
||||
let ids = batch
|
||||
.column_by_name("id")
|
||||
.ok_or_else(|| {
|
||||
OmniError::manifest(format!("table {} missing id column", table_key))
|
||||
})?
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.ok_or_else(|| {
|
||||
OmniError::manifest(format!("table {} id column is not Utf8", table_key))
|
||||
})?;
|
||||
for row in 0..ids.len() {
|
||||
values.insert(ids.value(row).to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
node_ids.insert(type_name.clone(), values);
|
||||
}
|
||||
|
||||
for (edge_name, edge_type) in &db.catalog().edge_types {
|
||||
let table_key = format!("edge:{}", edge_name);
|
||||
let mut unique_seen = vec![HashMap::new(); edge_type.unique_constraints.len()];
|
||||
let mut src_counts = HashMap::new();
|
||||
|
||||
if let Some(ds) =
|
||||
candidate_dataset(source_snapshot, target_snapshot, candidates, &table_key).await?
|
||||
{
|
||||
let mut stream = scan_validation_stream(&ds).await?;
|
||||
while let Some(batch) = stream
|
||||
.try_next()
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
{
|
||||
update_unique_constraints(
|
||||
&table_key,
|
||||
&batch,
|
||||
&edge_type.unique_constraints,
|
||||
&mut unique_seen,
|
||||
&mut conflicts,
|
||||
)?;
|
||||
accumulate_edge_cardinality(&batch, &mut src_counts, &table_key)?;
|
||||
conflicts.extend(validate_orphan_edges_batch(
|
||||
&table_key, edge_type, &batch, &node_ids,
|
||||
)?);
|
||||
}
|
||||
}
|
||||
|
||||
conflicts.extend(finalize_edge_cardinality_conflicts(
|
||||
&table_key,
|
||||
edge_name,
|
||||
edge_type.cardinality.min,
|
||||
edge_type.cardinality.max,
|
||||
src_counts,
|
||||
));
|
||||
}
|
||||
|
||||
if conflicts.is_empty() {
|
||||
// Δ-scoped, index-backed validation: the declared constraints are evaluated
|
||||
// over the merge delta against the committed target (queried through its
|
||||
// BTREE indexes), not by re-scanning every catalog table. Value/enum,
|
||||
// uniqueness, edge-RI, and cardinality all route through one evaluator shared
|
||||
// with (eventually) the write path — closing the merge-vs-write drift.
|
||||
let committed = crate::validate::CommittedState::merge(target_snapshot);
|
||||
let constraints = crate::validate::constraints_for(&db.catalog());
|
||||
let violations =
|
||||
crate::validate::evaluate(&constraints, changeset, &committed, &db.catalog()).await?;
|
||||
if violations.is_empty() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(OmniError::MergeConflicts(conflicts))
|
||||
}
|
||||
}
|
||||
|
||||
async fn candidate_dataset(
|
||||
source_snapshot: &Snapshot,
|
||||
target_snapshot: &Snapshot,
|
||||
candidates: &HashMap<String, CandidateTableState>,
|
||||
table_key: &str,
|
||||
) -> Result<Option<Dataset>> {
|
||||
if let Some(candidate) = candidates.get(table_key) {
|
||||
return match candidate {
|
||||
CandidateTableState::AdoptSourceState | CandidateTableState::AdoptWithDelta(_) => {
|
||||
match source_snapshot.entry(table_key) {
|
||||
Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
CandidateTableState::RewriteMerged(staged) => {
|
||||
Ok(Some(staged.full_staged.dataset.clone()))
|
||||
}
|
||||
};
|
||||
}
|
||||
match target_snapshot.entry(table_key) {
|
||||
Some(_) => Ok(Some(target_snapshot.open(table_key).await?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_unique_constraints(
|
||||
table_key: &str,
|
||||
batch: &RecordBatch,
|
||||
constraints: &[Vec<String>],
|
||||
seen: &mut [HashMap<Vec<String>, String>],
|
||||
conflicts: &mut Vec<MergeConflict>,
|
||||
) -> Result<()> {
|
||||
for (constraint_idx, columns) in constraints.iter().enumerate() {
|
||||
let seen = &mut seen[constraint_idx];
|
||||
// Resolve the group's columns once. The candidate dataset always
|
||||
// carries the full table schema, so a missing column is an internal
|
||||
// error rather than a skip.
|
||||
let group_columns = columns
|
||||
.iter()
|
||||
.map(|column_name| {
|
||||
batch.column_by_name(column_name).cloned().ok_or_else(|| {
|
||||
OmniError::manifest(format!(
|
||||
"table {} missing unique column '{}'",
|
||||
table_key, column_name
|
||||
))
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
for row in 0..batch.num_rows() {
|
||||
// Same tuple key as the intake path — one shared derivation in
|
||||
// `crate::loader::composite_unique_key`, so the two cannot drift on
|
||||
// separator or scalar conversion. Null rows are exempt.
|
||||
let Some(key) = crate::loader::composite_unique_key(&group_columns, row)? else {
|
||||
continue;
|
||||
};
|
||||
let row_id = row_id_at(batch, row)?;
|
||||
if let Some(first_row_id) = seen.insert(key, row_id.clone()) {
|
||||
conflicts.push(MergeConflict {
|
||||
table_key: table_key.to_string(),
|
||||
row_id: Some(row_id.clone()),
|
||||
kind: MergeConflictKind::UniqueViolation,
|
||||
message: format!(
|
||||
"unique constraint {:?} violated by '{}' and '{}'",
|
||||
columns, first_row_id, row_id
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn accumulate_edge_cardinality(
|
||||
batch: &RecordBatch,
|
||||
counts: &mut HashMap<String, u32>,
|
||||
table_key: &str,
|
||||
) -> Result<()> {
|
||||
let srcs = batch
|
||||
.column_by_name("src")
|
||||
.ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.ok_or_else(|| {
|
||||
OmniError::manifest(format!("table {} src column is not Utf8", table_key))
|
||||
})?;
|
||||
for row in 0..srcs.len() {
|
||||
*counts.entry(srcs.value(row).to_string()).or_insert(0_u32) += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finalize_edge_cardinality_conflicts(
|
||||
table_key: &str,
|
||||
edge_name: &str,
|
||||
min: u32,
|
||||
max: Option<u32>,
|
||||
counts: HashMap<String, u32>,
|
||||
) -> Vec<MergeConflict> {
|
||||
let mut conflicts = Vec::new();
|
||||
for (src, count) in counts {
|
||||
if let Some(max) = max {
|
||||
if count > max {
|
||||
conflicts.push(MergeConflict {
|
||||
table_key: table_key.to_string(),
|
||||
row_id: None,
|
||||
kind: MergeConflictKind::CardinalityViolation,
|
||||
message: format!(
|
||||
"@card violation on edge {}: source '{}' has {} edges (max {})",
|
||||
edge_name, src, count, max
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
if count < min {
|
||||
conflicts.push(MergeConflict {
|
||||
table_key: table_key.to_string(),
|
||||
row_id: None,
|
||||
kind: MergeConflictKind::CardinalityViolation,
|
||||
message: format!(
|
||||
"@card violation on edge {}: source '{}' has {} edges (min {})",
|
||||
edge_name, src, count, min
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
conflicts
|
||||
}
|
||||
|
||||
fn validate_orphan_edges_batch(
|
||||
table_key: &str,
|
||||
edge_type: &omnigraph_compiler::catalog::EdgeType,
|
||||
batch: &RecordBatch,
|
||||
node_ids: &HashMap<String, HashSet<String>>,
|
||||
) -> Result<Vec<MergeConflict>> {
|
||||
let srcs = batch
|
||||
.column_by_name("src")
|
||||
.ok_or_else(|| OmniError::manifest(format!("table {} missing src column", table_key)))?
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.ok_or_else(|| {
|
||||
OmniError::manifest(format!("table {} src column is not Utf8", table_key))
|
||||
})?;
|
||||
let dsts = batch
|
||||
.column_by_name("dst")
|
||||
.ok_or_else(|| OmniError::manifest(format!("table {} missing dst column", table_key)))?
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.ok_or_else(|| {
|
||||
OmniError::manifest(format!("table {} dst column is not Utf8", table_key))
|
||||
})?;
|
||||
|
||||
let from_ids = node_ids.get(&edge_type.from_type).ok_or_else(|| {
|
||||
OmniError::manifest(format!(
|
||||
"missing candidate node ids for {}",
|
||||
edge_type.from_type
|
||||
Err(OmniError::MergeConflicts(
|
||||
violations
|
||||
.into_iter()
|
||||
.map(crate::validate::Violation::into_merge_conflict)
|
||||
.collect(),
|
||||
))
|
||||
})?;
|
||||
let to_ids = node_ids.get(&edge_type.to_type).ok_or_else(|| {
|
||||
OmniError::manifest(format!(
|
||||
"missing candidate node ids for {}",
|
||||
edge_type.to_type
|
||||
))
|
||||
})?;
|
||||
|
||||
let mut conflicts = Vec::new();
|
||||
for row in 0..batch.num_rows() {
|
||||
let row_id = row_id_at(batch, row)?;
|
||||
let src = srcs.value(row);
|
||||
let dst = dsts.value(row);
|
||||
if !from_ids.contains(src) {
|
||||
conflicts.push(MergeConflict {
|
||||
table_key: table_key.to_string(),
|
||||
row_id: Some(row_id.clone()),
|
||||
kind: MergeConflictKind::OrphanEdge,
|
||||
message: format!("src '{}' not found in {}", src, edge_type.from_type),
|
||||
});
|
||||
}
|
||||
if !to_ids.contains(dst) {
|
||||
conflicts.push(MergeConflict {
|
||||
table_key: table_key.to_string(),
|
||||
row_id: Some(row_id),
|
||||
kind: MergeConflictKind::OrphanEdge,
|
||||
message: format!("dst '{}' not found in {}", dst, edge_type.to_type),
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(conflicts)
|
||||
}
|
||||
|
||||
fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
|
||||
|
|
@ -944,7 +791,10 @@ async fn classify_adopt(
|
|||
table_key: &str,
|
||||
) -> Result<CandidateTableState> {
|
||||
let Some(source_entry) = source_snapshot.entry(table_key) else {
|
||||
return Ok(CandidateTableState::AdoptSourceState);
|
||||
// Source has no such table — nothing to adopt or validate.
|
||||
return Ok(CandidateTableState::AdoptSourceState {
|
||||
validation_delta: None,
|
||||
});
|
||||
};
|
||||
let target_entry = target_snapshot.entry(table_key);
|
||||
let target_active = target_db.active_branch().await;
|
||||
|
|
@ -961,12 +811,21 @@ async fn classify_adopt(
|
|||
// Source on main (pointer switch) or target doesn't own (fork): no advance.
|
||||
_ => false,
|
||||
};
|
||||
if !advances_head {
|
||||
return Ok(CandidateTableState::AdoptSourceState);
|
||||
}
|
||||
match compute_adopt_delta(table_key, catalog, base_snapshot, source_snapshot).await? {
|
||||
Some(delta) => Ok(CandidateTableState::AdoptWithDelta(delta)),
|
||||
None => Ok(CandidateTableState::AdoptSourceState),
|
||||
// Compute the source-vs-target delta UNCONDITIONALLY — it is the validation
|
||||
// input the evaluator needs, independent of how the table is published.
|
||||
// (`classify_adopt` is only reached when base == target, so the
|
||||
// base-vs-source delta equals the target-vs-source delta.) A HEAD-advancing
|
||||
// publish consumes it as the write payload (`AdoptWithDelta`); a pointer/fork
|
||||
// publish ignores it and only validates it (`AdoptSourceState`), so a
|
||||
// pointer-adopt whose source diverged is still checked for
|
||||
// RI/uniqueness/cardinality against the merged state.
|
||||
let validation_delta =
|
||||
compute_adopt_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
|
||||
match (advances_head, validation_delta) {
|
||||
(true, Some(delta)) => Ok(CandidateTableState::AdoptWithDelta(delta)),
|
||||
(_, validation_delta) => {
|
||||
Ok(CandidateTableState::AdoptSourceState { validation_delta })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1588,7 +1447,8 @@ impl Omnigraph {
|
|||
return Err(OmniError::MergeConflicts(conflicts));
|
||||
}
|
||||
|
||||
validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
|
||||
let changeset = build_merge_changeset(self, &candidates).await?;
|
||||
validate_merge_candidates(self, &target_snapshot, &changeset).await?;
|
||||
|
||||
// Recovery sidecar: protect the per-table commit_staged loop.
|
||||
// Pin `RewriteMerged` and `AdoptWithDelta` candidates — both advance
|
||||
|
|
@ -1627,7 +1487,7 @@ impl Omnigraph {
|
|||
matches!(
|
||||
candidates.get(*table_key),
|
||||
Some(CandidateTableState::RewriteMerged(_))
|
||||
| Some(CandidateTableState::AdoptSourceState)
|
||||
| Some(CandidateTableState::AdoptSourceState { .. })
|
||||
| Some(CandidateTableState::AdoptWithDelta(_))
|
||||
)
|
||||
})
|
||||
|
|
@ -1643,7 +1503,7 @@ impl Omnigraph {
|
|||
if !matches!(
|
||||
candidate,
|
||||
CandidateTableState::RewriteMerged(_)
|
||||
| CandidateTableState::AdoptSourceState
|
||||
| CandidateTableState::AdoptSourceState { .. }
|
||||
| CandidateTableState::AdoptWithDelta(_)
|
||||
) {
|
||||
continue;
|
||||
|
|
@ -1746,7 +1606,7 @@ impl Omnigraph {
|
|||
continue;
|
||||
};
|
||||
let update = match candidate_state {
|
||||
CandidateTableState::AdoptSourceState => {
|
||||
CandidateTableState::AdoptSourceState { .. } => {
|
||||
publish_adopted_source_state(self, source_snapshot, &target_snapshot, table_key)
|
||||
.await?
|
||||
}
|
||||
|
|
|
|||
|
|
@ -338,112 +338,6 @@ fn build_insert_batch(
|
|||
RecordBatch::try_new(schema.clone(), columns).map_err(|e| OmniError::Lance(e.to_string()))
|
||||
}
|
||||
|
||||
async fn validate_edge_insert_endpoints(
|
||||
db: &Omnigraph,
|
||||
staging: &MutationStaging,
|
||||
branch: Option<&str>,
|
||||
edge_name: &str,
|
||||
assignments: &HashMap<String, Literal>,
|
||||
) -> Result<()> {
|
||||
let catalog = db.catalog();
|
||||
let edge_type = catalog
|
||||
.edge_types
|
||||
.get(edge_name)
|
||||
.ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_name)))?;
|
||||
let from = match assignments.get("from") {
|
||||
Some(Literal::String(value)) => value.as_str(),
|
||||
Some(other) => {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"edge {} from endpoint must be a string id, got {}",
|
||||
edge_name,
|
||||
literal_to_sql(other)
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"edge {} missing 'from' endpoint",
|
||||
edge_name
|
||||
)));
|
||||
}
|
||||
};
|
||||
let to = match assignments.get("to") {
|
||||
Some(Literal::String(value)) => value.as_str(),
|
||||
Some(other) => {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"edge {} to endpoint must be a string id, got {}",
|
||||
edge_name,
|
||||
literal_to_sql(other)
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"edge {} missing 'to' endpoint",
|
||||
edge_name
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
ensure_node_id_exists(db, staging, branch, &edge_type.from_type, from, "src").await?;
|
||||
ensure_node_id_exists(db, staging, branch, &edge_type.to_type, to, "dst").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Quick scan of pending batches for an `id` value match. Used by the
|
||||
/// mutation path's edge endpoint validation to satisfy read-your-writes
|
||||
/// for same-query inserts before they're committed to Lance.
|
||||
fn pending_batches_contain_id(batches: &[RecordBatch], id: &str) -> bool {
|
||||
for batch in batches {
|
||||
let Some(col) = batch.column_by_name("id") else {
|
||||
continue;
|
||||
};
|
||||
let Some(arr) = col.as_any().downcast_ref::<StringArray>() else {
|
||||
continue;
|
||||
};
|
||||
for i in 0..arr.len() {
|
||||
if arr.is_valid(i) && arr.value(i) == id {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn ensure_node_id_exists(
|
||||
db: &Omnigraph,
|
||||
staging: &MutationStaging,
|
||||
branch: Option<&str>,
|
||||
node_type: &str,
|
||||
id: &str,
|
||||
label: &str,
|
||||
) -> Result<()> {
|
||||
let table_key = format!("node:{}", node_type);
|
||||
|
||||
// Prefer the in-query pending accumulator so a same-query insert of
|
||||
// the referenced node is visible to this validation. Fall back to
|
||||
// the pre-mutation manifest snapshot when nothing pending matches.
|
||||
let pending = staging.pending_batches(&table_key);
|
||||
if pending_batches_contain_id(pending, id) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let filter = format!("id = '{}'", id.replace('\'', "''"));
|
||||
let snapshot = db.snapshot_for_branch(branch).await?;
|
||||
let ds = db
|
||||
.storage()
|
||||
.open_snapshot_at_table(&snapshot, &table_key)
|
||||
.await?;
|
||||
let exists = db.storage().count_rows(&ds, Some(filter)).await? > 0;
|
||||
|
||||
if exists {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(OmniError::manifest(format!(
|
||||
"{} '{}' not found in {}",
|
||||
label, id, node_type
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert an IRMutationPredicate to a Lance SQL filter string.
|
||||
fn predicate_to_sql(
|
||||
predicate: &IRMutationPredicate,
|
||||
|
|
@ -588,40 +482,6 @@ use super::staging::{MutationStaging, PendingMode};
|
|||
/// them into one staged delete — there is no post-inline-commit reopen to
|
||||
/// special-case anymore.
|
||||
impl Omnigraph {
|
||||
/// Resolve a LIVE-HEAD read handle for an edge table's committed-state `@card`
|
||||
/// scan when collapse #1 skipped the accumulation open. The edge-insert path no
|
||||
/// longer opens the edge dataset (non-strict op + txn), but cardinality is
|
||||
/// validated ONCE (never rechecked at commit), so the scan must observe the
|
||||
/// freshest committed edges — NOT the pinned `txn.base`. A concurrent writer can
|
||||
/// commit edges to this table after `txn` capture; counting against the stale
|
||||
/// base undercounts and lets a violating insert through (invariant 9). The table
|
||||
/// LOCATION is read from the pinned entry (stable across versions); the dataset is
|
||||
/// opened at live HEAD via `open_dataset_head_for_write` (a read here despite the
|
||||
/// name — no lock/stage), restoring the pre-3b image (the mutation's own open).
|
||||
/// The residual validate→commit race (a writer committing between this scan and
|
||||
/// the end-of-query commit) is the §7.1 gap, closed by RFC-013 step 4.
|
||||
async fn edge_cardinality_read_handle(
|
||||
&self,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
table_key: &str,
|
||||
) -> Result<SnapshotHandle> {
|
||||
let branch = txn.and_then(|t| t.branch.as_deref());
|
||||
match txn.and_then(|t| t.base.entry(table_key)) {
|
||||
Some(entry) => {
|
||||
let full_path = self.storage().dataset_uri(&entry.table_path);
|
||||
self.storage()
|
||||
.open_dataset_head_for_write(table_key, &full_path, branch)
|
||||
.await
|
||||
}
|
||||
// Unreachable today (the `None` handle only reaches here under a txn whose
|
||||
// base contains the table). Defensive: resolve the table fresh (live)
|
||||
// without the schema re-validation `snapshot_for_branch` would re-run.
|
||||
None => {
|
||||
let snapshot = self.fresh_snapshot_for_branch_unchecked(branch).await?;
|
||||
self.storage().open_snapshot_at_table(&snapshot, table_key).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn open_table_for_mutation(
|
||||
|
|
@ -776,6 +636,32 @@ impl Omnigraph {
|
|||
.await
|
||||
}
|
||||
|
||||
/// End-of-query validation for a constructive mutation: build the change-set
|
||||
/// from the accumulated staging and run the unified evaluator (value/enum,
|
||||
/// uniqueness incl. cross-version, edge-RI, cardinality) against committed
|
||||
/// state. Read-your-writes is inherent — every same-query insert is already
|
||||
/// in the change-set. Destructive queries (D2) stage no constructive batches,
|
||||
/// so the change-set is empty and this is a no-op (deletes cascade).
|
||||
async fn validate_staged_mutation(
|
||||
&self,
|
||||
staging: &MutationStaging,
|
||||
txn: &crate::db::WriteTxn,
|
||||
) -> Result<()> {
|
||||
// RI/uniqueness read the write's already-validated pinned base (`txn.base`),
|
||||
// NOT a fresh `snapshot_for_branch` — which would re-run the schema-contract
|
||||
// validation the WriteTxn already did once (RFC-013 step 3b capture-once).
|
||||
// Cardinality reads LIVE HEAD per edge table (the #298 stale-handle fix) via
|
||||
// the live opener in `CommittedState::write`.
|
||||
let committed =
|
||||
crate::validate::CommittedState::write(&txn.base, self, txn.branch.as_deref());
|
||||
// `to_changeset` carries both constructive batches and the ids the delete
|
||||
// ops captured from their own scans (`deleted_ids`), so the evaluator
|
||||
// recounts the srcs a delete empties (`@card`) and sees removed rows for
|
||||
// RI — the faithful change-set the merge path also builds.
|
||||
crate::validate::validate_changeset(&staging.to_changeset(), &committed, &self.catalog())
|
||||
.await
|
||||
}
|
||||
|
||||
async fn mutate_with_current_actor(
|
||||
&self,
|
||||
branch: &str,
|
||||
|
|
@ -872,6 +758,7 @@ impl Omnigraph {
|
|||
Err(e) => Err(e),
|
||||
Ok(total) if staging.is_empty() => Ok(total),
|
||||
Ok(total) => {
|
||||
self.validate_staged_mutation(&staging, &txn).await?;
|
||||
let staged = staging.stage_all(self, requested.as_deref()).await?;
|
||||
// `_queue_guards` holds per-(table_key, branch) write
|
||||
// queues acquired inside `commit_all`. Held across the
|
||||
|
|
@ -1100,16 +987,7 @@ impl Omnigraph {
|
|||
};
|
||||
|
||||
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
|
||||
crate::loader::validate_value_constraints(&batch, node_type)?;
|
||||
crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?;
|
||||
let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
|
||||
if !unique_groups.is_empty() {
|
||||
crate::loader::enforce_unique_constraints_intra_batch(
|
||||
&batch,
|
||||
type_name,
|
||||
&unique_groups,
|
||||
)?;
|
||||
}
|
||||
// Validation (value/enum/unique) runs end-of-query via the evaluator.
|
||||
let has_key = node_type.key_property().is_some();
|
||||
let table_key = format!("node:{}", type_name);
|
||||
// Capture pre-write metadata on first touch (no Lance write).
|
||||
|
|
@ -1145,21 +1023,14 @@ impl Omnigraph {
|
|||
let id = ulid::Ulid::new().to_string();
|
||||
|
||||
let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
|
||||
validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?;
|
||||
crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?;
|
||||
let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type);
|
||||
if !unique_groups.is_empty() {
|
||||
crate::loader::enforce_unique_constraints_intra_batch(
|
||||
&batch,
|
||||
type_name,
|
||||
&unique_groups,
|
||||
)?;
|
||||
}
|
||||
// Validation (edge-RI, enum, unique, @card against LIVE HEAD) runs
|
||||
// end-of-query via the evaluator.
|
||||
let table_key = format!("edge:{}", type_name);
|
||||
// Capture pre-write metadata on first touch. Edge inserts are
|
||||
// non-strict, so with a `WriteTxn` this opens NOTHING (collapse #1)
|
||||
// and returns `None`.
|
||||
let (handle, _full_path, _table_branch) = open_table_for_mutation(
|
||||
// Capture pre-write metadata on first touch (ensure_path). Edge
|
||||
// inserts are non-strict, so with a `WriteTxn` this opens NOTHING
|
||||
// (collapse #1) and the handle is discarded — validation, including
|
||||
// `@card` against LIVE HEAD, runs end-of-query via the evaluator.
|
||||
let (_handle, _full_path, _table_branch) = open_table_for_mutation(
|
||||
self,
|
||||
staging,
|
||||
branch,
|
||||
|
|
@ -1170,32 +1041,7 @@ impl Omnigraph {
|
|||
.await?;
|
||||
// Accumulate the new edge row. Edge IDs are ULID-generated so
|
||||
// Append mode is correct (no key-based dedup needed).
|
||||
staging.append_batch(&table_key, schema, PendingMode::Append, batch.clone())?;
|
||||
|
||||
// Edge cardinality validation: scan committed edges via Lance
|
||||
// + iterate pending edges in-memory for the `src` column,
|
||||
// group-by-src. The pending side already includes the row
|
||||
// we just appended (above). When the open was skipped (collapse
|
||||
// #1), resolve a read handle for the committed scan at LIVE HEAD
|
||||
// (`edge_cardinality_read_handle`, #298) — NOT the pinned txn.base,
|
||||
// which would undercount edges a concurrent writer committed since
|
||||
// capture. Only when cardinality is non-default, so the common
|
||||
// default-cardinality edge keeps the open-free path. (The residual
|
||||
// validate→commit race is the §7.1 gap — step 4.)
|
||||
if !edge_type.cardinality.is_default() {
|
||||
let committed_ds = match handle {
|
||||
Some(h) => h,
|
||||
None => self.edge_cardinality_read_handle(txn, &table_key).await?,
|
||||
};
|
||||
validate_edge_cardinality_with_pending(
|
||||
self,
|
||||
&committed_ds,
|
||||
staging,
|
||||
&table_key,
|
||||
edge_type,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
staging.append_batch(&table_key, schema, PendingMode::Append, batch)?;
|
||||
|
||||
self.invalidate_graph_index().await;
|
||||
|
||||
|
|
@ -1318,17 +1164,7 @@ impl Omnigraph {
|
|||
resolved.insert(a.property.clone(), resolve_expr_value(&a.value, params)?);
|
||||
}
|
||||
let updated = apply_assignments(&schema, &matched, &resolved, &blob_props)?;
|
||||
let node_type = &self.catalog().node_types[type_name];
|
||||
crate::loader::validate_value_constraints(&updated, node_type)?;
|
||||
crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?;
|
||||
let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type);
|
||||
if !unique_groups.is_empty() {
|
||||
crate::loader::enforce_unique_constraints_intra_batch(
|
||||
&updated,
|
||||
type_name,
|
||||
&unique_groups,
|
||||
)?;
|
||||
}
|
||||
// Validation (value/enum/unique) runs end-of-query via the evaluator.
|
||||
|
||||
// Accumulate the updated batch into the Merge-mode pending stream.
|
||||
// The accumulator may now contain entries with the same id as a
|
||||
|
|
@ -1402,19 +1238,7 @@ impl Omnigraph {
|
|||
.scan(&ds, Some(&["id"]), Some(&scan_filter), None)
|
||||
.await?;
|
||||
|
||||
let deleted_ids: Vec<String> = batches
|
||||
.iter()
|
||||
.flat_map(|batch| {
|
||||
let ids = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
(0..ids.len())
|
||||
.map(|i| ids.value(i).to_string())
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect();
|
||||
let deleted_ids: Vec<String> = ids_from_batches(&batches);
|
||||
|
||||
if deleted_ids.is_empty() {
|
||||
return Ok(MutationResult {
|
||||
|
|
@ -1433,6 +1257,7 @@ impl Omnigraph {
|
|||
// `open_table_for_mutation` above already captured the table's
|
||||
// path/version/op-kind via `ensure_path`.
|
||||
crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_DELETE_NODE_PRE_PRIMARY_DELETE)?;
|
||||
staging.record_deleted_ids(&table_key, &deleted_ids);
|
||||
staging.record_delete(&table_key, pred_sql.clone());
|
||||
|
||||
let mut affected_edges = 0usize;
|
||||
|
|
@ -1487,13 +1312,20 @@ impl Omnigraph {
|
|||
// delete removes the union); skip only when nothing NEW matches.
|
||||
let count_filter =
|
||||
dedup_delete_filter(&cascade_filter, staging.recorded_delete_predicates(&edge_table_key));
|
||||
let matched = self
|
||||
.storage()
|
||||
.count_rows(&edge_ds, Some(count_filter))
|
||||
.await?;
|
||||
// Scan (not count) the cascade-removed edge ids so validation
|
||||
// recounts the OTHER endpoint's @card after the cascade; `len()` is
|
||||
// the affected count.
|
||||
let matched_ids = ids_from_batches(
|
||||
&self
|
||||
.storage()
|
||||
.scan(&edge_ds, Some(&["id"]), Some(&count_filter), None)
|
||||
.await?,
|
||||
);
|
||||
let matched = matched_ids.len();
|
||||
affected_edges += matched;
|
||||
|
||||
if matched > 0 {
|
||||
staging.record_deleted_ids(&edge_table_key, &matched_ids);
|
||||
staging.record_delete(&edge_table_key, cascade_filter);
|
||||
}
|
||||
}
|
||||
|
|
@ -1540,12 +1372,20 @@ impl Omnigraph {
|
|||
// union); only record when something NEW matches.
|
||||
let count_filter =
|
||||
dedup_delete_filter(&pred_sql, staging.recorded_delete_predicates(&table_key));
|
||||
let affected = self
|
||||
.storage()
|
||||
.count_rows(&ds, Some(count_filter))
|
||||
.await?;
|
||||
// Scan the matched edge ids (not just count): the ids feed validation so
|
||||
// a delete emptying a src below @card min is rejected; `len()` is the
|
||||
// affected count. One scan replaces the former count-here + resolve-at-
|
||||
// validation re-scan.
|
||||
let deleted_ids = ids_from_batches(
|
||||
&self
|
||||
.storage()
|
||||
.scan(&ds, Some(&["id"]), Some(&count_filter), None)
|
||||
.await?,
|
||||
);
|
||||
let affected = deleted_ids.len();
|
||||
|
||||
if affected > 0 {
|
||||
staging.record_deleted_ids(&table_key, &deleted_ids);
|
||||
staging.record_delete(&table_key, pred_sql.clone());
|
||||
self.invalidate_graph_index().await;
|
||||
}
|
||||
|
|
@ -1557,6 +1397,25 @@ impl Omnigraph {
|
|||
}
|
||||
}
|
||||
|
||||
/// Extract the `id` column (projection index 0) from scanned batches. Used by
|
||||
/// the delete paths to capture the rows they remove, so validation recounts a
|
||||
/// src a delete empties without re-resolving the predicate.
|
||||
fn ids_from_batches(batches: &[RecordBatch]) -> Vec<String> {
|
||||
batches
|
||||
.iter()
|
||||
.flat_map(|batch| {
|
||||
let ids = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
(0..ids.len())
|
||||
.map(|i| ids.value(i).to_string())
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Concat the matched batches from `scan_with_pending` into a single batch.
|
||||
/// `scan_with_pending` returns committed-side and pending-side batches in
|
||||
/// order; both should share a schema if pending was produced through
|
||||
|
|
@ -1583,25 +1442,6 @@ fn concat_match_batches_to_schema(
|
|||
})
|
||||
}
|
||||
|
||||
/// Validate `@card` bounds against committed (Lance) + pending (in-memory)
|
||||
/// edges for one edge table. Engine path: each insert produces a fresh
|
||||
/// ULID id, so committed and pending cannot share a primary key — no
|
||||
/// dedup needed (`dedupe_key_column = None`).
|
||||
async fn validate_edge_cardinality_with_pending(
|
||||
db: &Omnigraph,
|
||||
committed_ds: &SnapshotHandle,
|
||||
staging: &MutationStaging,
|
||||
table_key: &str,
|
||||
edge_type: &omnigraph_compiler::catalog::EdgeType,
|
||||
) -> Result<()> {
|
||||
if edge_type.cardinality.is_default() {
|
||||
return Ok(());
|
||||
}
|
||||
let counts =
|
||||
super::staging::count_src_per_edge(db, committed_ds, table_key, staging, None).await?;
|
||||
super::staging::enforce_cardinality_bounds(edge_type, &counts)
|
||||
}
|
||||
|
||||
fn enrich_mutation_params(params: &ParamMap) -> Result<ParamMap> {
|
||||
let mut resolved = params.clone();
|
||||
if !resolved.contains_key(NOW_PARAM_NAME) {
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ use crate::storage_layer::{SnapshotHandle, StagedHandle};
|
|||
use arrow_array::{Array, RecordBatch, StringArray, UInt32Array};
|
||||
use arrow_schema::SchemaRef;
|
||||
use futures::stream::StreamExt;
|
||||
use omnigraph_compiler::catalog::EdgeType;
|
||||
|
||||
use crate::db::manifest::{
|
||||
RecoverySidecarHandle, SidecarKind, SidecarTablePin, new_sidecar, write_sidecar,
|
||||
|
|
@ -98,6 +97,13 @@ pub(crate) struct MutationStaging {
|
|||
/// `pending`. Staged as one combined `stage_delete` per table at
|
||||
/// end-of-query (no inline HEAD advance) — see `stage_delete_table`.
|
||||
pub(crate) delete_predicates: HashMap<String, Vec<String>>,
|
||||
/// Ids removed per table, captured by the delete ops as they scan their
|
||||
/// matched rows (so validation recounts the srcs a delete empties without
|
||||
/// re-resolving the predicates). Disjoint from `pending` by D₂; flows into
|
||||
/// the validation [`ChangeSet`](crate::validate::ChangeSet) via
|
||||
/// [`to_changeset`](Self::to_changeset). The combined `stage_delete` at
|
||||
/// commit still removes by predicate — these ids are validation-only.
|
||||
pub(crate) deleted_ids: HashMap<String, Vec<String>>,
|
||||
/// Strictest [`MutationOpKind`] seen per table within this query. Drives
|
||||
/// the op-kind-aware drift check in [`StagedMutation::commit_all`]: for
|
||||
/// tables whose first or any subsequent touch was a strict op
|
||||
|
|
@ -227,6 +233,20 @@ impl MutationStaging {
|
|||
.push(predicate);
|
||||
}
|
||||
|
||||
/// Record ids removed by a delete op on `table_key`, captured from the op's
|
||||
/// own scan, for validation (so cardinality recounts an emptied src). The
|
||||
/// caller scans with a dedup filter that excludes prior-scheduled matches, so
|
||||
/// no id is recorded twice across statements.
|
||||
pub(crate) fn record_deleted_ids(&mut self, table_key: &str, ids: &[String]) {
|
||||
if ids.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.deleted_ids
|
||||
.entry(table_key.to_string())
|
||||
.or_default()
|
||||
.extend(ids.iter().cloned());
|
||||
}
|
||||
|
||||
/// Delete predicates already recorded for `table_key` by earlier delete
|
||||
/// statements in this query. Read before recording the current statement's
|
||||
/// predicate so its `affected_*` count can exclude rows a prior statement
|
||||
|
|
@ -249,9 +269,31 @@ impl MutationStaging {
|
|||
.unwrap_or(&[])
|
||||
}
|
||||
|
||||
/// Accumulator mode for `table_key`, if this query has touched it.
|
||||
pub(crate) fn pending_mode(&self, table_key: &str) -> Option<PendingMode> {
|
||||
self.pending.get(table_key).map(|p| p.mode)
|
||||
/// Build the validation [`ChangeSet`](crate::validate::ChangeSet) for this
|
||||
/// staging: every touched table's accumulated rows as the `changed` delta
|
||||
/// (record-batch clone is Arc-cheap — no data copy). Shared by the mutation
|
||||
/// and loader write paths so their validation input cannot drift.
|
||||
pub(crate) fn to_changeset(&self) -> crate::validate::ChangeSet {
|
||||
let mut changeset = crate::validate::ChangeSet::new();
|
||||
for table_key in self.pending.keys() {
|
||||
let batches = self.pending_batches(table_key);
|
||||
if batches.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let mut change = crate::validate::TableChange::default();
|
||||
change.changed.extend(batches.iter().cloned());
|
||||
changeset.insert(table_key.clone(), change);
|
||||
}
|
||||
// Deletes (disjoint from `pending` by D₂) carry their removed ids so the
|
||||
// evaluator recounts the srcs a delete empties (`@card`) and sees removed
|
||||
// rows for RI — the faithful change-set the merge path also builds.
|
||||
for (table_key, ids) in &self.deleted_ids {
|
||||
if ids.is_empty() {
|
||||
continue;
|
||||
}
|
||||
changeset.entry(table_key.clone()).or_default().deleted_ids = ids.clone();
|
||||
}
|
||||
changeset
|
||||
}
|
||||
|
||||
/// Schema of the accumulated batches for `table_key`, or `None` if no
|
||||
|
|
@ -307,6 +349,8 @@ impl MutationStaging {
|
|||
paths,
|
||||
pending,
|
||||
delete_predicates,
|
||||
// Validation-only; consumed before staging, nothing to commit here.
|
||||
deleted_ids: _,
|
||||
op_kinds,
|
||||
} = self;
|
||||
|
||||
|
|
@ -983,232 +1027,3 @@ fn dedupe_merge_batches_by_id(
|
|||
arrow_select::concat::concat_batches(schema, &sliced)
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))
|
||||
}
|
||||
|
||||
// ─── Cardinality helpers (shared by mutation + loader paths) ────────────────
|
||||
|
||||
/// Count edges per `src` value across committed (Lance scan) + pending
|
||||
/// (in-memory). Caller supplies an opened committed dataset so the
|
||||
/// mutation path (which already has one) and the loader path (which
|
||||
/// opens via snapshot) share the same body. For overwrite staging, the
|
||||
/// pending batches are the replacement table image, so committed rows are
|
||||
/// intentionally skipped.
|
||||
///
|
||||
/// `dedupe_key_column` controls whether committed rows are shadowed by
|
||||
/// pending:
|
||||
/// - `None` — every committed row counts, every pending row counts.
|
||||
/// Correct when committed and pending cannot share a primary key
|
||||
/// (engine inserts always use fresh ULID edge ids; loader Append
|
||||
/// mode uses fresh ids too).
|
||||
/// - `Some(col)` — committed rows whose `col` value also appears in any
|
||||
/// pending batch are EXCLUDED from the committed count, so a Merge-mode
|
||||
/// load that *updates* an existing edge (potentially changing its
|
||||
/// `src`) counts the post-update row exactly once. Without this,
|
||||
/// `LoadMode::Merge` double-counts.
|
||||
pub(crate) async fn count_src_per_edge(
|
||||
db: &crate::db::Omnigraph,
|
||||
committed_ds: &SnapshotHandle,
|
||||
table_key: &str,
|
||||
staging: &MutationStaging,
|
||||
dedupe_key_column: Option<&str>,
|
||||
) -> Result<HashMap<String, u32>> {
|
||||
let mut counts: HashMap<String, u32> = HashMap::new();
|
||||
|
||||
let pending_batches = staging.pending_batches(table_key);
|
||||
|
||||
// Collect pending key values (for shadow-on-merge dedupe). Only when
|
||||
// dedupe is requested AND there's anything pending.
|
||||
let pending_keys: Option<HashSet<String>> = match dedupe_key_column {
|
||||
Some(col) if !pending_batches.is_empty() => {
|
||||
let mut set = HashSet::new();
|
||||
for batch in pending_batches {
|
||||
if let Some(arr) = batch
|
||||
.column_by_name(col)
|
||||
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
|
||||
{
|
||||
for i in 0..arr.len() {
|
||||
if arr.is_valid(i) {
|
||||
set.insert(arr.value(i).to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(set)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let replace_committed = staging.pending_mode(table_key) == Some(PendingMode::Overwrite);
|
||||
if !replace_committed {
|
||||
// Committed side: scan `src` plus the dedupe key column when set, so
|
||||
// we can both count and shadow in one pass.
|
||||
let projection: Vec<&str> = match dedupe_key_column {
|
||||
Some(col) if pending_keys.as_ref().is_some_and(|s| !s.is_empty()) => vec!["src", col],
|
||||
_ => vec!["src"],
|
||||
};
|
||||
let committed = db
|
||||
.storage()
|
||||
.scan(committed_ds, Some(&projection), None, None)
|
||||
.await?;
|
||||
for batch in &committed {
|
||||
let srcs = batch
|
||||
.column_by_name("src")
|
||||
.ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))?
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
|
||||
// Optional shadow-key column (only present when dedupe is on).
|
||||
let key_arr = match (&pending_keys, dedupe_key_column) {
|
||||
(Some(set), Some(col)) if !set.is_empty() => batch
|
||||
.column_by_name(col)
|
||||
.and_then(|c| c.as_any().downcast_ref::<StringArray>()),
|
||||
_ => None,
|
||||
};
|
||||
for i in 0..srcs.len() {
|
||||
if !srcs.is_valid(i) {
|
||||
continue;
|
||||
}
|
||||
// Shadow this committed row if its key is in pending.
|
||||
if let (Some(arr), Some(set)) = (key_arr, pending_keys.as_ref()) {
|
||||
if arr.is_valid(i) && set.contains(arr.value(i)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Pending side: walk in-memory batches for `src`. When dedupe is on,
|
||||
// collapse rows that share `dedupe_key_column` to their last occurrence
|
||||
// — mirrors `dedupe_merge_batches_by_id`'s last-write-wins applied at
|
||||
// finalize time, so cardinality counts what `commit_staged` will
|
||||
// actually publish, not raw input duplicates.
|
||||
//
|
||||
// Without this, a Merge-mode load whose input JSONL has two rows with
|
||||
// the same edge id would be double-counted here, even though the
|
||||
// finalize-time dedupe would collapse them to one. The result: spurious
|
||||
// `@card` violations on perfectly valid Merge inputs.
|
||||
match dedupe_key_column {
|
||||
Some(key_col) => count_pending_src_with_dedupe(pending_batches, key_col, &mut counts)?,
|
||||
None => count_pending_src_naive(pending_batches, &mut counts),
|
||||
}
|
||||
|
||||
Ok(counts)
|
||||
}
|
||||
|
||||
/// Count pending edges per `src` with NO dedup. Correct when caller
|
||||
/// guarantees pending rows have unique primary keys (engine inserts via
|
||||
/// fresh ULID; loader Append mode).
|
||||
fn count_pending_src_naive(pending_batches: &[RecordBatch], counts: &mut HashMap<String, u32>) {
|
||||
for batch in pending_batches {
|
||||
let Some(col) = batch.column_by_name("src") else {
|
||||
continue;
|
||||
};
|
||||
let Some(srcs) = col.as_any().downcast_ref::<StringArray>() else {
|
||||
continue;
|
||||
};
|
||||
for i in 0..srcs.len() {
|
||||
if srcs.is_valid(i) {
|
||||
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Count pending edges per `src` after deduping rows that share
|
||||
/// `dedupe_key_column`. Last occurrence wins (mirrors
|
||||
/// `dedupe_merge_batches_by_id`'s walk-in-reverse contract). Required for
|
||||
/// `LoadMode::Merge` where the same edge id may appear multiple times in
|
||||
/// one load and finalize will collapse them to the last value.
|
||||
fn count_pending_src_with_dedupe(
|
||||
pending_batches: &[RecordBatch],
|
||||
dedupe_key_column: &str,
|
||||
counts: &mut HashMap<String, u32>,
|
||||
) -> Result<()> {
|
||||
// Walk in reverse, track seen keys, keep one (key, src) pair per key.
|
||||
let mut seen: HashSet<String> = HashSet::new();
|
||||
let mut kept_srcs: Vec<String> = Vec::new();
|
||||
for batch in pending_batches.iter().rev() {
|
||||
let Some(key_col) = batch.column_by_name(dedupe_key_column) else {
|
||||
// Pending batch is missing the key column. By construction
|
||||
// this is unreachable: callers in dedupe mode always push
|
||||
// batches whose schema contains the key (loader Merge mode
|
||||
// builds via build_edge_batch which always emits `id`; the
|
||||
// append_batch schema-compatibility check at the call site
|
||||
// would also reject a heterogeneous mix). If it ever fires
|
||||
// it's a programmer error — fail loudly rather than skip
|
||||
// counting (which would let `@card` violations slip).
|
||||
return Err(OmniError::manifest_internal(format!(
|
||||
"count_pending_src_with_dedupe: pending batch missing dedup key column '{}' \
|
||||
(schema-compat check at append_batch should have rejected this)",
|
||||
dedupe_key_column
|
||||
)));
|
||||
};
|
||||
let key_arr = key_col
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.ok_or_else(|| {
|
||||
OmniError::Lance(format!(
|
||||
"count_src_per_edge: pending '{}' column is not Utf8",
|
||||
dedupe_key_column
|
||||
))
|
||||
})?;
|
||||
let src_arr = batch
|
||||
.column_by_name("src")
|
||||
.and_then(|c| c.as_any().downcast_ref::<StringArray>());
|
||||
let Some(srcs) = src_arr else {
|
||||
continue;
|
||||
};
|
||||
for i in (0..batch.num_rows()).rev() {
|
||||
if !srcs.is_valid(i) {
|
||||
continue;
|
||||
}
|
||||
// NULL key: keep (NULL != NULL semantics — every NULL counts).
|
||||
if !key_arr.is_valid(i) {
|
||||
kept_srcs.push(srcs.value(i).to_string());
|
||||
continue;
|
||||
}
|
||||
let key = key_arr.value(i);
|
||||
if seen.insert(key.to_string()) {
|
||||
kept_srcs.push(srcs.value(i).to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
for src in kept_srcs {
|
||||
*counts.entry(src).or_insert(0) += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply `@card(min..max)` bounds to a per-source count map.
|
||||
///
|
||||
/// Both bounds are checked. The `min` check produces a misleading error
|
||||
/// during a per-op insert mid-query (a bound of `2..` requires both
|
||||
/// edges to be inserted before validation passes), but the historical
|
||||
/// behavior was to enforce min per-op anyway — keeping users from
|
||||
/// accidentally publishing a graph that violates the schema. Consumers
|
||||
/// that need end-of-query semantics call this from after all edge ops
|
||||
/// are accumulated (the loader does, via Phase 3).
|
||||
pub(crate) fn enforce_cardinality_bounds(
|
||||
edge_type: &EdgeType,
|
||||
counts: &HashMap<String, u32>,
|
||||
) -> Result<()> {
|
||||
let card = &edge_type.cardinality;
|
||||
for (src, count) in counts {
|
||||
if let Some(max) = card.max {
|
||||
if *count > max {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"@card violation on edge {}: source '{}' has {} edges (max {})",
|
||||
edge_type.name, src, count, max
|
||||
)));
|
||||
}
|
||||
}
|
||||
if *count < card.min {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"@card violation on edge {}: source '{}' has {} edges (min {})",
|
||||
edge_type.name, src, count, card.min
|
||||
)));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,3 +20,4 @@ pub mod runtime_cache;
|
|||
pub mod storage;
|
||||
pub mod storage_layer;
|
||||
pub mod table_store;
|
||||
pub(crate) mod validate;
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use std::io::{BufRead, BufReader, Cursor};
|
||||
use std::sync::Arc;
|
||||
|
|
@ -476,12 +476,7 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
for (type_name, rows) in &node_rows {
|
||||
let node_type = &catalog.node_types[type_name];
|
||||
let batch = build_node_batch(node_type, rows)?;
|
||||
validate_value_constraints(&batch, node_type)?;
|
||||
validate_enum_constraints(&batch, &node_type.properties, type_name)?;
|
||||
let unique_groups = unique_constraint_groups_for_node(node_type);
|
||||
if !unique_groups.is_empty() {
|
||||
enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?;
|
||||
}
|
||||
// Validation (value/enum/unique) runs end-of-load via the evaluator.
|
||||
let loaded_count = batch.num_rows();
|
||||
let table_key = format!("node:{}", type_name);
|
||||
let _entry = snapshot
|
||||
|
|
@ -512,52 +507,14 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
result.nodes_loaded.insert(type_name, loaded_count);
|
||||
}
|
||||
|
||||
// Phase 2c: Validate edge referential integrity — every src/dst must
|
||||
// reference an existing node ID in the appropriate type. For
|
||||
// Append/Merge the lookup unions snapshot-committed IDs with the
|
||||
// in-memory pending batches. For Overwrite, a touched node table's
|
||||
// pending batch is the replacement image, so committed rows are not
|
||||
// included for that table.
|
||||
for (edge_name, rows) in &edge_rows {
|
||||
let edge_type = &catalog.edge_types[edge_name];
|
||||
let from_ids =
|
||||
collect_node_ids_with_pending(db, branch, &edge_type.from_type, &staging).await?;
|
||||
let to_ids =
|
||||
collect_node_ids_with_pending(db, branch, &edge_type.to_type, &staging).await?;
|
||||
|
||||
for (i, (src, dst, _)) in rows.iter().enumerate() {
|
||||
if !from_ids.contains(src.as_str()) {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"edge {} row {}: src '{}' not found in {}",
|
||||
edge_name,
|
||||
i + 1,
|
||||
src,
|
||||
edge_type.from_type
|
||||
)));
|
||||
}
|
||||
if !to_ids.contains(dst.as_str()) {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"edge {} row {}: dst '{}' not found in {}",
|
||||
edge_name,
|
||||
i + 1,
|
||||
dst,
|
||||
edge_type.to_type
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 2d: build edge batches.
|
||||
// Phase 2d: build edge batches. Edge referential integrity (and the rest)
|
||||
// runs end-of-load via the unified evaluator, below.
|
||||
let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
|
||||
Vec::with_capacity(edge_rows.len());
|
||||
for (edge_name, rows) in &edge_rows {
|
||||
let edge_type = &catalog.edge_types[edge_name];
|
||||
let batch = build_edge_batch(edge_type, rows)?;
|
||||
validate_enum_constraints(&batch, &edge_type.properties, edge_name)?;
|
||||
let unique_groups = unique_constraint_groups_for_edge(edge_type);
|
||||
if !unique_groups.is_empty() {
|
||||
enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?;
|
||||
}
|
||||
// Validation (enum/unique, edge-RI, @card) runs end-of-load via the evaluator.
|
||||
let loaded_count = batch.num_rows();
|
||||
let table_key = format!("edge:{}", edge_name);
|
||||
let _entry = snapshot
|
||||
|
|
@ -585,18 +542,40 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
result.edges_loaded.insert(edge_name, loaded_count);
|
||||
}
|
||||
|
||||
// Phase 3: Validate edge cardinality constraints (before commit —
|
||||
// invalid data must not be committed). The helper scans committed
|
||||
// edges via Lance + iterates pending edges in-memory; for Overwrite it
|
||||
// treats the pending edge batches as the replacement table image.
|
||||
for (edge_name, _) in &edge_rows {
|
||||
let edge_type = &catalog.edge_types[edge_name];
|
||||
let table_key = format!("edge:{}", edge_name);
|
||||
validate_edge_cardinality_with_pending_loader(
|
||||
db, branch, edge_type, &table_key, &staging, mode,
|
||||
)
|
||||
.await?;
|
||||
// Phase 3: end-of-load validation — one unified evaluator pass over the
|
||||
// accumulated staging (value/enum, uniqueness incl. cross-version, edge-RI,
|
||||
// cardinality) against the pinned pre-load base. `Overwrite` validates each
|
||||
// touched table as its whole new image (that table's committed view empty),
|
||||
// but is PER-TABLE — a table absent from the batch keeps `base`, so an
|
||||
// edges-only overwrite still resolves RI against committed nodes;
|
||||
// `Append`/`Merge` keep `base` everywhere. This shares the evaluator with the
|
||||
// mutation + merge paths, so the surfaces cannot drift.
|
||||
let mut changeset = staging.to_changeset();
|
||||
// Overwrite replaces each touched table; a committed row absent from the new
|
||||
// batch is REMOVED but is not in `to_changeset` (which only records the new
|
||||
// batch). Express those removals as `deleted_ids` so edge-RI (path-b) and
|
||||
// cardinality recompute against them — e.g. overwriting `node:Person` to drop
|
||||
// Bob while a retained `edge:Knows(Alice->Bob)` would otherwise publish an
|
||||
// orphan. (Per-table, like the rest of Overwrite handling.)
|
||||
if mode == LoadMode::Overwrite {
|
||||
let keys: Vec<String> = changeset.keys().cloned().collect();
|
||||
for table_key in keys {
|
||||
let removed = crate::validate::overwrite_removed_ids(
|
||||
&snapshot,
|
||||
&table_key,
|
||||
changeset.get(&table_key).expect("key from this changeset"),
|
||||
)
|
||||
.await?;
|
||||
if !removed.is_empty() {
|
||||
changeset
|
||||
.get_mut(&table_key)
|
||||
.expect("key from this changeset")
|
||||
.deleted_ids = removed;
|
||||
}
|
||||
}
|
||||
}
|
||||
let committed = crate::validate::CommittedState::load(&snapshot, mode, &changeset);
|
||||
crate::validate::validate_changeset(&changeset, &committed, &catalog).await?;
|
||||
|
||||
// Phase 4: Atomic manifest commit with publisher-level OCC.
|
||||
let staged = staging
|
||||
|
|
@ -1361,61 +1340,6 @@ pub(crate) fn validate_enum_constraints(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Detect duplicate values within a single `RecordBatch` for any of the
|
||||
/// `unique_constraints` groups. Each group is a list of one or more columns
|
||||
/// that together form a uniqueness key: a violation occurs when two rows share
|
||||
/// the same tuple of values across *all* columns in a group, so a composite
|
||||
/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an
|
||||
/// error on the first duplicate found.
|
||||
///
|
||||
/// Rows where any column in a group is null are exempt (standard SQL semantics
|
||||
/// for uniqueness over nullable columns), as is any group whose columns are
|
||||
/// not all present in the batch (e.g. a partial-schema load).
|
||||
///
|
||||
/// Note: this only catches duplicates *within* the batch. Cross-batch
|
||||
/// uniqueness against already-committed rows is not enforced here — that
|
||||
/// requires a dataset scan and is tracked separately.
|
||||
pub(crate) fn enforce_unique_constraints_intra_batch(
|
||||
batch: &RecordBatch,
|
||||
type_name: &str,
|
||||
unique_constraints: &[Vec<String>],
|
||||
) -> Result<()> {
|
||||
for columns in unique_constraints {
|
||||
// Resolve the group's columns once. A group whose columns aren't all
|
||||
// present in this batch is skipped (e.g. a partial-schema load).
|
||||
let Some(group_columns) = columns
|
||||
.iter()
|
||||
.map(|name| {
|
||||
batch
|
||||
.schema()
|
||||
.index_of(name)
|
||||
.ok()
|
||||
.map(|i| batch.column(i).clone())
|
||||
})
|
||||
.collect::<Option<Vec<ArrayRef>>>()
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let mut seen: HashMap<Vec<String>, usize> = HashMap::new();
|
||||
for row in 0..batch.num_rows() {
|
||||
let Some(key) = composite_unique_key(&group_columns, row)? else {
|
||||
continue;
|
||||
};
|
||||
if let Some(prev_row) = seen.insert(key.clone(), row) {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"@unique violation on {}.{}: value '{}' appears in rows {} and {}",
|
||||
type_name,
|
||||
format_tuple(columns),
|
||||
format_tuple(&key),
|
||||
prev_row,
|
||||
row
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build the composite uniqueness key for `row` over a constraint group's
|
||||
/// already-resolved columns (in declaration order).
|
||||
///
|
||||
|
|
@ -1429,9 +1353,10 @@ pub(crate) fn enforce_unique_constraints_intra_batch(
|
|||
/// - `Ok(Some(tuple))` otherwise.
|
||||
/// - `Err(..)` propagated from [`unique_key_scalar`] on an un-keyable value.
|
||||
///
|
||||
/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and the
|
||||
/// branch-merge path (`exec/merge.rs::update_unique_constraints`) so the two
|
||||
/// derive identical keys and cannot drift on separator or scalar conversion.
|
||||
/// Shared by every write surface through the unified validation evaluator
|
||||
/// (`crate::validate::evaluate_unique`, used by the loader, mutation, and
|
||||
/// branch-merge paths) so they derive identical keys and cannot drift on
|
||||
/// separator or scalar conversion.
|
||||
pub(crate) fn composite_unique_key(
|
||||
group_columns: &[ArrayRef],
|
||||
row: usize,
|
||||
|
|
@ -1449,7 +1374,7 @@ pub(crate) fn composite_unique_key(
|
|||
/// Render a constraint's column tuple for error messages: a single item as
|
||||
/// `col`, a composite as `(a, b)`. Used for both the column list and the
|
||||
/// offending value tuple, which share the same shape.
|
||||
fn format_tuple(items: &[String]) -> String {
|
||||
pub(crate) fn format_tuple(items: &[String]) -> String {
|
||||
match items {
|
||||
[single] => single.clone(),
|
||||
_ => format!("({})", items.join(", ")),
|
||||
|
|
@ -1517,32 +1442,6 @@ fn unique_key_scalar(array: &ArrayRef, row: usize) -> Result<Option<String>> {
|
|||
)))
|
||||
}
|
||||
|
||||
/// Build the list of uniqueness constraint groups to enforce on a node type.
|
||||
/// Each group is the column tuple of one constraint. Includes every
|
||||
/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the
|
||||
/// `@key` (which implies uniqueness over its column tuple). Grouping is
|
||||
/// preserved so a composite `@unique(a, b)` is enforced as a composite key
|
||||
/// rather than degraded into independent single-field checks.
|
||||
pub(crate) fn unique_constraint_groups_for_node(
|
||||
node_type: &omnigraph_compiler::catalog::NodeType,
|
||||
) -> Vec<Vec<String>> {
|
||||
let mut groups: Vec<Vec<String>> = node_type.unique_constraints.clone();
|
||||
if let Some(key) = &node_type.key
|
||||
&& !groups.contains(key)
|
||||
{
|
||||
groups.push(key.clone());
|
||||
}
|
||||
groups
|
||||
}
|
||||
|
||||
/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges
|
||||
/// have no `@key`).
|
||||
pub(crate) fn unique_constraint_groups_for_edge(
|
||||
edge_type: &omnigraph_compiler::catalog::EdgeType,
|
||||
) -> Vec<Vec<String>> {
|
||||
edge_type.unique_constraints.clone()
|
||||
}
|
||||
|
||||
fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option<f64> {
|
||||
use arrow_array::{
|
||||
Array, Float32Array, Float64Array, Int32Array, Int64Array, UInt32Array, UInt64Array,
|
||||
|
|
@ -1576,130 +1475,6 @@ fn literal_value_to_f64(v: &omnigraph_compiler::catalog::LiteralValue) -> f64 {
|
|||
}
|
||||
}
|
||||
|
||||
// ─── Edge cardinality validation ─────────────────────────────────────────────
|
||||
|
||||
/// Validate edge `@card` cardinality with in-memory pending edges visible.
|
||||
///
|
||||
/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
|
||||
/// opens the committed dataset at the pre-load snapshot version, then
|
||||
/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
|
||||
/// helpers in `exec::staging`. Used by every load mode; for `LoadMode::Overwrite`
|
||||
/// it treats the pending edge batches as the replacement table image (the
|
||||
/// committed rows are being replaced, so only the pending set is counted).
|
||||
///
|
||||
/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
|
||||
/// so committed edges that the load is *updating* (same edge id,
|
||||
/// possibly changed `src`) are not double-counted. `LoadMode::Append`
|
||||
/// passes `None` because each line generates a fresh ULID id that
|
||||
/// never collides with committed.
|
||||
async fn validate_edge_cardinality_with_pending_loader(
|
||||
db: &Omnigraph,
|
||||
branch: Option<&str>,
|
||||
edge_type: &omnigraph_compiler::catalog::EdgeType,
|
||||
table_key: &str,
|
||||
staging: &MutationStaging,
|
||||
mode: LoadMode,
|
||||
) -> Result<()> {
|
||||
if edge_type.cardinality.is_default() {
|
||||
return Ok(());
|
||||
}
|
||||
let snapshot = db.snapshot_for_branch(branch).await?;
|
||||
let Some(entry) = snapshot.entry(table_key) else {
|
||||
// No manifest entry — table doesn't exist yet. Pending-only is
|
||||
// fine; the helper handles empty committed scans.
|
||||
return Ok(());
|
||||
};
|
||||
let ds = db
|
||||
.open_dataset_at_state(
|
||||
&entry.table_path,
|
||||
entry.table_branch.as_deref(),
|
||||
entry.table_version,
|
||||
)
|
||||
.await?;
|
||||
let dedupe_key = match mode {
|
||||
LoadMode::Merge => Some("id"),
|
||||
LoadMode::Append | LoadMode::Overwrite => None,
|
||||
};
|
||||
let counts =
|
||||
crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key).await?;
|
||||
crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
|
||||
}
|
||||
|
||||
/// Collect all valid node IDs for a given type, with in-memory pending
|
||||
/// node inserts visible. Used by the staged loader's Phase 2c
|
||||
/// referential-integrity validation.
|
||||
///
|
||||
/// Union of:
|
||||
/// - IDs from the staged loader's pending batches (in-memory; just-staged
|
||||
/// inserts of this type)
|
||||
/// - IDs from the committed sub-table at the pre-load snapshot version
|
||||
///
|
||||
/// For `LoadMode::Overwrite`, if the node table is touched then the pending
|
||||
/// batches are the replacement image. In that case committed IDs are not
|
||||
/// included, so edge RI is validated against exactly what the overwrite will
|
||||
/// publish.
|
||||
async fn collect_node_ids_with_pending(
|
||||
db: &Omnigraph,
|
||||
branch: Option<&str>,
|
||||
type_name: &str,
|
||||
staging: &MutationStaging,
|
||||
) -> Result<HashSet<String>> {
|
||||
let mut ids = HashSet::new();
|
||||
let table_key = format!("node:{}", type_name);
|
||||
|
||||
// From staging.pending: walk the in-memory accumulator's id column.
|
||||
for batch in staging.pending_batches(&table_key) {
|
||||
if let Some(col) = batch.column_by_name("id") {
|
||||
if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
|
||||
for i in 0..arr.len() {
|
||||
if arr.is_valid(i) {
|
||||
ids.insert(arr.value(i).to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if staging.pending_mode(&table_key) == Some(PendingMode::Overwrite) {
|
||||
return Ok(ids);
|
||||
}
|
||||
|
||||
// From the committed Lance sub-table at the pre-load snapshot version.
|
||||
let snapshot = db.snapshot_for_branch(branch).await?;
|
||||
let Some(entry) = snapshot.entry(&table_key) else {
|
||||
return Ok(ids);
|
||||
};
|
||||
let ds = db
|
||||
.open_dataset_at_state(
|
||||
&entry.table_path,
|
||||
entry.table_branch.as_deref(),
|
||||
entry.table_version,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let batches = db.storage().scan(&ds, Some(&["id"]), None, None).await?;
|
||||
|
||||
for batch in &batches {
|
||||
let id_col = batch
|
||||
.column_by_name("id")
|
||||
.ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
|
||||
for i in 0..batch.num_rows() {
|
||||
// Defensive: `id` is the @key column on every node type and
|
||||
// is non-nullable by schema, but a committed-row corruption
|
||||
// (or future schema change) could surface a NULL. Skip
|
||||
// rather than insert "" — pending-side does the same.
|
||||
if id_col.is_valid(i) {
|
||||
ids.insert(id_col.value(i).to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ids)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -240,13 +240,6 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
|
|||
branch: Option<&str>,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn open_dataset_at_state(
|
||||
&self,
|
||||
table_path: &str,
|
||||
branch: Option<&str>,
|
||||
version: u64,
|
||||
) -> Result<SnapshotHandle>;
|
||||
|
||||
async fn fork_branch_from_state(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
|
|
@ -515,17 +508,6 @@ impl TableStorage for TableStore {
|
|||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn open_dataset_at_state(
|
||||
&self,
|
||||
table_path: &str,
|
||||
branch: Option<&str>,
|
||||
version: u64,
|
||||
) -> Result<SnapshotHandle> {
|
||||
TableStore::open_dataset_at_state(self, table_path, branch, version)
|
||||
.await
|
||||
.map(SnapshotHandle::new)
|
||||
}
|
||||
|
||||
async fn fork_branch_from_state(
|
||||
&self,
|
||||
dataset_uri: &str,
|
||||
|
|
|
|||
|
|
@ -284,20 +284,6 @@ impl TableStore {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn open_dataset_at_state(
|
||||
&self,
|
||||
table_path: &str,
|
||||
branch: Option<&str>,
|
||||
version: u64,
|
||||
) -> Result<Dataset> {
|
||||
let ds = self
|
||||
.open_dataset_head(&self.dataset_uri(table_path), branch)
|
||||
.await?;
|
||||
ds.checkout_version(version)
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))
|
||||
}
|
||||
|
||||
pub fn ensure_expected_version(
|
||||
&self,
|
||||
ds: &Dataset,
|
||||
|
|
|
|||
1085
crates/omnigraph/src/validate.rs
Normal file
1085
crates/omnigraph/src/validate.rs
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -1428,6 +1428,72 @@ async fn branch_merge_reports_cardinality_violation_conflict() {
|
|||
}
|
||||
}
|
||||
|
||||
/// Fix C regression: a table adopted by pointer switch (`AdoptSourceState`)
|
||||
/// must still be validated. Merging `main` -> `feature` where `feature` deleted
|
||||
/// a node and `main` added an edge referencing it classifies the edge table as
|
||||
/// `AdoptSourceState` (source on main, target on a branch). The unified
|
||||
/// evaluator must see the adopted edge and reject the orphan; before the fix it
|
||||
/// skipped the table entirely and silently published the dangling edge.
|
||||
#[tokio::test]
|
||||
async fn merge_main_into_branch_validates_adopted_edge_against_branch_node_delete() {
|
||||
const MUTATIONS: &str = r#"
|
||||
query add_knows($from: String, $to: String) {
|
||||
insert Knows { from: $from, to: $to }
|
||||
}
|
||||
|
||||
query delete_person($name: String) {
|
||||
delete Person where name = $name
|
||||
}
|
||||
"#;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await;
|
||||
main.branch_create("feature").await.unwrap();
|
||||
let mut feature = Omnigraph::open(uri).await.unwrap();
|
||||
|
||||
// main (merge source): add an edge referencing Bob.
|
||||
mutate_main(
|
||||
&mut main,
|
||||
MUTATIONS,
|
||||
"add_knows",
|
||||
¶ms(&[("$from", "Alice"), ("$to", "Bob")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// feature (merge target): delete Bob.
|
||||
mutate_branch(
|
||||
&mut feature,
|
||||
"feature",
|
||||
MUTATIONS,
|
||||
"delete_person",
|
||||
¶ms(&[("$name", "Bob")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Merge main -> feature: edge:Knows is adopted by pointer switch
|
||||
// (AdoptSourceState). The adopted edge Alice->Bob references Bob, which the
|
||||
// target branch deleted, so the merge must reject with OrphanEdge.
|
||||
let err = feature
|
||||
.branch_merge("main", "feature")
|
||||
.await
|
||||
.expect_err("adopting main's edge into a branch that deleted its endpoint must conflict");
|
||||
match err {
|
||||
OmniError::MergeConflicts(conflicts) => {
|
||||
assert!(
|
||||
conflicts
|
||||
.iter()
|
||||
.any(|c| c.table_key == "edge:Knows"
|
||||
&& c.kind == MergeConflictKind::OrphanEdge),
|
||||
"expected OrphanEdge on edge:Knows, got {conflicts:?}"
|
||||
);
|
||||
}
|
||||
other => panic!("expected merge conflicts, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn branch_api_rejects_reserved_main_and_same_source_target_merge() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
|
|
|||
|
|
@ -233,8 +233,15 @@ async fn overwrite_replaces_data() {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
// Overwrite with just one person
|
||||
let small = r#"{"type": "Person", "data": {"name": "Zara", "age": 40}}"#;
|
||||
// Overwrite to a small SELF-CONSISTENT image. Overwrite is per-table, so a
|
||||
// Person-only overwrite would drop Alice/Bob while the retained Knows/WorksAt
|
||||
// edges still reference them — a now-rejected orphan (see
|
||||
// `validators::overwrite_node_removal_rejects_retained_orphan_edge`). To
|
||||
// replace the graph, overwrite the edge tables too; Company stays retained
|
||||
// and Zara->Acme references it.
|
||||
let small = r#"{"type": "Person", "data": {"name": "Zara", "age": 40}}
|
||||
{"edge": "Knows", "from": "Zara", "to": "Zara"}
|
||||
{"edge": "WorksAt", "from": "Zara", "to": "Acme"}"#;
|
||||
load_jsonl(&mut db, small, LoadMode::Overwrite)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
|||
186
crates/omnigraph/tests/merge_cost.rs
Normal file
186
crates/omnigraph/tests/merge_cost.rs
Normal file
|
|
@ -0,0 +1,186 @@
|
|||
//! EMPIRICAL VALIDATION of the branch-merge latency analysis (investigation
|
||||
//! artifact). Two claims from the code review, measured at the object-store
|
||||
//! boundary with the shared `helpers::cost` harness:
|
||||
//!
|
||||
//! 1. `merge_validation_opens_untouched_tables` — `validate_merge_candidates`
|
||||
//! loops EVERY catalog node/edge type and opens each (untouched tables fall
|
||||
//! through to the target snapshot), so a merge whose delta touches ONE table
|
||||
//! still opens ALL tables. Cost ∝ #types (whole graph), not ∝ delta.
|
||||
//!
|
||||
//! 2. `merge_manifest_cost_grows_with_history` — a merge does several cold
|
||||
//! coordinator opens (head lookups + merge_base + base/source/target
|
||||
//! snapshots), each an O(history) `__manifest` scan, so on an un-compacted
|
||||
//! graph merge `__manifest` reads grow with commit depth (Regime A — the
|
||||
//! production RustFS/S3 case).
|
||||
//!
|
||||
//! Both bodies run on a 64 MiB-stack thread: the debug-build merge future plus
|
||||
//! the `cost_harness`/`measure` task-local layers overflow the default 2 MiB test
|
||||
//! stack (the same reason these cost tests raise `recursion_limit`).
|
||||
#![recursion_limit = "512"]
|
||||
|
||||
mod helpers;
|
||||
|
||||
use std::future::Future;
|
||||
|
||||
use helpers::cost::{
|
||||
IoCounts, assert_flat, assert_grows, cost_harness, local_graph, measure, measure_with_staged,
|
||||
};
|
||||
use helpers::{MUTATION_QUERIES, commit_many, mixed_params};
|
||||
|
||||
/// Run an async test body on a thread with a large stack. The debug merge future
|
||||
/// is deep enough to overflow the default test-thread stack under the cost
|
||||
/// harness's extra async layers.
|
||||
fn on_big_stack<F>(body: impl FnOnce() -> F + Send + 'static)
|
||||
where
|
||||
F: Future<Output = ()>,
|
||||
{
|
||||
std::thread::Builder::new()
|
||||
.stack_size(64 * 1024 * 1024)
|
||||
.spawn(move || {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(body());
|
||||
})
|
||||
.unwrap()
|
||||
.join()
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// CLAIM 1 (post-#5): merge validation is Δ-scoped, not whole-graph. The fixture
|
||||
/// has 4 tables (Person, Company, Knows, WorksAt). A merge whose only change is
|
||||
/// one inserted Person must NOT open the untouched tables for validation — cost
|
||||
/// follows the delta, not the catalog. Pre-#5 this opened ~6 tables via a
|
||||
/// full-graph validation scan; the index-backed evaluator probes only the
|
||||
/// committed Person table (for uniqueness) plus the delta.
|
||||
#[test]
|
||||
fn merge_validation_is_delta_scoped() {
|
||||
on_big_stack(|| async {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let db = local_graph(&dir).await;
|
||||
|
||||
// Control: a 1-row insert on main — the write path opens only the
|
||||
// touched table (Person).
|
||||
let (ctrl_res, ctrl) = measure(db.mutate(
|
||||
"main",
|
||||
MUTATION_QUERIES,
|
||||
"insert_person",
|
||||
&mixed_params(&[("$name", "ctrl")], &[("$age", 30)]),
|
||||
))
|
||||
.await;
|
||||
ctrl_res.unwrap();
|
||||
|
||||
// Branch + a one-row change touching ONLY Person.
|
||||
db.branch_create("feature").await.unwrap();
|
||||
db.mutate(
|
||||
"feature",
|
||||
MUTATION_QUERIES,
|
||||
"insert_person",
|
||||
&mixed_params(&[("$name", "f1")], &[("$age", 41)]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Measure the merge.
|
||||
let (res, io, staged) = measure_with_staged(db.branch_merge("feature", "main")).await;
|
||||
res.unwrap();
|
||||
|
||||
eprintln!(
|
||||
"CONTROL 1-row insert on main : data_open_count={} data_reads={} manifest_reads={}",
|
||||
ctrl.data_open_count, ctrl.data_reads, ctrl.manifest_reads
|
||||
);
|
||||
eprintln!(
|
||||
"MERGE 1-Person-row delta : data_open_count={} data_reads={} manifest_reads={} \
|
||||
[stage_append={} stage_merge_insert={} create_vector_index={}]",
|
||||
io.data_open_count,
|
||||
io.data_reads,
|
||||
io.manifest_reads,
|
||||
staged.stage_append,
|
||||
staged.stage_merge_insert,
|
||||
staged.create_vector_index,
|
||||
);
|
||||
|
||||
// The proof: only Person changed, so the merge opens only Person-related
|
||||
// tables (the delta + the committed-target index probe for uniqueness) —
|
||||
// never the untouched Company / Knows / WorksAt. Pre-#5 this was ~6
|
||||
// (every catalog table, full-scanned).
|
||||
assert!(
|
||||
io.data_open_count <= 3,
|
||||
"merge of a 1-Person delta opened {} data tables; expected <= 3 (Δ-scoped). \
|
||||
Pre-#5 it opened every catalog table (~6) via a whole-graph validation scan.",
|
||||
io.data_open_count
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/// CLAIM 2: a merge's `__manifest` cost grows with commit-history depth on an
|
||||
/// un-compacted graph (the several cold coordinator opens each scan O(fragments)
|
||||
/// of `__manifest`). Contrast with `write_cost.rs`, where a single write's
|
||||
/// manifest scan is held FLAT *after compaction* — here we deliberately do NOT
|
||||
/// compact, modelling the production graph that has grown its `_versions/` and
|
||||
/// `__manifest` fragments without GC.
|
||||
#[test]
|
||||
fn merge_manifest_cost_grows_with_history() {
|
||||
on_big_stack(|| {
|
||||
cost_harness(async {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = local_graph(&dir).await;
|
||||
|
||||
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
|
||||
let mut current = 0u64;
|
||||
for d in [5u64, 80] {
|
||||
if d > current {
|
||||
commit_many(&mut db, (d - current) as usize).await;
|
||||
current = d;
|
||||
}
|
||||
let br = format!("feat_{d}");
|
||||
db.branch_create(&br).await.unwrap();
|
||||
db.mutate(
|
||||
&br,
|
||||
MUTATION_QUERIES,
|
||||
"insert_person",
|
||||
&mixed_params(&[("$name", &format!("p_{d}"))], &[("$age", 30)]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
current += 1; // the branch write advanced depth
|
||||
|
||||
// Control single write at this depth, to quantify the merge's
|
||||
// manifest-open multiplication vs a normal write.
|
||||
let (cres, ctrl) = measure(db.mutate(
|
||||
"main",
|
||||
MUTATION_QUERIES,
|
||||
"insert_person",
|
||||
&mixed_params(&[("$name", &format!("c_{d}"))], &[("$age", 30)]),
|
||||
))
|
||||
.await;
|
||||
cres.unwrap();
|
||||
current += 1;
|
||||
|
||||
let (res, io) = measure(db.branch_merge(&br, "main")).await;
|
||||
res.unwrap();
|
||||
current += 1; // the merge advanced depth
|
||||
|
||||
eprintln!(
|
||||
"depth~{d}: MERGE manifest_reads={} data_reads={} data_open_count={} | \
|
||||
single-write manifest_reads={} (merge/write ratio = {:.1}x)",
|
||||
io.manifest_reads,
|
||||
io.data_reads,
|
||||
io.data_open_count,
|
||||
ctrl.manifest_reads,
|
||||
io.manifest_reads as f64 / ctrl.manifest_reads.max(1) as f64,
|
||||
);
|
||||
curve.push((d, io));
|
||||
}
|
||||
|
||||
// Regime A: merge __manifest cost still grows with history — the cold
|
||||
// cross-branch coordinator opens, a separate amplification not
|
||||
// addressed by the validation change.
|
||||
assert_grows(&curve, |c| c.manifest_reads, 1, "merge __manifest scan");
|
||||
// But validation table-opens are now Δ-scoped: flat across history
|
||||
// depth (the merge no longer scans the catalog's tables per merge).
|
||||
assert_flat(&curve, |c| c.data_open_count, 1, "merge data-table opens");
|
||||
})
|
||||
});
|
||||
}
|
||||
|
|
@ -8,7 +8,7 @@ mod helpers;
|
|||
use omnigraph::db::Omnigraph;
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
|
||||
use helpers::{mutate_main, params};
|
||||
use helpers::{count_rows, mutate_main, params};
|
||||
|
||||
const ENUM_SCHEMA: &str = r#"
|
||||
node Person {
|
||||
|
|
@ -55,6 +55,22 @@ node User {
|
|||
}
|
||||
"#;
|
||||
|
||||
const UNIQUE_MUTATIONS: &str = r#"
|
||||
query insert_user($name: String, $email: String) {
|
||||
insert User { name: $name, email: $email }
|
||||
}
|
||||
"#;
|
||||
|
||||
// A non-String `@unique` column: the committed cross-version probe must build a
|
||||
// typed literal, not a stringified key, or it compares a Date32 column to a Utf8
|
||||
// value (a DataFusion coercion error that breaks every write to the table).
|
||||
const DATE_UNIQUE_SCHEMA: &str = r#"
|
||||
node Task {
|
||||
name: String @key
|
||||
due: Date @unique
|
||||
}
|
||||
"#;
|
||||
|
||||
const CARDINALITY_SCHEMA: &str = r#"
|
||||
node Person { name: String @key }
|
||||
node Company { name: String @key }
|
||||
|
|
@ -71,6 +87,19 @@ query add_employment($person: String, $company: String) {
|
|||
}
|
||||
"#;
|
||||
|
||||
// A non-zero @card min so a move that vacates a src can drop it below the floor.
|
||||
const CARD_MIN_SCHEMA: &str = r#"
|
||||
node Person { name: String @key }
|
||||
node Company { name: String @key }
|
||||
edge WorksAt: Person -> Company @card(1..)
|
||||
"#;
|
||||
|
||||
const CARD_MIN_DELETE_MUTATIONS: &str = r#"
|
||||
query drop_employment($person: String) {
|
||||
delete WorksAt where from = $person
|
||||
}
|
||||
"#;
|
||||
|
||||
async fn init_with(schema: &str, data: &str) -> (tempfile::TempDir, Omnigraph) {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
|
@ -200,9 +229,324 @@ async fn intra_batch_unique_rejected_on_jsonl_load() {
|
|||
);
|
||||
}
|
||||
|
||||
// Note: single-row mutation insert can't violate intra-batch uniqueness
|
||||
// (only one row in the batch). Cross-batch uniqueness against committed rows
|
||||
// is out of scope for this wire-up — see the unified write-validator effort.
|
||||
// Single-row mutation insert can't violate INTRA-BATCH uniqueness (one row).
|
||||
// CROSS-VERSION uniqueness against already-committed rows IS now enforced on the
|
||||
// mutation path via the unified evaluator (#1/#2); the loader path's
|
||||
// cross-version check lands with the loader migration.
|
||||
|
||||
/// Cross-version uniqueness, closed by the write-path evaluator migration:
|
||||
/// two SEPARATE mutations inserting distinct rows with the same `@unique` value —
|
||||
/// the second is rejected against the committed first (previously a gap).
|
||||
#[tokio::test]
|
||||
async fn cross_version_unique_rejected_on_mutation_insert() {
|
||||
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
|
||||
mutate_main(
|
||||
&mut db,
|
||||
UNIQUE_MUTATIONS,
|
||||
"insert_user",
|
||||
¶ms(&[("$name", "Bob"), ("$email", "dup@example.com")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let err = mutate_main(
|
||||
&mut db,
|
||||
UNIQUE_MUTATIONS,
|
||||
"insert_user",
|
||||
¶ms(&[("$name", "Carol"), ("$email", "dup@example.com")]),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("@unique violation on User.email"),
|
||||
"got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
/// The cross-version unique check must NOT flag a row updating itself: an upsert
|
||||
/// of an existing `@key` (same id) is an update, not a duplicate. Re-inserting
|
||||
/// the same key with its own `@unique` value must succeed (the evaluator excludes
|
||||
/// the committed same-id holder).
|
||||
#[tokio::test]
|
||||
async fn reinsert_existing_key_is_upsert_not_unique_violation() {
|
||||
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
|
||||
mutate_main(
|
||||
&mut db,
|
||||
UNIQUE_MUTATIONS,
|
||||
"insert_user",
|
||||
¶ms(&[("$name", "Alice"), ("$email", "alice@example.com")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
mutate_main(
|
||||
&mut db,
|
||||
UNIQUE_MUTATIONS,
|
||||
"insert_user",
|
||||
¶ms(&[("$name", "Alice"), ("$email", "alice@example.com")]),
|
||||
)
|
||||
.await
|
||||
.expect("re-inserting an existing @key upserts; it is not a unique violation");
|
||||
}
|
||||
|
||||
// ─── Cross-version uniqueness + RI on the LOADER path (Slice 3) ───────────────
|
||||
|
||||
const RI_SCHEMA: &str = r#"
|
||||
node Person { name: String @key }
|
||||
edge Knows: Person -> Person
|
||||
"#;
|
||||
|
||||
/// Cross-version uniqueness is now enforced on the bulk-load path too: a second
|
||||
/// Append load duplicating a committed `@unique` value is rejected.
|
||||
#[tokio::test]
|
||||
async fn cross_version_unique_rejected_on_append_load() {
|
||||
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
r#"{"type":"User","data":{"name":"Bob","email":"dup@example.com"}}"#,
|
||||
LoadMode::Append,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let err = load_jsonl(
|
||||
&mut db,
|
||||
r#"{"type":"User","data":{"name":"Carol","email":"dup@example.com"}}"#,
|
||||
LoadMode::Append,
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("@unique violation on User.email"),
|
||||
"got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
/// Fix D: the cross-version `@unique` probe must use a typed literal on a
|
||||
/// non-String column. A second-version row colliding with a committed `Date`
|
||||
/// value must surface a proper `@unique` violation — not a Date32-vs-Utf8
|
||||
/// coercion error (the red symptom before the fix).
|
||||
#[tokio::test]
|
||||
async fn cross_version_unique_rejected_on_date_column() {
|
||||
let (_dir, mut db) = init_with(DATE_UNIQUE_SCHEMA, "").await;
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
r#"{"type":"Task","data":{"name":"T1","due":"2026-06-29"}}"#,
|
||||
LoadMode::Append,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let err = load_jsonl(
|
||||
&mut db,
|
||||
r#"{"type":"Task","data":{"name":"T2","due":"2026-06-29"}}"#,
|
||||
LoadMode::Append,
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("@unique violation on Task.due"),
|
||||
"got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
/// Fix D companion: a non-colliding write to a `Date @unique` table must
|
||||
/// succeed. Before the fix the committed probe raised a coercion error for
|
||||
/// ANY second write (it compared Date32 to a Utf8 literal regardless of a
|
||||
/// match), so this happy path failed too.
|
||||
#[tokio::test]
|
||||
async fn noncolliding_write_to_date_unique_column_succeeds() {
|
||||
let (_dir, mut db) = init_with(DATE_UNIQUE_SCHEMA, "").await;
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
r#"{"type":"Task","data":{"name":"T1","due":"2026-06-29"}}"#,
|
||||
LoadMode::Append,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
r#"{"type":"Task","data":{"name":"T2","due":"2026-07-01"}}"#,
|
||||
LoadMode::Append,
|
||||
)
|
||||
.await
|
||||
.expect("a distinct Date value must not collide and must not raise a coercion error");
|
||||
assert_eq!(count_rows(&db, "node:Task").await, 2);
|
||||
}
|
||||
|
||||
/// Fix B: a Merge-load that MOVES an edge to a new src must recount the OLD
|
||||
/// src. Moving Alice's only WorksAt to Bob drops Alice to zero, below
|
||||
/// @card(1..). Before the fix only the new src (Bob) was in the affected set,
|
||||
/// so Alice's underflow was missed and the load silently succeeded.
|
||||
#[tokio::test]
|
||||
async fn merge_load_edge_src_move_rechecks_vacated_src_cardinality() {
|
||||
let seed = r#"{"type":"Person","data":{"name":"Alice"}}
|
||||
{"type":"Person","data":{"name":"Bob"}}
|
||||
{"type":"Company","data":{"name":"Acme"}}
|
||||
{"edge":"WorksAt","from":"Alice","to":"Acme","data":{"id":"E1"}}"#;
|
||||
let (_dir, mut db) = init_with(CARD_MIN_SCHEMA, seed).await;
|
||||
|
||||
let err = load_jsonl(
|
||||
&mut db,
|
||||
r#"{"edge":"WorksAt","from":"Bob","to":"Acme","data":{"id":"E1"}}"#,
|
||||
LoadMode::Merge,
|
||||
)
|
||||
.await
|
||||
.expect_err("moving Alice's only edge to Bob drops Alice below @card(1..)");
|
||||
assert!(
|
||||
err.to_string().contains("@card violation") && err.to_string().contains("Alice"),
|
||||
"got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
/// Fix A: a Merge-load batch listing the same edge id twice with different srcs
|
||||
/// must be counted ONCE (commit dedupes by id, last-wins). Alice keeps her one
|
||||
/// committed edge and Bob gets the (deduped) E1, both within @card(0..1), so the
|
||||
/// load must succeed. Before the fix E1 was counted under both srcs, giving
|
||||
/// Alice a phantom second edge and a spurious max violation.
|
||||
#[tokio::test]
|
||||
async fn merge_load_duplicate_edge_id_counts_once_per_card() {
|
||||
let seed = r#"{"type":"Person","data":{"name":"Alice"}}
|
||||
{"type":"Person","data":{"name":"Bob"}}
|
||||
{"type":"Company","data":{"name":"Acme"}}
|
||||
{"type":"Company","data":{"name":"Beta"}}
|
||||
{"edge":"WorksAt","from":"Alice","to":"Acme","data":{"id":"E0"}}"#;
|
||||
let (_dir, mut db) = init_with(CARDINALITY_SCHEMA, seed).await;
|
||||
|
||||
// Same edge id E1 under two srcs in one batch: commit keeps the last
|
||||
// (Bob->Beta). Alice stays at her one committed edge (E0).
|
||||
let batch = r#"{"edge":"WorksAt","from":"Alice","to":"Beta","data":{"id":"E1"}}
|
||||
{"edge":"WorksAt","from":"Bob","to":"Beta","data":{"id":"E1"}}"#;
|
||||
load_jsonl(&mut db, batch, LoadMode::Merge)
|
||||
.await
|
||||
.expect("a deduped edge id must not double-count Alice into a @card(0..1) violation");
|
||||
assert_eq!(count_rows(&db, "edge:WorksAt").await, 2);
|
||||
}
|
||||
|
||||
/// A direct edge DELETE must recount the source it empties. Deleting Alice's
|
||||
/// only WorksAt drops her to zero, below @card(1..), and must be rejected.
|
||||
/// Deletes stage as predicates (absent from the constructive change-set), so
|
||||
/// before the fix the mutation committed without any cardinality check — while
|
||||
/// the merge path, which carries deleted_ids, would have caught it.
|
||||
#[tokio::test]
|
||||
async fn mutation_delete_edge_below_card_min_rejected() {
|
||||
let seed = r#"{"type":"Person","data":{"name":"Alice"}}
|
||||
{"type":"Company","data":{"name":"Acme"}}
|
||||
{"edge":"WorksAt","from":"Alice","to":"Acme","data":{"id":"E1"}}"#;
|
||||
let (_dir, mut db) = init_with(CARD_MIN_SCHEMA, seed).await;
|
||||
|
||||
let err = mutate_main(
|
||||
&mut db,
|
||||
CARD_MIN_DELETE_MUTATIONS,
|
||||
"drop_employment",
|
||||
¶ms(&[("$person", "Alice")]),
|
||||
)
|
||||
.await
|
||||
.expect_err("deleting Alice's only WorksAt drops her below @card(1..)");
|
||||
assert!(
|
||||
err.to_string().contains("@card violation") && err.to_string().contains("Alice"),
|
||||
"got: {}",
|
||||
err
|
||||
);
|
||||
assert_eq!(
|
||||
count_rows(&db, "edge:WorksAt").await,
|
||||
1,
|
||||
"the rejected delete must not have removed the edge"
|
||||
);
|
||||
}
|
||||
|
||||
/// A Merge load re-upserting an existing `@key` with its own `@unique` value is
|
||||
/// an update, not a duplicate — it must NOT false-trigger the cross-version check.
|
||||
#[tokio::test]
|
||||
async fn merge_load_reupsert_existing_key_is_not_unique_violation() {
|
||||
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
|
||||
let row = r#"{"type":"User","data":{"name":"Alice","email":"alice@example.com"}}"#;
|
||||
load_jsonl(&mut db, row, LoadMode::Merge).await.unwrap();
|
||||
load_jsonl(&mut db, row, LoadMode::Merge)
|
||||
.await
|
||||
.expect("merge-load re-upserting an existing @key is not a unique violation");
|
||||
}
|
||||
|
||||
/// `Overwrite` replaces the touched tables, so edge RI must validate against the
|
||||
/// NEW batch image, not the replaced committed one. An edge to a node that exists
|
||||
/// only in the new batch loads cleanly (regression against using the old image).
|
||||
#[tokio::test]
|
||||
async fn overwrite_load_validates_ri_against_new_image() {
|
||||
let (_dir, mut db) = init_with(RI_SCHEMA, r#"{"type":"Person","data":{"name":"Alice"}}"#).await;
|
||||
let batch = r#"{"type":"Person","data":{"name":"Carol"}}
|
||||
{"edge":"Knows","from":"Carol","to":"Carol"}"#;
|
||||
load_jsonl(&mut db, batch, LoadMode::Overwrite)
|
||||
.await
|
||||
.expect("Overwrite RI validates against the new batch image, not the replaced committed");
|
||||
}
|
||||
|
||||
/// And an Append load whose edge references a non-existent node is still rejected
|
||||
/// (edge-RI enforced via the evaluator).
|
||||
#[tokio::test]
|
||||
async fn append_load_rejects_orphan_edge() {
|
||||
let (_dir, mut db) = init_with(RI_SCHEMA, r#"{"type":"Person","data":{"name":"Alice"}}"#).await;
|
||||
let err = load_jsonl(
|
||||
&mut db,
|
||||
r#"{"edge":"Knows","from":"Alice","to":"Ghost"}"#,
|
||||
LoadMode::Append,
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("not found"),
|
||||
"orphan edge must be rejected, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
/// Finding 1: overwriting a NODE table can strand a retained edge in a
|
||||
/// non-overwritten table. Seed Alice, Bob + Knows(Alice->Bob); Overwrite-load
|
||||
/// node:Person with only Alice (Bob removed), leaving edge:Knows untouched ->
|
||||
/// Knows(Alice->Bob) is now an orphan and must be rejected. The overwrite-removed
|
||||
/// Bob is not expressed as a deleted_id, so edge-RI path-b never runs.
|
||||
#[tokio::test]
|
||||
async fn overwrite_node_removal_rejects_retained_orphan_edge() {
|
||||
let seed = r#"{"type":"Person","data":{"name":"Alice"}}
|
||||
{"type":"Person","data":{"name":"Bob"}}
|
||||
{"edge":"Knows","from":"Alice","to":"Bob"}"#;
|
||||
let (_dir, mut db) = init_with(RI_SCHEMA, seed).await;
|
||||
|
||||
let err = load_jsonl(
|
||||
&mut db,
|
||||
r#"{"type":"Person","data":{"name":"Alice"}}"#,
|
||||
LoadMode::Overwrite,
|
||||
)
|
||||
.await
|
||||
.expect_err("removing Bob via overwrite while Knows(Alice->Bob) is retained orphans the edge");
|
||||
assert!(
|
||||
err.to_string().contains("not found"),
|
||||
"retained edge to an overwrite-removed node must be rejected, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
/// Finding 2: uniqueness must evaluate the final coalesced image per id, not
|
||||
/// accumulate superseded keys. One mutation updates Alice.email temp -> final,
|
||||
/// then inserts Carol.email = temp. The final state (Alice=final, Carol=temp) is
|
||||
/// valid, but the validator retains the stale Alice->temp and false-rejects Carol.
|
||||
#[tokio::test]
|
||||
async fn chained_unique_update_then_reuse_freed_value_is_not_a_violation() {
|
||||
let (_dir, mut db) = init_with(
|
||||
UNIQUE_SCHEMA,
|
||||
r#"{"type":"User","data":{"name":"Alice","email":"orig"}}"#,
|
||||
)
|
||||
.await;
|
||||
const Q: &str = r#"
|
||||
query reassign() {
|
||||
update User set { email: "temp" } where name = "Alice"
|
||||
update User set { email: "final" } where name = "Alice"
|
||||
insert User { name: "Carol", email: "temp" }
|
||||
}
|
||||
"#;
|
||||
mutate_main(&mut db, Q, "reassign", ¶ms(&[]))
|
||||
.await
|
||||
.expect("Alice ends at 'final' and Carol takes the freed 'temp' — final image has no collision");
|
||||
}
|
||||
|
||||
// ─── Edge cardinality ────────────────────────────────────────────────────────
|
||||
|
||||
|
|
|
|||
|
|
@ -926,9 +926,14 @@ async fn load_overwrite_with_bad_edge_reference_unblocks_next_load() {
|
|||
assert_eq!(count_rows(&db, "node:Person").await, pre_persons);
|
||||
assert_eq!(count_rows(&db, "edge:Knows").await, pre_edges);
|
||||
|
||||
// The good overwrite must be self-consistent: it replaces Person, so it also
|
||||
// replaces every edge table that referenced the old Persons. WorksAt is in the
|
||||
// batch (pointing the surviving Company at a new Person) so the retained
|
||||
// WorksAt rows that named Alice/Bob don't strand against the new node image.
|
||||
let good = r#"{"type": "Person", "data": {"name": "Pat", "age": 55}}
|
||||
{"type": "Person", "data": {"name": "Quinn", "age": 56}}
|
||||
{"edge": "Knows", "from": "Pat", "to": "Quinn"}
|
||||
{"edge": "WorksAt", "from": "Pat", "to": "Acme"}
|
||||
"#;
|
||||
load_jsonl(&mut db, good, LoadMode::Overwrite)
|
||||
.await
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue