feat(engine): cost-based Expand dispatch chooser (C5)

Replace the fixed frontier<=1024 && hops<=6 dispatch threshold with a pure,
IO-free cost model. choose_expand_mode compares the indexed path's
frontier-relative work (hops * frontier * fanout, or hops * |E| when BTREE
coverage is degraded) against the cost of building the whole-graph CSR
(BUILD_FACTOR * |E|), from cheap manifest row counts. Under good coverage this
reduces to a selectivity ratio independent of |E|, preserving the flat-in-|E|
indexed win for selective traversals while routing dense / deep / high-fanout
or degraded-and-expensive traversals to CSR.

execute_expand decides cardinality-first and only opens the edge dataset to
confirm coverage when it leans indexed (no open on a clearly-CSR traversal).
The two env knobs become hard ceilings layered on the model; the
OMNIGRAPH_TRAVERSAL_MODE override still forces a path; the chosen mode is
traced. Results are unchanged across modes — only the path differs.

Adds inline crossover unit tests and extends the traversal_indexed both_modes
harness with an auto pass asserting the chooser is result-preserving across
every traversal shape. Documents the new flag semantics in
docs/user/{constants,query-language}.md.
This commit is contained in:
Ragnor Comerford 2026-06-09 09:12:00 +02:00
parent 236e66c789
commit 787d41ec5f
No known key found for this signature in database
4 changed files with 367 additions and 31 deletions

View file

