From 7d5fba3a45b7c862bb64713868ebb3d4889e19de Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Mon, 8 Jun 2026 10:26:00 +0200 Subject: [PATCH] feat(engine): add TableStore::scan_edges_by_endpoint for indexed neighbor lookup Static helper returning edge rows that match a set of endpoint keys on src/dst, projected to [key_col, opposite_col], via a structured `key_col IN (keys)` filter_expr. Lance routes it through the persisted BTREE on the endpoint column (index-search -> take), so cost scales with the frontier size rather than |E|. Unused until execute_expand's indexed mode lands; isolated in its own commit so the storage-layer primitive is reviewable on its own. --- crates/omnigraph/src/table_store.rs | 46 +++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 10123b0..19f1ddf 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -582,6 +582,52 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } + /// Indexed neighbor lookup for graph traversal. Given an edge dataset and a + /// set of endpoint keys on `key_col` (`"src"` for out-traversal, `"dst"` for + /// in-traversal), return the matching edge rows projected to + /// `[key_col, opposite_col]`. + /// + /// The `key_col IN (keys)` predicate is built as a structured DataFusion + /// `Expr` and applied via `Scanner::filter_expr`, so Lance routes it through + /// the persisted BTREE on `key_col` (index-search → take). Cost scales with + /// the frontier size, not |E| — the basis for serving selective traversals + /// without building the whole in-memory CSR. Empty `keys` returns empty + /// without scanning. + /// + /// Note: like any indexed scan, this observes only fragments the BTREE + /// covers plus an unindexed-fragment scan fallback; it reads the committed + /// snapshot `ds` was opened at. + pub async fn scan_edges_by_endpoint( + ds: &Dataset, + key_col: &str, + opposite_col: &str, + keys: &[String], + ) -> Result> { + use datafusion::prelude::{col, lit}; + + if keys.is_empty() { + return Ok(Vec::new()); + } + let key_list: Vec = + keys.iter().map(|k| lit(k.clone())).collect(); + let filter_expr = col(key_col).in_list(key_list, false); + Self::scan_stream_with( + ds, + Some(&[key_col, opposite_col]), + None, + None, + false, + |scanner| { + scanner.filter_expr(filter_expr); + Ok(()) + }, + ) + .await? + .try_collect() + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + pub async fn count_rows(&self, ds: &Dataset, filter: Option) -> Result { ds.count_rows(filter) .await