perf(engine): dense-id BFS on the indexed traversal path (C3)

execute_expand_indexed ran its per-source BFS in string space
(Vec<HashSet<String>>, HashMap<String,Vec<String>>, ~4 String clones per neighbor
occurrence). Intern node ids to u32 once via a per-traversal TypeIndex (no
GraphIndex/CSR build — laziness preserved) and run visited/seen/frontier/
neighbor-map in dense u32 space, mirroring the CSR path; de-intern only for the
per-hop IN-list and the emitted dst ids handed to the hydrate+align tail.

Behavior-preserving — the traversal_indexed CSR-vs-indexed equivalence tests are
the guard (results are identical, the key type just changes String -> u32).
This commit is contained in:
Ragnor Comerford 2026-06-08 14:23:20 +02:00
parent 5a7ab6d427
commit e7d02ea181
No known key found for this signature in database

View file

@ -893,16 +893,22 @@ async fn execute_expand_indexed(
let max = max_hops.unwrap_or(min_hops.max(1));
// Per-source BFS state in string-id space (no dense interning needed).
// Per-source BFS state in DENSE id space: intern node ids to u32 once via a
// per-traversal interner so visited/seen/frontier/neighbor-map avoid string
// hashing + cloning in the hot loop (mirrors the CSR path's TypeIndex). The
// GraphIndex/CSR is NOT built — only a local id↔u32 dictionary. Strings
// survive at the substrate edges only: the per-hop IN-list to Lance, and the
// emitted dst ids handed to the string-keyed hydrate+align tail.
let mut interner = crate::graph_index::TypeIndex::new();
let n = src_ids.len();
let mut frontiers: Vec<Vec<String>> = Vec::with_capacity(n);
let mut visited: Vec<HashSet<String>> = Vec::with_capacity(n);
let mut seen_dst: Vec<HashSet<String>> = Vec::with_capacity(n);
let mut frontiers: Vec<Vec<u32>> = Vec::with_capacity(n);
let mut visited: Vec<HashSet<u32>> = Vec::with_capacity(n);
let mut seen_dst: Vec<HashSet<u32>> = Vec::with_capacity(n);
for i in 0..n {
let sid = src_ids.value(i).to_string();
let sid = interner.get_or_insert(src_ids.value(i));
let mut v = HashSet::new();
if same_type {
v.insert(sid.clone());
v.insert(sid);
}
frontiers.push(vec![sid]);
visited.push(v);
@ -910,32 +916,41 @@ async fn execute_expand_indexed(
}
let mut src_indices: Vec<u32> = Vec::new();
let mut dst_ids: Vec<String> = Vec::new();
let mut dst_dense: Vec<u32> = Vec::new();
for hop in 1..=max {
// One batched indexed scan per hop over the union of all live frontiers.
let mut union: Vec<String> = Vec::new();
// Union of all live frontiers (dense), de-interned once for the IN-list.
let mut union_dense: Vec<u32> = Vec::new();
{
let mut seen: HashSet<&str> = HashSet::new();
let mut seen: HashSet<u32> = HashSet::new();
for f in &frontiers {
for node in f {
if seen.insert(node.as_str()) {
union.push(node.clone());
for &node in f {
if seen.insert(node) {
union_dense.push(node);
}
}
}
}
if union.is_empty() {
if union_dense.is_empty() {
break;
}
let union_keys: Vec<String> = union_dense
.iter()
.map(|&u| {
interner
.to_id(u)
.expect("interned frontier id must resolve")
.to_string()
})
.collect();
let batches = crate::table_store::TableStore::scan_edges_by_endpoint(
&edge_ds, key_col, opp_col, &union,
&edge_ds, key_col, opp_col, &union_keys,
)
.await?;
// key -> neighbors (scan order; duplicates preserved, like CSR multi-edges).
let mut neighbor_map: HashMap<String, Vec<String>> = HashMap::new();
// dense key -> dense neighbors (scan order; duplicates preserved, like CSR multi-edges).
let mut neighbor_map: HashMap<u32, Vec<u32>> = HashMap::new();
for batch in &batches {
let keys = batch
.column_by_name(key_col)
@ -950,27 +965,26 @@ async fn execute_expand_indexed(
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest(format!("edge '{}' is not Utf8", opp_col)))?;
for r in 0..batch.num_rows() {
neighbor_map
.entry(keys.value(r).to_string())
.or_default()
.push(opps.value(r).to_string());
let k = interner.get_or_insert(keys.value(r));
let o = interner.get_or_insert(opps.value(r));
neighbor_map.entry(k).or_default().push(o);
}
}
// Advance each source row's frontier independently.
// Advance each source row's frontier independently (dense ids).
for i in 0..n {
let cur = std::mem::take(&mut frontiers[i]);
let mut next: Vec<String> = Vec::new();
for node in &cur {
let Some(neighbors) = neighbor_map.get(node) else {
let mut next: Vec<u32> = Vec::new();
for &node in &cur {
let Some(neighbors) = neighbor_map.get(&node) else {
continue;
};
for neighbor in neighbors {
if !same_type || visited[i].insert(neighbor.clone()) {
next.push(neighbor.clone());
if hop >= min_hops && seen_dst[i].insert(neighbor.clone()) {
for &neighbor in neighbors {
if !same_type || visited[i].insert(neighbor) {
next.push(neighbor);
if hop >= min_hops && seen_dst[i].insert(neighbor) {
src_indices.push(i as u32);
dst_ids.push(neighbor.clone());
dst_dense.push(neighbor);
}
}
}
@ -979,6 +993,18 @@ async fn execute_expand_indexed(
}
}
// De-intern emitted destination ids (parallel to src_indices) for the
// string-keyed hydrate+align tail, exactly as the CSR path does.
let dst_ids: Vec<String> = dst_dense
.iter()
.map(|&d| {
interner
.to_id(d)
.expect("interned dst id must resolve")
.to_string()
})
.collect();
expand_hydrate_and_align(
wide, src_indices, dst_ids, snapshot, catalog, dst_type, dst_var, dst_filters, params,
)