fix(engine): collision-free anti-join correlation tag for nested negation

The set-oriented anti-join tagged the outer batch with a fixed column name and
read it back by name. Under a nested slow-path anti-join the enclosing tag rides
through the inner pipeline, so the inner call produced a duplicate field; Arrow
permits duplicate names and column_by_name returns the first, so the inner
negation mis-correlated against the outer row indices.

Choose a tag name not already present in the batch (suffix-incremented), so each
nesting level reads its own correlation column. Turns the fan-out regression
green; the existing nested/fast-vs-slow/proptest anti-join invariants still pass.
This commit is contained in:
Ragnor Comerford 2026-06-09 15:09:16 +02:00
parent 4cfa94d079
commit 5cca914822
No known key found for this signature in database

View file

@ -1621,18 +1621,29 @@ async fn execute_anti_join(
// The tag rides through the inner pipeline: Expand's hconcat preserves
// existing columns and Filter only drops rows, so each surviving row carries
// its originating outer-row index. The `__`-prefixed name cannot collide with
// the `var.prop` columns the pipeline produces, and correlating on the row
// index (not `outer_var.id`) stays correct even if a dst-filter references
// other outer bindings.
const TAG_COL: &str = "__antijoin_outer_row";
// its originating outer-row index. Correlating on the row index (not
// `outer_var.id`) stays correct even if a dst-filter references other outer
// bindings. Nested anti-joins reuse this slow path and an enclosing tag rides
// through too; Arrow allows duplicate field names and `column_by_name`
// returns the FIRST match, so choose a tag name not already present (each
// nesting level then reads its own) instead of a fixed one.
let tag_col: String = {
let mut n = 0usize;
loop {
let candidate = format!("__antijoin_outer_row_{n}");
if wide.schema().column_with_name(&candidate).is_none() {
break candidate;
}
n += 1;
}
};
let mut fields: Vec<Field> = wide
.schema()
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
fields.push(Field::new(TAG_COL, DataType::UInt32, false));
fields.push(Field::new(tag_col.as_str(), DataType::UInt32, false));
let mut columns: Vec<ArrayRef> = wide.columns().to_vec();
columns.push(Arc::new(UInt32Array::from_iter_values(0..num_rows as u32)));
let tagged = RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
@ -1658,7 +1669,7 @@ async fn execute_anti_join(
if let Some(batch) = inner_wide {
if batch.num_rows() > 0 {
let tags = batch
.column_by_name(TAG_COL)
.column_by_name(tag_col.as_str())
.ok_or_else(|| {
OmniError::manifest(
"anti-join inner pipeline dropped the correlation column".to_string(),
@ -1667,7 +1678,7 @@ async fn execute_anti_join(
.as_any()
.downcast_ref::<UInt32Array>()
.ok_or_else(|| {
OmniError::manifest(format!("'{}' column is not UInt32", TAG_COL))
OmniError::manifest(format!("'{}' column is not UInt32", tag_col))
})?;
for i in 0..tags.len() {
matched.insert(tags.value(i));