feat(engine): frontier-size Expand dispatcher + lazy CSR build

Replace the env-only mode switch with an auto policy: Expand uses the
BTREE-indexed path when the source frontier is small and the hop count bounded
(OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024, OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6),
else the in-memory CSR. OMNIGRAPH_TRAVERSAL_MODE=indexed|csr still forces a mode.

Make the CSR index lazy: thread a GraphIndexHandle (memoizing OnceCell over a
Cached/Direct/None builder) through execute_query/execute_pipeline/
execute_rrf_query/execute_anti_join instead of a pre-built Option<&GraphIndex>.
A query served entirely by the indexed path with no AntiJoin never pays the
O(|E|) CSR build — the perf win of Tier 3. AntiJoin still realizes the index
(its negation uses CSR has_neighbors).

Net effect: selective traversals (the common case) skip the whole-graph CSR
build and resolve neighbors from the persisted, incrementally-maintained
src/dst BTREE. Existing traversal/aggregation/end_to_end/search suites now run
the indexed path by default and stay green.

Docs: constants.md (new env knobs), query-language.md (Expand dual path),
indexes.md (graph index is lazy + the indexed alternative).
This commit is contained in:
Ragnor Comerford 2026-06-08 11:32:06 +02:00
parent 7337be60c8
commit c594aeb868
No known key found for this signature in database
4 changed files with 127 additions and 34 deletions

View file

