diff --git a/crates/omnigraph/examples/bench_expand.rs b/crates/omnigraph/examples/bench_expand.rs new file mode 100644 index 0000000..ed4ae5d --- /dev/null +++ b/crates/omnigraph/examples/bench_expand.rs @@ -0,0 +1,273 @@ +//! Benchmark expand/traversal path and microbenchmark the per-row dedup overhead. +//! +//! Run: `cargo run --release --example bench_expand` +//! +//! Produces end-to-end query latency for 1/2/3-hop Match queries against a +//! synthetic Person/Knows graph, plus a microbench comparing +//! `HashSet` vs `HashSet` dedup inside the BFS inner loop. + +use std::collections::HashSet; +use std::hash::BuildHasherDefault; +use std::time::Instant; + +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; + +const SCHEMA: &str = r#" +node Person { + name: String @key + age: I32? +} +edge Knows: Person -> Person +"#; + +const QUERIES: &str = r#" +query hop1() { + match { + $a: Person + $b: Person + $a knows $b + } + return { $a.name, $b.name } +} + +query hop2() { + match { + $a: Person + $b: Person + $c: Person + $a knows $b + $b knows $c + } + return { $a.name, $c.name } +} + +query hop3() { + match { + $a: Person + $b: Person + $c: Person + $d: Person + $a knows $b + $b knows $c + $c knows $d + } + return { $a.name, $d.name } +} +"#; + +/// Generate a synthetic graph: `n` persons, each with `avg_degree` outgoing Knows +/// edges to random other persons (seeded, deterministic). +fn generate_jsonl(n: usize, avg_degree: usize, seed: u64) -> String { + let mut s = String::with_capacity(n * 80); + for i in 0..n { + s.push_str(&format!( + r#"{{"type":"Person","data":{{"name":"p{}","age":{}}}}}"#, + i, + (i % 80) as i32 + 18 + )); + s.push('\n'); + } + // Simple xorshift-ish PRNG to avoid adding a dep. + let mut state: u64 = seed.wrapping_mul(0x9E3779B97F4A7C15).wrapping_add(1); + let mut next = |state: &mut u64| { + *state ^= *state << 13; + *state ^= *state >> 7; + *state ^= *state << 17; + *state + }; + for i in 0..n { + for _ in 0..avg_degree { + let j = (next(&mut state) as usize) % n; + if j == i { + continue; + } + s.push_str(&format!( + r#"{{"edge":"Knows","from":"p{}","to":"p{}"}}"#, + i, j + )); + s.push('\n'); + } + } + s +} + +async fn time_query( + db: &Omnigraph, + name: &str, + runs: usize, +) -> (std::time::Duration, std::time::Duration, usize) { + let params = ParamMap::new(); + // Warm up once (builds graph index lazily, caches lance datasets). + let warmup = db + .query(ReadTarget::branch("main"), QUERIES, name, ¶ms) + .await + .expect("warmup query"); + let row_count: usize = warmup.num_rows(); + + let mut total = std::time::Duration::ZERO; + let mut min = std::time::Duration::MAX; + for _ in 0..runs { + let t = Instant::now(); + let r = db + .query(ReadTarget::branch("main"), QUERIES, name, ¶ms) + .await + .expect("query"); + let d = t.elapsed(); + std::hint::black_box(r); + total += d; + if d < min { + min = d; + } + } + (total / runs as u32, min, row_count) +} + +fn microbench_dedup() { + // Simulate the per-source BFS dedup inner loop. + // Inputs: 10k "sources" each expanding to 512 neighbors (3-hop-ish footprint). + const SOURCES: usize = 10_000; + const NEIGHBORS_PER_SRC: usize = 512; + const UNIVERSE: usize = 10_000; + + // Pre-generate dense ids and corresponding strings. + let mut dense_streams: Vec> = Vec::with_capacity(SOURCES); + let mut string_streams: Vec> = Vec::with_capacity(SOURCES); + let mut state: u64 = 0xdeadbeef; + for _ in 0..SOURCES { + let mut ds = Vec::with_capacity(NEIGHBORS_PER_SRC); + for _ in 0..NEIGHBORS_PER_SRC { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + ds.push((state as usize % UNIVERSE) as u32); + } + let ss: Vec = ds.iter().map(|d| format!("p{}", d)).collect(); + dense_streams.push(ds); + string_streams.push(ss); + } + + // Variant A: HashSet (current behavior) + let t = Instant::now(); + let mut emitted_a: usize = 0; + for ss in &string_streams { + let mut seen: HashSet = HashSet::new(); + for s in ss { + if seen.insert(s.clone()) { + emitted_a += 1; + } + } + } + let dur_string = t.elapsed(); + + // Variant B: HashSet (default hasher) + let t = Instant::now(); + let mut emitted_b: usize = 0; + for ds in &dense_streams { + let mut seen: HashSet = HashSet::new(); + for &d in ds { + if seen.insert(d) { + emitted_b += 1; + } + } + } + let dur_u32 = t.elapsed(); + + // Variant C: HashSet with ahash (std's BuildHasherDefault vs foldhash). + // Use std::collections::HashSet with FxHasher-equivalent via BuildHasherDefault + // (skip if we don't want a dep). We'll compare against a bitmap instead. + // Variant D: Vec bitmap + let t = Instant::now(); + let mut emitted_d: usize = 0; + let mut bitmap = vec![false; UNIVERSE]; + for ds in &dense_streams { + // Reset only the bits we touched + let mut touched: Vec = Vec::with_capacity(NEIGHBORS_PER_SRC); + for &d in ds { + let idx = d as usize; + if !bitmap[idx] { + bitmap[idx] = true; + touched.push(d); + emitted_d += 1; + } + } + for &d in &touched { + bitmap[d as usize] = false; + } + } + let dur_bitmap = t.elapsed(); + + // Variant E: HashSet with FxHash-like (BuildHasherDefault is default; skip). + // Use ahash if available — fall back: just report three variants. + + println!("\n── Microbench: per-source dedup inner loop ──"); + println!( + " {} sources × {} neighbors (universe {})", + SOURCES, NEIGHBORS_PER_SRC, UNIVERSE + ); + println!( + " HashSet {:>9.2?} emitted={} → {:>6.1} ns/op", + dur_string, + emitted_a, + dur_string.as_nanos() as f64 / (SOURCES * NEIGHBORS_PER_SRC) as f64 + ); + println!( + " HashSet {:>9.2?} emitted={} → {:>6.1} ns/op (speedup {:.1}×)", + dur_u32, + emitted_b, + dur_u32.as_nanos() as f64 / (SOURCES * NEIGHBORS_PER_SRC) as f64, + dur_string.as_secs_f64() / dur_u32.as_secs_f64() + ); + println!( + " Vec bitmap {:>9.2?} emitted={} → {:>6.1} ns/op (speedup {:.1}×)", + dur_bitmap, + emitted_d, + dur_bitmap.as_nanos() as f64 / (SOURCES * NEIGHBORS_PER_SRC) as f64, + dur_string.as_secs_f64() / dur_bitmap.as_secs_f64() + ); + // Silence unused hasher typedef warning + let _ = std::marker::PhantomData::>; +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() { + println!("── End-to-end query latency ──"); + + for &(n, avg_deg, label) in &[ + (1_000usize, 4usize, "small (1k nodes, avg_deg=4)"), + (10_000, 8, "medium (10k nodes, avg_deg=8)"), + (30_000, 8, "large (30k nodes, avg_deg=8)"), + ] { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + let t = Instant::now(); + let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap(); + let init_elapsed = t.elapsed(); + + let jsonl = generate_jsonl(n, avg_deg, 42); + let t = Instant::now(); + load_jsonl(&mut db, &jsonl, LoadMode::Overwrite).await.unwrap(); + let load_elapsed = t.elapsed(); + + println!( + "\n[{}] init={:.2?} load={:.2?} jsonl_bytes={}", + label, + init_elapsed, + load_elapsed, + jsonl.len() + ); + + for q in &["hop1", "hop2", "hop3"] { + let runs = if *q == "hop3" && n >= 10_000 { 2 } else { 3 }; + let (mean, min, rows) = time_query(&db, q, runs).await; + println!( + " {:<6} mean={:>9.2?} min={:>9.2?} rows={:>7} runs={}", + q, mean, min, rows, runs + ); + } + } + + microbench_dedup(); +} diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 9a69381..defabf5 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -776,8 +776,27 @@ async fn execute_expand( .filter(|f| ir_filter_to_sql(f, params).is_none()) .collect(); - // Hydrate destination nodes from the snapshot (with pushed-down filters) - let dst_batch = hydrate_nodes(snapshot, catalog, dst_type, &dst_id_list, pushdown_sql.as_deref()).await?; + // Hydrate destination nodes from the snapshot (with pushed-down filters). + // Dedupe dst ids before the scan — the post-hydrate alignment HashMap fans + // them back out to the original (src, dst) pairs. For multi-hop on a small + // destination universe this collapses the IN-list from O(pairs) to + // O(unique_dst), often by 100–500×. + let unique_dst_list: Vec = { + let mut seen: HashSet<&str> = HashSet::with_capacity(dst_id_list.len()); + dst_id_list + .iter() + .filter(|s| seen.insert(s.as_str())) + .cloned() + .collect() + }; + let dst_batch = hydrate_nodes( + snapshot, + catalog, + dst_type, + &unique_dst_list, + pushdown_sql.as_deref(), + ) + .await?; // Build a mapping from dst_id to row index in dst_batch let dst_batch_id_col = dst_batch