fix(engine): preserve identifier case in filter pushdown (#283) (#285)

* test(engine): regression tests for #283 camelCase property filters

Red against current code. A query (or chained mutation) that filters on a
camelCase schema field lints and plans cleanly but fails at run time with
"No field named reponame" because the identifier's case is destroyed at the
engine->Lance boundary.

Coverage added:
- query.rs unit: ir_filter_to_expr on a camelCase property must emit an
  Expr::Column named `repoName`, not `reponame` (red); plus a green coercion
  guard that a camelCase int column still gets a coerced literal.
- mutation.rs unit: predicate_to_sql must emit the column UNQUOTED and
  case-preserved (green guard documenting the committed-scan contract).
- literal_filters.rs e2e: a camelCase @index field with an inline-binding
  pushdown filter returns the seeded row (red — read pushdown).
- writes.rs e2e: an update+delete on a camelCase predicate, and a chained
  update that re-reads the pending side of scan_with_pending by the same
  camelCase predicate (red — pending MemTable scan).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01FQ1Hf4eXLsJmeLUkTYBEw7

* fix(engine): preserve identifier case in filter pushdown (#283)

Two engine->Lance boundaries lowercased camelCase column identifiers,
breaking any filter on a camelCase schema field even though the IR,
compiler, projection, and in-memory filtering all preserve case.

Read pushdown (exec/query.rs, ir_expr_to_expr): build the column reference
with datafusion::prelude::ident() instead of col(). col() routes through SQL
identifier normalization and lowercases an unquoted identifier
(`repoName` -> `reponame`); ident() builds an unqualified, case-preserved
Column. Property refs here are always bare column names, so there is no
qualified-name handling to lose. No-op for the lowercase columns that work
today.

Pending mutation scan (table_store.rs, scan_pending_batches): the
committed-scan consumer (Lance Scanner::filter(&str)) preserves an unquoted
identifier's case but treats a double-quoted "col" as a string literal, so
predicate_to_sql must keep the column unquoted. The pending side splices that
same unquoted predicate into a DataFusion `SELECT ... WHERE`, which would
lowercase it. Make that path case-preserving by disabling
sql_parser.enable_ident_normalization on its SessionContext rather than
quoting (quoting would match zero committed rows). predicate_to_sql gains
only a clarifying comment; its emitted string is unchanged.

Full engine suite green (579 tests).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01FQ1Hf4eXLsJmeLUkTYBEw7

* docs(dev): case study for #283 camelCase filter bug

Record the root cause, the two-boundary fix (read pushdown col→ident; pending
mutation scan ident-normalization off), and why the obvious symmetric
"quote the column" fix is wrong (Lance reads a double-quoted column as a string
literal and silently matches zero committed rows). Linked from a new
"Case Studies" section in the dev index so the link check passes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01FQ1Hf4eXLsJmeLUkTYBEw7

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-06-19 18:42:56 +03:00 committed by GitHub
parent 3feb23af05
commit 57348cf7fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 424 additions and 3 deletions

View file

@ -477,6 +477,12 @@ fn predicate_to_sql(
}
};
// #283: emit the column UNQUOTED. Lance's `Scanner::filter(&str)` (the
// committed-scan consumer) preserves an unquoted identifier's case but
// treats a double-quoted `"col"` as a string literal, so quoting here
// would silently match zero committed rows. The pending-batch MemTable
// query is instead made case-preserving by disabling DataFusion identifier
// normalization on its `SessionContext` (see `scan_pending_batches`).
Ok(format!("{} {} {}", column, op, value_sql))
}
@ -1477,3 +1483,29 @@ fn enrich_mutation_params(params: &ParamMap) -> Result<ParamMap> {
}
Ok(resolved)
}
#[cfg(test)]
mod predicate_sql_tests {
use super::*;
// #283: a camelCase column in a mutation predicate must be emitted
// UNQUOTED and case-preserved. The committed-scan consumer, Lance's
// `Scanner::filter(&str)`, preserves an unquoted identifier's case but
// treats a double-quoted `"col"` as a string literal (which silently
// matches zero rows), so the predicate string must not quote the column.
// The pending MemTable path stays case-preserving by disabling DataFusion
// identifier normalization on its context, not by quoting here.
#[test]
fn predicate_to_sql_preserves_camelcase_column_unquoted() {
let predicate = IRMutationPredicate {
property: "repoName".to_string(),
op: CompOp::Eq,
value: IRExpr::Literal(Literal::String("acme".into())),
};
let sql = predicate_to_sql(&predicate, &ParamMap::new(), false).unwrap();
assert_eq!(
sql, "repoName = 'acme'",
"column must be unquoted and case-preserved, got {sql}"
);
}
}

View file

@ -2149,9 +2149,13 @@ pub(super) fn ir_expr_to_expr(
params: &ParamMap,
target: Option<&arrow_schema::DataType>,
) -> Option<datafusion::prelude::Expr> {
use datafusion::prelude::col;
use datafusion::prelude::ident;
match expr {
IRExpr::PropAccess { property, .. } => Some(col(property)),
// #283: `ident()` preserves the identifier's case. `col()` would route
// through SQL identifier normalization and lowercase an unquoted
// camelCase column (`repoName` → `reponame`), which then fails to
// resolve against the case-sensitive Lance/Arrow schema.
IRExpr::PropAccess { property, .. } => Some(ident(property)),
IRExpr::Literal(l) => literal_to_expr_coerced(l, target),
IRExpr::Param(name) => params
.get(name)
@ -2656,4 +2660,61 @@ mod literal_lowering_tests {
"reversed-operand literal must coerce to the Int32 column type, got {expr:?}"
);
}
// Name of the left operand's column in a binary comparison `col OP lit`.
fn binary_left_column_name(e: &Expr) -> Option<String> {
match e {
Expr::BinaryExpr(b) => match b.left.as_ref() {
Expr::Column(c) => Some(c.name.clone()),
_ => None,
},
_ => None,
}
}
// #283: a camelCase property must reach the scan as its exact column name,
// not a SQL-normalized (lowercased) one. `col()` lowercases unquoted
// identifiers; the pushed-down column ref must stay `repoName`.
#[test]
fn ir_filter_preserves_camelcase_column_name() {
use arrow_schema::{DataType, Field};
let schema = arrow_schema::Schema::new(vec![Field::new("repoName", DataType::Utf8, true)]);
let filter = IRFilter {
left: IRExpr::PropAccess {
variable: "d".into(),
property: "repoName".into(),
},
op: CompOp::Eq,
right: IRExpr::Literal(Literal::String("acme".into())),
};
let expr = ir_filter_to_expr(&filter, &ParamMap::new(), Some(&schema)).unwrap();
assert_eq!(
binary_left_column_name(&expr).as_deref(),
Some("repoName"),
"camelCase column must be preserved (not lowercased to `reponame`), got {expr:?}"
);
}
// Index preservation: a camelCase numeric column still coerces its literal
// (so the scalar BTREE stays eligible) — the col→ident fix must not disturb
// the coercion path (which resolves the column type via field_with_name).
#[test]
fn ir_filter_coerces_literal_for_camelcase_int_column() {
use arrow_schema::{DataType, Field};
let schema =
arrow_schema::Schema::new(vec![Field::new("itemCount", DataType::Int32, true)]);
let filter = IRFilter {
left: IRExpr::PropAccess {
variable: "m".into(),
property: "itemCount".into(),
},
op: CompOp::Eq,
right: IRExpr::Literal(Literal::Integer(2)),
};
let expr = ir_filter_to_expr(&filter, &ParamMap::new(), Some(&schema)).unwrap();
assert!(
binary_has_int32_literal(&expr),
"camelCase int column must keep its coerced Int32 literal (BTREE-eligible), got {expr:?}"
);
}
}

View file

