From bdc899f03d4bf47050768d1db3ea07982315eb17 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Tue, 9 Jun 2026 13:09:22 +0200 Subject: [PATCH] perf(engine): set-oriented filtered anti-join, remove per-row dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit execute_anti_join's filtered slow path sliced the outer batch to one row at a time and re-ran the inner pipeline per row, so each 1-row inner Expand dispatched to the indexed path — one Lance scan per outer row, while the CSR realized up front sat unused. Replace it with a set-oriented anti-semi-join: tag each outer row with a synthetic index column, run the inner pipeline once over the whole frontier (the tag survives Expand's hconcat and Filter's row-drop), then exclude outer rows whose tag survived. The inner Expand now runs as a single set-at-a-time traversal over the full frontier; config is read once per operator, not per row (the env nit is mooted). A produced-but-untagged inner batch fails loudly rather than silently keeping every row. Results are unchanged (the predicated-negation tests exercise the path over a multi-row outer with dst-filters). --- crates/omnigraph/src/exec/query.rs | 85 ++++++++++++++++++++++-------- 1 file changed, 62 insertions(+), 23 deletions(-) diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index d69c03f..ab0ce01 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -1607,36 +1607,75 @@ async fn execute_anti_join( return Ok(()); } - // Slow path: per-row inner pipeline execution + // Slow path (filtered / non-bulk inner): run the inner pipeline ONCE over the + // whole frontier — a set-oriented anti-semi-join — instead of row-by-row. + // Each outer row is tagged with a synthetic index; an outer row matches iff + // it produced at least one surviving inner row. No per-row dispatch, so the + // inner Expand runs as a single set-at-a-time traversal (one scan over the + // full frontier, reusing the already-built CSR) rather than one Lance scan + // per outer row. let num_rows = wide.num_rows(); - let mut keep_mask = vec![true; num_rows]; + if num_rows == 0 { + return Ok(()); + } - for i in 0..num_rows { - let single_row = wide.slice(i, 1); - let mut inner_wide: Option = Some(single_row); + // The tag rides through the inner pipeline: Expand's hconcat preserves + // existing columns and Filter only drops rows, so each surviving row carries + // its originating outer-row index. The `__`-prefixed name cannot collide with + // the `var.prop` columns the pipeline produces, and correlating on the row + // index (not `outer_var.id`) stays correct even if a dst-filter references + // other outer bindings. + const TAG_COL: &str = "__antijoin_outer_row"; + let mut fields: Vec = wide + .schema() + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect(); + fields.push(Field::new(TAG_COL, DataType::UInt32, false)); + let mut columns: Vec = wide.columns().to_vec(); + columns.push(Arc::new(UInt32Array::from_iter_values(0..num_rows as u32))); + let tagged = RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) + .map_err(|e| OmniError::Lance(e.to_string()))?; - let no_search = SearchMode::default(); - execute_pipeline( - inner_pipeline, - params, - snapshot, - graph_index, - catalog, - &mut inner_wide, - &no_search, - ) - .await?; + let mut inner_wide: Option = Some(tagged); + let no_search = SearchMode::default(); + execute_pipeline( + inner_pipeline, + params, + snapshot, + graph_index, + catalog, + &mut inner_wide, + &no_search, + ) + .await?; - let has_match = inner_wide - .as_ref() - .map(|batch| batch.num_rows() > 0) - .unwrap_or(false); - - if has_match { - keep_mask[i] = false; + // Outer rows whose tag survived have >= 1 match. A produced-but-untagged + // batch means the inner pipeline dropped the correlation column — fail loudly + // rather than silently keeping every row (which would corrupt the anti-join). + let mut matched: HashSet = HashSet::new(); + if let Some(batch) = inner_wide { + if batch.num_rows() > 0 { + let tags = batch + .column_by_name(TAG_COL) + .ok_or_else(|| { + OmniError::manifest( + "anti-join inner pipeline dropped the correlation column".to_string(), + ) + })? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OmniError::manifest(format!("'{}' column is not UInt32", TAG_COL)) + })?; + for i in 0..tags.len() { + matched.insert(tags.value(i)); + } } } + let keep_mask: Vec = (0..num_rows as u32).map(|i| !matched.contains(&i)).collect(); let mask = BooleanArray::from(keep_mask); *wide = arrow_select::filter::filter_record_batch(wide, &mask) .map_err(|e| OmniError::Lance(e.to_string()))?;