From 5cca914822df8738be6be9293ccae8b733d8d520 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Tue, 9 Jun 2026 15:09:16 +0200 Subject: [PATCH] fix(engine): collision-free anti-join correlation tag for nested negation The set-oriented anti-join tagged the outer batch with a fixed column name and read it back by name. Under a nested slow-path anti-join the enclosing tag rides through the inner pipeline, so the inner call produced a duplicate field; Arrow permits duplicate names and column_by_name returns the first, so the inner negation mis-correlated against the outer row indices. Choose a tag name not already present in the batch (suffix-incremented), so each nesting level reads its own correlation column. Turns the fan-out regression green; the existing nested/fast-vs-slow/proptest anti-join invariants still pass. --- crates/omnigraph/src/exec/query.rs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index ab0ce01..2379028 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -1621,18 +1621,29 @@ async fn execute_anti_join( // The tag rides through the inner pipeline: Expand's hconcat preserves // existing columns and Filter only drops rows, so each surviving row carries - // its originating outer-row index. The `__`-prefixed name cannot collide with - // the `var.prop` columns the pipeline produces, and correlating on the row - // index (not `outer_var.id`) stays correct even if a dst-filter references - // other outer bindings. - const TAG_COL: &str = "__antijoin_outer_row"; + // its originating outer-row index. Correlating on the row index (not + // `outer_var.id`) stays correct even if a dst-filter references other outer + // bindings. Nested anti-joins reuse this slow path and an enclosing tag rides + // through too; Arrow allows duplicate field names and `column_by_name` + // returns the FIRST match, so choose a tag name not already present (each + // nesting level then reads its own) instead of a fixed one. + let tag_col: String = { + let mut n = 0usize; + loop { + let candidate = format!("__antijoin_outer_row_{n}"); + if wide.schema().column_with_name(&candidate).is_none() { + break candidate; + } + n += 1; + } + }; let mut fields: Vec = wide .schema() .fields() .iter() .map(|f| f.as_ref().clone()) .collect(); - fields.push(Field::new(TAG_COL, DataType::UInt32, false)); + fields.push(Field::new(tag_col.as_str(), DataType::UInt32, false)); let mut columns: Vec = wide.columns().to_vec(); columns.push(Arc::new(UInt32Array::from_iter_values(0..num_rows as u32))); let tagged = RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) @@ -1658,7 +1669,7 @@ async fn execute_anti_join( if let Some(batch) = inner_wide { if batch.num_rows() > 0 { let tags = batch - .column_by_name(TAG_COL) + .column_by_name(tag_col.as_str()) .ok_or_else(|| { OmniError::manifest( "anti-join inner pipeline dropped the correlation column".to_string(), @@ -1667,7 +1678,7 @@ async fn execute_anti_join( .as_any() .downcast_ref::() .ok_or_else(|| { - OmniError::manifest(format!("'{}' column is not UInt32", TAG_COL)) + OmniError::manifest(format!("'{}' column is not UInt32", tag_col)) })?; for i in 0..tags.len() { matched.insert(tags.value(i));