refactor(engine): thread the opened edge dataset into indexed Expand

Hoist the edge-dataset open and the C6 index-coverage warning out of
execute_expand_indexed into execute_expand, threading the opened dataset in
as a parameter so it is opened exactly once. Extract the endpoint-column
mapping (endpoint_columns) and the coverage warning (warn_on_degraded_coverage)
as helpers.

Behavior-preserving: same dataset, same warning, same dispatch decision. This
only relocates the open so the upcoming cost-based chooser can consult index
coverage before dispatch without opening the dataset twice.
This commit is contained in:
Ragnor Comerford 2026-06-09 09:10:02 +02:00
parent 6d6ed3894f
commit 236e66c789
No known key found for this signature in database

View file

@ -789,6 +789,41 @@ fn should_use_indexed_expand(frontier_rows: usize, min_hops: u32, max_hops: Opti
frontier_rows <= expand_indexed_max_frontier() && effective_max <= expand_indexed_max_hops()
}
/// Surface the C6 silent scalar-index fallback (commit `5a7ab6d`): warn when the
/// per-hop `key_col IN (...)` won't route through the BTREE. Detection-only;
/// never fails the query. Behavior-identical to the inline check it replaced.
fn warn_on_degraded_coverage(
coverage: &Result<crate::table_store::IndexCoverage>,
key_col: &str,
edge_type: &str,
) {
match coverage {
Ok(crate::table_store::IndexCoverage::Degraded { reason }) => tracing::warn!(
target: "omnigraph::traverse",
edge = %edge_type,
key_col = key_col,
reason = %reason,
"indexed traversal falls back to a full edge scan (results correct, perf degraded)"
),
Ok(crate::table_store::IndexCoverage::Indexed) => {}
Err(e) => tracing::debug!(
target: "omnigraph::traverse",
error = %e,
"index-coverage check failed; proceeding with traversal"
),
}
}
/// The (key, opposite) endpoint columns for a traversal direction. Out follows
/// src -> dst (key on src); In follows the reverse. The persisted BTREE exists
/// on both columns.
fn endpoint_columns(direction: Direction) -> (&'static str, &'static str) {
match direction {
Direction::Out => ("src", "dst"),
Direction::In => ("dst", "src"),
}
}
/// Execute a graph traversal (Expand). Dispatches to the BTREE-indexed path
/// (selective traversals — neighbor lookups via the persisted src/dst index) or
/// the in-memory CSR path (dense / whole-graph traversals). The CSR index is
@ -809,9 +844,16 @@ async fn execute_expand(
params: &ParamMap,
) -> Result<()> {
if should_use_indexed_expand(wide.num_rows(), min_hops, max_hops) {
// Open the edge dataset once here and thread it into the indexed path,
// surfacing the C6 coverage fallback at the same point.
let (key_col, _) = endpoint_columns(direction);
let edge_ds = snapshot.open(&format!("edge:{}", edge_type)).await?;
let coverage =
crate::table_store::TableStore::key_column_index_coverage(&edge_ds, key_col).await;
warn_on_degraded_coverage(&coverage, key_col, edge_type);
execute_expand_indexed(
wide, snapshot, catalog, src_var, dst_var, edge_type, direction, dst_type, min_hops,
max_hops, dst_filters, params,
max_hops, dst_filters, params, edge_ds,
)
.await
} else {
@ -845,6 +887,7 @@ async fn execute_expand_indexed(
max_hops: Option<u32>,
dst_filters: &[IRFilter],
params: &ParamMap,
edge_ds: Dataset,
) -> Result<()> {
let src_id_col_name = format!("{}.id", src_var);
let src_ids = wide
@ -862,34 +905,10 @@ async fn execute_expand_indexed(
.get(edge_type)
.ok_or_else(|| OmniError::manifest(format!("unknown edge type '{}'", edge_type)))?;
let same_type = edge_def.from_type == edge_def.to_type;
// Out-traversal follows src -> dst (key on src); In-traversal follows the
// reverse (key on dst). The persisted BTREE exists on both columns.
let (key_col, opp_col) = match direction {
Direction::Out => ("src", "dst"),
Direction::In => ("dst", "src"),
};
let edge_table_key = format!("edge:{}", edge_type);
let edge_ds = snapshot.open(&edge_table_key).await?;
// C6 guard: surface the silent scalar-index fallback. If Lance won't route
// the per-hop `key_col IN (...)` through the BTREE (no index, or a fragment
// missing physical_rows), the scan is a correct but O(|E|) full scan.
// Detection-only (metadata, no IO); never fails the query.
match crate::table_store::TableStore::key_column_index_coverage(&edge_ds, key_col).await {
Ok(crate::table_store::IndexCoverage::Degraded { reason }) => tracing::warn!(
target: "omnigraph::traverse",
edge = %edge_type,
key_col = key_col,
reason = %reason,
"indexed traversal falls back to a full edge scan (results correct, perf degraded)"
),
Ok(crate::table_store::IndexCoverage::Indexed) => {}
Err(e) => tracing::debug!(
target: "omnigraph::traverse",
error = %e,
"index-coverage check failed; proceeding with traversal"
),
}
// The keyed/opposite endpoint columns for this direction. The edge dataset
// and the C6 coverage warn are owned by the caller (`execute_expand`), which
// opens the dataset once and threads it in.
let (key_col, opp_col) = endpoint_columns(direction);
let max = max_hops.unwrap_or(min_hops.max(1));