diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index b2d2fd8..435fec0 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -777,16 +777,144 @@ fn expand_indexed_max_hops() -> u32 { .unwrap_or(DEFAULT_EXPAND_INDEXED_MAX_HOPS) } -/// Choose the indexed Expand path for selective traversals: a small frontier -/// relative to the graph, bounded hop count. The explicit override wins when -/// set. A future refinement can make the frontier bound edge-count-relative -/// (indexed even for moderate frontiers on very large graphs). -fn should_use_indexed_expand(frontier_rows: usize, min_hops: u32, max_hops: Option) -> bool { - if let Some(forced) = traversal_indexed_override() { - return forced; +/// The two Expand execution paths the chooser dispatches between. Extensible: +/// a future persisted-adjacency artifact would become a third variant here, and +/// `choose_expand_mode` would learn to prefer it when covered. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ExpandMode { + /// Per-hop neighbor lookup via the persisted src/dst BTREE. Work scales + /// with the frontier, not |E| — best for selective traversals. + IndexedScan, + /// Whole-graph in-memory CSR (built once, reused). Best for dense / deep / + /// large-frontier traversals, or when the index is degraded and a full + /// scan would be paid per hop anyway. + Csr, +} + +/// Building the in-memory CSR costs more than a bare edge scan: it scans every +/// edge AND allocates + groups the adjacency. This factor expresses that +/// overhead so a one-off degraded single-hop scan can still edge out a full CSR +/// build. The crossover is insensitive to its exact value. +const CSR_BUILD_FACTOR: f64 = 1.5; + +/// Cardinality inputs for the (pure, IO-free) traversal-mode cost model. Every +/// field is a cheap manifest-resident count or an already-in-hand value — the +/// chooser performs no scans. +#[derive(Debug, Clone)] +struct ExpandCostInputs { + /// Current frontier size (`wide.num_rows()`). + frontier_rows: usize, + /// |E| for the edge type (manifest `row_count`). + edge_count: u64, + /// |V_src| — node count of the keyed endpoint type (manifest `row_count`). + src_node_count: u64, + /// Effective max hop count for this Expand. + effective_max_hops: u32, + /// Hard ceiling above which the indexed path is never used (resolved + /// `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS`). + max_hops_cap: u32, + /// Hard ceiling above which the indexed path is never used (resolved + /// `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER`). + max_frontier_cap: usize, + /// Whether `scan_edges_by_endpoint`'s `key_col IN (...)` is served by the + /// BTREE (`Indexed`) or silently falls back to a full scan (`Degraded`). + coverage: crate::table_store::IndexCoverage, + /// Whether the cross-query CSR for this snapshot+edge-version is already + /// built (making the CSR path ≈ free). Conservatively `false` until the + /// cache-peek is wired (the plan's optional refinement). + csr_cached: bool, +} + +/// Pure cost-based traversal-mode chooser. Compares an estimate of the indexed +/// path's frontier-relative work against the cost of building (or reusing) the +/// whole-graph CSR, and picks the cheaper. Deterministic and IO-free so it is +/// unit-tested at the crossover; the caller supplies the manifest counts and the +/// (optionally degraded) index coverage. +/// +/// Under `Indexed` coverage and a cold CSR the decision reduces to a clean +/// selectivity ratio — indexed wins when `hops * frontier < BUILD_FACTOR * +/// |V_src|`, i.e. when the frontier is a small fraction of the source vertex +/// set — which is independent of |E| (the flat-in-|E| property PR #149 shipped). +fn choose_expand_mode(i: &ExpandCostInputs) -> ExpandMode { + // Hard ceilings: very deep or very large frontiers fan out toward + // whole-graph and are always better served by CSR, regardless of the cost + // estimate. These preserve the documented semantics of the two cap flags. + if i.effective_max_hops > i.max_hops_cap || i.frontier_rows > i.max_frontier_cap { + return ExpandMode::Csr; + } + + let hops = i.effective_max_hops.max(1) as f64; + let frontier = i.frontier_rows as f64; + let edges = i.edge_count as f64; + let src = i.src_node_count.max(1) as f64; + let fanout = edges / src; + + // Indexed work scales with the frontier when the BTREE serves the IN-list; + // a degraded scan is a full edge scan per hop instead (the C6 perf cliff). + let indexed_cost = match i.coverage { + crate::table_store::IndexCoverage::Indexed => hops * frontier * fanout, + crate::table_store::IndexCoverage::Degraded { .. } => hops * edges, + }; + // A warm CSR is ~free to reuse; a cold one costs a build over all edges. + let csr_cost = if i.csr_cached { + 0.0 + } else { + CSR_BUILD_FACTOR * edges + }; + + if indexed_cost < csr_cost { + ExpandMode::IndexedScan + } else { + ExpandMode::Csr + } +} + +/// Gather the cost-model inputs from cheap manifest counts. `None` when the +/// edge type, its source node type, or their manifest entries are absent (e.g. +/// a not-yet-materialized table) — the caller then falls back to the legacy +/// frontier/hop ceiling so the decision is always defined. +fn gather_cost_inputs( + snapshot: &Snapshot, + catalog: &Catalog, + edge_type: &str, + direction: Direction, + frontier_rows: usize, + effective_max_hops: u32, + coverage: crate::table_store::IndexCoverage, +) -> Option { + let edge_entry = snapshot.entry(&format!("edge:{}", edge_type))?; + let edge_def = catalog.edge_types.get(edge_type)?; + // The frontier source vertices are the keyed endpoint's type: `from` for an + // Out traversal (keyed on `src`), `to` for In (keyed on `dst`). + let src_type = match direction { + Direction::Out => &edge_def.from_type, + Direction::In => &edge_def.to_type, + }; + let src_entry = snapshot.entry(&format!("node:{}", src_type))?; + Some(ExpandCostInputs { + frontier_rows, + edge_count: edge_entry.row_count, + src_node_count: src_entry.row_count, + effective_max_hops, + max_hops_cap: expand_indexed_max_hops(), + max_frontier_cap: expand_indexed_max_frontier(), + coverage, + csr_cached: false, + }) +} + +/// Coverage value to feed the cost decision. A failed coverage probe is treated +/// as `Degraded` (conservative: don't over-favor the indexed path when we can't +/// confirm the BTREE will serve the scan). +fn coverage_for_decision( + coverage: &Result, +) -> crate::table_store::IndexCoverage { + match coverage { + Ok(c) => c.clone(), + Err(_) => crate::table_store::IndexCoverage::Degraded { + reason: "coverage check failed".to_string(), + }, } - let effective_max = max_hops.unwrap_or(min_hops.max(1)); - frontier_rows <= expand_indexed_max_frontier() && effective_max <= expand_indexed_max_hops() } /// Surface the C6 silent scalar-index fallback (commit `5a7ab6d`): warn when the @@ -843,29 +971,111 @@ async fn execute_expand( dst_filters: &[IRFilter], params: &ParamMap, ) -> Result<()> { - if should_use_indexed_expand(wide.num_rows(), min_hops, max_hops) { - // Open the edge dataset once here and thread it into the indexed path, - // surfacing the C6 coverage fallback at the same point. - let (key_col, _) = endpoint_columns(direction); - let edge_ds = snapshot.open(&format!("edge:{}", edge_type)).await?; - let coverage = - crate::table_store::TableStore::key_column_index_coverage(&edge_ds, key_col).await; - warn_on_degraded_coverage(&coverage, key_col, edge_type); - execute_expand_indexed( - wide, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops, - max_hops, dst_filters, params, edge_ds, - ) - .await - } else { + let frontier_rows = wide.num_rows(); + let effective_max_hops = max_hops.unwrap_or(min_hops.max(1)); + let (key_col, _) = endpoint_columns(direction); + let edge_table_key = format!("edge:{}", edge_type); + + // Cardinality-first preliminary decision (no IO). The override wins; else the + // cost model decides under *optimistic* coverage. Optimistic is what lets us + // skip the dataset open on a clearly-CSR traversal: real coverage can only + // make the indexed path costlier, so if even a perfectly-indexed scan loses + // to CSR here, it loses for real. + let forced = traversal_indexed_override(); + let lean_indexed = match forced { + Some(v) => v, + None => match gather_cost_inputs( + snapshot, + catalog, + edge_type, + direction, + frontier_rows, + effective_max_hops, + crate::table_store::IndexCoverage::Indexed, + ) { + Some(inputs) => choose_expand_mode(&inputs) == ExpandMode::IndexedScan, + // Manifest counts absent (e.g. not-yet-materialized table): fall back + // to the legacy frontier/hop ceiling so the decision is defined. + None => { + frontier_rows <= expand_indexed_max_frontier() + && effective_max_hops <= expand_indexed_max_hops() + } + }, + }; + + if !lean_indexed { + tracing::debug!( + target: "omnigraph::traverse", + edge = %edge_type, + frontier = frontier_rows, + hops = effective_max_hops, + mode = "csr", + "expand mode chosen", + ); let gi = graph_index.get().await?.ok_or_else(|| { OmniError::manifest("graph index required for CSR traversal".to_string()) })?; - execute_expand_csr( + return execute_expand_csr( wide, gi, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops, max_hops, dst_filters, params, ) - .await + .await; } + + // Leaning indexed: open the edge dataset once, confirm real coverage, and + // (unless forced) re-decide with it. The opened dataset is threaded into the + // indexed path so it is never opened twice. + let edge_ds = snapshot.open(&edge_table_key).await?; + let coverage = + crate::table_store::TableStore::key_column_index_coverage(&edge_ds, key_col).await; + + if forced.is_none() { + if let Some(inputs) = gather_cost_inputs( + snapshot, + catalog, + edge_type, + direction, + frontier_rows, + effective_max_hops, + coverage_for_decision(&coverage), + ) { + if choose_expand_mode(&inputs) == ExpandMode::Csr { + tracing::debug!( + target: "omnigraph::traverse", + edge = %edge_type, + frontier = frontier_rows, + hops = effective_max_hops, + mode = "csr", + reason = "index coverage degraded", + "expand mode chosen", + ); + let gi = graph_index.get().await?.ok_or_else(|| { + OmniError::manifest("graph index required for CSR traversal".to_string()) + })?; + return execute_expand_csr( + wide, gi, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, + min_hops, max_hops, dst_filters, params, + ) + .await; + } + } + } + + tracing::debug!( + target: "omnigraph::traverse", + edge = %edge_type, + frontier = frontier_rows, + hops = effective_max_hops, + mode = "indexed", + "expand mode chosen", + ); + // Surface the C6 silent scalar-index fallback once, now that coverage is known. + warn_on_degraded_coverage(&coverage, key_col, edge_type); + execute_expand_indexed( + wide, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops, + max_hops, dst_filters, params, edge_ds, + ) + .await } /// BTREE-indexed graph traversal: per hop, batch the current frontier into one @@ -1849,3 +2059,110 @@ fn take_batch(batch: &RecordBatch, indices: &UInt32Array) -> Result .map_err(|e| OmniError::Lance(e.to_string()))?; RecordBatch::try_new(batch.schema(), columns).map_err(|e| OmniError::Lance(e.to_string())) } + +#[cfg(test)] +mod expand_chooser_tests { + use super::*; + use crate::table_store::IndexCoverage; + + /// Build cost inputs with generous hard caps, so the cost comparison (not a + /// ceiling) is what the assertions exercise unless a test sets one on purpose. + fn inputs( + frontier_rows: usize, + edge_count: u64, + src_node_count: u64, + effective_max_hops: u32, + coverage: IndexCoverage, + ) -> ExpandCostInputs { + ExpandCostInputs { + frontier_rows, + edge_count, + src_node_count, + effective_max_hops, + max_hops_cap: 6, + max_frontier_cap: 1024, + coverage, + csr_cached: false, + } + } + + #[test] + fn selective_frontier_on_large_graph_picks_indexed() { + // 50 source rows against 1M source vertices, one hop: tiny selectivity — + // the PR #149 win the chooser must preserve. + let m = choose_expand_mode(&inputs(50, 10_000_000, 1_000_000, 1, IndexCoverage::Indexed)); + assert_eq!(m, ExpandMode::IndexedScan); + } + + #[test] + fn flat_in_edge_count_same_selectivity_same_choice() { + // Same selectivity (frontier/|V_src|), 1000× difference in |E|. Indexed + // cost is independent of |E|, so the choice must not flip. + let small = choose_expand_mode(&inputs(50, 100_000, 1_000_000, 1, IndexCoverage::Indexed)); + let huge = + choose_expand_mode(&inputs(50, 100_000_000, 1_000_000, 1, IndexCoverage::Indexed)); + assert_eq!(small, ExpandMode::IndexedScan); + assert_eq!(huge, ExpandMode::IndexedScan); + } + + #[test] + fn frontier_large_fraction_of_source_picks_csr() { + // hops*frontier (200) exceeds BUILD_FACTOR*|V_src| (1.5*100=150) → CSR, + // and 200 is below the frontier cap, so it is the cost model deciding. + let m = choose_expand_mode(&inputs(200, 1_000, 100, 1, IndexCoverage::Indexed)); + assert_eq!(m, ExpandMode::Csr); + } + + #[test] + fn frontier_over_hard_cap_picks_csr() { + // 2000 > 1024 ceiling, even though the selectivity is tiny. + let m = choose_expand_mode(&inputs(2000, 10_000_000, 1_000_000, 1, IndexCoverage::Indexed)); + assert_eq!(m, ExpandMode::Csr); + } + + #[test] + fn hops_over_hard_cap_picks_csr() { + let m = choose_expand_mode(&inputs(10, 10_000_000, 1_000_000, 8, IndexCoverage::Indexed)); + assert_eq!(m, ExpandMode::Csr); + } + + #[test] + fn degraded_single_hop_tiny_frontier_stays_indexed() { + // One full degraded scan (1*|E|) still edges out a full CSR build + // (1.5*|E|) for a one-off single hop. + let m = choose_expand_mode(&inputs( + 5, + 10_000, + 10_000, + 1, + IndexCoverage::Degraded { + reason: "no btree".into(), + }, + )); + assert_eq!(m, ExpandMode::IndexedScan); + } + + #[test] + fn degraded_multi_hop_picks_csr() { + // Two degraded scans (2*|E|) lose to one CSR build (1.5*|E|). + let m = choose_expand_mode(&inputs( + 5, + 10_000, + 10_000, + 2, + IndexCoverage::Degraded { + reason: "no btree".into(), + }, + )); + assert_eq!(m, ExpandMode::Csr); + } + + #[test] + fn warm_csr_is_always_reused() { + // A maximally selective traversal still prefers an already-built CSR + // (cost ~0) over re-scanning per hop. + let mut i = inputs(1, 10_000_000, 1_000_000, 1, IndexCoverage::Indexed); + i.csr_cached = true; + assert_eq!(choose_expand_mode(&i), ExpandMode::Csr); + } +} diff --git a/crates/omnigraph/tests/traversal_indexed.rs b/crates/omnigraph/tests/traversal_indexed.rs index d7acb92..ecfab3d 100644 --- a/crates/omnigraph/tests/traversal_indexed.rs +++ b/crates/omnigraph/tests/traversal_indexed.rs @@ -47,18 +47,25 @@ async fn sorted_names(db: &mut Omnigraph, queries: &str, name: &str, params: &Pa v } -/// Run the same query under CSR then indexed mode; assert identical results and -/// return them. +/// Run the same query under CSR, indexed, and auto (cost-chooser) modes; assert +/// all three produce identical results and return them. The auto pass exercises +/// `choose_expand_mode` end to end: whichever path it selects, the rows must +/// match the forced paths (the chooser changes which path runs, never the result). async fn both_modes(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec { set_mode("csr"); let csr = sorted_names(db, queries, name, params).await; set_mode("indexed"); let indexed = sorted_names(db, queries, name, params).await; clear_mode(); + let auto = sorted_names(db, queries, name, params).await; assert_eq!( indexed, csr, "indexed Expand must produce identical results to CSR for query '{name}'" ); + assert_eq!( + auto, csr, + "auto (cost-chooser) Expand must produce identical results to the forced paths for query '{name}'" + ); indexed } diff --git a/docs/user/constants.md b/docs/user/constants.md index 74ee7fb..8a45ca7 100644 --- a/docs/user/constants.md +++ b/docs/user/constants.md @@ -13,9 +13,10 @@ | Maintenance concurrency | `OMNIGRAPH_MAINTENANCE_CONCURRENCY=8` | `db/omnigraph/optimize.rs` | | Lance blob compaction support | `LANCE_SUPPORTS_BLOB_COMPACTION = false` | `db/omnigraph/optimize.rs` | | Graph index cache size | `8` (LRU) | `runtime_cache.rs` | -| Expand indexed-path max frontier | `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024` | `exec/query.rs` | -| Expand indexed-path max hops | `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6` | `exec/query.rs` | -| Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = auto) | `exec/query.rs` | +| Expand indexed-path frontier ceiling | `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024` | `exec/query.rs` | +| Expand indexed-path hop ceiling | `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6` | `exec/query.rs` | +| Expand CSR-build cost factor | `CSR_BUILD_FACTOR = 1.5` | `exec/query.rs` | +| Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = cost-based auto) | `exec/query.rs` | | Default body limit | `1 MB` | `omnigraph-server/lib.rs` | | Ingest body limit | `32 MB` | `omnigraph-server/lib.rs` | | Engine embed model | `gemini-embedding-2-preview` | `omnigraph/embedding.rs` | @@ -24,3 +25,14 @@ | Embed retries | `4` | both clients | | Embed retry backoff | `200 ms` | both clients | | LANCE memory pool default | `1 GB` (raised in v0.3.0) | runtime | + +**Expand traversal dispatch.** With `OMNIGRAPH_TRAVERSAL_MODE` unset, the engine +chooses the indexed (per-hop BTREE) vs CSR (whole-graph in-memory) path with a +cost model over cheap manifest counts (frontier size, |E|, source-vertex count, +hops) plus the index-coverage signal: the indexed path is preferred when its +frontier-relative work beats building the CSR (≈ when `hops × frontier` is a +small fraction of the source-vertex set), and CSR is preferred for dense/deep +traversals or when the BTREE coverage is degraded and a full scan would be paid +per hop. The two ceilings above are hard caps — beyond them CSR is always used — +and the override flag forces a path (the `auto` result is identical either way; +only the path differs). diff --git a/docs/user/query-language.md b/docs/user/query-language.md index 2bbfe03..9ed5686 100644 --- a/docs/user/query-language.md +++ b/docs/user/query-language.md @@ -79,7 +79,7 @@ Reason: under the staged-write rewire (MR-794), inserts and updates accumulate i Pipeline operations: - `NodeScan { variable, type_name, filters }` -- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune. Executed one of two ways, chosen per-expand by frontier size: selective traversals (small frontier) resolve neighbors from the persisted `src`/`dst` BTREE (one indexed scan per hop, cost ∝ frontier); dense / whole-graph traversals fall back to the in-memory CSR adjacency index. Both produce identical results. Tunable via `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER` / `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS`, with `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` forcing a mode (see [constants](constants.md)). +- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune. Executed one of two ways, chosen per-expand by a cost model over cheap manifest counts (frontier size, |E|, source-vertex count, hops) plus index coverage: selective traversals (small frontier relative to the source set) resolve neighbors from the persisted `src`/`dst` BTREE (one indexed scan per hop, cost ∝ frontier); dense / deep / large-frontier traversals — or those whose BTREE coverage is degraded so a full scan would be paid per hop — use the in-memory CSR adjacency index. Both produce identical results. Bounded by `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER` / `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS` (hard ceilings, beyond which CSR is always used), with `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` forcing a mode (see [constants](constants.md)). - `Filter { left, op, right }` - `AntiJoin { outer_var, inner: Vec }` — for `not { … }`