mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-15 01:55:13 +02:00
fix(engine): coerce pushdown filter literals to the column type
Filter literals were pushed to Lance in their natural Arrow type (every integer
Int64, every float Float64). Against a narrower indexed column DataFusion widens
to the literal's type and casts the COLUMN (`CAST(n32 AS Int64)`), which defeats
the scalar BTREE and degrades to a full filtered read. A physical-plan probe
confirms it: an Int32 column filtered by an i32 literal uses `ScalarIndexQuery`;
by an i64 literal it does not.
Thread the scan's `arrow_schema` through `build_lance_filter_expr` ->
`ir_filter_to_expr` and coerce each literal operand to the opposite column's
exact Arrow type, reusing `projection::literal_to_array` + `arrow_cast` (the same
path the in-memory arm uses, so the two arms agree). Coercion never demotes a
filter to None: on failure it falls back to the natural literal, because a node
scan has no in-memory fallback for inline filters.
Supersedes the date-specific change in e4ef67b (PR1): the probe shows dates were
never index-defeated — temporal coercion casts the LITERAL, not the column — so
PR1's index-use rationale was wrong though harmless. The generic coercion
subsumes it; `literal_to_expr`'s date arms revert to the natural Utf8 fallback,
and its unit tests now assert the live coerced path.
Tests: surface guard `scalar_index_use_requires_matched_literal_type` pins the
substrate behavior (matched -> index, widened -> column-cast full scan); unit
tests cover Int32/UInt32/Float32 coercion, range op, reversed operand order, and
the natural fallback; `literal_filters` adds an I32 column with equality + range
and an F32 pushdown case.
This commit is contained in:
parent
e978a55e2e
commit
f0641210a3
4 changed files with 351 additions and 57 deletions
|
|
@ -72,7 +72,11 @@ fn evaluate_expr(batch: &RecordBatch, expr: &IRExpr, params: &ParamMap) -> Resul
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a constant array from a literal value.
|
/// Create a constant array from a literal value.
|
||||||
fn literal_to_array(lit: &Literal, num_rows: usize) -> Result<ArrayRef> {
|
///
|
||||||
|
/// `pub(super)` so the pushdown arm (`query.rs::literal_to_typed_expr`) can build
|
||||||
|
/// a literal in the same natural Arrow type and cast it to the column type through
|
||||||
|
/// the identical `arrow_cast` path used here, keeping the two filter arms in sync.
|
||||||
|
pub(super) fn literal_to_array(lit: &Literal, num_rows: usize) -> Result<ArrayRef> {
|
||||||
Ok(match lit {
|
Ok(match lit {
|
||||||
Literal::Null => arrow_array::new_null_array(&DataType::Utf8, num_rows),
|
Literal::Null => arrow_array::new_null_array(&DataType::Utf8, num_rows),
|
||||||
Literal::String(s) => Arc::new(StringArray::from(vec![s.as_str(); num_rows])) as ArrayRef,
|
Literal::String(s) => Arc::new(StringArray::from(vec![s.as_str(); num_rows])) as ArrayRef,
|
||||||
|
|
|
||||||
|
|
@ -1289,10 +1289,12 @@ async fn expand_hydrate_and_align(
|
||||||
params: &ParamMap,
|
params: &ParamMap,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// Pushable destination filters are applied by `hydrate_nodes`; the rest
|
// Pushable destination filters are applied by `hydrate_nodes`; the rest
|
||||||
// (`ir_filter_to_expr` → None) are applied in memory after hconcat.
|
// (`ir_filter_to_expr` → None) are applied in memory after hconcat. The
|
||||||
|
// schema arg only affects a pushable literal's TYPE, never Some-vs-None, so
|
||||||
|
// `None` here yields the same pushable/non-pushable split as `hydrate_nodes`.
|
||||||
let non_pushable: Vec<&IRFilter> = dst_filters
|
let non_pushable: Vec<&IRFilter> = dst_filters
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|f| ir_filter_to_expr(f, params).is_none())
|
.filter(|f| ir_filter_to_expr(f, params, None).is_none())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Unique destination ids (first-seen order) for one batched hydration.
|
// Unique destination ids (first-seen order) for one batched hydration.
|
||||||
|
|
@ -1506,7 +1508,8 @@ async fn hydrate_nodes(
|
||||||
// `id IN (ids)` AND any pushable destination filters, as a structured Expr.
|
// `id IN (ids)` AND any pushable destination filters, as a structured Expr.
|
||||||
let id_list: Vec<datafusion::prelude::Expr> = ids.iter().map(|id| lit(id.clone())).collect();
|
let id_list: Vec<datafusion::prelude::Expr> = ids.iter().map(|id| lit(id.clone())).collect();
|
||||||
let mut filter_expr = col("id").in_list(id_list, false);
|
let mut filter_expr = col("id").in_list(id_list, false);
|
||||||
if let Some(dst_expr) = build_lance_filter_expr(dst_filters, params) {
|
if let Some(dst_expr) = build_lance_filter_expr(dst_filters, params, Some(&node_type.arrow_schema))
|
||||||
|
{
|
||||||
filter_expr = filter_expr.and(dst_expr);
|
filter_expr = filter_expr.and(dst_expr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1747,21 +1750,23 @@ async fn execute_node_scan(
|
||||||
let table_key = format!("node:{}", type_name);
|
let table_key = format!("node:{}", type_name);
|
||||||
let ds = snapshot.open(&table_key).await?;
|
let ds = snapshot.open(&table_key).await?;
|
||||||
|
|
||||||
|
let node_type = &catalog.node_types[type_name];
|
||||||
|
|
||||||
// Lower the IR filters to a DataFusion `Expr` and apply via
|
// Lower the IR filters to a DataFusion `Expr` and apply via
|
||||||
// `Scanner::filter_expr` inside the configure closure. The string
|
// `Scanner::filter_expr` inside the configure closure. The string
|
||||||
// pushdown path (`build_lance_filter` → `scanner.filter(&str)`) is
|
// pushdown path (`build_lance_filter` → `scanner.filter(&str)`) is
|
||||||
// gone for node scans — structured Expr unlocks `CompOp::Contains`
|
// gone for node scans — structured Expr unlocks `CompOp::Contains`
|
||||||
// pushdown (via `array_has`) and lets DF 53's optimizer rules
|
// pushdown (via `array_has`) and lets DF 53's optimizer rules
|
||||||
// (vectorized IN-list, PhysicalExprSimplifier, CASE-NULL shortcut)
|
// (vectorized IN-list, PhysicalExprSimplifier, CASE-NULL shortcut)
|
||||||
// reach our predicates. Other call sites that still take string SQL
|
// reach our predicates. Passing the node's `arrow_schema` lets the lowering
|
||||||
// (hydrate_nodes for the Expand pushdown, count_rows, the mutation
|
// coerce literals to each column's exact type so narrow-numeric BTREEs are
|
||||||
// delete path) migrate in follow-up MRs.
|
// used. Other call sites that still take string SQL (count_rows, the
|
||||||
let filter_expr = build_lance_filter_expr(filters, params);
|
// mutation delete path) migrate in follow-up MRs.
|
||||||
|
let filter_expr = build_lance_filter_expr(filters, params, Some(&node_type.arrow_schema));
|
||||||
|
|
||||||
// Blob columns must be excluded from scan when a filter is present
|
// Blob columns must be excluded from scan when a filter is present
|
||||||
// (Lance bug: BlobsDescriptions + filter triggers a projection assertion).
|
// (Lance bug: BlobsDescriptions + filter triggers a projection assertion).
|
||||||
// We exclude blob columns and add metadata post-scan via take_blobs_by_indices.
|
// We exclude blob columns and add metadata post-scan via take_blobs_by_indices.
|
||||||
let node_type = &catalog.node_types[type_name];
|
|
||||||
let has_blobs = !node_type.blob_properties.is_empty();
|
let has_blobs = !node_type.blob_properties.is_empty();
|
||||||
let non_blob_cols: Vec<&str> = node_type
|
let non_blob_cols: Vec<&str> = node_type
|
||||||
.arrow_schema
|
.arrow_schema
|
||||||
|
|
@ -1990,13 +1995,14 @@ pub(super) fn literal_to_sql(lit: &Literal) -> String {
|
||||||
pub(super) fn build_lance_filter_expr(
|
pub(super) fn build_lance_filter_expr(
|
||||||
filters: &[IRFilter],
|
filters: &[IRFilter],
|
||||||
params: &ParamMap,
|
params: &ParamMap,
|
||||||
|
schema: Option<&Schema>,
|
||||||
) -> Option<datafusion::prelude::Expr> {
|
) -> Option<datafusion::prelude::Expr> {
|
||||||
use datafusion::logical_expr::Operator;
|
use datafusion::logical_expr::Operator;
|
||||||
use datafusion::prelude::Expr;
|
use datafusion::prelude::Expr;
|
||||||
|
|
||||||
let mut acc: Option<Expr> = None;
|
let mut acc: Option<Expr> = None;
|
||||||
for f in filters {
|
for f in filters {
|
||||||
let Some(e) = ir_filter_to_expr(f, params) else {
|
let Some(e) = ir_filter_to_expr(f, params, schema) else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
acc = Some(match acc {
|
acc = Some(match acc {
|
||||||
|
|
@ -2017,6 +2023,7 @@ pub(super) fn build_lance_filter_expr(
|
||||||
pub(super) fn ir_filter_to_expr(
|
pub(super) fn ir_filter_to_expr(
|
||||||
filter: &IRFilter,
|
filter: &IRFilter,
|
||||||
params: &ParamMap,
|
params: &ParamMap,
|
||||||
|
schema: Option<&Schema>,
|
||||||
) -> Option<datafusion::prelude::Expr> {
|
) -> Option<datafusion::prelude::Expr> {
|
||||||
use datafusion::functions_nested::expr_fn::array_has;
|
use datafusion::functions_nested::expr_fn::array_has;
|
||||||
|
|
||||||
|
|
@ -2027,14 +2034,22 @@ pub(super) fn ir_filter_to_expr(
|
||||||
// List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`.
|
// List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`.
|
||||||
// This is the case the old SQL-string pushdown had to return None for
|
// This is the case the old SQL-string pushdown had to return None for
|
||||||
// ("Can't pushdown list contains"); with structured Expr it pushes down fine.
|
// ("Can't pushdown list contains"); with structured Expr it pushes down fine.
|
||||||
|
// (Element-type coercion for the contained value is deferred — list columns
|
||||||
|
// are not scalar-indexed, so the index-eligibility concern below does not apply.)
|
||||||
if matches!(filter.op, CompOp::Contains) {
|
if matches!(filter.op, CompOp::Contains) {
|
||||||
let left = ir_expr_to_expr(&filter.left, params)?;
|
let left = ir_expr_to_expr(&filter.left, params, None)?;
|
||||||
let right = ir_expr_to_expr(&filter.right, params)?;
|
let right = ir_expr_to_expr(&filter.right, params, None)?;
|
||||||
return Some(array_has(left, right));
|
return Some(array_has(left, right));
|
||||||
}
|
}
|
||||||
|
|
||||||
let left = ir_expr_to_expr(&filter.left, params)?;
|
// A literal/param operand is coerced to the OTHER operand's column type so
|
||||||
let right = ir_expr_to_expr(&filter.right, params)?;
|
// the predicate stays a direct `col OP literal` and the scalar index is used.
|
||||||
|
// Without this, DataFusion widens a narrow column (`CAST(col AS Int64)`),
|
||||||
|
// which defeats the BTREE (validated by `probe_scalar_index_use_under_literal_type`).
|
||||||
|
let left_col_type = prop_data_type(&filter.left, schema);
|
||||||
|
let right_col_type = prop_data_type(&filter.right, schema);
|
||||||
|
let left = ir_expr_to_expr(&filter.left, params, right_col_type.as_ref())?;
|
||||||
|
let right = ir_expr_to_expr(&filter.right, params, left_col_type.as_ref())?;
|
||||||
Some(match filter.op {
|
Some(match filter.op {
|
||||||
CompOp::Eq => left.eq(right),
|
CompOp::Eq => left.eq(right),
|
||||||
CompOp::Ne => left.not_eq(right),
|
CompOp::Ne => left.not_eq(right),
|
||||||
|
|
@ -2052,19 +2067,74 @@ pub(super) fn ir_filter_to_expr(
|
||||||
pub(super) fn ir_expr_to_expr(
|
pub(super) fn ir_expr_to_expr(
|
||||||
expr: &IRExpr,
|
expr: &IRExpr,
|
||||||
params: &ParamMap,
|
params: &ParamMap,
|
||||||
|
target: Option<&arrow_schema::DataType>,
|
||||||
) -> Option<datafusion::prelude::Expr> {
|
) -> Option<datafusion::prelude::Expr> {
|
||||||
use datafusion::prelude::{col, lit};
|
use datafusion::prelude::col;
|
||||||
match expr {
|
match expr {
|
||||||
IRExpr::PropAccess { property, .. } => Some(col(property)),
|
IRExpr::PropAccess { property, .. } => Some(col(property)),
|
||||||
IRExpr::Literal(l) => literal_to_expr(l),
|
IRExpr::Literal(l) => literal_to_expr_coerced(l, target),
|
||||||
IRExpr::Param(name) => params.get(name).and_then(literal_to_expr),
|
IRExpr::Param(name) => params
|
||||||
|
.get(name)
|
||||||
|
.and_then(|l| literal_to_expr_coerced(l, target)),
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convert a Literal to a DataFusion `Expr`. Returns `None` for List
|
/// The Arrow type of a `PropAccess` operand, looked up in the scan's schema, or
|
||||||
/// (which the existing SQL path also can't pushdown — falls through to
|
/// `None` if the expr is not a column or the schema/field is unavailable.
|
||||||
/// post-scan in-memory application).
|
fn prop_data_type(expr: &IRExpr, schema: Option<&Schema>) -> Option<arrow_schema::DataType> {
|
||||||
|
match expr {
|
||||||
|
IRExpr::PropAccess { property, .. } => schema?
|
||||||
|
.field_with_name(property)
|
||||||
|
.ok()
|
||||||
|
.map(|f| f.data_type().clone()),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lower a literal for pushdown, coercing it to `target` (the comparison
|
||||||
|
/// column's Arrow type) when known. Falls back to the natural-type
|
||||||
|
/// `literal_to_expr` on a missing target or any coercion failure, so a filter is
|
||||||
|
/// never demoted to `None` by coercion (a node scan has no in-memory fallback for
|
||||||
|
/// inline filters — see `execute_node_scan`).
|
||||||
|
fn literal_to_expr_coerced(
|
||||||
|
lit: &Literal,
|
||||||
|
target: Option<&arrow_schema::DataType>,
|
||||||
|
) -> Option<datafusion::prelude::Expr> {
|
||||||
|
if let Some(target) = target {
|
||||||
|
if let Some(e) = literal_to_typed_expr(lit, target) {
|
||||||
|
return Some(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
literal_to_expr(lit)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a literal as a typed Arrow scalar matching `target`, reusing the same
|
||||||
|
/// `literal_to_array` + `arrow_cast` path as the in-memory arm
|
||||||
|
/// (`projection.rs::evaluate_filter`) so the two arms agree. Returns `None` on
|
||||||
|
/// any failure (unbuildable literal, incompatible cast) — the caller then falls
|
||||||
|
/// back to the natural-type literal.
|
||||||
|
fn literal_to_typed_expr(
|
||||||
|
lit: &Literal,
|
||||||
|
target: &arrow_schema::DataType,
|
||||||
|
) -> Option<datafusion::prelude::Expr> {
|
||||||
|
use datafusion::prelude::lit as df_lit;
|
||||||
|
let arr = super::projection::literal_to_array(lit, 1).ok()?;
|
||||||
|
let casted = if arr.data_type() == target {
|
||||||
|
arr
|
||||||
|
} else {
|
||||||
|
arrow_cast::cast::cast(&arr, target).ok()?
|
||||||
|
};
|
||||||
|
let scalar = datafusion::scalar::ScalarValue::try_from_array(&casted, 0).ok()?;
|
||||||
|
Some(df_lit(scalar))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a Literal to a DataFusion `Expr` in its NATURAL Arrow type. This is
|
||||||
|
/// the fallback used when the comparison column's type is unknown (no schema) or
|
||||||
|
/// when coercion to it fails; the typed, column-matched coercion that keeps
|
||||||
|
/// scalar indexes usable lives in `literal_to_typed_expr`. Returns `None` for
|
||||||
|
/// List (the SQL path also could not pushdown it — falls through to post-scan
|
||||||
|
/// in-memory application).
|
||||||
fn literal_to_expr(lit: &Literal) -> Option<datafusion::prelude::Expr> {
|
fn literal_to_expr(lit: &Literal) -> Option<datafusion::prelude::Expr> {
|
||||||
use datafusion::prelude::lit as df_lit;
|
use datafusion::prelude::lit as df_lit;
|
||||||
Some(match lit {
|
Some(match lit {
|
||||||
|
|
@ -2073,24 +2143,14 @@ fn literal_to_expr(lit: &Literal) -> Option<datafusion::prelude::Expr> {
|
||||||
Literal::Integer(n) => df_lit(*n),
|
Literal::Integer(n) => df_lit(*n),
|
||||||
Literal::Float(f) => df_lit(*f),
|
Literal::Float(f) => df_lit(*f),
|
||||||
Literal::Bool(b) => df_lit(*b),
|
Literal::Bool(b) => df_lit(*b),
|
||||||
// Date/DateTime columns are physically Date32/Date64 (see the loader's
|
// Date/DateTime pass through as strings here. Against a typed Date
|
||||||
// `to_arrow`). Lower the literal to the matching TYPED Arrow scalar so
|
// column DataFusion casts the LITERAL (`CAST(Utf8 AS Date32)`), which is
|
||||||
// the predicate stays a direct column comparison and the persisted
|
// index-safe (proven by `scalar_index_use_requires_matched_literal_type`).
|
||||||
// BTREE is used. A Utf8 literal would force DataFusion to coerce one
|
// At real pushdown sites the schema is known, so `literal_to_typed_expr`
|
||||||
// side; if it casts the column (`CAST(col AS Utf8)`) the scalar index
|
// produces a typed Date32/Date64 anyway; this branch is only the
|
||||||
// is defeated and the scan degrades to a full filtered read. This
|
// no-schema fallback.
|
||||||
// matches the already-typed in-memory comparison arm in
|
Literal::Date(s) => df_lit(s.clone()),
|
||||||
// `projection.rs::literal_to_array`. On a malformed literal, fall back
|
Literal::DateTime(s) => df_lit(s.clone()),
|
||||||
// to the Utf8 string so pushdown behavior never regresses (the
|
|
||||||
// in-memory path surfaces the parse error if it is load-bearing).
|
|
||||||
Literal::Date(s) => match crate::loader::parse_date32_literal(s) {
|
|
||||||
Ok(days) => df_lit(datafusion::scalar::ScalarValue::Date32(Some(days))),
|
|
||||||
Err(_) => df_lit(s.clone()),
|
|
||||||
},
|
|
||||||
Literal::DateTime(s) => match crate::loader::parse_date64_literal(s) {
|
|
||||||
Ok(ms) => df_lit(datafusion::scalar::ScalarValue::Date64(Some(ms))),
|
|
||||||
Err(_) => df_lit(s.clone()),
|
|
||||||
},
|
|
||||||
Literal::List(_) => return None,
|
Literal::List(_) => return None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
@ -2305,35 +2365,148 @@ mod literal_lowering_tests {
|
||||||
use datafusion::prelude::Expr;
|
use datafusion::prelude::Expr;
|
||||||
use datafusion::scalar::ScalarValue;
|
use datafusion::scalar::ScalarValue;
|
||||||
|
|
||||||
// Date/DateTime filter literals must lower to TYPED Arrow scalars
|
// With the column type known, the generic coercion types a date literal to
|
||||||
// (Date32 / Date64), not Utf8 strings. A Utf8 literal against a typed
|
// the column's Date32/Date64 (the live pushdown path). Without a target it
|
||||||
// Date column forces DataFusion to coerce one side; if it casts the
|
// is the natural Utf8 fallback, which is still index-safe for dates because
|
||||||
// column (`CAST(col AS Utf8)`) the persisted BTREE is defeated and the
|
// DataFusion casts the LITERAL, not the column (proven by
|
||||||
// scan falls back to a full filtered read. A typed literal keeps the
|
// `lance_surface_guards::scalar_index_use_requires_matched_literal_type`).
|
||||||
// predicate a direct column comparison so the scalar index is used.
|
|
||||||
#[test]
|
#[test]
|
||||||
fn date_literals_lower_to_typed_arrow_scalars() {
|
fn date_literals_coerce_to_typed_arrow_scalars() {
|
||||||
let dt = literal_to_expr(&Literal::DateTime("2024-06-01T12:00:00Z".into())).unwrap();
|
use arrow_schema::DataType;
|
||||||
|
let dt = literal_to_expr_coerced(
|
||||||
|
&Literal::DateTime("2024-06-01T12:00:00Z".into()),
|
||||||
|
Some(&DataType::Date64),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
assert!(
|
assert!(
|
||||||
matches!(dt, Expr::Literal(ScalarValue::Date64(Some(_)), ..)),
|
matches!(dt, Expr::Literal(ScalarValue::Date64(Some(_)), ..)),
|
||||||
"DateTime literal must lower to a typed Date64 scalar, got {dt:?}"
|
"DateTime vs Date64 column must coerce to a typed Date64, got {dt:?}"
|
||||||
);
|
);
|
||||||
let d = literal_to_expr(&Literal::Date("2024-06-01".into())).unwrap();
|
let d = literal_to_expr_coerced(&Literal::Date("2024-06-01".into()), Some(&DataType::Date32))
|
||||||
|
.unwrap();
|
||||||
assert!(
|
assert!(
|
||||||
matches!(d, Expr::Literal(ScalarValue::Date32(Some(_)), ..)),
|
matches!(d, Expr::Literal(ScalarValue::Date32(Some(_)), ..)),
|
||||||
"Date literal must lower to a typed Date32 scalar, got {d:?}"
|
"Date vs Date32 column must coerce to a typed Date32, got {d:?}"
|
||||||
|
);
|
||||||
|
let nat = literal_to_expr_coerced(&Literal::Date("2024-06-01".into()), None).unwrap();
|
||||||
|
assert!(
|
||||||
|
matches!(nat, Expr::Literal(ScalarValue::Utf8(Some(_)), ..)),
|
||||||
|
"no target should keep the natural Utf8 date literal, got {nat:?}"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// A malformed date string must not panic or error in the (infallible)
|
// A malformed date string makes coercion fail, so it falls back to the
|
||||||
// lowering: it falls back to the Utf8 literal so pushdown behavior never
|
// natural Utf8 literal rather than dropping the predicate to None.
|
||||||
// regresses (the in-memory path surfaces the parse error if it matters).
|
|
||||||
#[test]
|
#[test]
|
||||||
fn malformed_date_literal_falls_back_to_string() {
|
fn malformed_date_literal_falls_back_to_string() {
|
||||||
let bad = literal_to_expr(&Literal::DateTime("not-a-date".into())).unwrap();
|
use arrow_schema::DataType;
|
||||||
|
let bad = literal_to_expr_coerced(
|
||||||
|
&Literal::DateTime("not-a-date".into()),
|
||||||
|
Some(&DataType::Date64),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
assert!(
|
assert!(
|
||||||
matches!(bad, Expr::Literal(ScalarValue::Utf8(Some(_)), ..)),
|
matches!(bad, Expr::Literal(ScalarValue::Utf8(Some(_)), ..)),
|
||||||
"malformed DateTime literal should fall back to a Utf8 literal, got {bad:?}"
|
"malformed DateTime literal should fall back to a Utf8 literal, got {bad:?}"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// With a column target, a literal lowers to the column's EXACT Arrow type
|
||||||
|
// (not its natural width), so DataFusion does not widen and cast the column
|
||||||
|
// — keeping the scalar BTREE usable. See
|
||||||
|
// `lance_surface_guards::scalar_index_use_requires_matched_literal_type`.
|
||||||
|
#[test]
|
||||||
|
fn integer_literal_coerces_to_narrow_column_type() {
|
||||||
|
use arrow_schema::DataType;
|
||||||
|
let i32_lit = literal_to_expr_coerced(&Literal::Integer(5), Some(&DataType::Int32)).unwrap();
|
||||||
|
assert!(
|
||||||
|
matches!(i32_lit, Expr::Literal(ScalarValue::Int32(Some(5)), ..)),
|
||||||
|
"integer literal vs Int32 column must lower to Int32, got {i32_lit:?}"
|
||||||
|
);
|
||||||
|
let u32_lit = literal_to_expr_coerced(&Literal::Integer(7), Some(&DataType::UInt32)).unwrap();
|
||||||
|
assert!(
|
||||||
|
matches!(u32_lit, Expr::Literal(ScalarValue::UInt32(Some(7)), ..)),
|
||||||
|
"integer literal vs UInt32 column must lower to UInt32, got {u32_lit:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn float_literal_coerces_to_f32_column_type() {
|
||||||
|
use arrow_schema::DataType;
|
||||||
|
let f32_lit =
|
||||||
|
literal_to_expr_coerced(&Literal::Float(1.5), Some(&DataType::Float32)).unwrap();
|
||||||
|
assert!(
|
||||||
|
matches!(f32_lit, Expr::Literal(ScalarValue::Float32(Some(_)), ..)),
|
||||||
|
"float literal vs Float32 column must lower to Float32, got {f32_lit:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// No target (caller without a schema) keeps the natural width — the existing
|
||||||
|
// fallback, so behavior never regresses where the column type is unknown.
|
||||||
|
#[test]
|
||||||
|
fn literal_without_target_keeps_natural_width() {
|
||||||
|
let nat = literal_to_expr_coerced(&Literal::Integer(5), None).unwrap();
|
||||||
|
assert!(
|
||||||
|
matches!(nat, Expr::Literal(ScalarValue::Int64(Some(5)), ..)),
|
||||||
|
"no target should keep the natural Int64 width, got {nat:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// True if either operand of a binary comparison is an Int32 literal.
|
||||||
|
fn binary_has_int32_literal(e: &Expr) -> bool {
|
||||||
|
if let Expr::BinaryExpr(b) = e {
|
||||||
|
[b.left.as_ref(), b.right.as_ref()]
|
||||||
|
.iter()
|
||||||
|
.any(|side| matches!(side, Expr::Literal(ScalarValue::Int32(Some(_)), ..)))
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn int32_schema() -> arrow_schema::Schema {
|
||||||
|
use arrow_schema::{DataType, Field};
|
||||||
|
arrow_schema::Schema::new(vec![Field::new("count", DataType::Int32, true)])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn count_prop() -> IRExpr {
|
||||||
|
IRExpr::PropAccess {
|
||||||
|
variable: "m".into(),
|
||||||
|
property: "count".into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Coercion is operator-independent: a range comparison's literal coerces to
|
||||||
|
// the column type just like equality does, so range filters on a narrow
|
||||||
|
// numeric column keep the BTREE.
|
||||||
|
#[test]
|
||||||
|
fn ir_filter_coerces_literal_for_range_op() {
|
||||||
|
let schema = int32_schema();
|
||||||
|
let filter = IRFilter {
|
||||||
|
left: count_prop(),
|
||||||
|
op: CompOp::Ge,
|
||||||
|
right: IRExpr::Literal(Literal::Integer(2)),
|
||||||
|
};
|
||||||
|
let expr = ir_filter_to_expr(&filter, &ParamMap::new(), Some(&schema)).unwrap();
|
||||||
|
assert!(
|
||||||
|
binary_has_int32_literal(&expr),
|
||||||
|
"range-op literal must coerce to the Int32 column type, got {expr:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The column may be on either side; the literal coerces to the opposite
|
||||||
|
// operand's column type regardless of order (`5 < count`).
|
||||||
|
#[test]
|
||||||
|
fn ir_filter_coerces_literal_when_column_is_on_the_right() {
|
||||||
|
let schema = int32_schema();
|
||||||
|
let filter = IRFilter {
|
||||||
|
left: IRExpr::Literal(Literal::Integer(2)),
|
||||||
|
op: CompOp::Lt,
|
||||||
|
right: count_prop(),
|
||||||
|
};
|
||||||
|
let expr = ir_filter_to_expr(&filter, &ParamMap::new(), Some(&schema)).unwrap();
|
||||||
|
assert!(
|
||||||
|
binary_has_int32_literal(&expr),
|
||||||
|
"reversed-operand literal must coerce to the Int32 column type, got {expr:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -647,3 +647,94 @@ async fn value_index_uncovered_count(ds: &Dataset) -> usize {
|
||||||
// is never mistaken for full coverage.
|
// is never mistaken for full coverage.
|
||||||
frag_ids.len()
|
frag_ids.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Guard 16: scalar index use requires a literal matching the column type ---
|
||||||
|
//
|
||||||
|
// Pins the substrate behavior the pushdown literal-coercion fix relies on
|
||||||
|
// (`query.rs::literal_to_typed_expr`): Lance uses the BTREE only when the filter
|
||||||
|
// is `column OP literal` with a matching type. A width-mismatched literal makes
|
||||||
|
// DataFusion widen and cast the COLUMN (`CAST(n32 AS Int64)`), which drops the
|
||||||
|
// scalar index and full-scans. Temporal columns are immune (DataFusion casts the
|
||||||
|
// Utf8 LITERAL to the date type, not the column). If a Lance/DataFusion bump
|
||||||
|
// changes either coercion direction, this turns red — re-validate the fix.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scalar_index_use_requires_matched_literal_type() {
|
||||||
|
use datafusion::physical_plan::displayable;
|
||||||
|
use datafusion::prelude::{col, lit};
|
||||||
|
use datafusion::scalar::ScalarValue;
|
||||||
|
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().join("probe_literal_type.lance");
|
||||||
|
let uri = uri.to_str().unwrap();
|
||||||
|
|
||||||
|
let schema = Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Utf8, false),
|
||||||
|
Field::new("n32", DataType::Int32, false),
|
||||||
|
Field::new("d32", DataType::Date32, false),
|
||||||
|
]));
|
||||||
|
let batch = RecordBatch::try_new(
|
||||||
|
schema.clone(),
|
||||||
|
vec![
|
||||||
|
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
|
||||||
|
Arc::new(Int32Array::from(vec![1, 5, 9, 13])),
|
||||||
|
Arc::new(arrow_array::Date32Array::from(vec![19000, 19723, 20000, 20500])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
|
||||||
|
let params = WriteParams {
|
||||||
|
mode: WriteMode::Create,
|
||||||
|
enable_stable_row_ids: true,
|
||||||
|
data_storage_version: Some(LanceFileVersion::V2_2),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let mut ds = Dataset::write(reader, uri, Some(params)).await.unwrap();
|
||||||
|
for c in ["n32", "d32"] {
|
||||||
|
ds.create_index_builder(&[c], IndexType::BTree, &ScalarIndexParams::default())
|
||||||
|
.replace(true)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn plan_str(ds: &Dataset, filter: datafusion::prelude::Expr) -> String {
|
||||||
|
let mut scanner = ds.scan();
|
||||||
|
scanner.filter_expr(filter);
|
||||||
|
let plan = scanner.create_plan().await.unwrap();
|
||||||
|
format!("{}", displayable(plan.as_ref()).indent(true))
|
||||||
|
}
|
||||||
|
|
||||||
|
// (label, filter, expect_index_used)
|
||||||
|
let cases = [
|
||||||
|
("n32 = 5i32 (matched Int32)", col("n32").eq(lit(5i32)), true),
|
||||||
|
("n32 = 5i64 (widened Int64)", col("n32").eq(lit(5i64)), false),
|
||||||
|
(
|
||||||
|
"d32 = Date32 (matched)",
|
||||||
|
col("d32").eq(lit(ScalarValue::Date32(Some(19723)))),
|
||||||
|
true,
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"d32 = '2024-01-01' (Utf8 vs Date32)",
|
||||||
|
col("d32").eq(lit("2024-01-01")),
|
||||||
|
true,
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (label, filter, expect_index) in cases {
|
||||||
|
let s = plan_str(&ds, filter).await;
|
||||||
|
let uses_index = s.contains("ScalarIndexQuery");
|
||||||
|
assert_eq!(
|
||||||
|
uses_index, expect_index,
|
||||||
|
"[{label}] expected scalar-index use = {expect_index}, got {uses_index}.\n\
|
||||||
|
A change here means Lance/DataFusion shifted its coercion or index \
|
||||||
|
pushdown; re-validate query.rs::literal_to_typed_expr.\nplan:\n{s}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The widened case must show the index-defeating column CAST (the precise
|
||||||
|
// shape the fix avoids by coercing the literal to the column type).
|
||||||
|
let widened = plan_str(&ds, col("n32").eq(lit(5i64))).await;
|
||||||
|
assert!(
|
||||||
|
widened.contains("CAST(n32 AS Int64)"),
|
||||||
|
"expected a column-side cast in the widened plan, got:\n{widened}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ node Metric {
|
||||||
name: String @key
|
name: String @key
|
||||||
score: F64?
|
score: F64?
|
||||||
ratio: F32?
|
ratio: F32?
|
||||||
|
count: I32?
|
||||||
active: Bool?
|
active: Bool?
|
||||||
born: Date?
|
born: Date?
|
||||||
seen: DateTime?
|
seen: DateTime?
|
||||||
|
|
@ -26,10 +27,10 @@ node Metric {
|
||||||
"#;
|
"#;
|
||||||
|
|
||||||
// Seeds partition every predicate, so a dropped filter returns all 4 rows.
|
// Seeds partition every predicate, so a dropped filter returns all 4 rows.
|
||||||
const DATA: &str = r#"{"type":"Metric","data":{"name":"m1","score":2.5,"ratio":0.5,"active":true,"born":"2024-06-01","seen":"2024-06-01T12:00:00Z"}}
|
const DATA: &str = r#"{"type":"Metric","data":{"name":"m1","score":2.5,"ratio":0.5,"count":1,"active":true,"born":"2024-06-01","seen":"2024-06-01T12:00:00Z"}}
|
||||||
{"type":"Metric","data":{"name":"m2","score":1.0,"ratio":0.25,"active":false,"born":"2023-01-01","seen":"2023-01-01T00:00:00Z"}}
|
{"type":"Metric","data":{"name":"m2","score":1.0,"ratio":0.25,"count":2,"active":false,"born":"2023-01-01","seen":"2023-01-01T00:00:00Z"}}
|
||||||
{"type":"Metric","data":{"name":"m3","score":3.0,"ratio":0.75,"active":true,"born":"2025-01-01","seen":"2025-01-01T00:00:00Z"}}
|
{"type":"Metric","data":{"name":"m3","score":3.0,"ratio":0.75,"count":3,"active":true,"born":"2025-01-01","seen":"2025-01-01T00:00:00Z"}}
|
||||||
{"type":"Metric","data":{"name":"m4","score":0.5,"ratio":0.1,"active":false,"born":"2022-12-31","seen":"2022-01-01T00:00:00Z"}}"#;
|
{"type":"Metric","data":{"name":"m4","score":0.5,"ratio":0.1,"count":4,"active":false,"born":"2022-12-31","seen":"2022-01-01T00:00:00Z"}}"#;
|
||||||
|
|
||||||
async fn metric_db(dir: &tempfile::TempDir) -> Omnigraph {
|
async fn metric_db(dir: &tempfile::TempDir) -> Omnigraph {
|
||||||
let uri = dir.path().to_str().unwrap();
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
|
@ -67,6 +68,31 @@ query inline() { match { $m: Metric { score: 3.0 } } return { $m.name } }
|
||||||
assert_eq!(sorted_metric_names(&mut db, q, "inline").await, vec!["m3"]);
|
assert_eq!(sorted_metric_names(&mut db, q, "inline").await, vec!["m3"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inline-binding equality is the Lance-pushdown arm. With the literal coerced to
|
||||||
|
// the column's exact Arrow type, a narrow-numeric column (I32) and an F32 column
|
||||||
|
// must still select the right rows — the coercion changes the literal's type, not
|
||||||
|
// the result set. (The index-use win this enables is pinned at the Lance-surface
|
||||||
|
// layer by `lance_surface_guards::scalar_index_use_requires_matched_literal_type`.)
|
||||||
|
#[tokio::test]
|
||||||
|
async fn int_and_f32_literal_pushdown_coercion() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let mut db = metric_db(&dir).await;
|
||||||
|
let q = r#"
|
||||||
|
query count_eq() { match { $m: Metric { count: 2 } } return { $m.name } }
|
||||||
|
query ratio_eq() { match { $m: Metric { ratio: 0.25 } } return { $m.name } }
|
||||||
|
query count_ge() { match { $m: Metric $m.count >= 3 } return { $m.name } }
|
||||||
|
"#;
|
||||||
|
// I32 column, integer literal coerced Int64 -> Int32: count == 2 is m2 only.
|
||||||
|
assert_eq!(sorted_metric_names(&mut db, q, "count_eq").await, vec!["m2"]);
|
||||||
|
// F32 column, float literal coerced Float64 -> Float32: ratio == 0.25 is m2.
|
||||||
|
assert_eq!(sorted_metric_names(&mut db, q, "ratio_eq").await, vec!["m2"]);
|
||||||
|
// Range on the I32 column: count 3,4 >= 3 -> m3, m4 (coercion is op-independent).
|
||||||
|
assert_eq!(
|
||||||
|
sorted_metric_names(&mut db, q, "count_ge").await,
|
||||||
|
vec!["m3", "m4"]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn bool_literal_filters_execute() {
|
async fn bool_literal_filters_execute() {
|
||||||
let dir = tempfile::tempdir().unwrap();
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue