feat(engine): add BTREE-indexed Expand traversal path

Split execute_expand into a dispatcher over execute_expand_csr (the existing
in-memory CSR BFS, unchanged) and a new execute_expand_indexed that serves each
hop by batching the frontier into one scan_edges_by_endpoint call against the
persisted src/dst BTREE (index-search -> take), then fans out per source row.
Both share expand_hydrate_and_align — the destination hydration + alignment +
hconcat + in-memory non-pushable filters — which now aligns by string id (a
HashMap) instead of a dense row-id vec, so one tail serves both modes.

Mode selection is OMNIGRAPH_TRAVERSAL_MODE for now (default csr); the
frontier-size auto policy and lazy CSR build follow. AntiJoin stays on CSR.

tests/traversal_indexed.rs (its own #[serial] binary, so env writes never race a
reader) asserts the indexed path matches CSR for one-hop, multi-hop, cross-type,
and no-match cases, and that a freshly-appended unindexed edge is still found
(partial index coverage — fast_search=false unindexed-fragment scan).
This commit is contained in:
Ragnor Comerford 2026-06-08 10:43:35 +02:00
parent 7d5fba3a45
commit 7337be60c8
No known key found for this signature in database
2 changed files with 429 additions and 68 deletions

View file

@ -653,13 +653,10 @@ fn execute_pipeline<'a>(
max_hops,
dst_filters,
} => {
let gi = graph_index.ok_or_else(|| {
OmniError::manifest("graph index required for traversal".to_string())
})?;
if let Some(batch) = wide.as_mut() {
execute_expand(
batch,
gi,
graph_index,
snapshot,
catalog,
src_var,
@ -688,8 +685,269 @@ fn execute_pipeline<'a>(
})
}
/// Execute a graph traversal (Expand).
/// Traversal mode override for `execute_expand`. `OMNIGRAPH_TRAVERSAL_MODE`:
/// `indexed` forces the BTREE-backed path, `csr` forces the in-memory CSR path.
/// Unset (the default) currently selects CSR; the frontier-size auto policy and
/// lazy CSR build land with the dispatcher work. Read per call so tests can
/// force a mode — both modes are semantically identical, so a racing read only
/// changes which path runs, never the result.
fn traversal_indexed_override() -> Option<bool> {
match std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok().as_deref() {
Some("indexed") => Some(true),
Some("csr") => Some(false),
_ => None,
}
}
/// Execute a graph traversal (Expand). Dispatches to the BTREE-indexed path
/// (selective traversals — neighbor lookups via the persisted src/dst index) or
/// the in-memory CSR path (dense / whole-graph traversals). The CSR index is
/// required only by the CSR path.
async fn execute_expand(
wide: &mut RecordBatch,
graph_index: Option<&GraphIndex>,
snapshot: &Snapshot,
catalog: &Catalog,
src_var: &str,
dst_var: &str,
edge_type: &str,
direction: Direction,
dst_type: &str,
min_hops: u32,
max_hops: Option<u32>,
dst_filters: &[IRFilter],
params: &ParamMap,
) -> Result<()> {
let use_indexed = traversal_indexed_override().unwrap_or(false);
if use_indexed {
execute_expand_indexed(
wide, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops,
max_hops, dst_filters, params,
)
.await
} else {
let gi = graph_index.ok_or_else(|| {
OmniError::manifest("graph index required for CSR traversal".to_string())
})?;
execute_expand_csr(
wide, gi, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type,
min_hops, max_hops, dst_filters, params,
)
.await
}
}
/// BTREE-indexed graph traversal: per hop, batch the current frontier into one
/// `scan_edges_by_endpoint` call against the persisted src/dst index, then fan
/// out per source row. Cost scales with the frontier, not |E|. Produces the
/// same `(src_row, dst_id)` pairs as the CSR path and shares its hydrate+align
/// tail. Multi-hop only advances for same-type edges; cross-type frontiers go
/// empty after one hop (no edges key off the destination type), matching CSR.
async fn execute_expand_indexed(
wide: &mut RecordBatch,
snapshot: &Snapshot,
catalog: &Catalog,
src_var: &str,
dst_var: &str,
edge_type: &str,
direction: Direction,
dst_type: &str,
min_hops: u32,
max_hops: Option<u32>,
dst_filters: &[IRFilter],
params: &ParamMap,
) -> Result<()> {
let src_id_col_name = format!("{}.id", src_var);
let src_ids = wide
.column_by_name(&src_id_col_name)
.ok_or_else(|| {
OmniError::manifest(format!("wide batch missing '{}' column", src_id_col_name))
})?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest(format!("'{}' column is not Utf8", src_id_col_name)))?
.clone();
let edge_def = catalog
.edge_types
.get(edge_type)
.ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_type)))?;
let same_type = edge_def.from_type == edge_def.to_type;
// Out-traversal follows src -> dst (key on src); In-traversal follows the
// reverse (key on dst). The persisted BTREE exists on both columns.
let (key_col, opp_col) = match direction {
Direction::Out => ("src", "dst"),
Direction::In => ("dst", "src"),
};
let edge_table_key = format!("edge:{}", edge_type);
let edge_ds = snapshot.open(&edge_table_key).await?;
let max = max_hops.unwrap_or(min_hops.max(1));
// Per-source BFS state in string-id space (no dense interning needed).
let n = src_ids.len();
let mut frontiers: Vec<Vec<String>> = Vec::with_capacity(n);
let mut visited: Vec<HashSet<String>> = Vec::with_capacity(n);
let mut seen_dst: Vec<HashSet<String>> = Vec::with_capacity(n);
for i in 0..n {
let sid = src_ids.value(i).to_string();
let mut v = HashSet::new();
if same_type {
v.insert(sid.clone());
}
frontiers.push(vec![sid]);
visited.push(v);
seen_dst.push(HashSet::new());
}
let mut src_indices: Vec<u32> = Vec::new();
let mut dst_ids: Vec<String> = Vec::new();
for hop in 1..=max {
// One batched indexed scan per hop over the union of all live frontiers.
let mut union: Vec<String> = Vec::new();
{
let mut seen: HashSet<&str> = HashSet::new();
for f in &frontiers {
for node in f {
if seen.insert(node.as_str()) {
union.push(node.clone());
}
}
}
}
if union.is_empty() {
break;
}
let batches = crate::table_store::TableStore::scan_edges_by_endpoint(
&edge_ds, key_col, opp_col, &union,
)
.await?;
// key -> neighbors (scan order; duplicates preserved, like CSR multi-edges).
let mut neighbor_map: HashMap<String, Vec<String>> = HashMap::new();
for batch in &batches {
let keys = batch
.column_by_name(key_col)
.ok_or_else(|| OmniError::manifest(format!("edge batch missing '{}'", key_col)))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest(format!("edge '{}' is not Utf8", key_col)))?;
let opps = batch
.column_by_name(opp_col)
.ok_or_else(|| OmniError::manifest(format!("edge batch missing '{}'", opp_col)))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest(format!("edge '{}' is not Utf8", opp_col)))?;
for r in 0..batch.num_rows() {
neighbor_map
.entry(keys.value(r).to_string())
.or_default()
.push(opps.value(r).to_string());
}
}
// Advance each source row's frontier independently.
for i in 0..n {
let cur = std::mem::take(&mut frontiers[i]);
let mut next: Vec<String> = Vec::new();
for node in &cur {
let Some(neighbors) = neighbor_map.get(node) else {
continue;
};
for neighbor in neighbors {
if !same_type || visited[i].insert(neighbor.clone()) {
next.push(neighbor.clone());
if hop >= min_hops && seen_dst[i].insert(neighbor.clone()) {
src_indices.push(i as u32);
dst_ids.push(neighbor.clone());
}
}
}
}
frontiers[i] = next;
}
}
expand_hydrate_and_align(
wide, src_indices, dst_ids, snapshot, catalog, dst_type, dst_var, dst_filters, params,
)
.await
}
/// Shared tail for both Expand modes: hydrate the unique destination ids, align
/// the `(src_row, dst_id)` pairs back onto `wide`, hconcat, and apply
/// non-pushable destination filters in memory.
async fn expand_hydrate_and_align(
wide: &mut RecordBatch,
src_indices: Vec<u32>,
dst_ids: Vec<String>,
snapshot: &Snapshot,
catalog: &Catalog,
dst_type: &str,
dst_var: &str,
dst_filters: &[IRFilter],
params: &ParamMap,
) -> Result<()> {
// Pushable destination filters are applied by `hydrate_nodes`; the rest
// (`ir_filter_to_expr` → None) are applied in memory after hconcat.
let non_pushable: Vec<&IRFilter> = dst_filters
.iter()
.filter(|f| ir_filter_to_expr(f, params).is_none())
.collect();
// Unique destination ids (first-seen order) for one batched hydration.
let mut unique_dst_list: Vec<String> = Vec::new();
{
let mut seen: HashSet<&str> = HashSet::with_capacity(dst_ids.len());
for id in &dst_ids {
if seen.insert(id.as_str()) {
unique_dst_list.push(id.clone());
}
}
}
let dst_batch =
hydrate_nodes(snapshot, catalog, dst_type, &unique_dst_list, dst_filters, params).await?;
// id -> row index in the hydrated batch.
let dst_batch_id_col = dst_batch
.column_by_name("id")
.ok_or_else(|| OmniError::manifest("hydrated batch missing 'id' column".to_string()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest("hydrated 'id' column is not Utf8".to_string()))?;
let mut id_to_row: HashMap<&str, u32> = HashMap::with_capacity(dst_batch_id_col.len());
for row in 0..dst_batch_id_col.len() {
id_to_row.insert(dst_batch_id_col.value(row), row as u32);
}
// Align pairs to (src_row, hydrated_dst_row), dropping ids hydration filtered out.
let mut final_src_indices: Vec<u32> = Vec::with_capacity(src_indices.len());
let mut dst_indices: Vec<u32> = Vec::with_capacity(src_indices.len());
for (&src_idx, dst_id) in src_indices.iter().zip(dst_ids.iter()) {
if let Some(&dst_row) = id_to_row.get(dst_id.as_str()) {
final_src_indices.push(src_idx);
dst_indices.push(dst_row);
}
}
let src_take = UInt32Array::from(final_src_indices);
let dst_take = UInt32Array::from(dst_indices);
let expanded_wide = take_batch(wide, &src_take)?;
let dst_prefixed = prefix_batch(&dst_batch, dst_var)?;
let aligned_dst = take_batch(&dst_prefixed, &dst_take)?;
*wide = hconcat_batches(&expanded_wide, &aligned_dst)?;
for f in &non_pushable {
apply_filter(wide, f, params)?;
}
Ok(())
}
/// CSR-backed graph traversal: BFS over the in-memory adjacency index. Used for
/// dense / whole-graph traversals; selective traversals use
/// `execute_expand_indexed`. Both share `expand_hydrate_and_align`.
async fn execute_expand_csr(
wide: &mut RecordBatch,
graph_index: &GraphIndex,
snapshot: &Snapshot,
@ -785,77 +1043,30 @@ async fn execute_expand(
}
}
// Destination-binding filters: pushable ones lower to a DataFusion `Expr`
// and are applied by `hydrate_nodes` (AND'd with the id IN-list, routed
// through the BTREE); the rest (`ir_filter_to_expr` → None) are applied in
// memory post-hconcat.
let non_pushable: Vec<&IRFilter> = dst_filters
.iter()
.filter(|f| ir_filter_to_expr(f, params).is_none())
.collect();
// Dedup dst dense ids globally across source rows, then stringify once
// for the Lance IN-list. The post-hydrate alignment fans rows back out to
// the original (src, dst) pairs via a dense-indexed lookup below.
let mut unique_dst_list: Vec<String> = Vec::new();
{
let mut seen: HashSet<u32> = HashSet::with_capacity(dst_dense_list.len());
for &d in &dst_dense_list {
if seen.insert(d) {
if let Some(id) = dst_type_idx.to_id(d) {
unique_dst_list.push(id.to_string());
}
}
// Map BFS-produced dense destination ids to string ids for the shared
// hydrate+align tail. Dense ids always resolve (they came from the index);
// drop any that don't, keeping the (src, dst) arrays parallel.
let mut tail_src_indices: Vec<u32> = Vec::with_capacity(src_indices.len());
let mut dst_ids: Vec<String> = Vec::with_capacity(dst_dense_list.len());
for (&s, &d) in src_indices.iter().zip(dst_dense_list.iter()) {
if let Some(id) = dst_type_idx.to_id(d) {
tail_src_indices.push(s);
dst_ids.push(id.to_string());
}
}
let dst_batch = hydrate_nodes(
expand_hydrate_and_align(
wide,
tail_src_indices,
dst_ids,
snapshot,
catalog,
dst_type,
&unique_dst_list,
dst_var,
dst_filters,
params,
)
.await?;
// Build dense → row-in-hydrated-batch via a direct-indexed array.
let dst_batch_id_col = dst_batch
.column_by_name("id")
.ok_or_else(|| OmniError::manifest("hydrated batch missing 'id' column".to_string()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest("hydrated 'id' column is not Utf8".to_string()))?;
let mut dense_to_row: Vec<Option<u32>> = vec![None; dst_type_idx.len()];
for row in 0..dst_batch_id_col.len() {
let id_str = dst_batch_id_col.value(row);
if let Some(dense) = dst_type_idx.to_dense(id_str) {
dense_to_row[dense as usize] = Some(row as u32);
}
}
// Build aligned src/dst index arrays (only for ids that exist in hydrated batch)
let mut final_src_indices: Vec<u32> = Vec::new();
let mut dst_indices: Vec<u32> = Vec::new();
for (src_idx, dst_dense) in src_indices.iter().zip(dst_dense_list.iter()) {
if let Some(dst_row) = dense_to_row[*dst_dense as usize] {
final_src_indices.push(*src_idx);
dst_indices.push(dst_row);
}
}
let src_take = UInt32Array::from(final_src_indices);
let dst_take = UInt32Array::from(dst_indices);
let expanded_wide = take_batch(wide, &src_take)?;
let dst_prefixed = prefix_batch(&dst_batch, dst_var)?;
let aligned_dst = take_batch(&dst_prefixed, &dst_take)?;
*wide = hconcat_batches(&expanded_wide, &aligned_dst)?;
// Apply any non-pushable destination filters (e.g. list-contains) in memory
for f in &non_pushable {
apply_filter(wide, f, params)?;
}
Ok(())
.await
}
/// Load full node rows for a set of IDs from a snapshot.

View file

@ -0,0 +1,150 @@
//! BTREE-indexed Expand path (`execute_expand_indexed`) coverage.
//!
//! These tests force the Expand execution mode via `OMNIGRAPH_TRAVERSAL_MODE`
//! and assert the indexed path matches the CSR path (both are semantically
//! identical — the indexed path just serves neighbor lookups from the persisted
//! src/dst BTREE instead of an in-memory CSR). They live in their own test
//! binary and are all `#[serial]`, so the env writes never race a concurrent
//! reader: within this process serial execution serializes every env read, and
//! other test binaries (e.g. `traversal.rs`) are separate processes whose env
//! stays unset (→ CSR), validating the shared hydrate/align tail on the CSR path.
mod helpers;
use arrow_array::{Array, StringArray};
use omnigraph::db::Omnigraph;
use omnigraph_compiler::ir::ParamMap;
use serial_test::serial;
use helpers::*;
fn set_mode(mode: &str) {
// SAFE: every test here is #[serial] and this binary has no non-serial
// env reader, so no thread reads the environment during this write.
unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", mode) };
}
fn clear_mode() {
unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") };
}
/// Run a name-returning query and return its first column, sorted.
async fn sorted_names(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec<String> {
let result = query_main(db, queries, name, params).await.unwrap();
if result.num_rows() == 0 {
return Vec::new();
}
let batch = result.concat_batches().unwrap();
let col = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut v: Vec<String> = (0..col.len()).map(|i| col.value(i).to_string()).collect();
v.sort();
v
}
/// Run the same query under CSR then indexed mode; assert identical results and
/// return them.
async fn both_modes(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec<String> {
set_mode("csr");
let csr = sorted_names(db, queries, name, params).await;
set_mode("indexed");
let indexed = sorted_names(db, queries, name, params).await;
clear_mode();
assert_eq!(
indexed, csr,
"indexed Expand must produce identical results to CSR for query '{name}'"
);
indexed
}
#[tokio::test]
#[serial]
async fn indexed_matches_csr_one_hop_same_type() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// friends_of: `$p knows $f` (Person -> Person, single hop).
let got = both_modes(&mut db, TEST_QUERIES, "friends_of", &params(&[("$name", "Alice")])).await;
assert_eq!(got, vec!["Bob", "Charlie"], "Alice knows Bob and Charlie");
}
#[tokio::test]
#[serial]
async fn indexed_matches_csr_multi_hop_same_type() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query reach($name: String) {
match {
$p: Person { name: $name }
$p knows{1,2} $f
}
return { $f.name }
}
"#;
// Alice -> Bob, Charlie (1 hop); Bob -> Diana (2 hops).
let got = both_modes(&mut db, queries, "reach", &params(&[("$name", "Alice")])).await;
assert_eq!(got, vec!["Bob", "Charlie", "Diana"]);
}
#[tokio::test]
#[serial]
async fn indexed_matches_csr_cross_type() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query employer($name: String) {
match {
$p: Person { name: $name }
$p worksAt $c
}
return { $c.name }
}
"#;
let got = both_modes(&mut db, queries, "employer", &params(&[("$name", "Alice")])).await;
assert_eq!(got, vec!["Acme"], "Alice works at Acme");
}
#[tokio::test]
#[serial]
async fn indexed_matches_csr_no_match() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Diana has no outgoing Knows edges → empty in both modes.
let got = both_modes(&mut db, TEST_QUERIES, "friends_of", &params(&[("$name", "Diana")])).await;
assert!(got.is_empty(), "Diana knows no one");
}
#[tokio::test]
#[serial]
async fn indexed_finds_unindexed_appended_edge() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Append Alice -> Diana AFTER the initial load. `ensure_indices`' existence
// guard means the src/dst BTREE built on the first load does NOT cover this
// new fragment. The indexed path must still find it via Lance's
// unindexed-fragment scan (fast_search=false default), so partial index
// coverage never silently drops rows.
mutate_main(
&mut db,
MUTATION_QUERIES,
"add_friend",
&params(&[("$from", "Alice"), ("$to", "Diana")]),
)
.await
.unwrap();
set_mode("indexed");
let got = sorted_names(&mut db, TEST_QUERIES, "friends_of", &params(&[("$name", "Alice")])).await;
clear_mode();
assert_eq!(
got,
vec!["Bob", "Charlie", "Diana"],
"indexed traversal must see the freshly-appended, unindexed edge"
);
}