diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index a106cfc..c5c0b12 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -741,6 +741,13 @@ impl<'a> GraphIndexHandle<'a> { .await?; Ok(built.as_deref()) } + + /// Whether the in-memory CSR is already materialized for this query (a prior + /// Expand or bulk AntiJoin realized it), so reusing it is ~free. Lets the + /// cost chooser prefer the warm CSR over per-hop indexed scans. + fn is_built(&self) -> bool { + matches!(self.cell.get(), Some(Some(_))) + } } /// Explicit traversal-mode override. `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` @@ -893,6 +900,7 @@ fn gather_cost_inputs( frontier_rows: usize, effective_max_hops: u32, coverage: crate::table_store::IndexCoverage, + csr_cached: bool, ) -> Option { let edge_entry = snapshot.entry(&format!("edge:{}", edge_type))?; let edge_def = catalog.edge_types.get(edge_type)?; @@ -915,7 +923,7 @@ fn gather_cost_inputs( max_hops_cap: expand_indexed_max_hops(), max_frontier_cap: expand_indexed_max_frontier(), coverage, - csr_cached: false, + csr_cached, }) } @@ -1008,6 +1016,7 @@ async fn execute_expand( frontier_rows, effective_max_hops, crate::table_store::IndexCoverage::Indexed, + graph_index.is_built(), ) { Some(inputs) => choose_expand_mode(&inputs) == ExpandMode::IndexedScan, // Manifest counts absent (e.g. not-yet-materialized table): fall back @@ -1054,6 +1063,7 @@ async fn execute_expand( frontier_rows, effective_max_hops, coverage_for_decision(&coverage), + graph_index.is_built(), ) { if choose_expand_mode(&inputs) == ExpandMode::Csr { tracing::debug!( @@ -1541,6 +1551,18 @@ async fn hydrate_nodes( Ok(scan_result) } +/// Whether the inner pipeline is the bulk-anti-join shape: a single Expand from +/// the outer var with no destination filters (the only shape the CSR +/// `has_neighbors` fast path can serve). Pure — it does not touch the CSR — so +/// the caller can decide whether to realize the O(|E|) graph index at all. +fn bulk_anti_join_applies(inner_pipeline: &[IROp], outer_var: &str) -> bool { + matches!( + inner_pipeline, + [IROp::Expand { src_var, dst_filters, .. }] + if src_var == outer_var && dst_filters.is_empty() + ) +} + /// Try bulk anti-join via CSR existence check. Returns Some(mask) if the inner /// pipeline is a single Expand from outer_var (the common negation pattern). fn try_bulk_anti_join_mask( @@ -1550,27 +1572,17 @@ fn try_bulk_anti_join_mask( catalog: &Catalog, outer_var: &str, ) -> Option { - if inner_pipeline.len() != 1 { + if !bulk_anti_join_applies(inner_pipeline, outer_var) { return None; } let IROp::Expand { - src_var, edge_type, direction, - dst_filters, .. } = &inner_pipeline[0] else { return None; }; - if src_var != outer_var { - return None; - } - // Bulk CSR check only tests neighbor existence, not destination - // properties. Fall back to the slow path when dst_filters are present. - if !dst_filters.is_empty() { - return None; - } let gi = graph_index?; let edge_def = catalog.edge_types.get(edge_type.as_str())?; @@ -1613,9 +1625,15 @@ async fn execute_anti_join( catalog: &Catalog, outer_var: &str, ) -> Result<()> { - // AntiJoin negation is served by the CSR `has_neighbors` check, so build the - // index up front (this is the one place the lazy handle is always realized). - let gi = graph_index.get().await?; + // Only the bulk fast path consumes the CSR; the slow path's inner Expand + // chooses its own access path. Realize the O(|E|) graph index ONLY when the + // inner-pipeline shape qualifies for the bulk check — a filtered/nested + // anti-join over a large graph must not pay a whole-graph build it won't use. + let gi = if bulk_anti_join_applies(inner_pipeline, outer_var) { + graph_index.get().await? + } else { + None + }; // Fast path: bulk CSR existence check (O(N), zero Lance I/O) if let Some(mask) = try_bulk_anti_join_mask(wide, inner_pipeline, gi, catalog, outer_var) { *wide = arrow_select::filter::filter_record_batch(wide, &mask) @@ -1627,8 +1645,8 @@ async fn execute_anti_join( // whole frontier — a set-oriented anti-semi-join — instead of row-by-row. // Each outer row is tagged with a synthetic index; an outer row matches iff // it produced at least one surviving inner row. No per-row dispatch, so the - // inner Expand runs as a single set-at-a-time traversal (one scan over the - // full frontier, reusing the already-built CSR) rather than one Lance scan + // inner Expand runs as a single set-at-a-time traversal over the full + // frontier (its own chooser picks indexed vs CSR) rather than one Lance scan // per outer row. let num_rows = wide.num_rows(); if num_rows == 0 {