@ -777,16 +777,144 @@ fn expand_indexed_max_hops() -> u32 {
.unwrap_or(DEFAULT_EXPAND_INDEXED_MAX_HOPS)
}
/// Choose the indexed Expand path for selective traversals: a small frontier
/// relative to the graph, bounded hop count. The explicit override wins when
/// set. A future refinement can make the frontier bound edge-count-relative
/// (indexed even for moderate frontiers on very large graphs).
fn should_use_indexed_expand(frontier_rows: usize, min_hops: u32, max_hops: Option<u32>) -> bool {
if let Some(forced) = traversal_indexed_override() {
return forced;
/// The two Expand execution paths the chooser dispatches between. Extensible:
/// a future persisted-adjacency artifact would become a third variant here, and
/// `choose_expand_mode` would learn to prefer it when covered.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ExpandMode {
/// Per-hop neighbor lookup via the persisted src/dst BTREE. Work scales
/// with the frontier, not |E| — best for selective traversals.
IndexedScan,
/// Whole-graph in-memory CSR (built once, reused). Best for dense / deep /
/// large-frontier traversals, or when the index is degraded and a full
/// scan would be paid per hop anyway.
Csr,
}
/// Building the in-memory CSR costs more than a bare edge scan: it scans every
/// edge AND allocates + groups the adjacency. This factor expresses that
/// overhead so a one-off degraded single-hop scan can still edge out a full CSR
/// build. The crossover is insensitive to its exact value.
const CSR_BUILD_FACTOR: f64 = 1.5;
/// Cardinality inputs for the (pure, IO-free) traversal-mode cost model. Every
/// field is a cheap manifest-resident count or an already-in-hand value — the
/// chooser performs no scans.
#[derive(Debug, Clone)]
struct ExpandCostInputs {
/// Current frontier size (`wide.num_rows()`).
frontier_rows: usize,
/// |E| for the edge type (manifest `row_count`).
edge_count: u64,
/// |V_src| — node count of the keyed endpoint type (manifest `row_count`).
src_node_count: u64,
/// Effective max hop count for this Expand.
effective_max_hops: u32,
/// Hard ceiling above which the indexed path is never used (resolved
/// `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS`).
max_hops_cap: u32,
/// Hard ceiling above which the indexed path is never used (resolved
/// `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER`).
max_frontier_cap: usize,
/// Whether `scan_edges_by_endpoint`'s `key_col IN (...)` is served by the
/// BTREE (`Indexed`) or silently falls back to a full scan (`Degraded`).
coverage: crate::table_store::IndexCoverage,
/// Whether the cross-query CSR for this snapshot+edge-version is already
/// built (making the CSR path ≈ free). Conservatively `false` until the
/// cache-peek is wired (the plan's optional refinement).
csr_cached: bool,
}
/// Pure cost-based traversal-mode chooser. Compares an estimate of the indexed
/// path's frontier-relative work against the cost of building (or reusing) the
/// whole-graph CSR, and picks the cheaper. Deterministic and IO-free so it is
/// unit-tested at the crossover; the caller supplies the manifest counts and the
/// (optionally degraded) index coverage.
///
/// Under `Indexed` coverage and a cold CSR the decision reduces to a clean
/// selectivity ratio — indexed wins when `hops * frontier < BUILD_FACTOR *
/// |V_src|`, i.e. when the frontier is a small fraction of the source vertex
/// set — which is independent of |E| (the flat-in-|E| property PR #149 shipped).
fn choose_expand_mode(i: &ExpandCostInputs) -> ExpandMode {
// Hard ceilings: very deep or very large frontiers fan out toward
// whole-graph and are always better served by CSR, regardless of the cost
// estimate. These preserve the documented semantics of the two cap flags.
if i.effective_max_hops > i.max_hops_cap || i.frontier_rows > i.max_frontier_cap {
return ExpandMode::Csr;
}
let hops = i.effective_max_hops.max(1) as f64;
let frontier = i.frontier_rows as f64;
let edges = i.edge_count as f64;
let src = i.src_node_count.max(1) as f64;
let fanout = edges / src;
// Indexed work scales with the frontier when the BTREE serves the IN-list;
// a degraded scan is a full edge scan per hop instead (the C6 perf cliff).
let indexed_cost = match i.coverage {
crate::table_store::IndexCoverage::Indexed => hops * frontier * fanout,
crate::table_store::IndexCoverage::Degraded { .. } => hops * edges,
};
// A warm CSR is ~free to reuse; a cold one costs a build over all edges.
let csr_cost = if i.csr_cached {
0.0
} else {
CSR_BUILD_FACTOR * edges
};
if indexed_cost < csr_cost {
ExpandMode::IndexedScan
} else {
ExpandMode::Csr
}
}
/// Gather the cost-model inputs from cheap manifest counts. `None` when the
/// edge type, its source node type, or their manifest entries are absent (e.g.
/// a not-yet-materialized table) — the caller then falls back to the legacy
/// frontier/hop ceiling so the decision is always defined.
fn gather_cost_inputs(
snapshot: &Snapshot,
catalog: &Catalog,
edge_type: &str,
direction: Direction,
frontier_rows: usize,
effective_max_hops: u32,
coverage: crate::table_store::IndexCoverage,
) -> Option<ExpandCostInputs> {
let edge_entry = snapshot.entry(&format!("edge:{}", edge_type))?;
let edge_def = catalog.edge_types.get(edge_type)?;
// The frontier source vertices are the keyed endpoint's type: `from` for an
// Out traversal (keyed on `src`), `to` for In (keyed on `dst`).
let src_type = match direction {
Direction::Out => &edge_def.from_type,
Direction::In => &edge_def.to_type,
};
let src_entry = snapshot.entry(&format!("node:{}", src_type))?;
Some(ExpandCostInputs {
frontier_rows,
edge_count: edge_entry.row_count,
src_node_count: src_entry.row_count,
effective_max_hops,
max_hops_cap: expand_indexed_max_hops(),
max_frontier_cap: expand_indexed_max_frontier(),
coverage,
csr_cached: false,
})
}
/// Coverage value to feed the cost decision. A failed coverage probe is treated
/// as `Degraded` (conservative: don't over-favor the indexed path when we can't
/// confirm the BTREE will serve the scan).
fn coverage_for_decision(
coverage: &Result<crate::table_store::IndexCoverage>,
) -> crate::table_store::IndexCoverage {
match coverage {
Ok(c) => c.clone(),
Err(_) => crate::table_store::IndexCoverage::Degraded {
reason: "coverage check failed".to_string(),
},
}
let effective_max = max_hops.unwrap_or(min_hops.max(1));
frontier_rows <= expand_indexed_max_frontier() && effective_max <= expand_indexed_max_hops()
}
/// Surface the C6 silent scalar-index fallback (commit `5a7ab6d`): warn when the
@ -843,29 +971,111 @@ async fn execute_expand(
dst_filters: &[IRFilter],
params: &ParamMap,
) -> Result<()> {
if should_use_indexed_expand(wide.num_rows(), min_hops, max_hops) {
// Open the edge dataset once here and thread it into the indexed path,
// surfacing the C6 coverage fallback at the same point.
let (key_col, _) = endpoint_columns(direction);
let edge_ds = snapshot.open(&format!("edge:{}", edge_type)).await?;
let coverage =
crate::table_store::TableStore::key_column_index_coverage(&edge_ds, key_col).await;
warn_on_degraded_coverage(&coverage, key_col, edge_type);
execute_expand_indexed(
wide, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops,
max_hops, dst_filters, params, edge_ds,
)
.await
} else {
let frontier_rows = wide.num_rows();
let effective_max_hops = max_hops.unwrap_or(min_hops.max(1));
let (key_col, _) = endpoint_columns(direction);
let edge_table_key = format!("edge:{}", edge_type);
// Cardinality-first preliminary decision (no IO). The override wins; else the
// cost model decides under *optimistic* coverage. Optimistic is what lets us
// skip the dataset open on a clearly-CSR traversal: real coverage can only
// make the indexed path costlier, so if even a perfectly-indexed scan loses
// to CSR here, it loses for real.
let forced = traversal_indexed_override();
let lean_indexed = match forced {
Some(v) => v,
None => match gather_cost_inputs(
snapshot,
catalog,
edge_type,
direction,
frontier_rows,
effective_max_hops,
crate::table_store::IndexCoverage::Indexed,
) {
Some(inputs) => choose_expand_mode(&inputs) == ExpandMode::IndexedScan,
// Manifest counts absent (e.g. not-yet-materialized table): fall back
// to the legacy frontier/hop ceiling so the decision is defined.
None => {
frontier_rows <= expand_indexed_max_frontier()
&& effective_max_hops <= expand_indexed_max_hops()
}
},
};
if !lean_indexed {
tracing::debug!(
target: "omnigraph::traverse",
edge = %edge_type,
frontier = frontier_rows,
hops = effective_max_hops,
mode = "csr",
"expand mode chosen",
);
let gi = graph_index.get().await?.ok_or_else(|| {
OmniError::manifest("graph index required for CSR traversal".to_string())
})?;
execute_expand_csr(
return execute_expand_csr(
wide, gi, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type,
min_hops, max_hops, dst_filters, params,
)
.await
.await;
}
// Leaning indexed: open the edge dataset once, confirm real coverage, and
// (unless forced) re-decide with it. The opened dataset is threaded into the
// indexed path so it is never opened twice.
let edge_ds = snapshot.open(&edge_table_key).await?;
let coverage =
crate::table_store::TableStore::key_column_index_coverage(&edge_ds, key_col).await;
if forced.is_none() {
if let Some(inputs) = gather_cost_inputs(
snapshot,
catalog,
edge_type,
direction,
frontier_rows,
effective_max_hops,
coverage_for_decision(&coverage),
) {
if choose_expand_mode(&inputs) == ExpandMode::Csr {
tracing::debug!(
target: "omnigraph::traverse",
edge = %edge_type,
frontier = frontier_rows,
hops = effective_max_hops,
mode = "csr",
reason = "index coverage degraded",
"expand mode chosen",
);
let gi = graph_index.get().await?.ok_or_else(|| {
OmniError::manifest("graph index required for CSR traversal".to_string())
})?;
return execute_expand_csr(
wide, gi, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type,
min_hops, max_hops, dst_filters, params,
)
.await;
}
}
}
tracing::debug!(
target: "omnigraph::traverse",
edge = %edge_type,
frontier = frontier_rows,
hops = effective_max_hops,
mode = "indexed",
"expand mode chosen",
);
// Surface the C6 silent scalar-index fallback once, now that coverage is known.
warn_on_degraded_coverage(&coverage, key_col, edge_type);
execute_expand_indexed(
wide, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops,
max_hops, dst_filters, params, edge_ds,
)
.await
}
/// BTREE-indexed graph traversal: per hop, batch the current frontier into one
@ -1849,3 +2059,110 @@ fn take_batch(batch: &RecordBatch, indices: &UInt32Array) -> Result<RecordBatch>
.map_err(|e| OmniError::Lance(e.to_string()))?;
RecordBatch::try_new(batch.schema(), columns).map_err(|e| OmniError::Lance(e.to_string()))
}
#[cfg(test)]
mod expand_chooser_tests {
use super::*;
use crate::table_store::IndexCoverage;
/// Build cost inputs with generous hard caps, so the cost comparison (not a
/// ceiling) is what the assertions exercise unless a test sets one on purpose.
fn inputs(
frontier_rows: usize,
edge_count: u64,
src_node_count: u64,
effective_max_hops: u32,
coverage: IndexCoverage,
) -> ExpandCostInputs {
ExpandCostInputs {
frontier_rows,
edge_count,
src_node_count,
effective_max_hops,
max_hops_cap: 6,
max_frontier_cap: 1024,
coverage,
csr_cached: false,
}
}
#[test]
fn selective_frontier_on_large_graph_picks_indexed() {
// 50 source rows against 1M source vertices, one hop: tiny selectivity —
// the PR #149 win the chooser must preserve.
let m = choose_expand_mode(&inputs(50, 10_000_000, 1_000_000, 1, IndexCoverage::Indexed));
assert_eq!(m, ExpandMode::IndexedScan);
}
#[test]
fn flat_in_edge_count_same_selectivity_same_choice() {
// Same selectivity (frontier/|V_src|), 1000× difference in |E|. Indexed
// cost is independent of |E|, so the choice must not flip.
let small = choose_expand_mode(&inputs(50, 100_000, 1_000_000, 1, IndexCoverage::Indexed));
let huge =
choose_expand_mode(&inputs(50, 100_000_000, 1_000_000, 1, IndexCoverage::Indexed));
assert_eq!(small, ExpandMode::IndexedScan);
assert_eq!(huge, ExpandMode::IndexedScan);
}
#[test]
fn frontier_large_fraction_of_source_picks_csr() {
// hops*frontier (200) exceeds BUILD_FACTOR*|V_src| (1.5*100=150) → CSR,
// and 200 is below the frontier cap, so it is the cost model deciding.
let m = choose_expand_mode(&inputs(200, 1_000, 100, 1, IndexCoverage::Indexed));
assert_eq!(m, ExpandMode::Csr);
}
#[test]
fn frontier_over_hard_cap_picks_csr() {
// 2000 > 1024 ceiling, even though the selectivity is tiny.
let m = choose_expand_mode(&inputs(2000, 10_000_000, 1_000_000, 1, IndexCoverage::Indexed));
assert_eq!(m, ExpandMode::Csr);
}
#[test]
fn hops_over_hard_cap_picks_csr() {
let m = choose_expand_mode(&inputs(10, 10_000_000, 1_000_000, 8, IndexCoverage::Indexed));
assert_eq!(m, ExpandMode::Csr);
}
#[test]
fn degraded_single_hop_tiny_frontier_stays_indexed() {
// One full degraded scan (1*|E|) still edges out a full CSR build
// (1.5*|E|) for a one-off single hop.
let m = choose_expand_mode(&inputs(
5,
10_000,
10_000,
1,
IndexCoverage::Degraded {
reason: "no btree".into(),
},
));
assert_eq!(m, ExpandMode::IndexedScan);
}
#[test]
fn degraded_multi_hop_picks_csr() {
// Two degraded scans (2*|E|) lose to one CSR build (1.5*|E|).
let m = choose_expand_mode(&inputs(
5,
10_000,
10_000,
2,
IndexCoverage::Degraded {
reason: "no btree".into(),
},
));
assert_eq!(m, ExpandMode::Csr);
}
#[test]
fn warm_csr_is_always_reused() {
// A maximally selective traversal still prefers an already-built CSR
// (cost ~0) over re-scanning per hop.
let mut i = inputs(1, 10_000_000, 1_000_000, 1, IndexCoverage::Indexed);
i.csr_cached = true;
assert_eq!(choose_expand_mode(&i), ExpandMode::Csr);
}
}

View file

@ -47,18 +47,25 @@ async fn sorted_names(db: &mut Omnigraph, queries: &str, name: &str, params: &Pa
v
}
/// Run the same query under CSR then indexed mode; assert identical results and
/// return them.
/// Run the same query under CSR, indexed, and auto (cost-chooser) modes; assert
/// all three produce identical results and return them. The auto pass exercises
/// `choose_expand_mode` end to end: whichever path it selects, the rows must
/// match the forced paths (the chooser changes which path runs, never the result).
async fn both_modes(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec<String> {
set_mode("csr");
let csr = sorted_names(db, queries, name, params).await;
set_mode("indexed");
let indexed = sorted_names(db, queries, name, params).await;
clear_mode();
let auto = sorted_names(db, queries, name, params).await;
assert_eq!(
indexed, csr,
"indexed Expand must produce identical results to CSR for query '{name}'"
);
assert_eq!(
auto, csr,
"auto (cost-chooser) Expand must produce identical results to the forced paths for query '{name}'"
);
indexed
}

View file

@ -13,9 +13,10 @@
| Maintenance concurrency | `OMNIGRAPH_MAINTENANCE_CONCURRENCY=8` | `db/omnigraph/optimize.rs` |
| Lance blob compaction support | `LANCE_SUPPORTS_BLOB_COMPACTION = false` | `db/omnigraph/optimize.rs` |
| Graph index cache size | `8` (LRU) | `runtime_cache.rs` |
| Expand indexed-path max frontier | `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024` | `exec/query.rs` |
| Expand indexed-path max hops | `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6` | `exec/query.rs` |
| Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = auto) | `exec/query.rs` |
| Expand indexed-path frontier ceiling | `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024` | `exec/query.rs` |
| Expand indexed-path hop ceiling | `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6` | `exec/query.rs` |
| Expand CSR-build cost factor | `CSR_BUILD_FACTOR = 1.5` | `exec/query.rs` |
| Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = cost-based auto) | `exec/query.rs` |
| Default body limit | `1 MB` | `omnigraph-server/lib.rs` |
| Ingest body limit | `32 MB` | `omnigraph-server/lib.rs` |
| Engine embed model | `gemini-embedding-2-preview` | `omnigraph/embedding.rs` |
@ -24,3 +25,14 @@
| Embed retries | `4` | both clients |
| Embed retry backoff | `200 ms` | both clients |
| LANCE memory pool default | `1 GB` (raised in v0.3.0) | runtime |
**Expand traversal dispatch.** With `OMNIGRAPH_TRAVERSAL_MODE` unset, the engine
chooses the indexed (per-hop BTREE) vs CSR (whole-graph in-memory) path with a
cost model over cheap manifest counts (frontier size, |E|, source-vertex count,
hops) plus the index-coverage signal: the indexed path is preferred when its
frontier-relative work beats building the CSR (≈ when `hops × frontier` is a
small fraction of the source-vertex set), and CSR is preferred for dense/deep
traversals or when the BTREE coverage is degraded and a full scan would be paid
per hop. The two ceilings above are hard caps — beyond them CSR is always used —
and the override flag forces a path (the `auto` result is identical either way;
only the path differs).

View file

@ -79,7 +79,7 @@ Reason: under the staged-write rewire (MR-794), inserts and updates accumulate i
Pipeline operations:
- `NodeScan { variable, type_name, filters }`
- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune. Executed one of two ways, chosen per-expand by frontier size: selective traversals (small frontier) resolve neighbors from the persisted `src`/`dst` BTREE (one indexed scan per hop, cost ∝ frontier); dense / whole-graph traversals fall back to the in-memory CSR adjacency index. Both produce identical results. Tunable via `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER` / `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS`, with `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` forcing a mode (see [constants](constants.md)).
- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune. Executed one of two ways, chosen per-expand by a cost model over cheap manifest counts (frontier size, |E|, source-vertex count, hops) plus index coverage: selective traversals (small frontier relative to the source set) resolve neighbors from the persisted `src`/`dst` BTREE (one indexed scan per hop, cost ∝ frontier); dense / deep / large-frontier traversals — or those whose BTREE coverage is degraded so a full scan would be paid per hop — use the in-memory CSR adjacency index. Both produce identical results. Bounded by `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER` / `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS` (hard ceilings, beyond which CSR is always used), with `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` forcing a mode (see [constants](constants.md)).
- `Filter { left, op, right }`
- `AntiJoin { outer_var, inner: Vec<IROp> }` — for `not { … }`