diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 5509b43..08400ec 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -893,16 +893,22 @@ async fn execute_expand_indexed( let max = max_hops.unwrap_or(min_hops.max(1)); - // Per-source BFS state in string-id space (no dense interning needed). + // Per-source BFS state in DENSE id space: intern node ids to u32 once via a + // per-traversal interner so visited/seen/frontier/neighbor-map avoid string + // hashing + cloning in the hot loop (mirrors the CSR path's TypeIndex). The + // GraphIndex/CSR is NOT built — only a local id↔u32 dictionary. Strings + // survive at the substrate edges only: the per-hop IN-list to Lance, and the + // emitted dst ids handed to the string-keyed hydrate+align tail. + let mut interner = crate::graph_index::TypeIndex::new(); let n = src_ids.len(); - let mut frontiers: Vec> = Vec::with_capacity(n); - let mut visited: Vec> = Vec::with_capacity(n); - let mut seen_dst: Vec> = Vec::with_capacity(n); + let mut frontiers: Vec> = Vec::with_capacity(n); + let mut visited: Vec> = Vec::with_capacity(n); + let mut seen_dst: Vec> = Vec::with_capacity(n); for i in 0..n { - let sid = src_ids.value(i).to_string(); + let sid = interner.get_or_insert(src_ids.value(i)); let mut v = HashSet::new(); if same_type { - v.insert(sid.clone()); + v.insert(sid); } frontiers.push(vec![sid]); visited.push(v); @@ -910,32 +916,41 @@ async fn execute_expand_indexed( } let mut src_indices: Vec = Vec::new(); - let mut dst_ids: Vec = Vec::new(); + let mut dst_dense: Vec = Vec::new(); for hop in 1..=max { - // One batched indexed scan per hop over the union of all live frontiers. - let mut union: Vec = Vec::new(); + // Union of all live frontiers (dense), de-interned once for the IN-list. + let mut union_dense: Vec = Vec::new(); { - let mut seen: HashSet<&str> = HashSet::new(); + let mut seen: HashSet = HashSet::new(); for f in &frontiers { - for node in f { - if seen.insert(node.as_str()) { - union.push(node.clone()); + for &node in f { + if seen.insert(node) { + union_dense.push(node); } } } } - if union.is_empty() { + if union_dense.is_empty() { break; } + let union_keys: Vec = union_dense + .iter() + .map(|&u| { + interner + .to_id(u) + .expect("interned frontier id must resolve") + .to_string() + }) + .collect(); let batches = crate::table_store::TableStore::scan_edges_by_endpoint( - &edge_ds, key_col, opp_col, &union, + &edge_ds, key_col, opp_col, &union_keys, ) .await?; - // key -> neighbors (scan order; duplicates preserved, like CSR multi-edges). - let mut neighbor_map: HashMap> = HashMap::new(); + // dense key -> dense neighbors (scan order; duplicates preserved, like CSR multi-edges). + let mut neighbor_map: HashMap> = HashMap::new(); for batch in &batches { let keys = batch .column_by_name(key_col) @@ -950,27 +965,26 @@ async fn execute_expand_indexed( .downcast_ref::() .ok_or_else(|| OmniError::manifest(format!("edge '{}' is not Utf8", opp_col)))?; for r in 0..batch.num_rows() { - neighbor_map - .entry(keys.value(r).to_string()) - .or_default() - .push(opps.value(r).to_string()); + let k = interner.get_or_insert(keys.value(r)); + let o = interner.get_or_insert(opps.value(r)); + neighbor_map.entry(k).or_default().push(o); } } - // Advance each source row's frontier independently. + // Advance each source row's frontier independently (dense ids). for i in 0..n { let cur = std::mem::take(&mut frontiers[i]); - let mut next: Vec = Vec::new(); - for node in &cur { - let Some(neighbors) = neighbor_map.get(node) else { + let mut next: Vec = Vec::new(); + for &node in &cur { + let Some(neighbors) = neighbor_map.get(&node) else { continue; }; - for neighbor in neighbors { - if !same_type || visited[i].insert(neighbor.clone()) { - next.push(neighbor.clone()); - if hop >= min_hops && seen_dst[i].insert(neighbor.clone()) { + for &neighbor in neighbors { + if !same_type || visited[i].insert(neighbor) { + next.push(neighbor); + if hop >= min_hops && seen_dst[i].insert(neighbor) { src_indices.push(i as u32); - dst_ids.push(neighbor.clone()); + dst_dense.push(neighbor); } } } @@ -979,6 +993,18 @@ async fn execute_expand_indexed( } } + // De-intern emitted destination ids (parallel to src_indices) for the + // string-keyed hydrate+align tail, exactly as the CSR path does. + let dst_ids: Vec = dst_dense + .iter() + .map(|&d| { + interner + .to_id(d) + .expect("interned dst id must resolve") + .to_string() + }) + .collect(); + expand_hydrate_and_align( wide, src_indices, dst_ids, snapshot, catalog, dst_type, dst_var, dst_filters, params, )