mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Dedupe dst ids before hydrating nodes in execute_expand (#45)
The BFS in execute_expand emits one (src_idx, dst_id) pair per edge, so
dst_id_list contains heavy duplication when multi-hop traversals revisit
the same destination nodes. hydrate_nodes then built an
"id IN ('a', 'b', ...)" filter from the full list, passing it verbatim
to Lance. On a 30k-node Person graph, a 3-hop query produced a 15.4M-
entry IN-list against a 30k-row target — 512x more entries than unique
ids.
Deduplicate before the Lance scan; the post-hydrate alignment HashMap
already fans results back out to the original (src, dst) pairs, so
output is bit-identical.
Bench numbers (crates/omnigraph/examples/bench_expand.rs, min of 2-3
runs, release build):
query before after speedup
1k hop3 460 ms 28 ms 16x
10k hop2 4.21 s 188 ms 22x
10k hop3 40.59 s 1.30 s 31x
30k hop2 11.71 s 678 ms 17x
30k hop3 197.38 s 4.86 s 41x
All existing omnigraph-engine tests pass (72/72, 0 failures).
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
a1b00e2d06
commit
d8e0bfeb22
2 changed files with 294 additions and 2 deletions
273
crates/omnigraph/examples/bench_expand.rs
Normal file
273
crates/omnigraph/examples/bench_expand.rs
Normal file
|
|
@ -0,0 +1,273 @@
|
|||
//! Benchmark expand/traversal path and microbenchmark the per-row dedup overhead.
|
||||
//!
|
||||
//! Run: `cargo run --release --example bench_expand`
|
||||
//!
|
||||
//! Produces end-to-end query latency for 1/2/3-hop Match queries against a
|
||||
//! synthetic Person/Knows graph, plus a microbench comparing
|
||||
//! `HashSet<String>` vs `HashSet<u32>` dedup inside the BFS inner loop.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::hash::BuildHasherDefault;
|
||||
use std::time::Instant;
|
||||
|
||||
use omnigraph::db::{Omnigraph, ReadTarget};
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
use omnigraph_compiler::ir::ParamMap;
|
||||
|
||||
const SCHEMA: &str = r#"
|
||||
node Person {
|
||||
name: String @key
|
||||
age: I32?
|
||||
}
|
||||
edge Knows: Person -> Person
|
||||
"#;
|
||||
|
||||
const QUERIES: &str = r#"
|
||||
query hop1() {
|
||||
match {
|
||||
$a: Person
|
||||
$b: Person
|
||||
$a knows $b
|
||||
}
|
||||
return { $a.name, $b.name }
|
||||
}
|
||||
|
||||
query hop2() {
|
||||
match {
|
||||
$a: Person
|
||||
$b: Person
|
||||
$c: Person
|
||||
$a knows $b
|
||||
$b knows $c
|
||||
}
|
||||
return { $a.name, $c.name }
|
||||
}
|
||||
|
||||
query hop3() {
|
||||
match {
|
||||
$a: Person
|
||||
$b: Person
|
||||
$c: Person
|
||||
$d: Person
|
||||
$a knows $b
|
||||
$b knows $c
|
||||
$c knows $d
|
||||
}
|
||||
return { $a.name, $d.name }
|
||||
}
|
||||
"#;
|
||||
|
||||
/// Generate a synthetic graph: `n` persons, each with `avg_degree` outgoing Knows
|
||||
/// edges to random other persons (seeded, deterministic).
|
||||
fn generate_jsonl(n: usize, avg_degree: usize, seed: u64) -> String {
|
||||
let mut s = String::with_capacity(n * 80);
|
||||
for i in 0..n {
|
||||
s.push_str(&format!(
|
||||
r#"{{"type":"Person","data":{{"name":"p{}","age":{}}}}}"#,
|
||||
i,
|
||||
(i % 80) as i32 + 18
|
||||
));
|
||||
s.push('\n');
|
||||
}
|
||||
// Simple xorshift-ish PRNG to avoid adding a dep.
|
||||
let mut state: u64 = seed.wrapping_mul(0x9E3779B97F4A7C15).wrapping_add(1);
|
||||
let mut next = |state: &mut u64| {
|
||||
*state ^= *state << 13;
|
||||
*state ^= *state >> 7;
|
||||
*state ^= *state << 17;
|
||||
*state
|
||||
};
|
||||
for i in 0..n {
|
||||
for _ in 0..avg_degree {
|
||||
let j = (next(&mut state) as usize) % n;
|
||||
if j == i {
|
||||
continue;
|
||||
}
|
||||
s.push_str(&format!(
|
||||
r#"{{"edge":"Knows","from":"p{}","to":"p{}"}}"#,
|
||||
i, j
|
||||
));
|
||||
s.push('\n');
|
||||
}
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
async fn time_query(
|
||||
db: &Omnigraph,
|
||||
name: &str,
|
||||
runs: usize,
|
||||
) -> (std::time::Duration, std::time::Duration, usize) {
|
||||
let params = ParamMap::new();
|
||||
// Warm up once (builds graph index lazily, caches lance datasets).
|
||||
let warmup = db
|
||||
.query(ReadTarget::branch("main"), QUERIES, name, ¶ms)
|
||||
.await
|
||||
.expect("warmup query");
|
||||
let row_count: usize = warmup.num_rows();
|
||||
|
||||
let mut total = std::time::Duration::ZERO;
|
||||
let mut min = std::time::Duration::MAX;
|
||||
for _ in 0..runs {
|
||||
let t = Instant::now();
|
||||
let r = db
|
||||
.query(ReadTarget::branch("main"), QUERIES, name, ¶ms)
|
||||
.await
|
||||
.expect("query");
|
||||
let d = t.elapsed();
|
||||
std::hint::black_box(r);
|
||||
total += d;
|
||||
if d < min {
|
||||
min = d;
|
||||
}
|
||||
}
|
||||
(total / runs as u32, min, row_count)
|
||||
}
|
||||
|
||||
fn microbench_dedup() {
|
||||
// Simulate the per-source BFS dedup inner loop.
|
||||
// Inputs: 10k "sources" each expanding to 512 neighbors (3-hop-ish footprint).
|
||||
const SOURCES: usize = 10_000;
|
||||
const NEIGHBORS_PER_SRC: usize = 512;
|
||||
const UNIVERSE: usize = 10_000;
|
||||
|
||||
// Pre-generate dense ids and corresponding strings.
|
||||
let mut dense_streams: Vec<Vec<u32>> = Vec::with_capacity(SOURCES);
|
||||
let mut string_streams: Vec<Vec<String>> = Vec::with_capacity(SOURCES);
|
||||
let mut state: u64 = 0xdeadbeef;
|
||||
for _ in 0..SOURCES {
|
||||
let mut ds = Vec::with_capacity(NEIGHBORS_PER_SRC);
|
||||
for _ in 0..NEIGHBORS_PER_SRC {
|
||||
state ^= state << 13;
|
||||
state ^= state >> 7;
|
||||
state ^= state << 17;
|
||||
ds.push((state as usize % UNIVERSE) as u32);
|
||||
}
|
||||
let ss: Vec<String> = ds.iter().map(|d| format!("p{}", d)).collect();
|
||||
dense_streams.push(ds);
|
||||
string_streams.push(ss);
|
||||
}
|
||||
|
||||
// Variant A: HashSet<String> (current behavior)
|
||||
let t = Instant::now();
|
||||
let mut emitted_a: usize = 0;
|
||||
for ss in &string_streams {
|
||||
let mut seen: HashSet<String> = HashSet::new();
|
||||
for s in ss {
|
||||
if seen.insert(s.clone()) {
|
||||
emitted_a += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
let dur_string = t.elapsed();
|
||||
|
||||
// Variant B: HashSet<u32> (default hasher)
|
||||
let t = Instant::now();
|
||||
let mut emitted_b: usize = 0;
|
||||
for ds in &dense_streams {
|
||||
let mut seen: HashSet<u32> = HashSet::new();
|
||||
for &d in ds {
|
||||
if seen.insert(d) {
|
||||
emitted_b += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
let dur_u32 = t.elapsed();
|
||||
|
||||
// Variant C: HashSet<u32> with ahash (std's BuildHasherDefault<DefaultHasher> vs foldhash).
|
||||
// Use std::collections::HashSet with FxHasher-equivalent via BuildHasherDefault<DefaultHasher>
|
||||
// (skip if we don't want a dep). We'll compare against a bitmap instead.
|
||||
// Variant D: Vec<bool> bitmap
|
||||
let t = Instant::now();
|
||||
let mut emitted_d: usize = 0;
|
||||
let mut bitmap = vec![false; UNIVERSE];
|
||||
for ds in &dense_streams {
|
||||
// Reset only the bits we touched
|
||||
let mut touched: Vec<u32> = Vec::with_capacity(NEIGHBORS_PER_SRC);
|
||||
for &d in ds {
|
||||
let idx = d as usize;
|
||||
if !bitmap[idx] {
|
||||
bitmap[idx] = true;
|
||||
touched.push(d);
|
||||
emitted_d += 1;
|
||||
}
|
||||
}
|
||||
for &d in &touched {
|
||||
bitmap[d as usize] = false;
|
||||
}
|
||||
}
|
||||
let dur_bitmap = t.elapsed();
|
||||
|
||||
// Variant E: HashSet<u32> with FxHash-like (BuildHasherDefault<std::collections::hash_map::DefaultHasher> is default; skip).
|
||||
// Use ahash if available — fall back: just report three variants.
|
||||
|
||||
println!("\n── Microbench: per-source dedup inner loop ──");
|
||||
println!(
|
||||
" {} sources × {} neighbors (universe {})",
|
||||
SOURCES, NEIGHBORS_PER_SRC, UNIVERSE
|
||||
);
|
||||
println!(
|
||||
" HashSet<String> {:>9.2?} emitted={} → {:>6.1} ns/op",
|
||||
dur_string,
|
||||
emitted_a,
|
||||
dur_string.as_nanos() as f64 / (SOURCES * NEIGHBORS_PER_SRC) as f64
|
||||
);
|
||||
println!(
|
||||
" HashSet<u32> {:>9.2?} emitted={} → {:>6.1} ns/op (speedup {:.1}×)",
|
||||
dur_u32,
|
||||
emitted_b,
|
||||
dur_u32.as_nanos() as f64 / (SOURCES * NEIGHBORS_PER_SRC) as f64,
|
||||
dur_string.as_secs_f64() / dur_u32.as_secs_f64()
|
||||
);
|
||||
println!(
|
||||
" Vec<bool> bitmap {:>9.2?} emitted={} → {:>6.1} ns/op (speedup {:.1}×)",
|
||||
dur_bitmap,
|
||||
emitted_d,
|
||||
dur_bitmap.as_nanos() as f64 / (SOURCES * NEIGHBORS_PER_SRC) as f64,
|
||||
dur_string.as_secs_f64() / dur_bitmap.as_secs_f64()
|
||||
);
|
||||
// Silence unused hasher typedef warning
|
||||
let _ = std::marker::PhantomData::<BuildHasherDefault<std::collections::hash_map::DefaultHasher>>;
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() {
|
||||
println!("── End-to-end query latency ──");
|
||||
|
||||
for &(n, avg_deg, label) in &[
|
||||
(1_000usize, 4usize, "small (1k nodes, avg_deg=4)"),
|
||||
(10_000, 8, "medium (10k nodes, avg_deg=8)"),
|
||||
(30_000, 8, "large (30k nodes, avg_deg=8)"),
|
||||
] {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
let t = Instant::now();
|
||||
let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap();
|
||||
let init_elapsed = t.elapsed();
|
||||
|
||||
let jsonl = generate_jsonl(n, avg_deg, 42);
|
||||
let t = Instant::now();
|
||||
load_jsonl(&mut db, &jsonl, LoadMode::Overwrite).await.unwrap();
|
||||
let load_elapsed = t.elapsed();
|
||||
|
||||
println!(
|
||||
"\n[{}] init={:.2?} load={:.2?} jsonl_bytes={}",
|
||||
label,
|
||||
init_elapsed,
|
||||
load_elapsed,
|
||||
jsonl.len()
|
||||
);
|
||||
|
||||
for q in &["hop1", "hop2", "hop3"] {
|
||||
let runs = if *q == "hop3" && n >= 10_000 { 2 } else { 3 };
|
||||
let (mean, min, rows) = time_query(&db, q, runs).await;
|
||||
println!(
|
||||
" {:<6} mean={:>9.2?} min={:>9.2?} rows={:>7} runs={}",
|
||||
q, mean, min, rows, runs
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
microbench_dedup();
|
||||
}
|
||||
|
|
@ -776,8 +776,27 @@ async fn execute_expand(
|
|||
.filter(|f| ir_filter_to_sql(f, params).is_none())
|
||||
.collect();
|
||||
|
||||
// Hydrate destination nodes from the snapshot (with pushed-down filters)
|
||||
let dst_batch = hydrate_nodes(snapshot, catalog, dst_type, &dst_id_list, pushdown_sql.as_deref()).await?;
|
||||
// Hydrate destination nodes from the snapshot (with pushed-down filters).
|
||||
// Dedupe dst ids before the scan — the post-hydrate alignment HashMap fans
|
||||
// them back out to the original (src, dst) pairs. For multi-hop on a small
|
||||
// destination universe this collapses the IN-list from O(pairs) to
|
||||
// O(unique_dst), often by 100–500×.
|
||||
let unique_dst_list: Vec<String> = {
|
||||
let mut seen: HashSet<&str> = HashSet::with_capacity(dst_id_list.len());
|
||||
dst_id_list
|
||||
.iter()
|
||||
.filter(|s| seen.insert(s.as_str()))
|
||||
.cloned()
|
||||
.collect()
|
||||
};
|
||||
let dst_batch = hydrate_nodes(
|
||||
snapshot,
|
||||
catalog,
|
||||
dst_type,
|
||||
&unique_dst_list,
|
||||
pushdown_sql.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Build a mapping from dst_id to row index in dst_batch
|
||||
let dst_batch_id_col = dst_batch
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue