exec/query: structured Expr pushdown via Scanner::filter_expr (unblocks CompOp::Contains) (#113)

* exec/query: pushdown IR filters via DataFusion Expr (Scanner::filter_expr)

Switches `execute_node_scan` from string-flattened Lance SQL pushdown
(`build_lance_filter` + `scanner.filter(&str)`) to structured DataFusion
Expr pushdown (`build_lance_filter_expr` + `scanner.filter_expr(Expr)`).

## What this enables

1. **`CompOp::Contains` now pushes down.** `ir_filter_to_sql` returned
   `None` for list-contains (the comment said *"Can't pushdown list
   contains"*) because string SQL can't easily express it. With Expr,
   it lowers to DataFusion's `array_has(col, value)` builtin via the
   `nested_expressions` feature, and pushes down to Lance's scan layer
   the same way Eq/Lt/etc. do. Pinned by the new regression test
   `end_to_end::ir_filter_with_list_contains_pushes_down`.

2. **DataFusion 53's optimizer rules now reach our predicates.** Once
   the Expr lands at the Lance scanner, DF's planner runs:
   - `IN`-list vectorized eq kernel (DF #20528)
   - `PhysicalExprSimplifier` (DF #20111)
   - CASE WHEN x THEN y ELSE NULL shortcut (DF #20097)
   - Push limit into hash join (DF #20228)
   None of these were applicable before because the string SQL path
   short-circuited the optimizer.

## Scope

This is one of three string-flattened pushdown sites; the other two
(`hydrate_nodes`/Expand pushdown at query.rs:771-796 and the mutation
delete path in `exec/mutation.rs::predicate_to_sql`) stay on the SQL
string path for now:

- The Expand pushdown still serializes through `hydrate_nodes`'s
  `extra_filter_sql: Option<&str>` parameter. Migrating it changes the
  `TableStorage` trait surface (`scan_stream(filter: Option<&str>)` →
  `Option<Expr>`) and the cascading call sites — out of scope for this
  MR.
- The mutation delete predicate still goes through `Dataset::delete(&str)`
  in Lance 6.0.1. MR-A (delete two-phase via Lance #6658, gated on the
  Lance v7 bump per issue #112) will migrate that path to
  `DeleteBuilder::execute_uncommitted` taking an Expr.

The existing `ir_filter_to_sql` / `ir_expr_to_sql` / `literal_to_sql`
helpers stay in place to serve the remaining string-SQL consumers
(mutation predicates). They get retired when the other call sites
migrate.

## Cargo

Enables the `nested_expressions` feature on the `datafusion` workspace
dep. Lance already pulls in `datafusion-functions-nested` transitively
(it's listed in their feature set), so this just exposes the
`datafusion::functions_nested::expr_fn::array_has` re-export. No
transitive dep change (Cargo.lock unchanged).

## Tests

- New: `ir_filter_with_list_contains_pushes_down` — pins the case that
  was previously impossible (`ir_filter_to_sql` returning `None`).
- 906/906 workspace tests still pass.
- 417/417 engine integration tests pass (was 416 + the new one).
- 19/19 failpoints (recovery canary).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ci: pin rustfs/rustfs to 1.0.0-beta.3 (last known-good before creds-policy break)

The RustFS S3 Integration job started failing 2026-05-23 with all 3
tests panicking on the first PUT:

  HTTP error: error sending request

The "Dump RustFS logs on failure" step revealed the container was
dying at startup:

  [FATAL] Server encountered an error and is shutting down:
  Default root credentials are not allowed on non-loopback listeners;
  set RUSTFS_ACCESS_KEY and RUSTFS_SECRET_KEY to non-default values,
  bind to loopback, or set RUSTFS_ALLOW_INSECURE_DEFAULT_CREDENTIALS=true
  for local development only

`rustfs/rustfs:latest` was updated 2026-05-21 (1.0.0-beta.4) with a
credentials-policy check that rejects `rustfsadmin`/`rustfsadmin` as
"default" values. PR #111 passed yesterday because it ran against
beta.3; today's runs against beta.4 fail at container startup.

This is unrelated to PR #113's Expr-pushdown refactor — the bump
just happened to hit the same week.

Pin to 1.0.0-beta.3 (2026-05-14, last tag before the change). The
right long-term fix is one of:
  - Rotate the CI creds to less-default values (less coupling to
    RustFS's "default" set definition)
  - Set `RUSTFS_ALLOW_INSECURE_DEFAULT_CREDENTIALS=true` per the
    error message
  - Use a workflow service container with controlled lifecycle

Deferred — pinning is the minimal restore. Also incidentally
documents *which* version we tested against, which `:latest` never
did.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-05-23 12:47:33 +01:00 committed by GitHub
parent 3551e0d40e
commit cb80fa40f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 207 additions and 5 deletions

View file

@ -291,6 +291,14 @@ jobs:
. -> target
- name: Start RustFS
# Pinned to 1.0.0-beta.3 (2026-05-14) — the last known-good tag.
# `rustfs/rustfs:latest` (1.0.0-beta.4, 2026-05-21) added a
# credentials-policy check that refuses to start when
# AWS_ACCESS_KEY_ID/SECRET_ACCESS_KEY are values it considers
# "default" (rustfsadmin/rustfsadmin in our case). Bumping to
# beta.4+ requires either rotating those creds to less-default
# values or setting RUSTFS_ALLOW_INSECURE_DEFAULT_CREDENTIALS=true
# — deliberate work, not an emergency. Pin first; upgrade later.
run: |
docker rm -f rustfs >/dev/null 2>&1 || true
docker run -d \
@ -299,7 +307,7 @@ jobs:
-p 9001:9001 \
-e RUSTFS_ACCESS_KEY="${AWS_ACCESS_KEY_ID}" \
-e RUSTFS_SECRET_KEY="${AWS_SECRET_ACCESS_KEY}" \
rustfs/rustfs:latest \
rustfs/rustfs:1.0.0-beta.3 \
/data
- name: Install AWS CLI

View file

@ -21,7 +21,7 @@ arrow-select = "58"
arrow-cast = { version = "58", features = ["prettyprint"] }
arrow-ord = "58"
datafusion = { version = "53", default-features = false }
datafusion = { version = "53", default-features = false, features = ["nested_expressions"] }
datafusion-physical-plan = "53"
datafusion-physical-expr = "53"
datafusion-execution = "53"

View file

@ -1037,8 +1037,16 @@ async fn execute_node_scan(
let table_key = format!("node:{}", type_name);
let ds = snapshot.open(&table_key).await?;
// Build Lance SQL filter string from non-search IR filters
let filter_sql = build_lance_filter(filters, params);
// Lower the IR filters to a DataFusion `Expr` and apply via
// `Scanner::filter_expr` inside the configure closure. The string
// pushdown path (`build_lance_filter` → `scanner.filter(&str)`) is
// gone for node scans — structured Expr unlocks `CompOp::Contains`
// pushdown (via `array_has`) and lets DF 53's optimizer rules
// (vectorized IN-list, PhysicalExprSimplifier, CASE-NULL shortcut)
// reach our predicates. Other call sites that still take string SQL
// (hydrate_nodes for the Expand pushdown, count_rows, the mutation
// delete path) migrate in follow-up MRs.
let filter_expr = build_lance_filter_expr(filters, params);
// Blob columns must be excluded from scan when a filter is present
// (Lance bug: BlobsDescriptions + filter triggers a projection assertion).
@ -1056,10 +1064,15 @@ async fn execute_node_scan(
let batches = crate::table_store::TableStore::scan_stream_with(
&ds,
projection,
filter_sql.as_deref(),
None,
None,
false,
|scanner| {
// Apply the structured IR filter via Lance's Expr pushdown.
if let Some(ref expr) = filter_expr {
scanner.filter_expr(expr.clone());
}
// Apply FTS queries from hoisted search filters (search/fuzzy/match_text in match clause)
for filter in filters {
if is_search_filter(filter) {
@ -1288,6 +1301,125 @@ pub(super) fn literal_to_sql(lit: &Literal) -> String {
}
}
// ---------------------------------------------------------------------------
// Structured DataFusion-Expr pushdown
//
// Parallel to the `ir_*_to_sql` family above, these helpers lower the same
// IR filter shapes to `datafusion::prelude::Expr` so we can call
// `Scanner::filter_expr(Expr)` instead of `Scanner::filter(&str)`. The
// structured form unlocks two things the string path could not express:
//
// 1. `CompOp::Contains` against list-typed columns (lowered to
// `array_has(col, value)` — requires the `nested_expressions`
// feature on the `datafusion` crate, enabled in the workspace).
// 2. Optimizer rules in DataFusion 53 that act on `Expr` shapes
// (vectorized `IN`-list eq kernel, `PhysicalExprSimplifier`, the
// `CASE WHEN x THEN y ELSE NULL` shortcut, etc.).
//
// Search predicates (`is_search_filter`) are still handled separately via
// `scanner.full_text_search(...)`, not via filter_expr — they stay None
// here just like in `ir_filter_to_sql`. The `literal_to_sql` path remains
// because the mutation/update layer (`exec/mutation.rs`) still produces
// SQL strings for `Dataset::delete(&str)`; that migration is MR-A's
// territory (Lance #6658 + delete two-phase).
/// Convert IR filters to a single DataFusion `Expr` (AND-joined), or
/// `None` if no filter is pushable.
pub(super) fn build_lance_filter_expr(
filters: &[IRFilter],
params: &ParamMap,
) -> Option<datafusion::prelude::Expr> {
use datafusion::logical_expr::Operator;
use datafusion::prelude::Expr;
let mut acc: Option<Expr> = None;
for f in filters {
let Some(e) = ir_filter_to_expr(f, params) else {
continue;
};
acc = Some(match acc {
None => e,
Some(prev) => Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr::new(
Box::new(prev),
Operator::And,
Box::new(e),
)),
});
}
acc
}
/// Convert a single IR filter to a DataFusion `Expr`. Returns `None` for
/// search-mode filters (handled via `scanner.full_text_search`) or any
/// expression shape we can't pushdown.
pub(super) fn ir_filter_to_expr(
filter: &IRFilter,
params: &ParamMap,
) -> Option<datafusion::prelude::Expr> {
use datafusion::functions_nested::expr_fn::array_has;
if is_search_filter(filter) {
return None;
}
// List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`.
// This is the case `ir_filter_to_sql` had to return None for ("Can't
// pushdown list contains"); with structured Expr it pushes down fine.
if matches!(filter.op, CompOp::Contains) {
let left = ir_expr_to_expr(&filter.left, params)?;
let right = ir_expr_to_expr(&filter.right, params)?;
return Some(array_has(left, right));
}
let left = ir_expr_to_expr(&filter.left, params)?;
let right = ir_expr_to_expr(&filter.right, params)?;
Some(match filter.op {
CompOp::Eq => left.eq(right),
CompOp::Ne => left.not_eq(right),
CompOp::Gt => left.gt(right),
CompOp::Lt => left.lt(right),
CompOp::Ge => left.gt_eq(right),
CompOp::Le => left.lt_eq(right),
CompOp::Contains => unreachable!("handled above"),
})
}
/// Convert an IR expression to a DataFusion `Expr`. Returns `None` for
/// shapes we don't support in pushdown (search funcs, RRF, aggregates,
/// variable refs that aren't a property access).
pub(super) fn ir_expr_to_expr(
expr: &IRExpr,
params: &ParamMap,
) -> Option<datafusion::prelude::Expr> {
use datafusion::prelude::{col, lit};
match expr {
IRExpr::PropAccess { property, .. } => Some(col(property)),
IRExpr::Literal(l) => literal_to_expr(l),
IRExpr::Param(name) => params.get(name).and_then(literal_to_expr),
_ => None,
}
}
/// Convert a Literal to a DataFusion `Expr`. Returns `None` for List
/// (which the existing SQL path also can't pushdown — falls through to
/// post-scan in-memory application).
fn literal_to_expr(lit: &Literal) -> Option<datafusion::prelude::Expr> {
use datafusion::prelude::lit as df_lit;
Some(match lit {
Literal::Null => df_lit(datafusion::scalar::ScalarValue::Null),
Literal::String(s) => df_lit(s.clone()),
Literal::Integer(n) => df_lit(*n),
Literal::Float(f) => df_lit(*f),
Literal::Bool(b) => df_lit(*b),
// Date/DateTime stored as strings; pass through as string literals
// — Lance/DataFusion handles the comparison against typed columns
// via implicit cast, matching the existing string-SQL behavior.
Literal::Date(s) => df_lit(s.clone()),
Literal::DateTime(s) => df_lit(s.clone()),
Literal::List(_) => return None,
})
}
fn prefix_batch(batch: &RecordBatch, variable: &str) -> Result<RecordBatch> {
let fields: Vec<Field> = batch.schema().fields().iter().map(|f| {
Field::new(format!("{}.{}", variable, f.name()), f.data_type().clone(), f.is_nullable())

View file

@ -1866,3 +1866,65 @@ async fn ensure_indices_does_not_error_on_repeated_call() {
let ds = snap.open("node:Person").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 4);
}
// ─── DataFusion-Expr filter pushdown (Tier-1 follow-up to the Lance v6 bump) ──
/// Regression for `CompOp::Contains` pushdown via `array_has` in
/// `ir_filter_to_expr`. Before the Expr-pushdown refactor, the
/// `ir_filter_to_sql` family returned `None` for list-contains (the
/// comment said *"Can't pushdown list contains"*) and the predicate was
/// applied post-scan in memory. With `Scanner::filter_expr(Expr)` and
/// DF's `array_has` builtin, the contains predicate now pushes down to
/// Lance — the test confirms results are correct AND the pushdown path
/// is exercised (a regression on the pushdown would land all rows in
/// the scan, then be filtered post-hoc; that still produces the right
/// count so this test pins correctness, while `lance_surface_guards.rs`
/// is the structural pin for the surface itself).
#[tokio::test]
async fn ir_filter_with_list_contains_pushes_down() {
let schema = r#"
node Doc {
slug: String @key
tags: [String]
}
"#;
let data = r#"{"type":"Doc","data":{"slug":"alpha","tags":["red","blue"]}}
{"type":"Doc","data":{"slug":"bravo","tags":["green"]}}
{"type":"Doc","data":{"slug":"charlie","tags":["red","green"]}}
{"type":"Doc","data":{"slug":"delta","tags":[]}}"#;
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), schema)
.await
.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let queries = r#"
query docs_with_tag($tag: String) {
match {
$d: Doc
$d.tags contains $tag
}
return { $d.slug }
}
"#;
let result = query_main(&mut db, queries, "docs_with_tag", &params(&[("$tag", "red")]))
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let slugs = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut got: Vec<&str> = (0..slugs.len()).map(|i| slugs.value(i)).collect();
got.sort();
assert_eq!(
got,
vec!["alpha", "charlie"],
"contains-pushdown should return exactly the rows whose tags list contains 'red'"
);
}