perf(engine): route Expand node hydration through the id BTREE via structured filter

hydrate_nodes built an `id IN (...)` SQL string applied via Scanner::filter,
which DataFusion evaluates with InListEval (O(N×M)) rather than using the id
BTREE scalar index — measured at 72× the indexed cost on a 100k-node hop
(MR-376). Build the id IN-list as a structured DataFusion Expr, AND it with
the pushable destination filters, and apply via Scanner::filter_expr (the same
path execute_node_scan already uses); Lance then compiles it to
scalar-index-search -> take.

Destination-filter pushability is now decided by ir_filter_to_expr (structured)
instead of ir_filter_to_sql, so list-contains (array_has) pushes down too.
Removes the now-dead string-filter helpers build_lance_filter, ir_filter_to_sql,
and ir_expr_to_sql; literal_to_sql stays (still used by the mutation delete path).
This commit is contained in:
Ragnor Comerford 2026-06-08 10:13:11 +02:00
parent 4a66d6e071
commit aec72ee089
No known key found for this signature in database

View file

@ -785,11 +785,13 @@ async fn execute_expand(
} }
} }
// Split dst_filters: SQL-pushable go to Lance, the rest applied post-hconcat // Destination-binding filters: pushable ones lower to a DataFusion `Expr`
let pushdown_sql = build_lance_filter(dst_filters, params); // and are applied by `hydrate_nodes` (AND'd with the id IN-list, routed
// through the BTREE); the rest (`ir_filter_to_expr` → None) are applied in
// memory post-hconcat.
let non_pushable: Vec<&IRFilter> = dst_filters let non_pushable: Vec<&IRFilter> = dst_filters
.iter() .iter()
.filter(|f| ir_filter_to_sql(f, params).is_none()) .filter(|f| ir_filter_to_expr(f, params).is_none())
.collect(); .collect();
// Dedup dst dense ids globally across source rows, then stringify once // Dedup dst dense ids globally across source rows, then stringify once
@ -811,7 +813,8 @@ async fn execute_expand(
catalog, catalog,
dst_type, dst_type,
&unique_dst_list, &unique_dst_list,
pushdown_sql.as_deref(), dst_filters,
params,
) )
.await?; .await?;
@ -857,16 +860,24 @@ async fn execute_expand(
/// Load full node rows for a set of IDs from a snapshot. /// Load full node rows for a set of IDs from a snapshot.
/// ///
/// When `extra_filter_sql` is provided (from deferred destination-binding /// The `id IN (...)` predicate is built as a structured DataFusion `Expr` and
/// filters), it is ANDed with the `id IN (...)` clause so that Lance can /// AND'd with any pushable `dst_filters` (destination-binding filters), then
/// skip non-matching rows at the storage level. /// applied via `Scanner::filter_expr`. The structured form routes the id
/// IN-list through the `id` BTREE scalar index (index-search → take) rather
/// than evaluating a string filter via DataFusion `InListEval`, which is
/// O(N×M) and was measured at 72× the indexed cost on a 100k-node hop
/// (MR-376). Non-pushable `dst_filters` (`ir_filter_to_expr` → None) are
/// applied in memory by the caller after hydration.
async fn hydrate_nodes( async fn hydrate_nodes(
snapshot: &Snapshot, snapshot: &Snapshot,
catalog: &Catalog, catalog: &Catalog,
type_name: &str, type_name: &str,
ids: &[String], ids: &[String],
extra_filter_sql: Option<&str>, dst_filters: &[IRFilter],
params: &ParamMap,
) -> Result<RecordBatch> { ) -> Result<RecordBatch> {
use datafusion::prelude::{col, lit};
let node_type = catalog let node_type = catalog
.node_types .node_types
.get(type_name) .get(type_name)
@ -879,15 +890,13 @@ async fn hydrate_nodes(
let table_key = format!("node:{}", type_name); let table_key = format!("node:{}", type_name);
let ds = snapshot.open(&table_key).await?; let ds = snapshot.open(&table_key).await?;
// Build filter: id IN ('a', 'b', 'c') // `id IN (ids)` AND any pushable destination filters, as a structured Expr.
let escaped: Vec<String> = ids let id_list: Vec<datafusion::prelude::Expr> = ids.iter().map(|id| lit(id.clone())).collect();
.iter() let mut filter_expr = col("id").in_list(id_list, false);
.map(|id| format!("'{}'", id.replace('\'', "''"))) if let Some(dst_expr) = build_lance_filter_expr(dst_filters, params) {
.collect(); filter_expr = filter_expr.and(dst_expr);
let mut filter_sql = format!("id IN ({})", escaped.join(", "));
if let Some(extra) = extra_filter_sql {
filter_sql = format!("({}) AND ({})", filter_sql, extra);
} }
let has_blobs = !node_type.blob_properties.is_empty(); let has_blobs = !node_type.blob_properties.is_empty();
let non_blob_cols: Vec<&str> = node_type let non_blob_cols: Vec<&str> = node_type
.arrow_schema .arrow_schema
@ -897,12 +906,16 @@ async fn hydrate_nodes(
.map(|f| f.name().as_str()) .map(|f| f.name().as_str())
.collect(); .collect();
let projection = has_blobs.then_some(non_blob_cols.as_slice()); let projection = has_blobs.then_some(non_blob_cols.as_slice());
let batches = crate::table_store::TableStore::scan_stream( let batches = crate::table_store::TableStore::scan_stream_with(
&ds, &ds,
projection, projection,
Some(&filter_sql), None,
None, None,
false, false,
|scanner| {
scanner.filter_expr(filter_expr);
Ok(())
},
) )
.await? .await?
.try_collect::<Vec<RecordBatch>>() .try_collect::<Vec<RecordBatch>>()
@ -1186,45 +1199,6 @@ fn add_null_blob_columns(
.map_err(|e| OmniError::Lance(e.to_string())) .map_err(|e| OmniError::Lance(e.to_string()))
} }
/// Convert IR filters to a Lance SQL filter string.
fn build_lance_filter(filters: &[IRFilter], params: &ParamMap) -> Option<String> {
if filters.is_empty() {
return None;
}
let parts: Vec<String> = filters
.iter()
.filter_map(|f| ir_filter_to_sql(f, params))
.collect();
if parts.is_empty() {
return None;
}
Some(parts.join(" AND "))
}
fn ir_filter_to_sql(filter: &IRFilter, params: &ParamMap) -> Option<String> {
// Search predicates (search/fuzzy/match_text = true) are NOT converted to SQL.
// They are handled via scanner.full_text_search() in execute_node_scan.
if is_search_filter(filter) {
return None;
}
let left = ir_expr_to_sql(&filter.left, params)?;
let right = ir_expr_to_sql(&filter.right, params)?;
let op = match filter.op {
CompOp::Eq => "=",
CompOp::Ne => "!=",
CompOp::Gt => ">",
CompOp::Lt => "<",
CompOp::Ge => ">=",
CompOp::Le => "<=",
CompOp::Contains => return None, // Can't pushdown list contains
};
Some(format!("{} {} {}", left, op, right))
}
/// Build a FullTextSearchQuery from a search IR expression. /// Build a FullTextSearchQuery from a search IR expression.
fn build_fts_query( fn build_fts_query(
expr: &IRExpr, expr: &IRExpr,
@ -1297,15 +1271,6 @@ fn resolve_to_int(expr: &IRExpr, params: &ParamMap) -> Option<i64> {
} }
} }
fn ir_expr_to_sql(expr: &IRExpr, params: &ParamMap) -> Option<String> {
match expr {
IRExpr::PropAccess { property, .. } => Some(property.clone()),
IRExpr::Literal(lit) => Some(literal_to_sql(lit)),
IRExpr::Param(name) => params.get(name).map(literal_to_sql),
_ => None,
}
}
pub(super) fn literal_to_sql(lit: &Literal) -> String { pub(super) fn literal_to_sql(lit: &Literal) -> String {
match lit { match lit {
Literal::Null => "NULL".to_string(), Literal::Null => "NULL".to_string(),
@ -1336,10 +1301,10 @@ pub(super) fn literal_to_sql(lit: &Literal) -> String {
// //
// Search predicates (`is_search_filter`) are still handled separately via // Search predicates (`is_search_filter`) are still handled separately via
// `scanner.full_text_search(...)`, not via filter_expr — they stay None // `scanner.full_text_search(...)`, not via filter_expr — they stay None
// here just like in `ir_filter_to_sql`. The `literal_to_sql` path remains // here (search predicates are never lowered to a scalar filter). The
// because the mutation/update layer (`exec/mutation.rs`) still produces // `literal_to_sql` path remains because the mutation/update layer
// SQL strings for `Dataset::delete(&str)`; that migration is MR-A's // (`exec/mutation.rs`) still produces SQL strings for `Dataset::delete(&str)`;
// territory (Lance #6658 + delete two-phase). // that migration is MR-A's territory (Lance #6658 + delete two-phase).
/// Convert IR filters to a single DataFusion `Expr` (AND-joined), or /// Convert IR filters to a single DataFusion `Expr` (AND-joined), or
/// `None` if no filter is pushable. /// `None` if no filter is pushable.
@ -1381,8 +1346,8 @@ pub(super) fn ir_filter_to_expr(
} }
// List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`. // List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`.
// This is the case `ir_filter_to_sql` had to return None for ("Can't // This is the case the old SQL-string pushdown had to return None for
// pushdown list contains"); with structured Expr it pushes down fine. // ("Can't pushdown list contains"); with structured Expr it pushes down fine.
if matches!(filter.op, CompOp::Contains) { if matches!(filter.op, CompOp::Contains) {
let left = ir_expr_to_expr(&filter.left, params)?; let left = ir_expr_to_expr(&filter.left, params)?;
let right = ir_expr_to_expr(&filter.right, params)?; let right = ir_expr_to_expr(&filter.right, params)?;