perf(engine): set-oriented filtered anti-join, remove per-row dispatch

execute_anti_join's filtered slow path sliced the outer batch to one row at a
time and re-ran the inner pipeline per row, so each 1-row inner Expand dispatched
to the indexed path — one Lance scan per outer row, while the CSR realized up
front sat unused.

Replace it with a set-oriented anti-semi-join: tag each outer row with a
synthetic index column, run the inner pipeline once over the whole frontier (the
tag survives Expand's hconcat and Filter's row-drop), then exclude outer rows
whose tag survived. The inner Expand now runs as a single set-at-a-time traversal
over the full frontier; config is read once per operator, not per row (the env
nit is mooted). A produced-but-untagged inner batch fails loudly rather than
silently keeping every row. Results are unchanged (the predicated-negation tests
exercise the path over a multi-row outer with dst-filters).
This commit is contained in:
Ragnor Comerford 2026-06-09 13:09:22 +02:00
parent f6a0e53737
commit bdc899f03d
No known key found for this signature in database

View file

@ -1607,36 +1607,75 @@ async fn execute_anti_join(
return Ok(());
}
// Slow path: per-row inner pipeline execution
// Slow path (filtered / non-bulk inner): run the inner pipeline ONCE over the
// whole frontier — a set-oriented anti-semi-join — instead of row-by-row.
// Each outer row is tagged with a synthetic index; an outer row matches iff
// it produced at least one surviving inner row. No per-row dispatch, so the
// inner Expand runs as a single set-at-a-time traversal (one scan over the
// full frontier, reusing the already-built CSR) rather than one Lance scan
// per outer row.
let num_rows = wide.num_rows();
let mut keep_mask = vec![true; num_rows];
if num_rows == 0 {
return Ok(());
}
for i in 0..num_rows {
let single_row = wide.slice(i, 1);
let mut inner_wide: Option<RecordBatch> = Some(single_row);
// The tag rides through the inner pipeline: Expand's hconcat preserves
// existing columns and Filter only drops rows, so each surviving row carries
// its originating outer-row index. The `__`-prefixed name cannot collide with
// the `var.prop` columns the pipeline produces, and correlating on the row
// index (not `outer_var.id`) stays correct even if a dst-filter references
// other outer bindings.
const TAG_COL: &str = "__antijoin_outer_row";
let mut fields: Vec<Field> = wide
.schema()
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
fields.push(Field::new(TAG_COL, DataType::UInt32, false));
let mut columns: Vec<ArrayRef> = wide.columns().to_vec();
columns.push(Arc::new(UInt32Array::from_iter_values(0..num_rows as u32)));
let tagged = RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
.map_err(|e| OmniError::Lance(e.to_string()))?;
let no_search = SearchMode::default();
execute_pipeline(
inner_pipeline,
params,
snapshot,
graph_index,
catalog,
&mut inner_wide,
&no_search,
)
.await?;
let mut inner_wide: Option<RecordBatch> = Some(tagged);
let no_search = SearchMode::default();
execute_pipeline(
inner_pipeline,
params,
snapshot,
graph_index,
catalog,
&mut inner_wide,
&no_search,
)
.await?;
let has_match = inner_wide
.as_ref()
.map(|batch| batch.num_rows() > 0)
.unwrap_or(false);
if has_match {
keep_mask[i] = false;
// Outer rows whose tag survived have >= 1 match. A produced-but-untagged
// batch means the inner pipeline dropped the correlation column — fail loudly
// rather than silently keeping every row (which would corrupt the anti-join).
let mut matched: HashSet<u32> = HashSet::new();
if let Some(batch) = inner_wide {
if batch.num_rows() > 0 {
let tags = batch
.column_by_name(TAG_COL)
.ok_or_else(|| {
OmniError::manifest(
"anti-join inner pipeline dropped the correlation column".to_string(),
)
})?
.as_any()
.downcast_ref::<UInt32Array>()
.ok_or_else(|| {
OmniError::manifest(format!("'{}' column is not UInt32", TAG_COL))
})?;
for i in 0..tags.len() {
matched.insert(tags.value(i));
}
}
}
let keep_mask: Vec<bool> = (0..num_rows as u32).map(|i| !matched.contains(&i)).collect();
let mask = BooleanArray::from(keep_mask);
*wide = arrow_select::filter::filter_record_batch(wide, &mask)
.map_err(|e| OmniError::Lance(e.to_string()))?;