diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 7590512..5b5f303 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -785,11 +785,13 @@ async fn execute_expand( } } - // Split dst_filters: SQL-pushable go to Lance, the rest applied post-hconcat - let pushdown_sql = build_lance_filter(dst_filters, params); + // Destination-binding filters: pushable ones lower to a DataFusion `Expr` + // and are applied by `hydrate_nodes` (AND'd with the id IN-list, routed + // through the BTREE); the rest (`ir_filter_to_expr` → None) are applied in + // memory post-hconcat. let non_pushable: Vec<&IRFilter> = dst_filters .iter() - .filter(|f| ir_filter_to_sql(f, params).is_none()) + .filter(|f| ir_filter_to_expr(f, params).is_none()) .collect(); // Dedup dst dense ids globally across source rows, then stringify once @@ -811,7 +813,8 @@ async fn execute_expand( catalog, dst_type, &unique_dst_list, - pushdown_sql.as_deref(), + dst_filters, + params, ) .await?; @@ -857,16 +860,24 @@ async fn execute_expand( /// Load full node rows for a set of IDs from a snapshot. /// -/// When `extra_filter_sql` is provided (from deferred destination-binding -/// filters), it is ANDed with the `id IN (...)` clause so that Lance can -/// skip non-matching rows at the storage level. +/// The `id IN (...)` predicate is built as a structured DataFusion `Expr` and +/// AND'd with any pushable `dst_filters` (destination-binding filters), then +/// applied via `Scanner::filter_expr`. The structured form routes the id +/// IN-list through the `id` BTREE scalar index (index-search → take) rather +/// than evaluating a string filter via DataFusion `InListEval`, which is +/// O(N×M) and was measured at 72× the indexed cost on a 100k-node hop +/// (MR-376). Non-pushable `dst_filters` (`ir_filter_to_expr` → None) are +/// applied in memory by the caller after hydration. async fn hydrate_nodes( snapshot: &Snapshot, catalog: &Catalog, type_name: &str, ids: &[String], - extra_filter_sql: Option<&str>, + dst_filters: &[IRFilter], + params: &ParamMap, ) -> Result { + use datafusion::prelude::{col, lit}; + let node_type = catalog .node_types .get(type_name) @@ -879,15 +890,13 @@ async fn hydrate_nodes( let table_key = format!("node:{}", type_name); let ds = snapshot.open(&table_key).await?; - // Build filter: id IN ('a', 'b', 'c') - let escaped: Vec = ids - .iter() - .map(|id| format!("'{}'", id.replace('\'', "''"))) - .collect(); - let mut filter_sql = format!("id IN ({})", escaped.join(", ")); - if let Some(extra) = extra_filter_sql { - filter_sql = format!("({}) AND ({})", filter_sql, extra); + // `id IN (ids)` AND any pushable destination filters, as a structured Expr. + let id_list: Vec = ids.iter().map(|id| lit(id.clone())).collect(); + let mut filter_expr = col("id").in_list(id_list, false); + if let Some(dst_expr) = build_lance_filter_expr(dst_filters, params) { + filter_expr = filter_expr.and(dst_expr); } + let has_blobs = !node_type.blob_properties.is_empty(); let non_blob_cols: Vec<&str> = node_type .arrow_schema @@ -897,12 +906,16 @@ async fn hydrate_nodes( .map(|f| f.name().as_str()) .collect(); let projection = has_blobs.then_some(non_blob_cols.as_slice()); - let batches = crate::table_store::TableStore::scan_stream( + let batches = crate::table_store::TableStore::scan_stream_with( &ds, projection, - Some(&filter_sql), + None, None, false, + |scanner| { + scanner.filter_expr(filter_expr); + Ok(()) + }, ) .await? .try_collect::>() @@ -1186,45 +1199,6 @@ fn add_null_blob_columns( .map_err(|e| OmniError::Lance(e.to_string())) } -/// Convert IR filters to a Lance SQL filter string. -fn build_lance_filter(filters: &[IRFilter], params: &ParamMap) -> Option { - if filters.is_empty() { - return None; - } - - let parts: Vec = filters - .iter() - .filter_map(|f| ir_filter_to_sql(f, params)) - .collect(); - - if parts.is_empty() { - return None; - } - - Some(parts.join(" AND ")) -} - -fn ir_filter_to_sql(filter: &IRFilter, params: &ParamMap) -> Option { - // Search predicates (search/fuzzy/match_text = true) are NOT converted to SQL. - // They are handled via scanner.full_text_search() in execute_node_scan. - if is_search_filter(filter) { - return None; - } - - let left = ir_expr_to_sql(&filter.left, params)?; - let right = ir_expr_to_sql(&filter.right, params)?; - let op = match filter.op { - CompOp::Eq => "=", - CompOp::Ne => "!=", - CompOp::Gt => ">", - CompOp::Lt => "<", - CompOp::Ge => ">=", - CompOp::Le => "<=", - CompOp::Contains => return None, // Can't pushdown list contains - }; - Some(format!("{} {} {}", left, op, right)) -} - /// Build a FullTextSearchQuery from a search IR expression. fn build_fts_query( expr: &IRExpr, @@ -1297,15 +1271,6 @@ fn resolve_to_int(expr: &IRExpr, params: &ParamMap) -> Option { } } -fn ir_expr_to_sql(expr: &IRExpr, params: &ParamMap) -> Option { - match expr { - IRExpr::PropAccess { property, .. } => Some(property.clone()), - IRExpr::Literal(lit) => Some(literal_to_sql(lit)), - IRExpr::Param(name) => params.get(name).map(literal_to_sql), - _ => None, - } -} - pub(super) fn literal_to_sql(lit: &Literal) -> String { match lit { Literal::Null => "NULL".to_string(), @@ -1336,10 +1301,10 @@ pub(super) fn literal_to_sql(lit: &Literal) -> String { // // Search predicates (`is_search_filter`) are still handled separately via // `scanner.full_text_search(...)`, not via filter_expr — they stay None -// here just like in `ir_filter_to_sql`. The `literal_to_sql` path remains -// because the mutation/update layer (`exec/mutation.rs`) still produces -// SQL strings for `Dataset::delete(&str)`; that migration is MR-A's -// territory (Lance #6658 + delete two-phase). +// here (search predicates are never lowered to a scalar filter). The +// `literal_to_sql` path remains because the mutation/update layer +// (`exec/mutation.rs`) still produces SQL strings for `Dataset::delete(&str)`; +// that migration is MR-A's territory (Lance #6658 + delete two-phase). /// Convert IR filters to a single DataFusion `Expr` (AND-joined), or /// `None` if no filter is pushable. @@ -1381,8 +1346,8 @@ pub(super) fn ir_filter_to_expr( } // List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`. - // This is the case `ir_filter_to_sql` had to return None for ("Can't - // pushdown list contains"); with structured Expr it pushes down fine. + // This is the case the old SQL-string pushdown had to return None for + // ("Can't pushdown list contains"); with structured Expr it pushes down fine. if matches!(filter.op, CompOp::Contains) { let left = ir_expr_to_expr(&filter.left, params)?; let right = ir_expr_to_expr(&filter.right, params)?;