Pass dense u32 ids through expand instead of round-tripping via String

BFS now emits Vec<u32> dense ids directly with HashSet<u32> per-source
dedup. Only the deduped set is stringified for Lance's IN-list. The
post-hydrate alignment uses a dense-indexed Vec<Option<u32>> instead of
HashMap<&str, usize>, giving O(1) lookup without repeated string hashing.

End-to-end on the bench_expand harness (release, M-series):

  query     baseline    after     speedup
  1k  hop3   460.2 ms    23.7 ms     19x
  10k hop2     4.21 s   139.9 ms     30x
  10k hop3    40.59 s    898.5 ms    45x
  30k hop2    11.71 s    490.2 ms    24x
  30k hop3   197.38 s      3.22 s    61x

The cost lived in stringifying every (src,dst) pair and re-hashing the
strings during alignment; once dense ids stay dense, the BFS inner loop
and the final fan-out both collapse to integer ops.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-04-25 11:59:32 +02:00
parent 628bc2e607
commit 53d7f47909
No known key found for this signature in database

View file

@ -724,9 +724,11 @@ async fn execute_expand(
let same_type = src_type_name == dst_type_name;
// BFS to collect (src_row_idx, dst_id) pairs with per-source dedup
// BFS to collect (src_row_idx, dst_dense) pairs with per-source dedup.
// Dense u32 ids stay in hand through BFS, dedup, and align — we only
// stringify the unique set for Lance's SQL IN-list.
let mut src_indices: Vec<u32> = Vec::new();
let mut dst_id_list: Vec<String> = Vec::new();
let mut dst_dense_list: Vec<u32> = Vec::new();
for i in 0..src_ids.len() {
let src_id = src_ids.value(i);
let Some(src_dense) = src_type_idx.to_dense(src_id) else {
@ -736,7 +738,7 @@ async fn execute_expand(
// BFS with hop tracking
let mut frontier: Vec<u32> = vec![src_dense];
let mut visited: HashSet<u32> = HashSet::new();
let mut seen_dst_ids: HashSet<String> = HashSet::new();
let mut seen_dst_dense: HashSet<u32> = HashSet::new();
// Only track visited in the destination namespace for same-type edges
// (to avoid revisiting the source). For cross-type edges, dense indices
// are in different namespaces so collision is impossible.
@ -750,14 +752,9 @@ async fn execute_expand(
for &neighbor in adj.neighbors(node) {
if !same_type || visited.insert(neighbor) {
next_frontier.push(neighbor);
if hop >= min_hops {
if let Some(dst_id) = dst_type_idx.to_id(neighbor) {
let dst_id = dst_id.to_string();
if seen_dst_ids.insert(dst_id.clone()) {
src_indices.push(i as u32);
dst_id_list.push(dst_id);
}
}
if hop >= min_hops && seen_dst_dense.insert(neighbor) {
src_indices.push(i as u32);
dst_dense_list.push(neighbor);
}
}
}
@ -776,19 +773,20 @@ async fn execute_expand(
.filter(|f| ir_filter_to_sql(f, params).is_none())
.collect();
// Hydrate destination nodes from the snapshot (with pushed-down filters).
// Dedupe dst ids before the scan — the post-hydrate alignment HashMap fans
// them back out to the original (src, dst) pairs. For multi-hop on a small
// destination universe this collapses the IN-list from O(pairs) to
// O(unique_dst), often by 100500×.
let unique_dst_list: Vec<String> = {
let mut seen: HashSet<&str> = HashSet::with_capacity(dst_id_list.len());
dst_id_list
.iter()
.filter(|s| seen.insert(s.as_str()))
.cloned()
.collect()
};
// Dedup dst dense ids globally across source rows, then stringify once
// for the Lance IN-list. The post-hydrate alignment fans rows back out to
// the original (src, dst) pairs via a dense-indexed lookup below.
let mut unique_dst_list: Vec<String> = Vec::new();
{
let mut seen: HashSet<u32> = HashSet::with_capacity(dst_dense_list.len());
for &d in &dst_dense_list {
if seen.insert(d) {
if let Some(id) = dst_type_idx.to_id(d) {
unique_dst_list.push(id.to_string());
}
}
}
}
let dst_batch = hydrate_nodes(
snapshot,
catalog,
@ -798,24 +796,28 @@ async fn execute_expand(
)
.await?;
// Build a mapping from dst_id to row index in dst_batch
// Build dense → row-in-hydrated-batch via a direct-indexed array.
let dst_batch_id_col = dst_batch
.column_by_name("id")
.ok_or_else(|| OmniError::manifest("hydrated batch missing 'id' column".to_string()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest("hydrated 'id' column is not Utf8".to_string()))?;
let dst_id_to_row: HashMap<&str, usize> = (0..dst_batch_id_col.len())
.map(|i| (dst_batch_id_col.value(i), i))
.collect();
let mut dense_to_row: Vec<Option<u32>> = vec![None; dst_type_idx.len()];
for row in 0..dst_batch_id_col.len() {
let id_str = dst_batch_id_col.value(row);
if let Some(dense) = dst_type_idx.to_dense(id_str) {
dense_to_row[dense as usize] = Some(row as u32);
}
}
// Build aligned src/dst index arrays (only for IDs that exist in hydrated batch)
// Build aligned src/dst index arrays (only for ids that exist in hydrated batch)
let mut final_src_indices: Vec<u32> = Vec::new();
let mut dst_indices: Vec<u32> = Vec::new();
for (src_idx, dst_id) in src_indices.iter().zip(dst_id_list.iter()) {
if let Some(&dst_row) = dst_id_to_row.get(dst_id.as_str()) {
for (src_idx, dst_dense) in src_indices.iter().zip(dst_dense_list.iter()) {
if let Some(dst_row) = dense_to_row[*dst_dense as usize] {
final_src_indices.push(*src_idx);
dst_indices.push(dst_row as u32);
dst_indices.push(dst_row);
}
}