From c594aeb86809c3f352d2b4c3f7c869cc8d828013 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Mon, 8 Jun 2026 11:32:06 +0200 Subject: [PATCH] feat(engine): frontier-size Expand dispatcher + lazy CSR build MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the env-only mode switch with an auto policy: Expand uses the BTREE-indexed path when the source frontier is small and the hop count bounded (OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024, OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6), else the in-memory CSR. OMNIGRAPH_TRAVERSAL_MODE=indexed|csr still forces a mode. Make the CSR index lazy: thread a GraphIndexHandle (memoizing OnceCell over a Cached/Direct/None builder) through execute_query/execute_pipeline/ execute_rrf_query/execute_anti_join instead of a pre-built Option<&GraphIndex>. A query served entirely by the indexed path with no AntiJoin never pays the O(|E|) CSR build — the perf win of Tier 3. AntiJoin still realizes the index (its negation uses CSR has_neighbors). Net effect: selective traversals (the common case) skip the whole-graph CSR build and resolve neighbors from the persisted, incrementally-maintained src/dst BTREE. Existing traversal/aggregation/end_to_end/search suites now run the indexed path by default and stay green. Docs: constants.md (new env knobs), query-language.md (Expand dual path), indexes.md (graph index is lazy + the indexed alternative). --- crates/omnigraph/src/exec/query.rs | 152 +++++++++++++++++++++++------ docs/user/constants.md | 3 + docs/user/indexes.md | 4 +- docs/user/query-language.md | 2 +- 4 files changed, 127 insertions(+), 34 deletions(-) diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 01bef2b..63343e7 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -24,20 +24,14 @@ impl Omnigraph { .pipeline .iter() .any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. })); + // Lazy: an index-served query with no AntiJoin never builds the CSR. let graph_index = if needs_graph { - Some(self.graph_index_for_resolved(&resolved).await?) + GraphIndexHandle::cached(self, &resolved) } else { - None + GraphIndexHandle::none() }; - execute_query( - &ir, - params, - &resolved.snapshot, - graph_index.as_deref(), - &catalog, - ) - .await + execute_query(&ir, params, &resolved.snapshot, &graph_index, &catalog).await } /// Run a named query against the graph as it existed at a prior manifest version. @@ -64,18 +58,21 @@ impl Omnigraph { .pipeline .iter() .any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. })); + // Lazy build against this historical snapshot (not the RuntimeCache, + // which is keyed to live branch targets); only a CSR-path Expand or an + // AntiJoin triggers it. let graph_index = if needs_graph { let edge_types = catalog .edge_types .iter() .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone()))) .collect(); - Some(Arc::new(GraphIndex::build(&snapshot, &edge_types).await?)) + GraphIndexHandle::direct(&snapshot, edge_types) } else { - None + GraphIndexHandle::none() }; - execute_query(&ir, params, &snapshot, graph_index.as_deref(), &catalog).await + execute_query(&ir, params, &snapshot, &graph_index, &catalog).await } } @@ -342,7 +339,7 @@ pub async fn execute_query( ir: &QueryIR, params: &ParamMap, snapshot: &Snapshot, - graph_index: Option<&GraphIndex>, + graph_index: &GraphIndexHandle<'_>, catalog: &Catalog, ) -> Result { let search_mode = extract_search_mode(ir, params, catalog).await?; @@ -400,7 +397,7 @@ async fn execute_rrf_query( ir: &QueryIR, params: &ParamMap, snapshot: &Snapshot, - graph_index: Option<&GraphIndex>, + graph_index: &GraphIndexHandle<'_>, catalog: &Catalog, rrf: &RrfMode, ) -> Result { @@ -583,7 +580,7 @@ fn execute_pipeline<'a>( pipeline: &'a [IROp], params: &'a ParamMap, snapshot: &'a Snapshot, - graph_index: Option<&'a GraphIndex>, + graph_index: &'a GraphIndexHandle<'a>, catalog: &'a Catalog, wide: &'a mut Option, search_mode: &'a SearchMode, @@ -685,12 +682,70 @@ fn execute_pipeline<'a>( }) } -/// Traversal mode override for `execute_expand`. `OMNIGRAPH_TRAVERSAL_MODE`: -/// `indexed` forces the BTREE-backed path, `csr` forces the in-memory CSR path. -/// Unset (the default) currently selects CSR; the frontier-size auto policy and -/// lazy CSR build land with the dispatcher work. Read per call so tests can -/// force a mode — both modes are semantically identical, so a racing read only -/// changes which path runs, never the result. +/// Lazily provides the in-memory CSR graph index, building it on first use and +/// memoizing for the rest of the query. Indexed-mode Expand never asks for it, +/// so a query that is entirely index-served and has no AntiJoin never pays the +/// O(|E|) CSR build (the whole point of the indexed path). The `Cached` builder +/// also reuses the cross-query `RuntimeCache` entry; `Direct` builds against an +/// arbitrary snapshot (time-travel reads); `None` is for queries with no +/// traversal at all. +pub struct GraphIndexHandle<'a> { + cell: tokio::sync::OnceCell>>, + builder: GraphIndexBuilder<'a>, +} + +enum GraphIndexBuilder<'a> { + None, + Cached(&'a Omnigraph, &'a crate::db::ResolvedTarget), + Direct(&'a Snapshot, HashMap), +} + +impl<'a> GraphIndexHandle<'a> { + fn none() -> Self { + Self { + cell: tokio::sync::OnceCell::new(), + builder: GraphIndexBuilder::None, + } + } + + fn cached(db: &'a Omnigraph, resolved: &'a crate::db::ResolvedTarget) -> Self { + Self { + cell: tokio::sync::OnceCell::new(), + builder: GraphIndexBuilder::Cached(db, resolved), + } + } + + fn direct(snapshot: &'a Snapshot, edge_types: HashMap) -> Self { + Self { + cell: tokio::sync::OnceCell::new(), + builder: GraphIndexBuilder::Direct(snapshot, edge_types), + } + } + + /// The CSR index, built on first call. `None` only when the query needs no + /// traversal (the `None` builder). + async fn get(&self) -> Result> { + let built = self + .cell + .get_or_try_init(|| async { + match &self.builder { + GraphIndexBuilder::None => Ok::>, OmniError>(None), + GraphIndexBuilder::Cached(db, resolved) => { + Ok(Some(db.graph_index_for_resolved(resolved).await?)) + } + GraphIndexBuilder::Direct(snapshot, edge_types) => { + Ok(Some(Arc::new(GraphIndex::build(snapshot, edge_types).await?))) + } + } + }) + .await?; + Ok(built.as_deref()) + } +} + +/// Explicit traversal-mode override. `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` +/// forces the path (ops escape hatch + test hook). Both modes are semantically +/// identical, so the override only changes which path runs, never the result. fn traversal_indexed_override() -> Option { match std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok().as_deref() { Some("indexed") => Some(true), @@ -699,13 +754,48 @@ fn traversal_indexed_override() -> Option { } } +/// Max source-row frontier for which Expand uses the BTREE-indexed path. +/// Larger frontiers fall back to the in-memory CSR (dense / whole-graph). See +/// `docs/user/constants.md`. +const DEFAULT_EXPAND_INDEXED_MAX_FRONTIER: usize = 1024; +/// Max hop count for the indexed path (each hop is one indexed scan; very deep +/// traversals fan out toward whole-graph and are better served by CSR). +const DEFAULT_EXPAND_INDEXED_MAX_HOPS: u32 = 6; + +fn expand_indexed_max_frontier() -> usize { + std::env::var("OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_EXPAND_INDEXED_MAX_FRONTIER) +} + +fn expand_indexed_max_hops() -> u32 { + std::env::var("OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|&v| v > 0) + .unwrap_or(DEFAULT_EXPAND_INDEXED_MAX_HOPS) +} + +/// Choose the indexed Expand path for selective traversals: a small frontier +/// relative to the graph, bounded hop count. The explicit override wins when +/// set. A future refinement can make the frontier bound edge-count-relative +/// (indexed even for moderate frontiers on very large graphs). +fn should_use_indexed_expand(frontier_rows: usize, min_hops: u32, max_hops: Option) -> bool { + if let Some(forced) = traversal_indexed_override() { + return forced; + } + let effective_max = max_hops.unwrap_or(min_hops.max(1)); + frontier_rows <= expand_indexed_max_frontier() && effective_max <= expand_indexed_max_hops() +} + /// Execute a graph traversal (Expand). Dispatches to the BTREE-indexed path /// (selective traversals — neighbor lookups via the persisted src/dst index) or /// the in-memory CSR path (dense / whole-graph traversals). The CSR index is -/// required only by the CSR path. +/// built lazily and only the CSR path requests it. async fn execute_expand( wide: &mut RecordBatch, - graph_index: Option<&GraphIndex>, + graph_index: &GraphIndexHandle<'_>, snapshot: &Snapshot, catalog: &Catalog, src_var: &str, @@ -718,15 +808,14 @@ async fn execute_expand( dst_filters: &[IRFilter], params: &ParamMap, ) -> Result<()> { - let use_indexed = traversal_indexed_override().unwrap_or(false); - if use_indexed { + if should_use_indexed_expand(wide.num_rows(), min_hops, max_hops) { execute_expand_indexed( wide, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops, max_hops, dst_filters, params, ) .await } else { - let gi = graph_index.ok_or_else(|| { + let gi = graph_index.get().await?.ok_or_else(|| { OmniError::manifest("graph index required for CSR traversal".to_string()) })?; execute_expand_csr( @@ -1217,14 +1306,15 @@ async fn execute_anti_join( inner_pipeline: &[IROp], params: &ParamMap, snapshot: &Snapshot, - graph_index: Option<&GraphIndex>, + graph_index: &GraphIndexHandle<'_>, catalog: &Catalog, outer_var: &str, ) -> Result<()> { + // AntiJoin negation is served by the CSR `has_neighbors` check, so build the + // index up front (this is the one place the lazy handle is always realized). + let gi = graph_index.get().await?; // Fast path: bulk CSR existence check (O(N), zero Lance I/O) - if let Some(mask) = - try_bulk_anti_join_mask(wide, inner_pipeline, graph_index, catalog, outer_var) - { + if let Some(mask) = try_bulk_anti_join_mask(wide, inner_pipeline, gi, catalog, outer_var) { *wide = arrow_select::filter::filter_record_batch(wide, &mask) .map_err(|e| OmniError::Lance(e.to_string()))?; return Ok(()); diff --git a/docs/user/constants.md b/docs/user/constants.md index 210155e..74ee7fb 100644 --- a/docs/user/constants.md +++ b/docs/user/constants.md @@ -13,6 +13,9 @@ | Maintenance concurrency | `OMNIGRAPH_MAINTENANCE_CONCURRENCY=8` | `db/omnigraph/optimize.rs` | | Lance blob compaction support | `LANCE_SUPPORTS_BLOB_COMPACTION = false` | `db/omnigraph/optimize.rs` | | Graph index cache size | `8` (LRU) | `runtime_cache.rs` | +| Expand indexed-path max frontier | `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024` | `exec/query.rs` | +| Expand indexed-path max hops | `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6` | `exec/query.rs` | +| Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = auto) | `exec/query.rs` | | Default body limit | `1 MB` | `omnigraph-server/lib.rs` | | Ingest body limit | `32 MB` | `omnigraph-server/lib.rs` | | Engine embed model | `gemini-embedding-2-preview` | `omnigraph/embedding.rs` | diff --git a/docs/user/indexes.md b/docs/user/indexes.md index ce6c728..df898c4 100644 --- a/docs/user/indexes.md +++ b/docs/user/indexes.md @@ -21,6 +21,6 @@ This is OmniGraph-specific (not Lance): - `TypeIndex`: dense `u32 ↔ String id` mapping per node type. - `CsrIndex`: Compressed Sparse Row representation of edges per edge type — `offsets[i]..offsets[i+1]` slices into `targets`. -- `GraphIndex { type_indices, csr (out), csc (in) }` — built on demand from a snapshot's edge tables. +- `GraphIndex { type_indices, csr (out), csc (in) }` — built on demand from a snapshot's edge tables, **lazily**: only when an `Expand` the planner routes to the CSR path (dense / large frontier) or an `AntiJoin` actually needs it. - Cached in `RuntimeCache::graph_indices` (LRU, max 8 entries, keyed by snapshot id + edge table versions). -- Built only when an `Expand` or `AntiJoin` IR op is present in the lowered query, so pure scans skip it. +- Selective `Expand`s resolve neighbors from the persisted `src`/`dst` BTREE instead (one indexed scan per hop) and never trigger the CSR build; see [query-language](query-language.md) → Expand. Pure scans, and queries served entirely by the indexed traversal path, skip it. diff --git a/docs/user/query-language.md b/docs/user/query-language.md index 6c7516f..2bbfe03 100644 --- a/docs/user/query-language.md +++ b/docs/user/query-language.md @@ -79,7 +79,7 @@ Reason: under the staged-write rewire (MR-794), inserts and updates accumulate i Pipeline operations: - `NodeScan { variable, type_name, filters }` -- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune. +- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune. Executed one of two ways, chosen per-expand by frontier size: selective traversals (small frontier) resolve neighbors from the persisted `src`/`dst` BTREE (one indexed scan per hop, cost ∝ frontier); dense / whole-graph traversals fall back to the in-memory CSR adjacency index. Both produce identical results. Tunable via `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER` / `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS`, with `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` forcing a mode (see [constants](constants.md)). - `Filter { left, op, right }` - `AntiJoin { outer_var, inner: Vec }` — for `not { … }`