perf(engine): realize anti-join CSR lazily + reuse a warm CSR in the chooser

Two CSR build/reuse fixes flagged on the set-oriented anti-join work (results
unchanged — plan/perf accuracy):

- execute_anti_join called graph_index.get() (the O(|E|) whole-graph CSR build)
  unconditionally, but only the bulk fast path consumes it; a filtered/nested
  slow-path anti-join's inner Expand picks its own access path. Gate the build on
  a pure shape predicate (bulk_anti_join_applies) so a selective anti-join over a
  large graph no longer pays a build it won't use.
- gather_cost_inputs hardcoded csr_cached=false, so once an earlier op realized
  the CSR, later Expands still cost it as a cold build and could pick per-hop
  indexed scans over reusing the warm in-memory CSR. Add GraphIndexHandle::
  is_built() and thread it through so the chooser reuses a materialized CSR.

Anti-join, cross-type, proptest-equivalence, and chooser unit tests stay green.
This commit is contained in:
Ragnor Comerford 2026-06-09 16:27:18 +02:00
parent bdf31afb4c
commit fb16c66285
No known key found for this signature in database

View file

@ -741,6 +741,13 @@ impl<'a> GraphIndexHandle<'a> {
.await?;
Ok(built.as_deref())
}
/// Whether the in-memory CSR is already materialized for this query (a prior
/// Expand or bulk AntiJoin realized it), so reusing it is ~free. Lets the
/// cost chooser prefer the warm CSR over per-hop indexed scans.
fn is_built(&self) -> bool {
matches!(self.cell.get(), Some(Some(_)))
}
}
/// Explicit traversal-mode override. `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr`
@ -893,6 +900,7 @@ fn gather_cost_inputs(
frontier_rows: usize,
effective_max_hops: u32,
coverage: crate::table_store::IndexCoverage,
csr_cached: bool,
) -> Option<ExpandCostInputs> {
let edge_entry = snapshot.entry(&format!("edge:{}", edge_type))?;
let edge_def = catalog.edge_types.get(edge_type)?;
@ -915,7 +923,7 @@ fn gather_cost_inputs(
max_hops_cap: expand_indexed_max_hops(),
max_frontier_cap: expand_indexed_max_frontier(),
coverage,
csr_cached: false,
csr_cached,
})
}
@ -1008,6 +1016,7 @@ async fn execute_expand(
frontier_rows,
effective_max_hops,
crate::table_store::IndexCoverage::Indexed,
graph_index.is_built(),
) {
Some(inputs) => choose_expand_mode(&inputs) == ExpandMode::IndexedScan,
// Manifest counts absent (e.g. not-yet-materialized table): fall back
@ -1054,6 +1063,7 @@ async fn execute_expand(
frontier_rows,
effective_max_hops,
coverage_for_decision(&coverage),
graph_index.is_built(),
) {
if choose_expand_mode(&inputs) == ExpandMode::Csr {
tracing::debug!(
@ -1541,6 +1551,18 @@ async fn hydrate_nodes(
Ok(scan_result)
}
/// Whether the inner pipeline is the bulk-anti-join shape: a single Expand from
/// the outer var with no destination filters (the only shape the CSR
/// `has_neighbors` fast path can serve). Pure — it does not touch the CSR — so
/// the caller can decide whether to realize the O(|E|) graph index at all.
fn bulk_anti_join_applies(inner_pipeline: &[IROp], outer_var: &str) -> bool {
matches!(
inner_pipeline,
[IROp::Expand { src_var, dst_filters, .. }]
if src_var == outer_var && dst_filters.is_empty()
)
}
/// Try bulk anti-join via CSR existence check. Returns Some(mask) if the inner
/// pipeline is a single Expand from outer_var (the common negation pattern).
fn try_bulk_anti_join_mask(
@ -1550,27 +1572,17 @@ fn try_bulk_anti_join_mask(
catalog: &Catalog,
outer_var: &str,
) -> Option<BooleanArray> {
if inner_pipeline.len() != 1 {
if !bulk_anti_join_applies(inner_pipeline, outer_var) {
return None;
}
let IROp::Expand {
src_var,
edge_type,
direction,
dst_filters,
..
} = &inner_pipeline[0]
else {
return None;
};
if src_var != outer_var {
return None;
}
// Bulk CSR check only tests neighbor existence, not destination
// properties. Fall back to the slow path when dst_filters are present.
if !dst_filters.is_empty() {
return None;
}
let gi = graph_index?;
let edge_def = catalog.edge_types.get(edge_type.as_str())?;
@ -1613,9 +1625,15 @@ async fn execute_anti_join(
catalog: &Catalog,
outer_var: &str,
) -> Result<()> {
// AntiJoin negation is served by the CSR `has_neighbors` check, so build the
// index up front (this is the one place the lazy handle is always realized).
let gi = graph_index.get().await?;
// Only the bulk fast path consumes the CSR; the slow path's inner Expand
// chooses its own access path. Realize the O(|E|) graph index ONLY when the
// inner-pipeline shape qualifies for the bulk check — a filtered/nested
// anti-join over a large graph must not pay a whole-graph build it won't use.
let gi = if bulk_anti_join_applies(inner_pipeline, outer_var) {
graph_index.get().await?
} else {
None
};
// Fast path: bulk CSR existence check (O(N), zero Lance I/O)
if let Some(mask) = try_bulk_anti_join_mask(wide, inner_pipeline, gi, catalog, outer_var) {
*wide = arrow_select::filter::filter_record_batch(wide, &mask)
@ -1627,8 +1645,8 @@ async fn execute_anti_join(
// whole frontier — a set-oriented anti-semi-join — instead of row-by-row.
// Each outer row is tagged with a synthetic index; an outer row matches iff
// it produced at least one surviving inner row. No per-row dispatch, so the
// inner Expand runs as a single set-at-a-time traversal (one scan over the
// full frontier, reusing the already-built CSR) rather than one Lance scan
// inner Expand runs as a single set-at-a-time traversal over the full
// frontier (its own chooser picks indexed vs CSR) rather than one Lance scan
// per outer row.
let num_rows = wide.num_rows();
if num_rows == 0 {