mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-15 01:55:13 +02:00
fix(engine): scalar index coverage + filter literal coercion (query latency) (#216)
* fix(engine): lower date/datetime filter literals as typed Arrow scalars `literal_to_expr` lowered `Date`/`DateTime` query literals as Utf8 strings, relying on DataFusion implicit casts. Against a physical `Date32`/`Date64` column that can coerce the column side (`CAST(col AS Utf8)`), which defeats a scalar BTREE and degrades the scan to a full filtered read. Lower to typed `Date32`/`Date64` scalars instead (reusing the loader's `parse_date32_literal`/`parse_date64_literal`, already used by the in-memory comparison arm), so the predicate stays a direct column comparison and the index is used. Malformed literals fall back to the Utf8 string so pushdown behavior never regresses. Tests: unit goldens asserting the lowered literal is typed (red before, green after) + inline-binding pushdown equality in literal_filters confirming the epoch conversion selects the right rows. * fix(engine): build scalar BTREE for enum and orderable-scalar @index columns `build_indices_on_dataset_for_catalog` only handled `String` (-> FTS) and `Vector` (-> vector). Enums are physically `String`, so an enum `@index` column (e.g. `status`) got an FTS inverted index, which Lance never consults for `=`; and `DateTime`/`Date`/numeric/`Bool` `@index` columns fell through and built nothing. Both meant equality/range filters degraded to full scans with `indices_loaded=0`. Dispatch index kind by property type via a shared `node_prop_index_kind`: enum + orderable scalar -> BTREE, free-text String -> FTS, Vector -> vector, list/Blob -> none. The helper is shared by the builder and `needs_index_work_node` so they cannot drift — the latter decides recovery- sidecar pinning, and under-reporting would leave a HEAD-advancing index build uncovered (invariant 5). Tests: scalar_indexes.rs asserts enum/DateTime/numeric @index columns report `IndexCoverage::Indexed` while free-text String/un-annotated columns stay `Degraded` (negative control). Docs: docs/user/indexes.md. * feat(engine): reindex in optimize to keep index coverage current A scalar/FTS/vector index only covers the fragments it was built over. Rows appended after the build (e.g. `ingest --mode merge`, whose commit does not rebuild an existing index) are scanned unindexed, and `compact_files` rewrites fragments out of coverage. Nothing folded them back in, so coverage decayed as the graph grew — even the id/src/dst BTREEs that power traversal. `optimize_one_table` now runs Lance `optimize_indices` after `compact_files` (incremental merge, not retrain — the same compact->optimize_indices sequence LanceDB's `optimize()` uses) and enters the publish path on compaction work OR stale index coverage (new `TableStore::has_unindexed_fragments`, reusing the fragment_bitmap logic). `optimize_indices` is a committing call with no uncommitted variant in lance-6.0.1, so it is an inline-commit residual covered by the existing `SidecarKind::Optimize` recovery sidecar spanning both ops. Blob-bearing tables are still skipped (the Lance blob-compaction bug is compaction-specific; reindex-for-blob deferred as a noted follow-up). Tests: maintenance.rs asserts an appended fragment is uncovered before and covered after optimize, and idempotency holds (second pass is a no-op). lance_surface_guards pins the `optimize_indices` signature and its incremental- coverage behavior. The existing optimize Phase-B recovery failpoint now also exercises a crash after reindex. Docs: maintenance.md, writes.md, invariants.md, lance.md, AGENTS.md. * fix(engine): coerce pushdown filter literals to the column type Filter literals were pushed to Lance in their natural Arrow type (every integer Int64, every float Float64). Against a narrower indexed column DataFusion widens to the literal's type and casts the COLUMN (`CAST(n32 AS Int64)`), which defeats the scalar BTREE and degrades to a full filtered read. A physical-plan probe confirms it: an Int32 column filtered by an i32 literal uses `ScalarIndexQuery`; by an i64 literal it does not. Thread the scan's `arrow_schema` through `build_lance_filter_expr` -> `ir_filter_to_expr` and coerce each literal operand to the opposite column's exact Arrow type, reusing `projection::literal_to_array` + `arrow_cast` (the same path the in-memory arm uses, so the two arms agree). Coercion never demotes a filter to None: on failure it falls back to the natural literal, because a node scan has no in-memory fallback for inline filters. Supersedes the date-specific change ine4ef67b(PR1): the probe shows dates were never index-defeated — temporal coercion casts the LITERAL, not the column — so PR1's index-use rationale was wrong though harmless. The generic coercion subsumes it; `literal_to_expr`'s date arms revert to the natural Utf8 fallback, and its unit tests now assert the live coerced path. Tests: surface guard `scalar_index_use_requires_matched_literal_type` pins the substrate behavior (matched -> index, widened -> column-cast full scan); unit tests cover Int32/UInt32/Float32 coercion, range op, reversed operand order, and the natural fallback; `literal_filters` adds an I32 column with equality + range and an F32 pushdown case. * fix(engine): only coerce filter literals when the cast is lossless The literal coercion inf064121narrowed unconditionally. typecheck permits numeric cross-type comparisons (`types_compatible`), so an out-of-domain literal reaches `literal_to_typed_expr` and casts lossily: a fractional float vs an integer column truncates (`{ count: 2.7 }` -> `count = 2`, wrongly matching the count=2 row) and an out-of-range integer overflows to null (`count < 3e9` on I32 -> `count < NULL` -> empty). Both silently change results, and a node scan has no in-memory fallback for inline filters. Add a lossless guard for integer targets: round-trip the cast back to the natural type and, on mismatch, return None so the caller keeps the natural literal (correct via DataFusion coercion; the index is just unused for that out-of-domain predicate). Float targets stay coerced -- narrowing F64 -> F32 is the column's own precision domain, not a value error. Resolves the two valid review findings on PR #216 (Codex float truncation, Greptile out-of-range). Tests: unit cases for fractional/out-of-range fallback vs whole-float/in-range coerce vs F32 exemption; e2e `{ count: 2.7 }` returns no rows.
This commit is contained in:
parent
77dffdae92
commit
1bed998052
16 changed files with 917 additions and 85 deletions
|
|
@ -247,10 +247,10 @@ omnigraph policy explain --actor act-alice --action change --branch main
|
|||
| Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables |
|
||||
| Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering |
|
||||
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). The write entry points (`load_as`, `mutate_as`, `apply_schema_as`, `branch_merge_as`) and `refresh` additionally run an in-process roll-forward-only heal (serialized against live writers via the per-table write queues), so a long-lived server converges on its next write without restart; only rollback-eligible sidecars still defer to the next read-write open (a future background reconciler's goal). Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete_where` and `create_vector_index` stay inline until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. |
|
||||
| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **publishes each compacted table's new version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe compaction and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage; **refuses on an unrecovered graph** (errors if a `__recovery` sidecar is pending); **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair` instead of interpreting it; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) |
|
||||
| Compaction (`compact_files`) + reindex (`optimize_indices`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; per table runs `compact_files` **then Lance `optimize_indices`** (folds appended/rewritten fragments back into existing indexes — incremental merge, not retrain) and **publishes the resulting version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe the work and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage spanning both ops; **commits even with no compaction work if index coverage is stale**; **refuses on an unrecovered graph**; **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair`; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent; reindex is skipped for them too today), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) |
|
||||
| Repair uncovered drift | — | `omnigraph repair` explicitly classifies uncovered table `HEAD > manifest` drift: verified maintenance drift (`ReserveFragments`/`Rewrite`) can be published with `--confirm`; suspicious or unverifiable drift requires `--force --confirm`. Sidecar-covered crash residuals still recover automatically on open. |
|
||||
| Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy |
|
||||
| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches |
|
||||
| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them per `@index`/`@key` column, dispatched by type via `node_prop_index_kind` (enum + orderable scalar → BTREE, free-text String → FTS, Vector → vector); idempotent; lazy across branches. Coverage of fragments appended after build is restored by `optimize`'s `optimize_indices` pass (see Compaction row). |
|
||||
| `merge_insert` upsert | ✅ | `LoadMode::Merge`, mutation `update`/`insert`/`delete` lowering |
|
||||
| Vector search | ✅ | `nearest()` query op; embedding pipeline (Gemini / OpenAI clients); `@embed` in schema |
|
||||
| Full-text search | ✅ | `search/fuzzy/match_text/bm25` query ops |
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ use lance::dataset::scanner::ColumnOrdering;
|
|||
use lance::datatypes::BlobKind;
|
||||
use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType};
|
||||
use omnigraph_compiler::schema::parser::parse_schema;
|
||||
use omnigraph_compiler::types::ScalarType;
|
||||
use omnigraph_compiler::types::{PropType, ScalarType};
|
||||
use omnigraph_compiler::{
|
||||
DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind,
|
||||
build_catalog_from_ir, build_schema_ir, plan_schema_migration,
|
||||
|
|
|
|||
|
|
@ -32,6 +32,8 @@ use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
|
|||
use lance::dataset::optimize::{
|
||||
CompactionMetrics, CompactionOptions, compact_files, plan_compaction,
|
||||
};
|
||||
use lance::index::DatasetIndexExt;
|
||||
use lance_index::optimize::OptimizeOptions;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
|
@ -361,16 +363,22 @@ async fn optimize_one_table(
|
|||
}
|
||||
|
||||
// Precise "will it compact?" check — `plan_compaction` also accounts for
|
||||
// deletion materialization (which can rewrite even a single fragment). A
|
||||
// steady-state already-compacted table yields an empty plan and is never
|
||||
// pinned in a sidecar (a zero-commit pin would classify NoMovement on
|
||||
// recovery and force an all-or-nothing rollback). Uncovered pre-existing
|
||||
// drift is skipped above and must go through explicit repair.
|
||||
// deletion materialization (which can rewrite even a single fragment).
|
||||
let options = CompactionOptions::default();
|
||||
let plan = plan_compaction(&ds, &options)
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
if plan.num_tasks() == 0 {
|
||||
let will_compact = plan.num_tasks() > 0;
|
||||
// Even when there is nothing to compact, the table may still have index
|
||||
// work: rows appended since the index was built (e.g. via `ingest --mode
|
||||
// merge`) are scanned unindexed until folded in. Either compaction or stale
|
||||
// index coverage is enough to enter the publish path. If NEITHER, this
|
||||
// table is a no-op and must NOT be pinned in a sidecar — a zero-commit pin
|
||||
// classifies NoMovement on recovery and forces an all-or-nothing rollback
|
||||
// of sibling tables' legitimate work. Uncovered pre-existing manifest/HEAD
|
||||
// drift is skipped above and must go through explicit repair.
|
||||
let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?;
|
||||
if !will_compact && !needs_reindex {
|
||||
return Ok(TableOptimizeStats::compacted(
|
||||
table_key,
|
||||
&CompactionMetrics::default(),
|
||||
|
|
@ -378,8 +386,9 @@ async fn optimize_one_table(
|
|||
));
|
||||
}
|
||||
|
||||
// Phase A: recovery sidecar BEFORE compaction advances the Lance HEAD, so a
|
||||
// crash before the manifest publish rolls forward on next open.
|
||||
// Phase A: recovery sidecar BEFORE any HEAD-advancing op (compaction or
|
||||
// index optimize), so a crash before the manifest publish rolls forward on
|
||||
// next open.
|
||||
let sidecar = crate::db::manifest::new_sidecar(
|
||||
crate::db::manifest::SidecarKind::Optimize,
|
||||
None,
|
||||
|
|
@ -398,11 +407,26 @@ async fn optimize_one_table(
|
|||
let handle =
|
||||
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?;
|
||||
|
||||
// Phase B: compaction (reserve-fragments + rewrite commits advance HEAD).
|
||||
// Phase B: compaction (if any) then incremental index optimize — both
|
||||
// advance Lance HEAD inside the sidecar window. `compact_files` rewrites
|
||||
// fragments and drops them from existing index segments' coverage;
|
||||
// `optimize_indices` folds the rewritten and any previously-unindexed
|
||||
// fragments back in (Lance's incremental merge, not a full retrain). This
|
||||
// is the same compact -> optimize_indices sequencing LanceDB's `optimize()`
|
||||
// uses. `optimize_indices` is an inline-commit residual: lance-6.0.1
|
||||
// exposes no uncommitted variant, so like `compact_files` it commits
|
||||
// directly and relies on the sidecar for recovery.
|
||||
let version_before = ds.version().version;
|
||||
let metrics: CompactionMetrics = compact_files(&mut ds, options, None)
|
||||
let metrics: CompactionMetrics = if will_compact {
|
||||
compact_files(&mut ds, options, None)
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
} else {
|
||||
CompactionMetrics::default()
|
||||
};
|
||||
ds.optimize_indices(&OptimizeOptions::default())
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
.map_err(|e| OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)))?;
|
||||
let version_after = ds.version().version;
|
||||
let committed = version_after != version_before;
|
||||
|
||||
|
|
|
|||
|
|
@ -310,6 +310,48 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// The single scalar/vector index a node property receives from a one-column
|
||||
/// `@index`/`@key` declaration, or `None` when the property type is not
|
||||
/// indexable here (a list column or `Blob`).
|
||||
///
|
||||
/// Shared by `build_indices_on_dataset_for_catalog` (which builds the index)
|
||||
/// and `needs_index_work_node` (which checks coverage to decide recovery-
|
||||
/// sidecar pinning) so the two cannot drift: an enum or orderable scalar the
|
||||
/// builder gives a BTREE must also be reported as "needs work" until that
|
||||
/// BTREE exists, or the HEAD-advancing build would run without sidecar cover.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
enum NodePropIndexKind {
|
||||
Btree,
|
||||
Fts,
|
||||
Vector,
|
||||
}
|
||||
|
||||
fn node_prop_index_kind(prop_type: &PropType) -> Option<NodePropIndexKind> {
|
||||
if prop_type.list {
|
||||
return None;
|
||||
}
|
||||
// Enums are physically `String` but filtered by equality, so they take a
|
||||
// scalar BTREE, not an FTS inverted index (Lance never consults an inverted
|
||||
// index for `=`/range). Free-text Strings keep FTS for
|
||||
// `search()`/`match_text`/`bm25`.
|
||||
let is_enum = prop_type.enum_values.is_some();
|
||||
match prop_type.scalar {
|
||||
ScalarType::String if !is_enum => Some(NodePropIndexKind::Fts),
|
||||
ScalarType::Vector(_) => Some(NodePropIndexKind::Vector),
|
||||
ScalarType::String
|
||||
| ScalarType::DateTime
|
||||
| ScalarType::Date
|
||||
| ScalarType::I32
|
||||
| ScalarType::I64
|
||||
| ScalarType::U32
|
||||
| ScalarType::U64
|
||||
| ScalarType::F32
|
||||
| ScalarType::F64
|
||||
| ScalarType::Bool => Some(NodePropIndexKind::Btree),
|
||||
ScalarType::Blob => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the node table is missing at least one declared
|
||||
/// scalar/vector index that `build_indices_on_dataset_for_catalog` would
|
||||
/// build AND has at least one row (the ensure_indices loop has
|
||||
|
|
@ -318,11 +360,12 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st
|
|||
/// would force `NoMovement` classification on recovery and trigger the
|
||||
/// all-or-nothing rollback of sibling tables' legitimate index work).
|
||||
///
|
||||
/// Per the actual `build_indices_on_dataset_for_catalog` implementation
|
||||
/// (this file, ~line 419-491), nodes get BTree (id) + per-prop FTS
|
||||
/// (@search String) + per-prop Vector indices; edges get BTree only
|
||||
/// (id, src, dst). The two helpers mirror that asymmetry — see the
|
||||
/// `needs_index_work_edge` doc comment.
|
||||
/// Per `build_indices_on_dataset_for_catalog`, nodes get BTree (id) plus, for
|
||||
/// each one-column `@index`/`@key` property, the index `node_prop_index_kind`
|
||||
/// assigns: a scalar BTREE for enums and orderable scalars
|
||||
/// (DateTime/Date/numeric/Bool), FTS for free-text Strings, or a Vector index.
|
||||
/// Edges get BTree only (id, src, dst). This helper and the builder share
|
||||
/// `node_prop_index_kind` so they cannot drift — see its doc comment.
|
||||
async fn needs_index_work_node(
|
||||
db: &Omnigraph,
|
||||
type_name: &str,
|
||||
|
|
@ -359,14 +402,23 @@ async fn needs_index_work_node(
|
|||
let Some(prop_type) = node_type.properties.get(prop_name) else {
|
||||
continue;
|
||||
};
|
||||
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
|
||||
if !db.storage().has_fts_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
match node_prop_index_kind(prop_type) {
|
||||
Some(NodePropIndexKind::Fts) => {
|
||||
if !db.storage().has_fts_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
|
||||
if !db.storage().has_vector_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
Some(NodePropIndexKind::Vector) => {
|
||||
if !db.storage().has_vector_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Some(NodePropIndexKind::Btree) => {
|
||||
if !db.storage().has_btree_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
|
|
@ -615,30 +667,44 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
|
|||
}
|
||||
let prop_name = &index_cols[0];
|
||||
if let Some(prop_type) = node_type.properties.get(prop_name) {
|
||||
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
|
||||
if !db.storage().has_fts_index(ds, prop_name).await? {
|
||||
stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
|
||||
.await?;
|
||||
match node_prop_index_kind(prop_type) {
|
||||
Some(NodePropIndexKind::Fts) => {
|
||||
if !db.storage().has_fts_index(ds, prop_name).await? {
|
||||
stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
|
||||
if !db.storage().has_vector_index(ds, prop_name).await? {
|
||||
// Inline-commit residual: lance-6.0.1 does not
|
||||
// expose `build_index_metadata_from_segments` as
|
||||
// `pub`, so vector indices cannot be staged from
|
||||
// outside the lance crate. Document at the call
|
||||
// site; companion ticket to lance-format/lance#6658.
|
||||
let new_snap = db
|
||||
.storage_inline_residual()
|
||||
.create_vector_index(ds.clone(), prop_name.as_str())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
"create Vector index on {}({}): {}",
|
||||
table_key, prop_name, e
|
||||
))
|
||||
})?;
|
||||
*ds = new_snap;
|
||||
Some(NodePropIndexKind::Vector) => {
|
||||
if !db.storage().has_vector_index(ds, prop_name).await? {
|
||||
// Inline-commit residual: lance-6.0.1 does not
|
||||
// expose `build_index_metadata_from_segments` as
|
||||
// `pub`, so vector indices cannot be staged from
|
||||
// outside the lance crate. Document at the call
|
||||
// site; companion ticket to lance-format/lance#6658.
|
||||
let new_snap = db
|
||||
.storage_inline_residual()
|
||||
.create_vector_index(ds.clone(), prop_name.as_str())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
OmniError::Lance(format!(
|
||||
"create Vector index on {}({}): {}",
|
||||
table_key, prop_name, e
|
||||
))
|
||||
})?;
|
||||
*ds = new_snap;
|
||||
}
|
||||
}
|
||||
// Enum + orderable scalars (DateTime/Date/numeric/Bool)
|
||||
// get a BTREE so `=`, range, IN, and IS NULL are index-
|
||||
// accelerated instead of degrading to a full scan.
|
||||
Some(NodePropIndexKind::Btree) => {
|
||||
if !db.storage().has_btree_index(ds, prop_name).await? {
|
||||
stage_and_commit_btree(db, table_key, ds, &[prop_name.as_str()])
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
// List or Blob column: not indexable as a scalar here.
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,7 +72,11 @@ fn evaluate_expr(batch: &RecordBatch, expr: &IRExpr, params: &ParamMap) -> Resul
|
|||
}
|
||||
|
||||
/// Create a constant array from a literal value.
|
||||
fn literal_to_array(lit: &Literal, num_rows: usize) -> Result<ArrayRef> {
|
||||
///
|
||||
/// `pub(super)` so the pushdown arm (`query.rs::literal_to_typed_expr`) can build
|
||||
/// a literal in the same natural Arrow type and cast it to the column type through
|
||||
/// the identical `arrow_cast` path used here, keeping the two filter arms in sync.
|
||||
pub(super) fn literal_to_array(lit: &Literal, num_rows: usize) -> Result<ArrayRef> {
|
||||
Ok(match lit {
|
||||
Literal::Null => arrow_array::new_null_array(&DataType::Utf8, num_rows),
|
||||
Literal::String(s) => Arc::new(StringArray::from(vec![s.as_str(); num_rows])) as ArrayRef,
|
||||
|
|
|
|||
|
|
@ -1289,10 +1289,12 @@ async fn expand_hydrate_and_align(
|
|||
params: &ParamMap,
|
||||
) -> Result<()> {
|
||||
// Pushable destination filters are applied by `hydrate_nodes`; the rest
|
||||
// (`ir_filter_to_expr` → None) are applied in memory after hconcat.
|
||||
// (`ir_filter_to_expr` → None) are applied in memory after hconcat. The
|
||||
// schema arg only affects a pushable literal's TYPE, never Some-vs-None, so
|
||||
// `None` here yields the same pushable/non-pushable split as `hydrate_nodes`.
|
||||
let non_pushable: Vec<&IRFilter> = dst_filters
|
||||
.iter()
|
||||
.filter(|f| ir_filter_to_expr(f, params).is_none())
|
||||
.filter(|f| ir_filter_to_expr(f, params, None).is_none())
|
||||
.collect();
|
||||
|
||||
// Unique destination ids (first-seen order) for one batched hydration.
|
||||
|
|
@ -1506,7 +1508,8 @@ async fn hydrate_nodes(
|
|||
// `id IN (ids)` AND any pushable destination filters, as a structured Expr.
|
||||
let id_list: Vec<datafusion::prelude::Expr> = ids.iter().map(|id| lit(id.clone())).collect();
|
||||
let mut filter_expr = col("id").in_list(id_list, false);
|
||||
if let Some(dst_expr) = build_lance_filter_expr(dst_filters, params) {
|
||||
if let Some(dst_expr) = build_lance_filter_expr(dst_filters, params, Some(&node_type.arrow_schema))
|
||||
{
|
||||
filter_expr = filter_expr.and(dst_expr);
|
||||
}
|
||||
|
||||
|
|
@ -1747,21 +1750,23 @@ async fn execute_node_scan(
|
|||
let table_key = format!("node:{}", type_name);
|
||||
let ds = snapshot.open(&table_key).await?;
|
||||
|
||||
let node_type = &catalog.node_types[type_name];
|
||||
|
||||
// Lower the IR filters to a DataFusion `Expr` and apply via
|
||||
// `Scanner::filter_expr` inside the configure closure. The string
|
||||
// pushdown path (`build_lance_filter` → `scanner.filter(&str)`) is
|
||||
// gone for node scans — structured Expr unlocks `CompOp::Contains`
|
||||
// pushdown (via `array_has`) and lets DF 53's optimizer rules
|
||||
// (vectorized IN-list, PhysicalExprSimplifier, CASE-NULL shortcut)
|
||||
// reach our predicates. Other call sites that still take string SQL
|
||||
// (hydrate_nodes for the Expand pushdown, count_rows, the mutation
|
||||
// delete path) migrate in follow-up MRs.
|
||||
let filter_expr = build_lance_filter_expr(filters, params);
|
||||
// reach our predicates. Passing the node's `arrow_schema` lets the lowering
|
||||
// coerce literals to each column's exact type so narrow-numeric BTREEs are
|
||||
// used. Other call sites that still take string SQL (count_rows, the
|
||||
// mutation delete path) migrate in follow-up MRs.
|
||||
let filter_expr = build_lance_filter_expr(filters, params, Some(&node_type.arrow_schema));
|
||||
|
||||
// Blob columns must be excluded from scan when a filter is present
|
||||
// (Lance bug: BlobsDescriptions + filter triggers a projection assertion).
|
||||
// We exclude blob columns and add metadata post-scan via take_blobs_by_indices.
|
||||
let node_type = &catalog.node_types[type_name];
|
||||
let has_blobs = !node_type.blob_properties.is_empty();
|
||||
let non_blob_cols: Vec<&str> = node_type
|
||||
.arrow_schema
|
||||
|
|
@ -1990,13 +1995,14 @@ pub(super) fn literal_to_sql(lit: &Literal) -> String {
|
|||
pub(super) fn build_lance_filter_expr(
|
||||
filters: &[IRFilter],
|
||||
params: &ParamMap,
|
||||
schema: Option<&Schema>,
|
||||
) -> Option<datafusion::prelude::Expr> {
|
||||
use datafusion::logical_expr::Operator;
|
||||
use datafusion::prelude::Expr;
|
||||
|
||||
let mut acc: Option<Expr> = None;
|
||||
for f in filters {
|
||||
let Some(e) = ir_filter_to_expr(f, params) else {
|
||||
let Some(e) = ir_filter_to_expr(f, params, schema) else {
|
||||
continue;
|
||||
};
|
||||
acc = Some(match acc {
|
||||
|
|
@ -2017,6 +2023,7 @@ pub(super) fn build_lance_filter_expr(
|
|||
pub(super) fn ir_filter_to_expr(
|
||||
filter: &IRFilter,
|
||||
params: &ParamMap,
|
||||
schema: Option<&Schema>,
|
||||
) -> Option<datafusion::prelude::Expr> {
|
||||
use datafusion::functions_nested::expr_fn::array_has;
|
||||
|
||||
|
|
@ -2027,14 +2034,22 @@ pub(super) fn ir_filter_to_expr(
|
|||
// List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`.
|
||||
// This is the case the old SQL-string pushdown had to return None for
|
||||
// ("Can't pushdown list contains"); with structured Expr it pushes down fine.
|
||||
// (Element-type coercion for the contained value is deferred — list columns
|
||||
// are not scalar-indexed, so the index-eligibility concern below does not apply.)
|
||||
if matches!(filter.op, CompOp::Contains) {
|
||||
let left = ir_expr_to_expr(&filter.left, params)?;
|
||||
let right = ir_expr_to_expr(&filter.right, params)?;
|
||||
let left = ir_expr_to_expr(&filter.left, params, None)?;
|
||||
let right = ir_expr_to_expr(&filter.right, params, None)?;
|
||||
return Some(array_has(left, right));
|
||||
}
|
||||
|
||||
let left = ir_expr_to_expr(&filter.left, params)?;
|
||||
let right = ir_expr_to_expr(&filter.right, params)?;
|
||||
// A literal/param operand is coerced to the OTHER operand's column type so
|
||||
// the predicate stays a direct `col OP literal` and the scalar index is used.
|
||||
// Without this, DataFusion widens a narrow column (`CAST(col AS Int64)`),
|
||||
// which defeats the BTREE (validated by `probe_scalar_index_use_under_literal_type`).
|
||||
let left_col_type = prop_data_type(&filter.left, schema);
|
||||
let right_col_type = prop_data_type(&filter.right, schema);
|
||||
let left = ir_expr_to_expr(&filter.left, params, right_col_type.as_ref())?;
|
||||
let right = ir_expr_to_expr(&filter.right, params, left_col_type.as_ref())?;
|
||||
Some(match filter.op {
|
||||
CompOp::Eq => left.eq(right),
|
||||
CompOp::Ne => left.not_eq(right),
|
||||
|
|
@ -2052,19 +2067,91 @@ pub(super) fn ir_filter_to_expr(
|
|||
pub(super) fn ir_expr_to_expr(
|
||||
expr: &IRExpr,
|
||||
params: &ParamMap,
|
||||
target: Option<&arrow_schema::DataType>,
|
||||
) -> Option<datafusion::prelude::Expr> {
|
||||
use datafusion::prelude::{col, lit};
|
||||
use datafusion::prelude::col;
|
||||
match expr {
|
||||
IRExpr::PropAccess { property, .. } => Some(col(property)),
|
||||
IRExpr::Literal(l) => literal_to_expr(l),
|
||||
IRExpr::Param(name) => params.get(name).and_then(literal_to_expr),
|
||||
IRExpr::Literal(l) => literal_to_expr_coerced(l, target),
|
||||
IRExpr::Param(name) => params
|
||||
.get(name)
|
||||
.and_then(|l| literal_to_expr_coerced(l, target)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a Literal to a DataFusion `Expr`. Returns `None` for List
|
||||
/// (which the existing SQL path also can't pushdown — falls through to
|
||||
/// post-scan in-memory application).
|
||||
/// The Arrow type of a `PropAccess` operand, looked up in the scan's schema, or
|
||||
/// `None` if the expr is not a column or the schema/field is unavailable.
|
||||
fn prop_data_type(expr: &IRExpr, schema: Option<&Schema>) -> Option<arrow_schema::DataType> {
|
||||
match expr {
|
||||
IRExpr::PropAccess { property, .. } => schema?
|
||||
.field_with_name(property)
|
||||
.ok()
|
||||
.map(|f| f.data_type().clone()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Lower a literal for pushdown, coercing it to `target` (the comparison
|
||||
/// column's Arrow type) when known. Falls back to the natural-type
|
||||
/// `literal_to_expr` on a missing target or any coercion failure, so a filter is
|
||||
/// never demoted to `None` by coercion (a node scan has no in-memory fallback for
|
||||
/// inline filters — see `execute_node_scan`).
|
||||
fn literal_to_expr_coerced(
|
||||
lit: &Literal,
|
||||
target: Option<&arrow_schema::DataType>,
|
||||
) -> Option<datafusion::prelude::Expr> {
|
||||
if let Some(target) = target {
|
||||
if let Some(e) = literal_to_typed_expr(lit, target) {
|
||||
return Some(e);
|
||||
}
|
||||
}
|
||||
literal_to_expr(lit)
|
||||
}
|
||||
|
||||
/// Build a literal as a typed Arrow scalar matching `target`, reusing the same
|
||||
/// `literal_to_array` + `arrow_cast` path as the in-memory arm
|
||||
/// (`projection.rs::evaluate_filter`) so the two arms agree. Returns `None` on
|
||||
/// any failure (unbuildable literal, incompatible cast) — the caller then falls
|
||||
/// back to the natural-type literal.
|
||||
///
|
||||
/// Lossless-only for integer targets: typecheck permits numeric cross-type
|
||||
/// comparisons (`types_compatible`), so a fractional float or out-of-range
|
||||
/// integer can reach here. Casting those to a narrower integer would truncate
|
||||
/// (`2.7 -> 2`) or overflow to null, silently changing which rows match. We
|
||||
/// round-trip the cast and, on mismatch, return `None` so the caller keeps the
|
||||
/// natural literal — correct via DataFusion coercion, the index just goes unused
|
||||
/// for that out-of-domain predicate. Float targets are exempt: narrowing
|
||||
/// `F64 -> F32` is the column's own precision domain, not a value error.
|
||||
fn literal_to_typed_expr(
|
||||
lit: &Literal,
|
||||
target: &arrow_schema::DataType,
|
||||
) -> Option<datafusion::prelude::Expr> {
|
||||
use datafusion::prelude::lit as df_lit;
|
||||
use datafusion::scalar::ScalarValue;
|
||||
|
||||
let arr = super::projection::literal_to_array(lit, 1).ok()?;
|
||||
if arr.data_type() == target {
|
||||
return Some(df_lit(ScalarValue::try_from_array(&arr, 0).ok()?));
|
||||
}
|
||||
let casted = arrow_cast::cast::cast(&arr, target).ok()?;
|
||||
if target.is_integer() {
|
||||
let back = arrow_cast::cast::cast(&casted, arr.data_type()).ok()?;
|
||||
let original = ScalarValue::try_from_array(&arr, 0).ok()?;
|
||||
let round_tripped = ScalarValue::try_from_array(&back, 0).ok()?;
|
||||
if original != round_tripped {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
Some(df_lit(ScalarValue::try_from_array(&casted, 0).ok()?))
|
||||
}
|
||||
|
||||
/// Convert a Literal to a DataFusion `Expr` in its NATURAL Arrow type. This is
|
||||
/// the fallback used when the comparison column's type is unknown (no schema) or
|
||||
/// when coercion to it fails; the typed, column-matched coercion that keeps
|
||||
/// scalar indexes usable lives in `literal_to_typed_expr`. Returns `None` for
|
||||
/// List (the SQL path also could not pushdown it — falls through to post-scan
|
||||
/// in-memory application).
|
||||
fn literal_to_expr(lit: &Literal) -> Option<datafusion::prelude::Expr> {
|
||||
use datafusion::prelude::lit as df_lit;
|
||||
Some(match lit {
|
||||
|
|
@ -2073,9 +2160,12 @@ fn literal_to_expr(lit: &Literal) -> Option<datafusion::prelude::Expr> {
|
|||
Literal::Integer(n) => df_lit(*n),
|
||||
Literal::Float(f) => df_lit(*f),
|
||||
Literal::Bool(b) => df_lit(*b),
|
||||
// Date/DateTime stored as strings; pass through as string literals
|
||||
// — Lance/DataFusion handles the comparison against typed columns
|
||||
// via implicit cast, matching the existing string-SQL behavior.
|
||||
// Date/DateTime pass through as strings here. Against a typed Date
|
||||
// column DataFusion casts the LITERAL (`CAST(Utf8 AS Date32)`), which is
|
||||
// index-safe (proven by `scalar_index_use_requires_matched_literal_type`).
|
||||
// At real pushdown sites the schema is known, so `literal_to_typed_expr`
|
||||
// produces a typed Date32/Date64 anyway; this branch is only the
|
||||
// no-schema fallback.
|
||||
Literal::Date(s) => df_lit(s.clone()),
|
||||
Literal::DateTime(s) => df_lit(s.clone()),
|
||||
Literal::List(_) => return None,
|
||||
|
|
@ -2285,3 +2375,205 @@ mod expand_chooser_tests {
|
|||
assert_eq!(choose_expand_mode(&i), ExpandMode::Csr);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod literal_lowering_tests {
|
||||
use super::*;
|
||||
use datafusion::prelude::Expr;
|
||||
use datafusion::scalar::ScalarValue;
|
||||
|
||||
// With the column type known, the generic coercion types a date literal to
|
||||
// the column's Date32/Date64 (the live pushdown path). Without a target it
|
||||
// is the natural Utf8 fallback, which is still index-safe for dates because
|
||||
// DataFusion casts the LITERAL, not the column (proven by
|
||||
// `lance_surface_guards::scalar_index_use_requires_matched_literal_type`).
|
||||
#[test]
|
||||
fn date_literals_coerce_to_typed_arrow_scalars() {
|
||||
use arrow_schema::DataType;
|
||||
let dt = literal_to_expr_coerced(
|
||||
&Literal::DateTime("2024-06-01T12:00:00Z".into()),
|
||||
Some(&DataType::Date64),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(
|
||||
matches!(dt, Expr::Literal(ScalarValue::Date64(Some(_)), ..)),
|
||||
"DateTime vs Date64 column must coerce to a typed Date64, got {dt:?}"
|
||||
);
|
||||
let d = literal_to_expr_coerced(&Literal::Date("2024-06-01".into()), Some(&DataType::Date32))
|
||||
.unwrap();
|
||||
assert!(
|
||||
matches!(d, Expr::Literal(ScalarValue::Date32(Some(_)), ..)),
|
||||
"Date vs Date32 column must coerce to a typed Date32, got {d:?}"
|
||||
);
|
||||
let nat = literal_to_expr_coerced(&Literal::Date("2024-06-01".into()), None).unwrap();
|
||||
assert!(
|
||||
matches!(nat, Expr::Literal(ScalarValue::Utf8(Some(_)), ..)),
|
||||
"no target should keep the natural Utf8 date literal, got {nat:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// A malformed date string makes coercion fail, so it falls back to the
|
||||
// natural Utf8 literal rather than dropping the predicate to None.
|
||||
#[test]
|
||||
fn malformed_date_literal_falls_back_to_string() {
|
||||
use arrow_schema::DataType;
|
||||
let bad = literal_to_expr_coerced(
|
||||
&Literal::DateTime("not-a-date".into()),
|
||||
Some(&DataType::Date64),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(
|
||||
matches!(bad, Expr::Literal(ScalarValue::Utf8(Some(_)), ..)),
|
||||
"malformed DateTime literal should fall back to a Utf8 literal, got {bad:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// With a column target, a literal lowers to the column's EXACT Arrow type
|
||||
// (not its natural width), so DataFusion does not widen and cast the column
|
||||
// — keeping the scalar BTREE usable. See
|
||||
// `lance_surface_guards::scalar_index_use_requires_matched_literal_type`.
|
||||
#[test]
|
||||
fn integer_literal_coerces_to_narrow_column_type() {
|
||||
use arrow_schema::DataType;
|
||||
let i32_lit = literal_to_expr_coerced(&Literal::Integer(5), Some(&DataType::Int32)).unwrap();
|
||||
assert!(
|
||||
matches!(i32_lit, Expr::Literal(ScalarValue::Int32(Some(5)), ..)),
|
||||
"integer literal vs Int32 column must lower to Int32, got {i32_lit:?}"
|
||||
);
|
||||
let u32_lit = literal_to_expr_coerced(&Literal::Integer(7), Some(&DataType::UInt32)).unwrap();
|
||||
assert!(
|
||||
matches!(u32_lit, Expr::Literal(ScalarValue::UInt32(Some(7)), ..)),
|
||||
"integer literal vs UInt32 column must lower to UInt32, got {u32_lit:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn float_literal_coerces_to_f32_column_type() {
|
||||
use arrow_schema::DataType;
|
||||
let f32_lit =
|
||||
literal_to_expr_coerced(&Literal::Float(1.5), Some(&DataType::Float32)).unwrap();
|
||||
assert!(
|
||||
matches!(f32_lit, Expr::Literal(ScalarValue::Float32(Some(_)), ..)),
|
||||
"float literal vs Float32 column must lower to Float32, got {f32_lit:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Lossless guard: a fractional float against an integer column must NOT
|
||||
// truncate (2.7 -> 2). Fall back to the natural Float64 so the comparison
|
||||
// stays exact (no integer equals 2.7).
|
||||
#[test]
|
||||
fn fractional_float_vs_int_column_falls_back_not_truncate() {
|
||||
use arrow_schema::DataType;
|
||||
let e = literal_to_expr_coerced(&Literal::Float(2.7), Some(&DataType::Int32)).unwrap();
|
||||
assert!(
|
||||
matches!(e, Expr::Literal(ScalarValue::Float64(Some(_)), ..)),
|
||||
"fractional float vs Int32 must fall back to natural Float64, got {e:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// A whole-number float IS lossless against an integer column, so it coerces.
|
||||
#[test]
|
||||
fn whole_float_vs_int_column_coerces() {
|
||||
use arrow_schema::DataType;
|
||||
let e = literal_to_expr_coerced(&Literal::Float(2.0), Some(&DataType::Int32)).unwrap();
|
||||
assert!(
|
||||
matches!(e, Expr::Literal(ScalarValue::Int32(Some(2)), ..)),
|
||||
"whole-number float vs Int32 is lossless and must coerce to Int32(2), got {e:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Lossless guard: an integer literal outside the column's range must NOT
|
||||
// overflow to null; fall back to the natural Int64 (correct via DataFusion).
|
||||
#[test]
|
||||
fn out_of_range_int_vs_narrow_column_falls_back() {
|
||||
use arrow_schema::DataType;
|
||||
let e = literal_to_expr_coerced(&Literal::Integer(3_000_000_000), Some(&DataType::Int32))
|
||||
.unwrap();
|
||||
assert!(
|
||||
matches!(e, Expr::Literal(ScalarValue::Int64(Some(3_000_000_000)), ..)),
|
||||
"out-of-range integer vs Int32 must fall back to natural Int64, got {e:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Float targets are exempt from the lossless guard: narrowing to the column's
|
||||
// own precision is the correct comparison domain, even when the value is not
|
||||
// exactly representable in F32 (0.1).
|
||||
#[test]
|
||||
fn float_vs_f32_column_coerces_even_when_not_exactly_representable() {
|
||||
use arrow_schema::DataType;
|
||||
let e = literal_to_expr_coerced(&Literal::Float(0.1), Some(&DataType::Float32)).unwrap();
|
||||
assert!(
|
||||
matches!(e, Expr::Literal(ScalarValue::Float32(Some(_)), ..)),
|
||||
"float target must coerce 0.1 to Float32 (exempt from lossless guard), got {e:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// No target (caller without a schema) keeps the natural width — the existing
|
||||
// fallback, so behavior never regresses where the column type is unknown.
|
||||
#[test]
|
||||
fn literal_without_target_keeps_natural_width() {
|
||||
let nat = literal_to_expr_coerced(&Literal::Integer(5), None).unwrap();
|
||||
assert!(
|
||||
matches!(nat, Expr::Literal(ScalarValue::Int64(Some(5)), ..)),
|
||||
"no target should keep the natural Int64 width, got {nat:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// True if either operand of a binary comparison is an Int32 literal.
|
||||
fn binary_has_int32_literal(e: &Expr) -> bool {
|
||||
if let Expr::BinaryExpr(b) = e {
|
||||
[b.left.as_ref(), b.right.as_ref()]
|
||||
.iter()
|
||||
.any(|side| matches!(side, Expr::Literal(ScalarValue::Int32(Some(_)), ..)))
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn int32_schema() -> arrow_schema::Schema {
|
||||
use arrow_schema::{DataType, Field};
|
||||
arrow_schema::Schema::new(vec![Field::new("count", DataType::Int32, true)])
|
||||
}
|
||||
|
||||
fn count_prop() -> IRExpr {
|
||||
IRExpr::PropAccess {
|
||||
variable: "m".into(),
|
||||
property: "count".into(),
|
||||
}
|
||||
}
|
||||
|
||||
// Coercion is operator-independent: a range comparison's literal coerces to
|
||||
// the column type just like equality does, so range filters on a narrow
|
||||
// numeric column keep the BTREE.
|
||||
#[test]
|
||||
fn ir_filter_coerces_literal_for_range_op() {
|
||||
let schema = int32_schema();
|
||||
let filter = IRFilter {
|
||||
left: count_prop(),
|
||||
op: CompOp::Ge,
|
||||
right: IRExpr::Literal(Literal::Integer(2)),
|
||||
};
|
||||
let expr = ir_filter_to_expr(&filter, &ParamMap::new(), Some(&schema)).unwrap();
|
||||
assert!(
|
||||
binary_has_int32_literal(&expr),
|
||||
"range-op literal must coerce to the Int32 column type, got {expr:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// The column may be on either side; the literal coerces to the opposite
|
||||
// operand's column type regardless of order (`5 < count`).
|
||||
#[test]
|
||||
fn ir_filter_coerces_literal_when_column_is_on_the_right() {
|
||||
let schema = int32_schema();
|
||||
let filter = IRFilter {
|
||||
left: IRExpr::Literal(Literal::Integer(2)),
|
||||
op: CompOp::Lt,
|
||||
right: count_prop(),
|
||||
};
|
||||
let expr = ir_filter_to_expr(&filter, &ParamMap::new(), Some(&schema)).unwrap();
|
||||
assert!(
|
||||
binary_has_int32_literal(&expr),
|
||||
"reversed-operand literal must coerce to the Int32 column type, got {expr:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -705,6 +705,36 @@ impl TableStore {
|
|||
Ok(IndexCoverage::Indexed)
|
||||
}
|
||||
|
||||
/// True if any non-system index on `ds` leaves at least one current
|
||||
/// fragment uncovered, i.e. rows that the index does not yet account for
|
||||
/// (appended after the index was built, or rewritten by compaction). Such
|
||||
/// fragments are scanned unindexed until a reindex (`optimize_indices`)
|
||||
/// folds them in. Returns false when every index covers every fragment, or
|
||||
/// when the table has no (non-system) indices to optimize. A `None`
|
||||
/// `fragment_bitmap` means Lance cannot report coverage for that index, so
|
||||
/// we do not treat it as uncovered (mirrors `key_column_index_coverage`).
|
||||
///
|
||||
/// Used by `optimize` to decide whether an otherwise-already-compacted
|
||||
/// table still has index work to do.
|
||||
pub async fn has_unindexed_fragments(ds: &Dataset) -> Result<bool> {
|
||||
let indices = ds
|
||||
.load_indices()
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
|
||||
for index in indices.iter() {
|
||||
if is_system_index(index) {
|
||||
continue;
|
||||
}
|
||||
if let Some(bitmap) = index.fragment_bitmap.as_ref() {
|
||||
if frag_ids.iter().any(|id| !bitmap.contains(*id)) {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
|
||||
ds.count_rows(filter)
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
|
|||
use lance::index::DatasetIndexExt;
|
||||
use lance_file::version::LanceFileVersion;
|
||||
use lance_index::IndexType;
|
||||
use lance_index::optimize::OptimizeOptions;
|
||||
use lance_index::scalar::ScalarIndexParams;
|
||||
use lance_namespace::LanceNamespace;
|
||||
use lance_table::io::commit::ManifestNamingScheme;
|
||||
|
|
@ -541,3 +542,199 @@ async fn fragment_deletion_metadata_is_available() {
|
|||
per-fragment deletions and would need to read the deletion vector.",
|
||||
);
|
||||
}
|
||||
|
||||
// --- Guard 14: Dataset::optimize_indices signature ----------------------------
|
||||
//
|
||||
// `db/omnigraph/optimize.rs::optimize_one_table` calls
|
||||
// `ds.optimize_indices(&OptimizeOptions::default())` (via `DatasetIndexExt`) to
|
||||
// fold appended/compacted fragments back into existing indexes. If Lance
|
||||
// changes the receiver, the options type, or the return shape, this fails to
|
||||
// compile. Compile-only.
|
||||
|
||||
#[allow(
|
||||
dead_code,
|
||||
unreachable_code,
|
||||
unused_variables,
|
||||
unused_mut,
|
||||
clippy::diverging_sub_expression
|
||||
)]
|
||||
async fn _compile_optimize_indices_signature() -> lance::Result<()> {
|
||||
let mut ds: Dataset = unimplemented!();
|
||||
let options = OptimizeOptions::default();
|
||||
// `&mut self`, `&OptimizeOptions`, returns `Result<()>` (mutates in place
|
||||
// and commits — there is no uncommitted variant in this Lance, which is why
|
||||
// optimize treats it as an inline-commit residual under a recovery sidecar).
|
||||
let _: () = ds.optimize_indices(&options).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// --- Guard 15: optimize_indices extends fragment coverage ----------------------
|
||||
//
|
||||
// PR3's reindex assumes `optimize_indices` folds fragments appended AFTER an
|
||||
// index was built into that index (incremental merge, not retrain). This pins
|
||||
// that Lance behavior at the surface layer so a regression turns red here, the
|
||||
// first smoke check on a Lance bump, before the slower engine suite.
|
||||
|
||||
#[tokio::test]
|
||||
async fn optimize_indices_extends_fragment_coverage() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().join("guard_optimize_indices.lance");
|
||||
let uri = uri.to_str().unwrap();
|
||||
|
||||
// Fragment 0: alice, bob. Build a BTREE over `value` covering only it.
|
||||
let mut ds = fresh_dataset(uri).await;
|
||||
ds.create_index_builder(&["value"], IndexType::BTree, &ScalarIndexParams::default())
|
||||
.replace(true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Append a second fragment the existing index does not cover.
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Utf8, false),
|
||||
Field::new("value", DataType::Int32, false),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["carol"])),
|
||||
Arc::new(Int32Array::from(vec![3])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
|
||||
let params = WriteParams {
|
||||
mode: WriteMode::Append,
|
||||
enable_stable_row_ids: true,
|
||||
data_storage_version: Some(LanceFileVersion::V2_2),
|
||||
..Default::default()
|
||||
};
|
||||
Dataset::write(reader, uri, Some(params)).await.unwrap();
|
||||
|
||||
let mut ds = Dataset::open(uri).await.unwrap();
|
||||
assert!(
|
||||
value_index_uncovered_count(&ds).await > 0,
|
||||
"appended fragment should be uncovered by the BTREE before optimize_indices"
|
||||
);
|
||||
|
||||
ds.optimize_indices(&OptimizeOptions::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
value_index_uncovered_count(&ds).await,
|
||||
0,
|
||||
"optimize_indices must fold the appended fragment into the existing index \
|
||||
(incremental coverage); if this regresses, PR3's reindex no longer keeps \
|
||||
coverage current — revisit db/omnigraph/optimize.rs and docs/dev/lance.md."
|
||||
);
|
||||
}
|
||||
|
||||
/// Count current fragments not covered by the single-column `value` BTREE —
|
||||
/// mirrors `TableStore::has_unindexed_fragments` (load_indices +
|
||||
/// `fragment_bitmap.contains`), pinned by Guard 11.
|
||||
async fn value_index_uncovered_count(ds: &Dataset) -> usize {
|
||||
let indices = ds.load_indices().await.unwrap();
|
||||
let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
|
||||
let value_fid = ds.schema().field("value").unwrap().id;
|
||||
for index in indices.iter() {
|
||||
if index.fields.len() == 1 && index.fields[0] == value_fid {
|
||||
if let Some(bitmap) = index.fragment_bitmap.as_ref() {
|
||||
return frag_ids.iter().filter(|id| !bitmap.contains(**id)).count();
|
||||
}
|
||||
}
|
||||
}
|
||||
// No `value` index found — treat as fully uncovered so a missing index
|
||||
// is never mistaken for full coverage.
|
||||
frag_ids.len()
|
||||
}
|
||||
|
||||
// --- Guard 16: scalar index use requires a literal matching the column type ---
|
||||
//
|
||||
// Pins the substrate behavior the pushdown literal-coercion fix relies on
|
||||
// (`query.rs::literal_to_typed_expr`): Lance uses the BTREE only when the filter
|
||||
// is `column OP literal` with a matching type. A width-mismatched literal makes
|
||||
// DataFusion widen and cast the COLUMN (`CAST(n32 AS Int64)`), which drops the
|
||||
// scalar index and full-scans. Temporal columns are immune (DataFusion casts the
|
||||
// Utf8 LITERAL to the date type, not the column). If a Lance/DataFusion bump
|
||||
// changes either coercion direction, this turns red — re-validate the fix.
|
||||
#[tokio::test]
|
||||
async fn scalar_index_use_requires_matched_literal_type() {
|
||||
use datafusion::physical_plan::displayable;
|
||||
use datafusion::prelude::{col, lit};
|
||||
use datafusion::scalar::ScalarValue;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().join("probe_literal_type.lance");
|
||||
let uri = uri.to_str().unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Utf8, false),
|
||||
Field::new("n32", DataType::Int32, false),
|
||||
Field::new("d32", DataType::Date32, false),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
|
||||
Arc::new(Int32Array::from(vec![1, 5, 9, 13])),
|
||||
Arc::new(arrow_array::Date32Array::from(vec![19000, 19723, 20000, 20500])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
|
||||
let params = WriteParams {
|
||||
mode: WriteMode::Create,
|
||||
enable_stable_row_ids: true,
|
||||
data_storage_version: Some(LanceFileVersion::V2_2),
|
||||
..Default::default()
|
||||
};
|
||||
let mut ds = Dataset::write(reader, uri, Some(params)).await.unwrap();
|
||||
for c in ["n32", "d32"] {
|
||||
ds.create_index_builder(&[c], IndexType::BTree, &ScalarIndexParams::default())
|
||||
.replace(true)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn plan_str(ds: &Dataset, filter: datafusion::prelude::Expr) -> String {
|
||||
let mut scanner = ds.scan();
|
||||
scanner.filter_expr(filter);
|
||||
let plan = scanner.create_plan().await.unwrap();
|
||||
format!("{}", displayable(plan.as_ref()).indent(true))
|
||||
}
|
||||
|
||||
// (label, filter, expect_index_used)
|
||||
let cases = [
|
||||
("n32 = 5i32 (matched Int32)", col("n32").eq(lit(5i32)), true),
|
||||
("n32 = 5i64 (widened Int64)", col("n32").eq(lit(5i64)), false),
|
||||
(
|
||||
"d32 = Date32 (matched)",
|
||||
col("d32").eq(lit(ScalarValue::Date32(Some(19723)))),
|
||||
true,
|
||||
),
|
||||
(
|
||||
"d32 = '2024-01-01' (Utf8 vs Date32)",
|
||||
col("d32").eq(lit("2024-01-01")),
|
||||
true,
|
||||
),
|
||||
];
|
||||
|
||||
for (label, filter, expect_index) in cases {
|
||||
let s = plan_str(&ds, filter).await;
|
||||
let uses_index = s.contains("ScalarIndexQuery");
|
||||
assert_eq!(
|
||||
uses_index, expect_index,
|
||||
"[{label}] expected scalar-index use = {expect_index}, got {uses_index}.\n\
|
||||
A change here means Lance/DataFusion shifted its coercion or index \
|
||||
pushdown; re-validate query.rs::literal_to_typed_expr.\nplan:\n{s}"
|
||||
);
|
||||
}
|
||||
|
||||
// The widened case must show the index-defeating column CAST (the precise
|
||||
// shape the fix avoids by coercing the literal to the column type).
|
||||
let widened = plan_str(&ds, col("n32").eq(lit(5i64))).await;
|
||||
assert!(
|
||||
widened.contains("CAST(n32 AS Int64)"),
|
||||
"expected a column-side cast in the widened plan, got:\n{widened}"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ node Metric {
|
|||
name: String @key
|
||||
score: F64?
|
||||
ratio: F32?
|
||||
count: I32?
|
||||
active: Bool?
|
||||
born: Date?
|
||||
seen: DateTime?
|
||||
|
|
@ -26,10 +27,10 @@ node Metric {
|
|||
"#;
|
||||
|
||||
// Seeds partition every predicate, so a dropped filter returns all 4 rows.
|
||||
const DATA: &str = r#"{"type":"Metric","data":{"name":"m1","score":2.5,"ratio":0.5,"active":true,"born":"2024-06-01","seen":"2024-06-01T12:00:00Z"}}
|
||||
{"type":"Metric","data":{"name":"m2","score":1.0,"ratio":0.25,"active":false,"born":"2023-01-01","seen":"2023-01-01T00:00:00Z"}}
|
||||
{"type":"Metric","data":{"name":"m3","score":3.0,"ratio":0.75,"active":true,"born":"2025-01-01","seen":"2025-01-01T00:00:00Z"}}
|
||||
{"type":"Metric","data":{"name":"m4","score":0.5,"ratio":0.1,"active":false,"born":"2022-12-31","seen":"2022-01-01T00:00:00Z"}}"#;
|
||||
const DATA: &str = r#"{"type":"Metric","data":{"name":"m1","score":2.5,"ratio":0.5,"count":1,"active":true,"born":"2024-06-01","seen":"2024-06-01T12:00:00Z"}}
|
||||
{"type":"Metric","data":{"name":"m2","score":1.0,"ratio":0.25,"count":2,"active":false,"born":"2023-01-01","seen":"2023-01-01T00:00:00Z"}}
|
||||
{"type":"Metric","data":{"name":"m3","score":3.0,"ratio":0.75,"count":3,"active":true,"born":"2025-01-01","seen":"2025-01-01T00:00:00Z"}}
|
||||
{"type":"Metric","data":{"name":"m4","score":0.5,"ratio":0.1,"count":4,"active":false,"born":"2022-12-31","seen":"2022-01-01T00:00:00Z"}}"#;
|
||||
|
||||
async fn metric_db(dir: &tempfile::TempDir) -> Omnigraph {
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
|
@ -67,6 +68,50 @@ query inline() { match { $m: Metric { score: 3.0 } } return { $m.name } }
|
|||
assert_eq!(sorted_metric_names(&mut db, q, "inline").await, vec!["m3"]);
|
||||
}
|
||||
|
||||
// Inline-binding equality is the Lance-pushdown arm. With the literal coerced to
|
||||
// the column's exact Arrow type, a narrow-numeric column (I32) and an F32 column
|
||||
// must still select the right rows — the coercion changes the literal's type, not
|
||||
// the result set. (The index-use win this enables is pinned at the Lance-surface
|
||||
// layer by `lance_surface_guards::scalar_index_use_requires_matched_literal_type`.)
|
||||
#[tokio::test]
|
||||
async fn int_and_f32_literal_pushdown_coercion() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = metric_db(&dir).await;
|
||||
let q = r#"
|
||||
query count_eq() { match { $m: Metric { count: 2 } } return { $m.name } }
|
||||
query ratio_eq() { match { $m: Metric { ratio: 0.25 } } return { $m.name } }
|
||||
query count_ge() { match { $m: Metric $m.count >= 3 } return { $m.name } }
|
||||
"#;
|
||||
// I32 column, integer literal coerced Int64 -> Int32: count == 2 is m2 only.
|
||||
assert_eq!(sorted_metric_names(&mut db, q, "count_eq").await, vec!["m2"]);
|
||||
// F32 column, float literal coerced Float64 -> Float32: ratio == 0.25 is m2.
|
||||
assert_eq!(sorted_metric_names(&mut db, q, "ratio_eq").await, vec!["m2"]);
|
||||
// Range on the I32 column: count 3,4 >= 3 -> m3, m4 (coercion is op-independent).
|
||||
assert_eq!(
|
||||
sorted_metric_names(&mut db, q, "count_ge").await,
|
||||
vec!["m3", "m4"]
|
||||
);
|
||||
}
|
||||
|
||||
// A fractional float against an integer column must not be truncated by the
|
||||
// pushdown coercion (`2.7 -> 2` would wrongly match the count=2 row). The
|
||||
// lossless guard falls back to the natural Float64 literal, so `count = 2.7`
|
||||
// matches no integer and returns no rows.
|
||||
#[tokio::test]
|
||||
async fn fractional_float_equality_on_int_column_returns_no_rows() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = metric_db(&dir).await;
|
||||
let q = r#"
|
||||
query count_frac() { match { $m: Metric { count: 2.7 } } return { $m.name } }
|
||||
"#;
|
||||
assert!(
|
||||
sorted_metric_names(&mut db, q, "count_frac")
|
||||
.await
|
||||
.is_empty(),
|
||||
"count = 2.7 must match no integer rows (no truncation to count = 2)"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bool_literal_filters_execute() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
|
@ -88,9 +133,15 @@ async fn date_and_datetime_literal_filters_execute() {
|
|||
let q = r#"
|
||||
query born_ge() { match { $m: Metric $m.born >= date("2024-01-01") } return { $m.name } }
|
||||
query seen_lt() { match { $m: Metric $m.seen < datetime("2024-01-01T00:00:00Z") } return { $m.name } }
|
||||
query born_eq() { match { $m: Metric { born: date("2024-06-01") } } return { $m.name } }
|
||||
query seen_eq() { match { $m: Metric { seen: datetime("2024-06-01T12:00:00Z") } } return { $m.name } }
|
||||
"#;
|
||||
// born: m1 2024-06, m3 2025 >= 2024-01-01
|
||||
assert_eq!(sorted_metric_names(&mut db, q, "born_ge").await, vec!["m1", "m3"]);
|
||||
// seen: m2 2023, m4 2022 < 2024-01-01
|
||||
assert_eq!(sorted_metric_names(&mut db, q, "seen_lt").await, vec!["m2", "m4"]);
|
||||
// Inline-binding equality exercises the Lance-pushdown arm with a typed
|
||||
// Date32/Date64 literal: the epoch conversion must select exactly m1.
|
||||
assert_eq!(sorted_metric_names(&mut db, q, "born_eq").await, vec!["m1"]);
|
||||
assert_eq!(sorted_metric_names(&mut db, q, "seen_eq").await, vec!["m1"]);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,9 +14,11 @@ use omnigraph::db::{
|
|||
SkipReason,
|
||||
};
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
use omnigraph::table_store::{IndexCoverage, TableStore};
|
||||
|
||||
use helpers::{
|
||||
MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, count_rows, init_and_load, mixed_params, mutate_main,
|
||||
snapshot_main,
|
||||
};
|
||||
|
||||
/// Filesystem URI of a node sub-table, mirroring the engine's layout
|
||||
|
|
@ -131,6 +133,72 @@ async fn optimize_after_load_then_again_is_idempotent() {
|
|||
}
|
||||
}
|
||||
|
||||
// PR3 (Workstream B): an existing scalar index does not cover fragments
|
||||
// appended after it was built (build_indices is existence-gated), so those
|
||||
// rows are scanned unindexed. `optimize` must fold them back in via Lance's
|
||||
// incremental `optimize_indices`, restoring full coverage.
|
||||
#[tokio::test]
|
||||
async fn optimize_reindexes_fragments_appended_after_index_build() {
|
||||
const SCHEMA: &str = r#"
|
||||
node Doc {
|
||||
slug: String @key
|
||||
rank: I32 @index
|
||||
}
|
||||
"#;
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap();
|
||||
|
||||
// First load builds the id + rank BTREEs over the initial fragment.
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"rank\":1}}\n\
|
||||
{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"rank\":2}}",
|
||||
LoadMode::Merge,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// A second load with NEW keys appends a fragment the existing BTREEs do not
|
||||
// cover (the existence gate skips re-building an index that already exists).
|
||||
load_jsonl(
|
||||
&mut db,
|
||||
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d3\",\"rank\":3}}\n\
|
||||
{\"type\":\"Doc\",\"data\":{\"slug\":\"d4\",\"rank\":4}}",
|
||||
LoadMode::Merge,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Precondition: the appended fragment is unindexed.
|
||||
{
|
||||
let snap = snapshot_main(&db).await.unwrap();
|
||||
let ds = snap.open("node:Doc").await.unwrap();
|
||||
assert!(
|
||||
TableStore::has_unindexed_fragments(&ds).await.unwrap(),
|
||||
"appended fragment should be unindexed before optimize"
|
||||
);
|
||||
}
|
||||
|
||||
db.optimize().await.unwrap();
|
||||
|
||||
// Postcondition: optimize_indices folded the appended fragment in, so every
|
||||
// index covers every fragment and `rank` reports fully Indexed.
|
||||
let snap = snapshot_main(&db).await.unwrap();
|
||||
let ds = snap.open("node:Doc").await.unwrap();
|
||||
assert!(
|
||||
!TableStore::has_unindexed_fragments(&ds).await.unwrap(),
|
||||
"optimize must extend index coverage to all fragments"
|
||||
);
|
||||
assert_eq!(
|
||||
TableStore::key_column_index_coverage(&ds, "rank")
|
||||
.await
|
||||
.unwrap(),
|
||||
IndexCoverage::Indexed,
|
||||
"rank BTREE must cover all fragments after optimize"
|
||||
);
|
||||
}
|
||||
|
||||
// Regression: `optimize` must not crash on a graph that has a `Blob` table.
|
||||
//
|
||||
// Lance `compact_files` forces `BlobHandling::AllBinary`, which mis-decodes
|
||||
|
|
|
|||
74
crates/omnigraph/tests/scalar_indexes.rs
Normal file
74
crates/omnigraph/tests/scalar_indexes.rs
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
//! Coverage for `build_indices_on_dataset_for_catalog`'s per-property index
|
||||
//! dispatch: which scalar/vector index each `@index`/`@key` column gets.
|
||||
//!
|
||||
//! The observable signal is `TableStore::key_column_index_coverage`, which
|
||||
//! reports `Indexed` only when a BTREE covers the column (the same helper the
|
||||
//! traversal chooser uses). Enums and orderable scalars must get a BTREE so
|
||||
//! `=`/range/IN/IS NULL are index-accelerated; free-text Strings keep FTS
|
||||
//! (which `key_column_index_coverage` does not count as a BTREE, by design).
|
||||
|
||||
mod helpers;
|
||||
|
||||
use omnigraph::db::Omnigraph;
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
use omnigraph::table_store::{IndexCoverage, TableStore};
|
||||
|
||||
use helpers::*;
|
||||
|
||||
const SCHEMA: &str = r#"
|
||||
node Item {
|
||||
slug: String @key
|
||||
status: enum(active, archived) @index
|
||||
published: DateTime @index
|
||||
rank: I32 @index
|
||||
title: String @index
|
||||
note: String?
|
||||
}
|
||||
"#;
|
||||
|
||||
const DATA: &str = r#"{"type":"Item","data":{"slug":"a","status":"active","published":"2024-06-01T00:00:00Z","rank":1,"title":"alpha","note":"n1"}}
|
||||
{"type":"Item","data":{"slug":"b","status":"archived","published":"2023-01-01T00:00:00Z","rank":2,"title":"beta","note":"n2"}}
|
||||
{"type":"Item","data":{"slug":"c","status":"active","published":"2025-02-02T00:00:00Z","rank":3,"title":"gamma","note":"n3"}}"#;
|
||||
|
||||
// Enums and orderable scalars (DateTime, numeric) get a BTREE from load's
|
||||
// build-indices pass, so a `=`/range filter on them uses the index. Free-text
|
||||
// String `@index` keeps FTS (no BTREE), and an un-annotated column has no
|
||||
// scalar index — both report `Degraded`, which is the negative control that
|
||||
// keeps this test from being vacuously green.
|
||||
#[tokio::test]
|
||||
async fn node_scalar_and_enum_index_columns_get_btree() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap();
|
||||
load_jsonl(&mut db, DATA, LoadMode::Overwrite).await.unwrap();
|
||||
|
||||
let snap = snapshot_main(&db).await.unwrap();
|
||||
let ds = snap.open("node:Item").await.unwrap();
|
||||
|
||||
for col in ["status", "published", "rank"] {
|
||||
let cov = TableStore::key_column_index_coverage(&ds, col).await.unwrap();
|
||||
assert_eq!(
|
||||
cov,
|
||||
IndexCoverage::Indexed,
|
||||
"column '{col}' (enum/DateTime/numeric @index) must get a BTREE, got {cov:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Free-text String @index -> FTS, which is not a BTREE -> Degraded.
|
||||
let title_cov = TableStore::key_column_index_coverage(&ds, "title")
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(
|
||||
matches!(title_cov, IndexCoverage::Degraded { .. }),
|
||||
"free-text String @index should keep FTS (no BTREE), got {title_cov:?}"
|
||||
);
|
||||
|
||||
// No @index annotation -> no scalar index at all -> Degraded.
|
||||
let note_cov = TableStore::key_column_index_coverage(&ds, "note")
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(
|
||||
matches!(note_cov, IndexCoverage::Degraded { .. }),
|
||||
"un-annotated column should have no scalar index, got {note_cov:?}"
|
||||
);
|
||||
}
|
||||
|
|
@ -105,7 +105,7 @@ Use it this way:
|
|||
| Schema validation | Type checks, required fields, defaults, edge endpoint checks, and edge cardinality are enforced on write paths | [schema-language.md](../user/schema/index.md), [execution.md](execution.md) |
|
||||
| Unique constraints | Intra-batch and write-path checks exist; intake and branch-merge derive the composite key through one shared function (`loader::composite_unique_key`, a separator-free `Vec<String>` tuple) and fail loudly on an un-keyable column type rather than silently exempting it; full cross-version uniqueness against already-committed rows is still a gap | [schema-language.md](../user/schema/index.md) |
|
||||
| Storage trait | `TableStorage` (via `db.storage()`) is staged-only; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so §1 holds by construction; capability/stat surfaces are roadmap | [writes.md](writes.md), [architecture.md](architecture.md) |
|
||||
| Index lifecycle | `ensure_indices` is explicit today; reconciler-based convergence is roadmap | [indexes.md](../user/search/indexes.md), [maintenance.md](../user/operations/maintenance.md) |
|
||||
| Index lifecycle | Index *creation* per `@index`/`@key` property is dispatched by type (enum + orderable scalar → BTREE, free-text String → FTS, Vector → vector) via `node_prop_index_kind`; index *coverage maintenance* exists — `optimize` runs Lance `optimize_indices` after compaction to fold appended/rewritten fragments into existing indexes (still an explicit maintenance call, not yet a background reconciler) | [indexes.md](../user/search/indexes.md), [maintenance.md](../user/operations/maintenance.md) |
|
||||
| Traversal IDs | Runtime still builds `TypeIndex`; Lance stable row-id based graph IDs are roadmap | [architecture.md](architecture.md), [query-language.md](../user/queries/index.md) |
|
||||
| Auth | Bearer token hashing and server-side actor resolution are implemented at the HTTP boundary | [server.md](../user/operations/server.md), [policy.md](../user/operations/policy.md) |
|
||||
| Tests | Tempdir-backed Lance tests are the current substrate; the storage adapter has an in-memory backend for adapter-level contract tests, but Lance datasets bypass it | [testing.md](testing.md) |
|
||||
|
|
|
|||
|
|
@ -169,6 +169,7 @@ Migration from Lance 4.0.0 → 6.0.1 landed in this cycle (DataFusion 52 → 53,
|
|||
- **`Dataset::checkout_version(N).await?.restore().await?`**: `restore()` takes `&mut self` and returns `Result<()>` (mutates in place, does not consume + return a new dataset). The recovery rollback hammer at `db/manifest/recovery.rs:505-522` continues to work. Pinned by `lance_surface_guards.rs::_compile_checkout_version_then_restore_signature`.
|
||||
- **`DatasetBuilder::from_namespace(...).with_branch(...).with_version(...).load()`** surface preserved (the namespace builder chain at `db/manifest/namespace.rs:162-174`). Pinned by `lance_surface_guards.rs::_compile_dataset_builder_from_namespace_signature`.
|
||||
- **`compact_files(&mut ds, CompactionOptions::default(), None)`** signature stable. `CompactionOptions` still does not expose `data_storage_version`; `compact_files` builds its own `WriteParams { ..Default::default() }`. Note: `LanceFileVersion::default()` is now V2_1 in v6, so optimize-rewritten fragments come out at V2_1 by default (was V2_0 in v4). Existing explicit V2_2 pins on creates/appends still apply.
|
||||
- **`Dataset::optimize_indices(&mut self, &lance_index::optimize::OptimizeOptions)`** (via `DatasetIndexExt`) is a depended-on surface as of the index-coverage work: `db/omnigraph/optimize.rs` calls it after `compact_files` to fold appended/rewritten fragments into existing indexes (incremental merge, not retrain). It is a **committing** call (mutates in place, advances HEAD; no uncommitted variant in v6.0.1), so optimize treats it as an inline-commit residual under the `SidecarKind::Optimize` recovery sidecar. Signature pinned by `lance_surface_guards.rs::_compile_optimize_indices_signature`; the incremental-coverage behavior pinned by `optimize_indices_extends_fragment_coverage` (appended fragment uncovered before, covered after).
|
||||
- **`Dataset::delete(predicate)` returns `DeleteResult { new_dataset: Arc<Dataset>, num_deleted_rows: u64 }`** — unchanged shape. Pinned by `lance_surface_guards.rs::_compile_delete_result_field_shape`. MR-A will repurpose this guard to the staged two-phase variant once `DeleteBuilder::execute_uncommitted` migration lands.
|
||||
- **File reader read methods now async** (Lance PR #6710, v6.0). No effect — omnigraph reaches Lance exclusively through `Dataset::scan` and the staged-write API.
|
||||
- **Tokenizer vendored as `lance-tokenizer`** (Lance PR #6512, v6.0). No effect — no direct tokenizer imports.
|
||||
|
|
@ -178,6 +179,6 @@ Migration from Lance 4.0.0 → 6.0.1 landed in this cycle (DataFusion 52 → 53,
|
|||
- **`Dataset::force_delete_branch`** (`branches().delete(name, force=true)`, dataset.rs:524) tolerates a missing branch-*contents* ref (vs plain `delete_branch`'s `RefNotFound`), but on the local store still errors `NotFound` if the branch `tree/` directory is fully absent (`remove_dir_all`'s NotFound is not caught for Lance's native error variant, refs.rs:526-549). Both variants still refuse a branch with referencing descendants (`RefConflict`). `TableStore::force_delete_branch` wraps this to be fully idempotent (tolerates already-absent). The single-authority branch-delete redesign uses it for orphan reclamation (eager best-effort reclaim + cleanup reconciler). Pinned by `lance_surface_guards.rs::force_delete_branch_semantics`. Branch delete is "flip the ref atomically, then `remove_dir_all(tree/{branch})`"; branch-exclusive data lives under `tree/{branch}/` so a drop reclaims it immediately without touching `main`.
|
||||
- **Lance blob-v2 `compact_files` bug** (no public issue found as of 2026-06): `compact_files` disables binary-copy for blob datasets and forces `BlobHandling::AllBinary` on the read side; the v2.1+ structural decoder then mis-counts column infos for the blob-v2 struct and fails with `Invalid user input: there were more fields in the schema than provided column indices / infos` (`lance-encoding/src/decoder.rs::ColumnInfoIter::expect_next`). This fails even a pristine uniform-V2_2 multi-fragment blob table; vector/list/scalar/ragged columns and mixed file versions all compact fine. Reads/queries use descriptor handling (`BlobHandling::default()`) and are unaffected. `optimize` skips blob-bearing tables behind `LANCE_SUPPORTS_BLOB_COMPACTION = false` (`db/omnigraph/optimize.rs`), reporting `SkipReason::BlobColumnsUnsupportedByLance`. Pinned by `lance_surface_guards.rs::compact_files_still_fails_on_blob_columns`, which turns red when the bug is fixed → flip the gate, remove the skip branch + the `maintenance.rs::optimize_skips_blob_table_and_reports_skip` skip assertions.
|
||||
|
||||
Surface guards added: `crates/omnigraph/tests/lance_surface_guards.rs` (10 named guards; 5 runtime + 5 compile-only). Future Lance bumps re-run this file first as the smoke check. Two additional guards from the original plan deferred to follow-up (`manifest_cas_returns_row_level_contention_variant` needs full publisher-race harness; `table_version_metadata_byte_compatible_with_v4` needs `pub(crate)` reach extension).
|
||||
Surface guards added: `crates/omnigraph/tests/lance_surface_guards.rs` (10 named guards; 5 runtime + 5 compile-only; plus the index-coverage work's `_compile_optimize_indices_signature` and `optimize_indices_extends_fragment_coverage`). Future Lance bumps re-run this file first as the smoke check. Two additional guards from the original plan deferred to follow-up (`manifest_cas_returns_row_level_contention_variant` needs full publisher-race harness; `table_version_metadata_byte_compatible_with_v4` needs `pub(crate)` reach extension).
|
||||
|
||||
Bump this date stanza on the next alignment pass.
|
||||
|
|
|
|||
|
|
@ -80,10 +80,17 @@ deferred to a follow-up cycle — tracked).
|
|||
Three writers have been migrated onto staged primitives:
|
||||
|
||||
* **`ensure_indices`** (`db/omnigraph/table_ops.rs::build_indices_on_dataset_for_catalog`)
|
||||
— scalar indices (BTree, Inverted) now use `stage_create_*_index` +
|
||||
`commit_staged`. Vector indices stay inline (residual — Lance
|
||||
`build_index_metadata_from_segments` is `pub(crate)` in 6.0.1;
|
||||
companion ticket to lance-format/lance#6658 needed).
|
||||
— scalar indices (BTree, Inverted) use `stage_create_*_index` +
|
||||
`commit_staged`. Which index a `@index`/`@key` property gets is dispatched by
|
||||
type via `node_prop_index_kind` (enum + orderable scalar → BTree, free-text
|
||||
String → Inverted/FTS, Vector → vector). Vector indices stay inline (residual
|
||||
— Lance `build_index_metadata_from_segments` is `pub(crate)` in 6.0.1;
|
||||
companion ticket to lance-format/lance#6658 needed). This build is
|
||||
existence-gated (it creates a *missing* index over current fragments); folding
|
||||
fragments appended afterward into an *existing* index is `optimize`'s
|
||||
`optimize_indices` pass — an inline-commit residual, not a staged write (Lance
|
||||
exposes no uncommitted index-optimize), covered by the optimize recovery
|
||||
sidecar (see [maintenance.md](../user/operations/maintenance.md)).
|
||||
* **`branch_merge::publish_rewritten_merge_table`**
|
||||
(`exec/merge.rs`) — merge_insert now uses `stage_merge_insert` +
|
||||
`commit_staged`. Deletes stay inline (Lance #6658 residual).
|
||||
|
|
|
|||
|
|
@ -4,14 +4,15 @@
|
|||
|
||||
## `optimize` — non-destructive
|
||||
|
||||
- Compacts every node + edge table on `main`, then **publishes the compacted version to the `__manifest`** so the manifest's recorded version tracks the compacted state. Reads pin the manifest version, so without this publish compaction would be invisible to readers *and* would break the version precondition of the next schema apply / strict update/delete ("stale view … refresh and retry"). The publish advances the graph version (a system-attributed commit) only for tables that actually compacted.
|
||||
- Compacts every node + edge table on `main`, then reindexes them, then **publishes the resulting version to the `__manifest`** so the manifest's recorded version tracks the compacted-and-reindexed state. Reads pin the manifest version, so without this publish the work would be invisible to readers *and* would break the version precondition of the next schema apply / strict update/delete ("stale view … refresh and retry"). The publish advances the graph version (a system-attributed commit) only for tables that actually changed.
|
||||
- Rewrites small fragments into fewer large ones; old fragments remain reachable via older versions until `cleanup` runs.
|
||||
- Each table's compact→publish serializes with concurrent mutations on the same table. A crash mid-operation is recovered automatically on the next open (compaction is content-preserving, so roll-forward is always safe).
|
||||
- **Reindex (index coverage maintenance).** A scalar/FTS/vector index only covers the fragments it was built over. Rows appended after the index was built (e.g. by `load --mode merge`, whose commit does not rebuild an already-existing index) are scanned unindexed, and compaction itself rewrites fragments out of an index's coverage. `optimize` runs Lance's incremental `optimize_indices` after compaction to fold those fragments back in (a delta merge, not a full retrain), restoring full coverage so equality/range/traversal predicates stay index-accelerated. This is why a table with **no compaction work but stale index coverage still commits** a new version under `optimize`. Run `optimize` on a cadence at least as frequent as your freshness window so recently-loaded rows do not linger in the unindexed flat-scan tail.
|
||||
- Each table's compact→reindex→publish serializes with concurrent mutations on the same table. A crash mid-operation is recovered automatically on the next open (both compaction and reindex are content-preserving, so roll-forward is always safe).
|
||||
- **Requires a recovered graph.** `optimize` refuses (errors) when a pending crash-recovery operation is present — operating on an unrecovered graph could publish a partial write that recovery would roll back. Reopen the graph to run recovery, then re-run `optimize`.
|
||||
- **Uncovered drift is skipped, not interpreted.** If a table's underlying version is ahead of the version recorded in `__manifest` and no crash-recovery record covers that movement, `optimize` reports `skipped: DriftNeedsRepair` with the manifest/head versions and leaves the table untouched. Run `omnigraph repair` to classify and explicitly publish that drift.
|
||||
- Bounded by `OMNIGRAPH_MAINTENANCE_CONCURRENCY` (default 8).
|
||||
- Returns per-table stats: `table_key, fragments_removed, fragments_added, committed, skipped, manifest_version, lance_head_version`.
|
||||
- **Blob tables are skipped.** A table that declares any `Blob` property is not compacted: it is reported with `skipped: BlobColumnsUnsupportedByLance` (and logged) instead of compacted, and the rest of the sweep proceeds normally. **Reads and writes are unaffected** — only compaction is. Consequence: fragment count and deleted-row space on blob tables are not reclaimed; query results are never affected.
|
||||
- **Blob tables are skipped.** A table that declares any `Blob` property is not compacted: it is reported with `skipped: BlobColumnsUnsupportedByLance` (and logged) instead of compacted, and the rest of the sweep proceeds normally. **Reads and writes are unaffected** — only compaction is. Consequence: fragment count and deleted-row space on blob tables are not reclaimed; query results are never affected. A skipped blob table is also **not reindexed** in the same sweep (the skip happens before the reindex step), so its index coverage on appended rows is not refreshed by `optimize` today.
|
||||
|
||||
## `repair` — explicit
|
||||
|
||||
|
|
|
|||
|
|
@ -4,10 +4,27 @@
|
|||
|
||||
| Index | Use | Notes |
|
||||
|---|---|---|
|
||||
| **BTREE scalar** | range / equality on any scalar | created on `@key`, `@index(...)`, and on key columns by `ensure_indices()` |
|
||||
| **Inverted (FTS)** | `search`, `fuzzy`, `match_text`, `bm25` | created on text columns referenced by FTS queries |
|
||||
| **BTREE scalar** | `=` / range / `IN` / `IS NULL` on a scalar | always on the node `id` and edge `src`/`dst`; and on each one-column `@index`/`@key` property that is an **enum** or an **orderable scalar** (`DateTime`/`Date`/`I32`/`I64`/`U32`/`U64`/`F32`/`F64`/`Bool`) |
|
||||
| **Inverted (FTS)** | `search`, `fuzzy`, `match_text`, `bm25` | created on **free-text** (non-enum) `String` `@index`/`@key` columns |
|
||||
| **Vector** | `nearest()` k-NN | Lance picks IVF_PQ vs HNSW family by configuration; OmniGraph stores as FixedSizeList(Float32, dim) |
|
||||
|
||||
The per-property index a column gets is decided by `node_prop_index_kind` (shared
|
||||
by the builder and the sidecar-pinning coverage check so they cannot drift):
|
||||
enums and orderable scalars → BTREE, free-text Strings → FTS, `Vector` → vector,
|
||||
list/`Blob` columns → none.
|
||||
|
||||
> **Free-text Strings are not equality-indexed.** A non-enum `String` column
|
||||
> (including a `String @key` slug) gets an FTS inverted index, which Lance does
|
||||
> **not** consult for `=`/range — only for `search`/`match_text`/`bm25`. So an
|
||||
> equality filter on a free-text String falls back to a full scan. If you filter
|
||||
> a String identifier by equality on a large table, model it so the value is the
|
||||
> node id, or track it as a follow-up to also build a BTREE on such columns.
|
||||
|
||||
> **Coverage and cost.** Each indexed column adds index files and build time, and
|
||||
> an index only covers the fragments it was built over. Rows appended after the
|
||||
> index was built (e.g. by `ingest --mode merge`) are scanned unindexed until a
|
||||
> reindex extends coverage; see [maintenance](maintenance.md) → `optimize`.
|
||||
|
||||
## L2 — OmniGraph orchestration
|
||||
|
||||
- `ensure_indices()` / `ensure_indices_on(branch)` — idempotent build of BTREE + inverted indexes for the current head; safe to re-run.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue