feat(engine): add TableStore::scan_edges_by_endpoint for indexed neighbor lookup

Static helper returning edge rows that match a set of endpoint keys on src/dst,
projected to [key_col, opposite_col], via a structured `key_col IN (keys)`
filter_expr. Lance routes it through the persisted BTREE on the endpoint column
(index-search -> take), so cost scales with the frontier size rather than |E|.

Unused until execute_expand's indexed mode lands; isolated in its own commit so
the storage-layer primitive is reviewable on its own.
This commit is contained in:
Ragnor Comerford 2026-06-08 10:26:00 +02:00
parent aec72ee089
commit 7d5fba3a45
No known key found for this signature in database

View file

@ -582,6 +582,52 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Indexed neighbor lookup for graph traversal. Given an edge dataset and a
/// set of endpoint keys on `key_col` (`"src"` for out-traversal, `"dst"` for
/// in-traversal), return the matching edge rows projected to
/// `[key_col, opposite_col]`.
///
/// The `key_col IN (keys)` predicate is built as a structured DataFusion
/// `Expr` and applied via `Scanner::filter_expr`, so Lance routes it through
/// the persisted BTREE on `key_col` (index-search → take). Cost scales with
/// the frontier size, not |E| — the basis for serving selective traversals
/// without building the whole in-memory CSR. Empty `keys` returns empty
/// without scanning.
///
/// Note: like any indexed scan, this observes only fragments the BTREE
/// covers plus an unindexed-fragment scan fallback; it reads the committed
/// snapshot `ds` was opened at.
pub async fn scan_edges_by_endpoint(
ds: &Dataset,
key_col: &str,
opposite_col: &str,
keys: &[String],
) -> Result<Vec<RecordBatch>> {
use datafusion::prelude::{col, lit};
if keys.is_empty() {
return Ok(Vec::new());
}
let key_list: Vec<datafusion::prelude::Expr> =
keys.iter().map(|k| lit(k.clone())).collect();
let filter_expr = col(key_col).in_list(key_list, false);
Self::scan_stream_with(
ds,
Some(&[key_col, opposite_col]),
None,
None,
false,
|scanner| {
scanner.filter_expr(filter_expr);
Ok(())
},
)
.await?
.try_collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
ds.count_rows(filter)
.await