Merge pull request #47 from ModernRelay/perf/expand-dense-ids

perf(expand): dense u32 ids end-to-end (follow-up to #45)
This commit is contained in:
Ragnor Comerford 2026-04-25 23:54:03 +02:00 committed by GitHub
commit 189caf893c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -724,9 +724,11 @@ async fn execute_expand(
let same_type = src_type_name == dst_type_name;
// BFS to collect (src_row_idx, dst_id) pairs with per-source dedup
// BFS to collect (src_row_idx, dst_dense) pairs with per-source dedup.
// Dense u32 ids stay in hand through BFS, dedup, and align — we only
// stringify the unique set for Lance's SQL IN-list.
let mut src_indices: Vec<u32> = Vec::new();
let mut dst_id_list: Vec<String> = Vec::new();
let mut dst_dense_list: Vec<u32> = Vec::new();
for i in 0..src_ids.len() {
let src_id = src_ids.value(i);
let Some(src_dense) = src_type_idx.to_dense(src_id) else {
@ -736,7 +738,7 @@ async fn execute_expand(
// BFS with hop tracking
let mut frontier: Vec<u32> = vec![src_dense];
let mut visited: HashSet<u32> = HashSet::new();
let mut seen_dst_ids: HashSet<String> = HashSet::new();
let mut seen_dst_dense: HashSet<u32> = HashSet::new();
// Only track visited in the destination namespace for same-type edges
// (to avoid revisiting the source). For cross-type edges, dense indices
// are in different namespaces so collision is impossible.
@ -750,14 +752,9 @@ async fn execute_expand(
for &neighbor in adj.neighbors(node) {
if !same_type || visited.insert(neighbor) {
next_frontier.push(neighbor);
if hop >= min_hops {
if let Some(dst_id) = dst_type_idx.to_id(neighbor) {
let dst_id = dst_id.to_string();
if seen_dst_ids.insert(dst_id.clone()) {
src_indices.push(i as u32);
dst_id_list.push(dst_id);
}
}
if hop >= min_hops && seen_dst_dense.insert(neighbor) {
src_indices.push(i as u32);
dst_dense_list.push(neighbor);
}
}
}
@ -776,19 +773,20 @@ async fn execute_expand(
.filter(|f| ir_filter_to_sql(f, params).is_none())
.collect();
// Hydrate destination nodes from the snapshot (with pushed-down filters).
// Dedupe dst ids before the scan — the post-hydrate alignment HashMap fans
// them back out to the original (src, dst) pairs. For multi-hop on a small
// destination universe this collapses the IN-list from O(pairs) to
// O(unique_dst), often by 100500×.
let unique_dst_list: Vec<String> = {
let mut seen: HashSet<&str> = HashSet::with_capacity(dst_id_list.len());
dst_id_list
.iter()
.filter(|s| seen.insert(s.as_str()))
.cloned()
.collect()
};
// Dedup dst dense ids globally across source rows, then stringify once
// for the Lance IN-list. The post-hydrate alignment fans rows back out to
// the original (src, dst) pairs via a dense-indexed lookup below.
let mut unique_dst_list: Vec<String> = Vec::new();
{
let mut seen: HashSet<u32> = HashSet::with_capacity(dst_dense_list.len());
for &d in &dst_dense_list {
if seen.insert(d) {
if let Some(id) = dst_type_idx.to_id(d) {
unique_dst_list.push(id.to_string());
}
}
}
}
let dst_batch = hydrate_nodes(
snapshot,
catalog,
@ -798,24 +796,28 @@ async fn execute_expand(
)
.await?;
// Build a mapping from dst_id to row index in dst_batch
// Build dense → row-in-hydrated-batch via a direct-indexed array.
let dst_batch_id_col = dst_batch
.column_by_name("id")
.ok_or_else(|| OmniError::manifest("hydrated batch missing 'id' column".to_string()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest("hydrated 'id' column is not Utf8".to_string()))?;
let dst_id_to_row: HashMap<&str, usize> = (0..dst_batch_id_col.len())
.map(|i| (dst_batch_id_col.value(i), i))
.collect();
let mut dense_to_row: Vec<Option<u32>> = vec![None; dst_type_idx.len()];
for row in 0..dst_batch_id_col.len() {
let id_str = dst_batch_id_col.value(row);
if let Some(dense) = dst_type_idx.to_dense(id_str) {
dense_to_row[dense as usize] = Some(row as u32);
}
}
// Build aligned src/dst index arrays (only for IDs that exist in hydrated batch)
// Build aligned src/dst index arrays (only for ids that exist in hydrated batch)
let mut final_src_indices: Vec<u32> = Vec::new();
let mut dst_indices: Vec<u32> = Vec::new();
for (src_idx, dst_id) in src_indices.iter().zip(dst_id_list.iter()) {
if let Some(&dst_row) = dst_id_to_row.get(dst_id.as_str()) {
for (src_idx, dst_dense) in src_indices.iter().zip(dst_dense_list.iter()) {
if let Some(dst_row) = dense_to_row[*dst_dense as usize] {
final_src_indices.push(*src_idx);
dst_indices.push(dst_row as u32);
dst_indices.push(dst_row);
}
}