diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 10123b0..19f1ddf 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -582,6 +582,52 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } + /// Indexed neighbor lookup for graph traversal. Given an edge dataset and a + /// set of endpoint keys on `key_col` (`"src"` for out-traversal, `"dst"` for + /// in-traversal), return the matching edge rows projected to + /// `[key_col, opposite_col]`. + /// + /// The `key_col IN (keys)` predicate is built as a structured DataFusion + /// `Expr` and applied via `Scanner::filter_expr`, so Lance routes it through + /// the persisted BTREE on `key_col` (index-search → take). Cost scales with + /// the frontier size, not |E| — the basis for serving selective traversals + /// without building the whole in-memory CSR. Empty `keys` returns empty + /// without scanning. + /// + /// Note: like any indexed scan, this observes only fragments the BTREE + /// covers plus an unindexed-fragment scan fallback; it reads the committed + /// snapshot `ds` was opened at. + pub async fn scan_edges_by_endpoint( + ds: &Dataset, + key_col: &str, + opposite_col: &str, + keys: &[String], + ) -> Result> { + use datafusion::prelude::{col, lit}; + + if keys.is_empty() { + return Ok(Vec::new()); + } + let key_list: Vec = + keys.iter().map(|k| lit(k.clone())).collect(); + let filter_expr = col(key_col).in_list(key_list, false); + Self::scan_stream_with( + ds, + Some(&[key_col, opposite_col]), + None, + None, + false, + |scanner| { + scanner.filter_expr(filter_expr); + Ok(()) + }, + ) + .await? + .try_collect() + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + pub async fn count_rows(&self, ds: &Dataset, filter: Option) -> Result { ds.count_rows(filter) .await