From cb80fa40f1474aedb35ebf940861c9317d93a525 Mon Sep 17 00:00:00 2001 From: Andrew Altshuler Date: Sat, 23 May 2026 12:47:33 +0100 Subject: [PATCH] exec/query: structured Expr pushdown via Scanner::filter_expr (unblocks CompOp::Contains) (#113) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * exec/query: pushdown IR filters via DataFusion Expr (Scanner::filter_expr) Switches `execute_node_scan` from string-flattened Lance SQL pushdown (`build_lance_filter` + `scanner.filter(&str)`) to structured DataFusion Expr pushdown (`build_lance_filter_expr` + `scanner.filter_expr(Expr)`). ## What this enables 1. **`CompOp::Contains` now pushes down.** `ir_filter_to_sql` returned `None` for list-contains (the comment said *"Can't pushdown list contains"*) because string SQL can't easily express it. With Expr, it lowers to DataFusion's `array_has(col, value)` builtin via the `nested_expressions` feature, and pushes down to Lance's scan layer the same way Eq/Lt/etc. do. Pinned by the new regression test `end_to_end::ir_filter_with_list_contains_pushes_down`. 2. **DataFusion 53's optimizer rules now reach our predicates.** Once the Expr lands at the Lance scanner, DF's planner runs: - `IN`-list vectorized eq kernel (DF #20528) - `PhysicalExprSimplifier` (DF #20111) - CASE WHEN x THEN y ELSE NULL shortcut (DF #20097) - Push limit into hash join (DF #20228) None of these were applicable before because the string SQL path short-circuited the optimizer. ## Scope This is one of three string-flattened pushdown sites; the other two (`hydrate_nodes`/Expand pushdown at query.rs:771-796 and the mutation delete path in `exec/mutation.rs::predicate_to_sql`) stay on the SQL string path for now: - The Expand pushdown still serializes through `hydrate_nodes`'s `extra_filter_sql: Option<&str>` parameter. Migrating it changes the `TableStorage` trait surface (`scan_stream(filter: Option<&str>)` → `Option`) and the cascading call sites — out of scope for this MR. - The mutation delete predicate still goes through `Dataset::delete(&str)` in Lance 6.0.1. MR-A (delete two-phase via Lance #6658, gated on the Lance v7 bump per issue #112) will migrate that path to `DeleteBuilder::execute_uncommitted` taking an Expr. The existing `ir_filter_to_sql` / `ir_expr_to_sql` / `literal_to_sql` helpers stay in place to serve the remaining string-SQL consumers (mutation predicates). They get retired when the other call sites migrate. ## Cargo Enables the `nested_expressions` feature on the `datafusion` workspace dep. Lance already pulls in `datafusion-functions-nested` transitively (it's listed in their feature set), so this just exposes the `datafusion::functions_nested::expr_fn::array_has` re-export. No transitive dep change (Cargo.lock unchanged). ## Tests - New: `ir_filter_with_list_contains_pushes_down` — pins the case that was previously impossible (`ir_filter_to_sql` returning `None`). - 906/906 workspace tests still pass. - 417/417 engine integration tests pass (was 416 + the new one). - 19/19 failpoints (recovery canary). Co-Authored-By: Claude Opus 4.7 (1M context) * ci: pin rustfs/rustfs to 1.0.0-beta.3 (last known-good before creds-policy break) The RustFS S3 Integration job started failing 2026-05-23 with all 3 tests panicking on the first PUT: HTTP error: error sending request The "Dump RustFS logs on failure" step revealed the container was dying at startup: [FATAL] Server encountered an error and is shutting down: Default root credentials are not allowed on non-loopback listeners; set RUSTFS_ACCESS_KEY and RUSTFS_SECRET_KEY to non-default values, bind to loopback, or set RUSTFS_ALLOW_INSECURE_DEFAULT_CREDENTIALS=true for local development only `rustfs/rustfs:latest` was updated 2026-05-21 (1.0.0-beta.4) with a credentials-policy check that rejects `rustfsadmin`/`rustfsadmin` as "default" values. PR #111 passed yesterday because it ran against beta.3; today's runs against beta.4 fail at container startup. This is unrelated to PR #113's Expr-pushdown refactor — the bump just happened to hit the same week. Pin to 1.0.0-beta.3 (2026-05-14, last tag before the change). The right long-term fix is one of: - Rotate the CI creds to less-default values (less coupling to RustFS's "default" set definition) - Set `RUSTFS_ALLOW_INSECURE_DEFAULT_CREDENTIALS=true` per the error message - Use a workflow service container with controlled lifecycle Deferred — pinning is the minimal restore. Also incidentally documents *which* version we tested against, which `:latest` never did. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- .github/workflows/ci.yml | 10 +- Cargo.toml | 2 +- crates/omnigraph/src/exec/query.rs | 138 ++++++++++++++++++++++++++- crates/omnigraph/tests/end_to_end.rs | 62 ++++++++++++ 4 files changed, 207 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f36b31e..3dc2e80 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -291,6 +291,14 @@ jobs: . -> target - name: Start RustFS + # Pinned to 1.0.0-beta.3 (2026-05-14) — the last known-good tag. + # `rustfs/rustfs:latest` (1.0.0-beta.4, 2026-05-21) added a + # credentials-policy check that refuses to start when + # AWS_ACCESS_KEY_ID/SECRET_ACCESS_KEY are values it considers + # "default" (rustfsadmin/rustfsadmin in our case). Bumping to + # beta.4+ requires either rotating those creds to less-default + # values or setting RUSTFS_ALLOW_INSECURE_DEFAULT_CREDENTIALS=true + # — deliberate work, not an emergency. Pin first; upgrade later. run: | docker rm -f rustfs >/dev/null 2>&1 || true docker run -d \ @@ -299,7 +307,7 @@ jobs: -p 9001:9001 \ -e RUSTFS_ACCESS_KEY="${AWS_ACCESS_KEY_ID}" \ -e RUSTFS_SECRET_KEY="${AWS_SECRET_ACCESS_KEY}" \ - rustfs/rustfs:latest \ + rustfs/rustfs:1.0.0-beta.3 \ /data - name: Install AWS CLI diff --git a/Cargo.toml b/Cargo.toml index 1e647d3..66bfc01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ arrow-select = "58" arrow-cast = { version = "58", features = ["prettyprint"] } arrow-ord = "58" -datafusion = { version = "53", default-features = false } +datafusion = { version = "53", default-features = false, features = ["nested_expressions"] } datafusion-physical-plan = "53" datafusion-physical-expr = "53" datafusion-execution = "53" diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 88865d8..24a8722 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -1037,8 +1037,16 @@ async fn execute_node_scan( let table_key = format!("node:{}", type_name); let ds = snapshot.open(&table_key).await?; - // Build Lance SQL filter string from non-search IR filters - let filter_sql = build_lance_filter(filters, params); + // Lower the IR filters to a DataFusion `Expr` and apply via + // `Scanner::filter_expr` inside the configure closure. The string + // pushdown path (`build_lance_filter` → `scanner.filter(&str)`) is + // gone for node scans — structured Expr unlocks `CompOp::Contains` + // pushdown (via `array_has`) and lets DF 53's optimizer rules + // (vectorized IN-list, PhysicalExprSimplifier, CASE-NULL shortcut) + // reach our predicates. Other call sites that still take string SQL + // (hydrate_nodes for the Expand pushdown, count_rows, the mutation + // delete path) migrate in follow-up MRs. + let filter_expr = build_lance_filter_expr(filters, params); // Blob columns must be excluded from scan when a filter is present // (Lance bug: BlobsDescriptions + filter triggers a projection assertion). @@ -1056,10 +1064,15 @@ async fn execute_node_scan( let batches = crate::table_store::TableStore::scan_stream_with( &ds, projection, - filter_sql.as_deref(), + None, None, false, |scanner| { + // Apply the structured IR filter via Lance's Expr pushdown. + if let Some(ref expr) = filter_expr { + scanner.filter_expr(expr.clone()); + } + // Apply FTS queries from hoisted search filters (search/fuzzy/match_text in match clause) for filter in filters { if is_search_filter(filter) { @@ -1288,6 +1301,125 @@ pub(super) fn literal_to_sql(lit: &Literal) -> String { } } +// --------------------------------------------------------------------------- +// Structured DataFusion-Expr pushdown +// +// Parallel to the `ir_*_to_sql` family above, these helpers lower the same +// IR filter shapes to `datafusion::prelude::Expr` so we can call +// `Scanner::filter_expr(Expr)` instead of `Scanner::filter(&str)`. The +// structured form unlocks two things the string path could not express: +// +// 1. `CompOp::Contains` against list-typed columns (lowered to +// `array_has(col, value)` — requires the `nested_expressions` +// feature on the `datafusion` crate, enabled in the workspace). +// 2. Optimizer rules in DataFusion 53 that act on `Expr` shapes +// (vectorized `IN`-list eq kernel, `PhysicalExprSimplifier`, the +// `CASE WHEN x THEN y ELSE NULL` shortcut, etc.). +// +// Search predicates (`is_search_filter`) are still handled separately via +// `scanner.full_text_search(...)`, not via filter_expr — they stay None +// here just like in `ir_filter_to_sql`. The `literal_to_sql` path remains +// because the mutation/update layer (`exec/mutation.rs`) still produces +// SQL strings for `Dataset::delete(&str)`; that migration is MR-A's +// territory (Lance #6658 + delete two-phase). + +/// Convert IR filters to a single DataFusion `Expr` (AND-joined), or +/// `None` if no filter is pushable. +pub(super) fn build_lance_filter_expr( + filters: &[IRFilter], + params: &ParamMap, +) -> Option { + use datafusion::logical_expr::Operator; + use datafusion::prelude::Expr; + + let mut acc: Option = None; + for f in filters { + let Some(e) = ir_filter_to_expr(f, params) else { + continue; + }; + acc = Some(match acc { + None => e, + Some(prev) => Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr::new( + Box::new(prev), + Operator::And, + Box::new(e), + )), + }); + } + acc +} + +/// Convert a single IR filter to a DataFusion `Expr`. Returns `None` for +/// search-mode filters (handled via `scanner.full_text_search`) or any +/// expression shape we can't pushdown. +pub(super) fn ir_filter_to_expr( + filter: &IRFilter, + params: &ParamMap, +) -> Option { + use datafusion::functions_nested::expr_fn::array_has; + + if is_search_filter(filter) { + return None; + } + + // List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`. + // This is the case `ir_filter_to_sql` had to return None for ("Can't + // pushdown list contains"); with structured Expr it pushes down fine. + if matches!(filter.op, CompOp::Contains) { + let left = ir_expr_to_expr(&filter.left, params)?; + let right = ir_expr_to_expr(&filter.right, params)?; + return Some(array_has(left, right)); + } + + let left = ir_expr_to_expr(&filter.left, params)?; + let right = ir_expr_to_expr(&filter.right, params)?; + Some(match filter.op { + CompOp::Eq => left.eq(right), + CompOp::Ne => left.not_eq(right), + CompOp::Gt => left.gt(right), + CompOp::Lt => left.lt(right), + CompOp::Ge => left.gt_eq(right), + CompOp::Le => left.lt_eq(right), + CompOp::Contains => unreachable!("handled above"), + }) +} + +/// Convert an IR expression to a DataFusion `Expr`. Returns `None` for +/// shapes we don't support in pushdown (search funcs, RRF, aggregates, +/// variable refs that aren't a property access). +pub(super) fn ir_expr_to_expr( + expr: &IRExpr, + params: &ParamMap, +) -> Option { + use datafusion::prelude::{col, lit}; + match expr { + IRExpr::PropAccess { property, .. } => Some(col(property)), + IRExpr::Literal(l) => literal_to_expr(l), + IRExpr::Param(name) => params.get(name).and_then(literal_to_expr), + _ => None, + } +} + +/// Convert a Literal to a DataFusion `Expr`. Returns `None` for List +/// (which the existing SQL path also can't pushdown — falls through to +/// post-scan in-memory application). +fn literal_to_expr(lit: &Literal) -> Option { + use datafusion::prelude::lit as df_lit; + Some(match lit { + Literal::Null => df_lit(datafusion::scalar::ScalarValue::Null), + Literal::String(s) => df_lit(s.clone()), + Literal::Integer(n) => df_lit(*n), + Literal::Float(f) => df_lit(*f), + Literal::Bool(b) => df_lit(*b), + // Date/DateTime stored as strings; pass through as string literals + // — Lance/DataFusion handles the comparison against typed columns + // via implicit cast, matching the existing string-SQL behavior. + Literal::Date(s) => df_lit(s.clone()), + Literal::DateTime(s) => df_lit(s.clone()), + Literal::List(_) => return None, + }) +} + fn prefix_batch(batch: &RecordBatch, variable: &str) -> Result { let fields: Vec = batch.schema().fields().iter().map(|f| { Field::new(format!("{}.{}", variable, f.name()), f.data_type().clone(), f.is_nullable()) diff --git a/crates/omnigraph/tests/end_to_end.rs b/crates/omnigraph/tests/end_to_end.rs index 6a41830..0d9e58e 100644 --- a/crates/omnigraph/tests/end_to_end.rs +++ b/crates/omnigraph/tests/end_to_end.rs @@ -1866,3 +1866,65 @@ async fn ensure_indices_does_not_error_on_repeated_call() { let ds = snap.open("node:Person").await.unwrap(); assert_eq!(ds.count_rows(None).await.unwrap(), 4); } + +// ─── DataFusion-Expr filter pushdown (Tier-1 follow-up to the Lance v6 bump) ── + +/// Regression for `CompOp::Contains` pushdown via `array_has` in +/// `ir_filter_to_expr`. Before the Expr-pushdown refactor, the +/// `ir_filter_to_sql` family returned `None` for list-contains (the +/// comment said *"Can't pushdown list contains"*) and the predicate was +/// applied post-scan in memory. With `Scanner::filter_expr(Expr)` and +/// DF's `array_has` builtin, the contains predicate now pushes down to +/// Lance — the test confirms results are correct AND the pushdown path +/// is exercised (a regression on the pushdown would land all rows in +/// the scan, then be filtered post-hoc; that still produces the right +/// count so this test pins correctness, while `lance_surface_guards.rs` +/// is the structural pin for the surface itself). +#[tokio::test] +async fn ir_filter_with_list_contains_pushes_down() { + let schema = r#" +node Doc { + slug: String @key + tags: [String] +} +"#; + let data = r#"{"type":"Doc","data":{"slug":"alpha","tags":["red","blue"]}} +{"type":"Doc","data":{"slug":"bravo","tags":["green"]}} +{"type":"Doc","data":{"slug":"charlie","tags":["red","green"]}} +{"type":"Doc","data":{"slug":"delta","tags":[]}}"#; + + let dir = tempfile::tempdir().unwrap(); + let mut db = Omnigraph::init(dir.path().to_str().unwrap(), schema) + .await + .unwrap(); + load_jsonl(&mut db, data, LoadMode::Overwrite) + .await + .unwrap(); + + let queries = r#" +query docs_with_tag($tag: String) { + match { + $d: Doc + $d.tags contains $tag + } + return { $d.slug } +} +"#; + let result = query_main(&mut db, queries, "docs_with_tag", ¶ms(&[("$tag", "red")])) + .await + .unwrap(); + + let batch = result.concat_batches().unwrap(); + let slugs = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut got: Vec<&str> = (0..slugs.len()).map(|i| slugs.value(i)).collect(); + got.sort(); + assert_eq!( + got, + vec!["alpha", "charlie"], + "contains-pushdown should return exactly the rows whose tags list contains 'red'" + ); +}