@ -24,20 +24,14 @@ impl Omnigraph {
.pipeline
.iter()
.any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. }));
// Lazy: an index-served query with no AntiJoin never builds the CSR.
let graph_index = if needs_graph {
Some(self.graph_index_for_resolved(&resolved).await?)
GraphIndexHandle::cached(self, &resolved)
} else {
None
GraphIndexHandle::none()
};
execute_query(
&ir,
params,
&resolved.snapshot,
graph_index.as_deref(),
&catalog,
)
.await
execute_query(&ir, params, &resolved.snapshot, &graph_index, &catalog).await
}
/// Run a named query against the graph as it existed at a prior manifest version.
@ -64,18 +58,21 @@ impl Omnigraph {
.pipeline
.iter()
.any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. }));
// Lazy build against this historical snapshot (not the RuntimeCache,
// which is keyed to live branch targets); only a CSR-path Expand or an
// AntiJoin triggers it.
let graph_index = if needs_graph {
let edge_types = catalog
.edge_types
.iter()
.map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone())))
.collect();
Some(Arc::new(GraphIndex::build(&snapshot, &edge_types).await?))
GraphIndexHandle::direct(&snapshot, edge_types)
} else {
None
GraphIndexHandle::none()
};
execute_query(&ir, params, &snapshot, graph_index.as_deref(), &catalog).await
execute_query(&ir, params, &snapshot, &graph_index, &catalog).await
}
}
@ -342,7 +339,7 @@ pub async fn execute_query(
ir: &QueryIR,
params: &ParamMap,
snapshot: &Snapshot,
graph_index: Option<&GraphIndex>,
graph_index: &GraphIndexHandle<'_>,
catalog: &Catalog,
) -> Result<QueryResult> {
let search_mode = extract_search_mode(ir, params, catalog).await?;
@ -400,7 +397,7 @@ async fn execute_rrf_query(
ir: &QueryIR,
params: &ParamMap,
snapshot: &Snapshot,
graph_index: Option<&GraphIndex>,
graph_index: &GraphIndexHandle<'_>,
catalog: &Catalog,
rrf: &RrfMode,
) -> Result<QueryResult> {
@ -583,7 +580,7 @@ fn execute_pipeline<'a>(
pipeline: &'a [IROp],
params: &'a ParamMap,
snapshot: &'a Snapshot,
graph_index: Option<&'a GraphIndex>,
graph_index: &'a GraphIndexHandle<'a>,
catalog: &'a Catalog,
wide: &'a mut Option<RecordBatch>,
search_mode: &'a SearchMode,
@ -685,12 +682,70 @@ fn execute_pipeline<'a>(
})
}
/// Traversal mode override for `execute_expand`. `OMNIGRAPH_TRAVERSAL_MODE`:
/// `indexed` forces the BTREE-backed path, `csr` forces the in-memory CSR path.
/// Unset (the default) currently selects CSR; the frontier-size auto policy and
/// lazy CSR build land with the dispatcher work. Read per call so tests can
/// force a mode — both modes are semantically identical, so a racing read only
/// changes which path runs, never the result.
/// Lazily provides the in-memory CSR graph index, building it on first use and
/// memoizing for the rest of the query. Indexed-mode Expand never asks for it,
/// so a query that is entirely index-served and has no AntiJoin never pays the
/// O(|E|) CSR build (the whole point of the indexed path). The `Cached` builder
/// also reuses the cross-query `RuntimeCache` entry; `Direct` builds against an
/// arbitrary snapshot (time-travel reads); `None` is for queries with no
/// traversal at all.
pub struct GraphIndexHandle<'a> {
cell: tokio::sync::OnceCell<Option<Arc<GraphIndex>>>,
builder: GraphIndexBuilder<'a>,
}
enum GraphIndexBuilder<'a> {
None,
Cached(&'a Omnigraph, &'a crate::db::ResolvedTarget),
Direct(&'a Snapshot, HashMap<String, (String, String)>),
}
impl<'a> GraphIndexHandle<'a> {
fn none() -> Self {
Self {
cell: tokio::sync::OnceCell::new(),
builder: GraphIndexBuilder::None,
}
}
fn cached(db: &'a Omnigraph, resolved: &'a crate::db::ResolvedTarget) -> Self {
Self {
cell: tokio::sync::OnceCell::new(),
builder: GraphIndexBuilder::Cached(db, resolved),
}
}
fn direct(snapshot: &'a Snapshot, edge_types: HashMap<String, (String, String)>) -> Self {
Self {
cell: tokio::sync::OnceCell::new(),
builder: GraphIndexBuilder::Direct(snapshot, edge_types),
}
}
/// The CSR index, built on first call. `None` only when the query needs no
/// traversal (the `None` builder).
async fn get(&self) -> Result<Option<&GraphIndex>> {
let built = self
.cell
.get_or_try_init(|| async {
match &self.builder {
GraphIndexBuilder::None => Ok::<Option<Arc<GraphIndex>>, OmniError>(None),
GraphIndexBuilder::Cached(db, resolved) => {
Ok(Some(db.graph_index_for_resolved(resolved).await?))
}
GraphIndexBuilder::Direct(snapshot, edge_types) => {
Ok(Some(Arc::new(GraphIndex::build(snapshot, edge_types).await?)))
}
}
})
.await?;
Ok(built.as_deref())
}
}
/// Explicit traversal-mode override. `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr`
/// forces the path (ops escape hatch + test hook). Both modes are semantically
/// identical, so the override only changes which path runs, never the result.
fn traversal_indexed_override() -> Option<bool> {
match std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok().as_deref() {
Some("indexed") => Some(true),
@ -699,13 +754,48 @@ fn traversal_indexed_override() -> Option<bool> {
}
}
/// Max source-row frontier for which Expand uses the BTREE-indexed path.
/// Larger frontiers fall back to the in-memory CSR (dense / whole-graph). See
/// `docs/user/constants.md`.
const DEFAULT_EXPAND_INDEXED_MAX_FRONTIER: usize = 1024;
/// Max hop count for the indexed path (each hop is one indexed scan; very deep
/// traversals fan out toward whole-graph and are better served by CSR).
const DEFAULT_EXPAND_INDEXED_MAX_HOPS: u32 = 6;
fn expand_indexed_max_frontier() -> usize {
std::env::var("OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(DEFAULT_EXPAND_INDEXED_MAX_FRONTIER)
}
fn expand_indexed_max_hops() -> u32 {
std::env::var("OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS")
.ok()
.and_then(|v| v.parse::<u32>().ok())
.filter(|&v| v > 0)
.unwrap_or(DEFAULT_EXPAND_INDEXED_MAX_HOPS)
}
/// Choose the indexed Expand path for selective traversals: a small frontier
/// relative to the graph, bounded hop count. The explicit override wins when
/// set. A future refinement can make the frontier bound edge-count-relative
/// (indexed even for moderate frontiers on very large graphs).
fn should_use_indexed_expand(frontier_rows: usize, min_hops: u32, max_hops: Option<u32>) -> bool {
if let Some(forced) = traversal_indexed_override() {
return forced;
}
let effective_max = max_hops.unwrap_or(min_hops.max(1));
frontier_rows <= expand_indexed_max_frontier() && effective_max <= expand_indexed_max_hops()
}
/// Execute a graph traversal (Expand). Dispatches to the BTREE-indexed path
/// (selective traversals — neighbor lookups via the persisted src/dst index) or
/// the in-memory CSR path (dense / whole-graph traversals). The CSR index is
/// required only by the CSR path.
/// built lazily and only the CSR path requests it.
async fn execute_expand(
wide: &mut RecordBatch,
graph_index: Option<&GraphIndex>,
graph_index: &GraphIndexHandle<'_>,
snapshot: &Snapshot,
catalog: &Catalog,
src_var: &str,
@ -718,15 +808,14 @@ async fn execute_expand(
dst_filters: &[IRFilter],
params: &ParamMap,
) -> Result<()> {
let use_indexed = traversal_indexed_override().unwrap_or(false);
if use_indexed {
if should_use_indexed_expand(wide.num_rows(), min_hops, max_hops) {
execute_expand_indexed(
wide, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops,
max_hops, dst_filters, params,
)
.await
} else {
let gi = graph_index.ok_or_else(|| {
let gi = graph_index.get().await?.ok_or_else(|| {
OmniError::manifest("graph index required for CSR traversal".to_string())
})?;
execute_expand_csr(
@ -1217,14 +1306,15 @@ async fn execute_anti_join(
inner_pipeline: &[IROp],
params: &ParamMap,
snapshot: &Snapshot,
graph_index: Option<&GraphIndex>,
graph_index: &GraphIndexHandle<'_>,
catalog: &Catalog,
outer_var: &str,
) -> Result<()> {
// AntiJoin negation is served by the CSR `has_neighbors` check, so build the
// index up front (this is the one place the lazy handle is always realized).
let gi = graph_index.get().await?;
// Fast path: bulk CSR existence check (O(N), zero Lance I/O)
if let Some(mask) =
try_bulk_anti_join_mask(wide, inner_pipeline, graph_index, catalog, outer_var)
{
if let Some(mask) = try_bulk_anti_join_mask(wide, inner_pipeline, gi, catalog, outer_var) {
*wide = arrow_select::filter::filter_record_batch(wide, &mask)
.map_err(|e| OmniError::Lance(e.to_string()))?;
return Ok(());

View file

@ -13,6 +13,9 @@
| Maintenance concurrency | `OMNIGRAPH_MAINTENANCE_CONCURRENCY=8` | `db/omnigraph/optimize.rs` |
| Lance blob compaction support | `LANCE_SUPPORTS_BLOB_COMPACTION = false` | `db/omnigraph/optimize.rs` |
| Graph index cache size | `8` (LRU) | `runtime_cache.rs` |
| Expand indexed-path max frontier | `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024` | `exec/query.rs` |
| Expand indexed-path max hops | `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6` | `exec/query.rs` |
| Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = auto) | `exec/query.rs` |
| Default body limit | `1 MB` | `omnigraph-server/lib.rs` |
| Ingest body limit | `32 MB` | `omnigraph-server/lib.rs` |
| Engine embed model | `gemini-embedding-2-preview` | `omnigraph/embedding.rs` |

View file

@ -21,6 +21,6 @@ This is OmniGraph-specific (not Lance):
- `TypeIndex`: dense `u32 ↔ String id` mapping per node type.
- `CsrIndex`: Compressed Sparse Row representation of edges per edge type — `offsets[i]..offsets[i+1]` slices into `targets`.
- `GraphIndex { type_indices, csr (out), csc (in) }` — built on demand from a snapshot's edge tables.
- `GraphIndex { type_indices, csr (out), csc (in) }` — built on demand from a snapshot's edge tables, **lazily**: only when an `Expand` the planner routes to the CSR path (dense / large frontier) or an `AntiJoin` actually needs it.
- Cached in `RuntimeCache::graph_indices` (LRU, max 8 entries, keyed by snapshot id + edge table versions).
- Built only when an `Expand` or `AntiJoin` IR op is present in the lowered query, so pure scans skip it.
- Selective `Expand`s resolve neighbors from the persisted `src`/`dst` BTREE instead (one indexed scan per hop) and never trigger the CSR build; see [query-language](query-language.md) → Expand. Pure scans, and queries served entirely by the indexed traversal path, skip it.

View file

@ -79,7 +79,7 @@ Reason: under the staged-write rewire (MR-794), inserts and updates accumulate i
Pipeline operations:
- `NodeScan { variable, type_name, filters }`
- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune.
- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune. Executed one of two ways, chosen per-expand by frontier size: selective traversals (small frontier) resolve neighbors from the persisted `src`/`dst` BTREE (one indexed scan per hop, cost ∝ frontier); dense / whole-graph traversals fall back to the in-memory CSR adjacency index. Both produce identical results. Tunable via `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER` / `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS`, with `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` forcing a mode (see [constants](constants.md)).
- `Filter { left, op, right }`
- `AntiJoin { outer_var, inner: Vec<IROp> }` — for `not { … }`