@ -1883,7 +1883,15 @@ async fn scan_pending_batches(
filter: Option<&str>,
) -> Result<Vec<RecordBatch>> {
let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
let ctx = datafusion::execution::context::SessionContext::new();
// #283: disable SQL identifier normalization so an unquoted camelCase
// column in `filter` (e.g. `repoName = 'acme'`, emitted unquoted by
// `predicate_to_sql` because the committed Lance scan needs it unquoted)
// is matched case-preserving against the case-sensitive MemTable schema.
// Without this, DataFusion lowercases `repoName` → `reponame` and fails to
// resolve. Quoted identifiers (the projection list below) are unaffected.
let mut config = datafusion::execution::context::SessionConfig::new();
config.options_mut().sql_parser.enable_ident_normalization = false;
let ctx = datafusion::execution::context::SessionContext::new_with_config(config);
let mem = datafusion::datasource::MemTable::try_new(schema, vec![pending_batches.to_vec()])
.map_err(|e| OmniError::Lance(e.to_string()))?;
ctx.register_table("pending", Arc::new(mem))

View file

@ -145,3 +145,29 @@ query seen_eq() { match { $m: Metric { seen: datetime("2024-06-01T12:00:00Z") }
assert_eq!(sorted_metric_names(&mut db, q, "born_eq").await, vec!["m1"]);
assert_eq!(sorted_metric_names(&mut db, q, "seen_eq").await, vec!["m1"]);
}
// #283: a property-match on a camelCase `@index` field must execute, not fail
// with "No field named reponame" at the Lance scan. Exercises the pushdown arm
// (inline binding `Doc { repoName: $r }`) end-to-end.
const CC_SCHEMA: &str = r#"
node Doc {
slug: String @key
repoName: String @index
}
"#;
const CC_DATA: &str = r#"{"type":"Doc","data":{"slug":"d1","repoName":"acme"}}
{"type":"Doc","data":{"slug":"d2","repoName":"globex"}}"#;
#[tokio::test]
async fn camelcase_property_filter_executes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CC_SCHEMA).await.unwrap();
load_jsonl(&mut db, CC_DATA, LoadMode::Overwrite).await.unwrap();
let q = r#"query by_repo($r: String) { match { $d: Doc { repoName: $r } } return { $d.slug } }"#;
let r = query_main(&mut db, q, "by_repo", &params(&[("$r", "acme")]))
.await
.expect("camelCase property filter must execute, not fail at the Lance scan");
assert_eq!(r.num_rows(), 1, "expected exactly the d1 row for repoName=acme");
}

View file

@ -1646,3 +1646,70 @@ async fn branch_cascade_delete_forks_node_and_edges_under_held_queues() {
"main must be untouched by the branch delete"
);
}
// #283: a mutation predicate (`where camelField = ...`) on a camelCase column
// must execute, not fail at the Lance scan with "No field named ...". Covers
// both `update` (committed scan via scan_with_pending) and `delete`
// (delete_where), which share the same emitted SQL filter string.
const CC_SCHEMA: &str = r#"
node Doc {
slug: String @key
repoName: String @index
status: String?
}
"#;
const CC_DATA: &str = r#"{"type":"Doc","data":{"slug":"d1","repoName":"acme","status":"open"}}
{"type":"Doc","data":{"slug":"d2","repoName":"globex","status":"open"}}"#;
#[tokio::test]
async fn camelcase_mutation_predicate_updates_and_deletes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CC_SCHEMA).await.unwrap();
load_jsonl(&mut db, CC_DATA, LoadMode::Overwrite).await.unwrap();
let m = r#"
query set_status($repo: String, $st: String) { update Doc set { status: $st } where repoName = $repo }
query del($repo: String) { delete Doc where repoName = $repo }
"#;
let upd = db
.mutate("main", m, "set_status", &params(&[("$repo", "acme"), ("$st", "closed")]))
.await
.expect("update with a camelCase predicate must execute");
assert_eq!(upd.affected_nodes, 1, "exactly the acme Doc should update");
let del = db
.mutate("main", m, "del", &params(&[("$repo", "globex")]))
.await
.expect("delete with a camelCase predicate must execute");
assert_eq!(del.affected_nodes, 1, "exactly the globex Doc should delete");
assert_eq!(count_rows(&db, "node:Doc").await, 1, "one Doc (acme) should remain");
}
// #283 (pending side): a chained mutation whose 2nd op filters a camelCase
// column must read op-1's staged rows through the pending DataFusion `MemTable`
// (`SELECT … WHERE {filter}` via ctx.sql), which lowercases unquoted idents.
// This is the path the single update/delete above does NOT exercise.
#[tokio::test]
async fn camelcase_chained_mutation_reads_pending_by_camelcase() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CC_SCHEMA).await.unwrap();
load_jsonl(&mut db, CC_DATA, LoadMode::Overwrite).await.unwrap();
// op-1 stages a status change to the acme Doc; op-2 re-filters the same
// camelCase column, so it must match op-1's pending row.
let m = r#"
query chain($repo: String) {
update Doc set { status: "stage1" } where repoName = $repo
update Doc set { status: "stage2" } where repoName = $repo
}
"#;
let r = db
.mutate("main", m, "chain", &params(&[("$repo", "acme")]))
.await
.expect("chained camelCase mutation must read the pending row, not fail at the MemTable SELECT");
assert_eq!(r.affected_nodes, 2, "both ops should touch the acme Doc (read-your-writes)");
}