diff --git a/AGENTS.md b/AGENTS.md index 065e28a..427d976 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -247,10 +247,10 @@ omnigraph policy explain --actor act-alice --action change --branch main | Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables | | Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering | | Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). The write entry points (`load_as`, `mutate_as`, `apply_schema_as`, `branch_merge_as`) and `refresh` additionally run an in-process roll-forward-only heal (serialized against live writers via the per-table write queues), so a long-lived server converges on its next write without restart; only rollback-eligible sidecars still defer to the next read-write open (a future background reconciler's goal). Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete_where` and `create_vector_index` stay inline until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. | -| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **publishes each compacted table's new version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe compaction and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage; **refuses on an unrecovered graph** (errors if a `__recovery` sidecar is pending); **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair` instead of interpreting it; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) | +| Compaction (`compact_files`) + reindex (`optimize_indices`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; per table runs `compact_files` **then Lance `optimize_indices`** (folds appended/rewritten fragments back into existing indexes — incremental merge, not retrain) and **publishes the resulting version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe the work and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage spanning both ops; **commits even with no compaction work if index coverage is stale**; **refuses on an unrecovered graph**; **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair`; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent; reindex is skipped for them too today), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) | | Repair uncovered drift | — | `omnigraph repair` explicitly classifies uncovered table `HEAD > manifest` drift: verified maintenance drift (`ReserveFragments`/`Rewrite`) can be published with `--confirm`; suspicious or unverifiable drift requires `--force --confirm`. Sidecar-covered crash residuals still recover automatically on open. | | Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy | -| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches | +| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them per `@index`/`@key` column, dispatched by type via `node_prop_index_kind` (enum + orderable scalar → BTREE, free-text String → FTS, Vector → vector); idempotent; lazy across branches. Coverage of fragments appended after build is restored by `optimize`'s `optimize_indices` pass (see Compaction row). | | `merge_insert` upsert | ✅ | `LoadMode::Merge`, mutation `update`/`insert`/`delete` lowering | | Vector search | ✅ | `nearest()` query op; embedding pipeline (Gemini / OpenAI clients); `@embed` in schema | | Full-text search | ✅ | `search/fuzzy/match_text/bm25` query ops | diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 779a2e0..6c80117 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -16,7 +16,7 @@ use lance::dataset::scanner::ColumnOrdering; use lance::datatypes::BlobKind; use omnigraph_compiler::catalog::{Catalog, EdgeType, NodeType}; use omnigraph_compiler::schema::parser::parse_schema; -use omnigraph_compiler::types::ScalarType; +use omnigraph_compiler::types::{PropType, ScalarType}; use omnigraph_compiler::{ DropMode, SchemaIR, SchemaMigrationPlan, SchemaMigrationStep, SchemaTypeKind, build_catalog_from_ir, build_schema_ir, plan_schema_migration, diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 21629a8..9195256 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -32,6 +32,8 @@ use lance::dataset::cleanup::{CleanupPolicy, RemovalStats}; use lance::dataset::optimize::{ CompactionMetrics, CompactionOptions, compact_files, plan_compaction, }; +use lance::index::DatasetIndexExt; +use lance_index::optimize::OptimizeOptions; use super::*; @@ -361,16 +363,22 @@ async fn optimize_one_table( } // Precise "will it compact?" check — `plan_compaction` also accounts for - // deletion materialization (which can rewrite even a single fragment). A - // steady-state already-compacted table yields an empty plan and is never - // pinned in a sidecar (a zero-commit pin would classify NoMovement on - // recovery and force an all-or-nothing rollback). Uncovered pre-existing - // drift is skipped above and must go through explicit repair. + // deletion materialization (which can rewrite even a single fragment). let options = CompactionOptions::default(); let plan = plan_compaction(&ds, &options) .await .map_err(|e| OmniError::Lance(e.to_string()))?; - if plan.num_tasks() == 0 { + let will_compact = plan.num_tasks() > 0; + // Even when there is nothing to compact, the table may still have index + // work: rows appended since the index was built (e.g. via `ingest --mode + // merge`) are scanned unindexed until folded in. Either compaction or stale + // index coverage is enough to enter the publish path. If NEITHER, this + // table is a no-op and must NOT be pinned in a sidecar — a zero-commit pin + // classifies NoMovement on recovery and forces an all-or-nothing rollback + // of sibling tables' legitimate work. Uncovered pre-existing manifest/HEAD + // drift is skipped above and must go through explicit repair. + let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?; + if !will_compact && !needs_reindex { return Ok(TableOptimizeStats::compacted( table_key, &CompactionMetrics::default(), @@ -378,8 +386,9 @@ async fn optimize_one_table( )); } - // Phase A: recovery sidecar BEFORE compaction advances the Lance HEAD, so a - // crash before the manifest publish rolls forward on next open. + // Phase A: recovery sidecar BEFORE any HEAD-advancing op (compaction or + // index optimize), so a crash before the manifest publish rolls forward on + // next open. let sidecar = crate::db::manifest::new_sidecar( crate::db::manifest::SidecarKind::Optimize, None, @@ -398,11 +407,26 @@ async fn optimize_one_table( let handle = crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?; - // Phase B: compaction (reserve-fragments + rewrite commits advance HEAD). + // Phase B: compaction (if any) then incremental index optimize — both + // advance Lance HEAD inside the sidecar window. `compact_files` rewrites + // fragments and drops them from existing index segments' coverage; + // `optimize_indices` folds the rewritten and any previously-unindexed + // fragments back in (Lance's incremental merge, not a full retrain). This + // is the same compact -> optimize_indices sequencing LanceDB's `optimize()` + // uses. `optimize_indices` is an inline-commit residual: lance-6.0.1 + // exposes no uncommitted variant, so like `compact_files` it commits + // directly and relies on the sidecar for recovery. let version_before = ds.version().version; - let metrics: CompactionMetrics = compact_files(&mut ds, options, None) + let metrics: CompactionMetrics = if will_compact { + compact_files(&mut ds, options, None) + .await + .map_err(|e| OmniError::Lance(e.to_string()))? + } else { + CompactionMetrics::default() + }; + ds.optimize_indices(&OptimizeOptions::default()) .await - .map_err(|e| OmniError::Lance(e.to_string()))?; + .map_err(|e| OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)))?; let version_after = ds.version().version; let committed = version_after != version_before; diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index f7a365a..3f40c1d 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -310,6 +310,48 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st Ok(()) } +/// The single scalar/vector index a node property receives from a one-column +/// `@index`/`@key` declaration, or `None` when the property type is not +/// indexable here (a list column or `Blob`). +/// +/// Shared by `build_indices_on_dataset_for_catalog` (which builds the index) +/// and `needs_index_work_node` (which checks coverage to decide recovery- +/// sidecar pinning) so the two cannot drift: an enum or orderable scalar the +/// builder gives a BTREE must also be reported as "needs work" until that +/// BTREE exists, or the HEAD-advancing build would run without sidecar cover. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +enum NodePropIndexKind { + Btree, + Fts, + Vector, +} + +fn node_prop_index_kind(prop_type: &PropType) -> Option { + if prop_type.list { + return None; + } + // Enums are physically `String` but filtered by equality, so they take a + // scalar BTREE, not an FTS inverted index (Lance never consults an inverted + // index for `=`/range). Free-text Strings keep FTS for + // `search()`/`match_text`/`bm25`. + let is_enum = prop_type.enum_values.is_some(); + match prop_type.scalar { + ScalarType::String if !is_enum => Some(NodePropIndexKind::Fts), + ScalarType::Vector(_) => Some(NodePropIndexKind::Vector), + ScalarType::String + | ScalarType::DateTime + | ScalarType::Date + | ScalarType::I32 + | ScalarType::I64 + | ScalarType::U32 + | ScalarType::U64 + | ScalarType::F32 + | ScalarType::F64 + | ScalarType::Bool => Some(NodePropIndexKind::Btree), + ScalarType::Blob => None, + } +} + /// Returns true if the node table is missing at least one declared /// scalar/vector index that `build_indices_on_dataset_for_catalog` would /// build AND has at least one row (the ensure_indices loop has @@ -318,11 +360,12 @@ pub(super) async fn ensure_indices_for_branch(db: &Omnigraph, branch: Option<&st /// would force `NoMovement` classification on recovery and trigger the /// all-or-nothing rollback of sibling tables' legitimate index work). /// -/// Per the actual `build_indices_on_dataset_for_catalog` implementation -/// (this file, ~line 419-491), nodes get BTree (id) + per-prop FTS -/// (@search String) + per-prop Vector indices; edges get BTree only -/// (id, src, dst). The two helpers mirror that asymmetry — see the -/// `needs_index_work_edge` doc comment. +/// Per `build_indices_on_dataset_for_catalog`, nodes get BTree (id) plus, for +/// each one-column `@index`/`@key` property, the index `node_prop_index_kind` +/// assigns: a scalar BTREE for enums and orderable scalars +/// (DateTime/Date/numeric/Bool), FTS for free-text Strings, or a Vector index. +/// Edges get BTree only (id, src, dst). This helper and the builder share +/// `node_prop_index_kind` so they cannot drift — see its doc comment. async fn needs_index_work_node( db: &Omnigraph, type_name: &str, @@ -359,14 +402,23 @@ async fn needs_index_work_node( let Some(prop_type) = node_type.properties.get(prop_name) else { continue; }; - if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list { - if !db.storage().has_fts_index(&ds, prop_name).await? { - return Ok(true); + match node_prop_index_kind(prop_type) { + Some(NodePropIndexKind::Fts) => { + if !db.storage().has_fts_index(&ds, prop_name).await? { + return Ok(true); + } } - } else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list { - if !db.storage().has_vector_index(&ds, prop_name).await? { - return Ok(true); + Some(NodePropIndexKind::Vector) => { + if !db.storage().has_vector_index(&ds, prop_name).await? { + return Ok(true); + } } + Some(NodePropIndexKind::Btree) => { + if !db.storage().has_btree_index(&ds, prop_name).await? { + return Ok(true); + } + } + None => {} } } Ok(false) @@ -615,30 +667,44 @@ pub(super) async fn build_indices_on_dataset_for_catalog( } let prop_name = &index_cols[0]; if let Some(prop_type) = node_type.properties.get(prop_name) { - if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list { - if !db.storage().has_fts_index(ds, prop_name).await? { - stage_and_commit_inverted(db, table_key, ds, prop_name.as_str()) - .await?; + match node_prop_index_kind(prop_type) { + Some(NodePropIndexKind::Fts) => { + if !db.storage().has_fts_index(ds, prop_name).await? { + stage_and_commit_inverted(db, table_key, ds, prop_name.as_str()) + .await?; + } } - } else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list { - if !db.storage().has_vector_index(ds, prop_name).await? { - // Inline-commit residual: lance-6.0.1 does not - // expose `build_index_metadata_from_segments` as - // `pub`, so vector indices cannot be staged from - // outside the lance crate. Document at the call - // site; companion ticket to lance-format/lance#6658. - let new_snap = db - .storage_inline_residual() - .create_vector_index(ds.clone(), prop_name.as_str()) - .await - .map_err(|e| { - OmniError::Lance(format!( - "create Vector index on {}({}): {}", - table_key, prop_name, e - )) - })?; - *ds = new_snap; + Some(NodePropIndexKind::Vector) => { + if !db.storage().has_vector_index(ds, prop_name).await? { + // Inline-commit residual: lance-6.0.1 does not + // expose `build_index_metadata_from_segments` as + // `pub`, so vector indices cannot be staged from + // outside the lance crate. Document at the call + // site; companion ticket to lance-format/lance#6658. + let new_snap = db + .storage_inline_residual() + .create_vector_index(ds.clone(), prop_name.as_str()) + .await + .map_err(|e| { + OmniError::Lance(format!( + "create Vector index on {}({}): {}", + table_key, prop_name, e + )) + })?; + *ds = new_snap; + } } + // Enum + orderable scalars (DateTime/Date/numeric/Bool) + // get a BTREE so `=`, range, IN, and IS NULL are index- + // accelerated instead of degrading to a full scan. + Some(NodePropIndexKind::Btree) => { + if !db.storage().has_btree_index(ds, prop_name).await? { + stage_and_commit_btree(db, table_key, ds, &[prop_name.as_str()]) + .await?; + } + } + // List or Blob column: not indexable as a scalar here. + None => {} } } } diff --git a/crates/omnigraph/src/exec/projection.rs b/crates/omnigraph/src/exec/projection.rs index 7280ec5..bb6e665 100644 --- a/crates/omnigraph/src/exec/projection.rs +++ b/crates/omnigraph/src/exec/projection.rs @@ -72,7 +72,11 @@ fn evaluate_expr(batch: &RecordBatch, expr: &IRExpr, params: &ParamMap) -> Resul } /// Create a constant array from a literal value. -fn literal_to_array(lit: &Literal, num_rows: usize) -> Result { +/// +/// `pub(super)` so the pushdown arm (`query.rs::literal_to_typed_expr`) can build +/// a literal in the same natural Arrow type and cast it to the column type through +/// the identical `arrow_cast` path used here, keeping the two filter arms in sync. +pub(super) fn literal_to_array(lit: &Literal, num_rows: usize) -> Result { Ok(match lit { Literal::Null => arrow_array::new_null_array(&DataType::Utf8, num_rows), Literal::String(s) => Arc::new(StringArray::from(vec![s.as_str(); num_rows])) as ArrayRef, diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index ae2a824..4c1822f 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -1289,10 +1289,12 @@ async fn expand_hydrate_and_align( params: &ParamMap, ) -> Result<()> { // Pushable destination filters are applied by `hydrate_nodes`; the rest - // (`ir_filter_to_expr` → None) are applied in memory after hconcat. + // (`ir_filter_to_expr` → None) are applied in memory after hconcat. The + // schema arg only affects a pushable literal's TYPE, never Some-vs-None, so + // `None` here yields the same pushable/non-pushable split as `hydrate_nodes`. let non_pushable: Vec<&IRFilter> = dst_filters .iter() - .filter(|f| ir_filter_to_expr(f, params).is_none()) + .filter(|f| ir_filter_to_expr(f, params, None).is_none()) .collect(); // Unique destination ids (first-seen order) for one batched hydration. @@ -1506,7 +1508,8 @@ async fn hydrate_nodes( // `id IN (ids)` AND any pushable destination filters, as a structured Expr. let id_list: Vec = ids.iter().map(|id| lit(id.clone())).collect(); let mut filter_expr = col("id").in_list(id_list, false); - if let Some(dst_expr) = build_lance_filter_expr(dst_filters, params) { + if let Some(dst_expr) = build_lance_filter_expr(dst_filters, params, Some(&node_type.arrow_schema)) + { filter_expr = filter_expr.and(dst_expr); } @@ -1747,21 +1750,23 @@ async fn execute_node_scan( let table_key = format!("node:{}", type_name); let ds = snapshot.open(&table_key).await?; + let node_type = &catalog.node_types[type_name]; + // Lower the IR filters to a DataFusion `Expr` and apply via // `Scanner::filter_expr` inside the configure closure. The string // pushdown path (`build_lance_filter` → `scanner.filter(&str)`) is // gone for node scans — structured Expr unlocks `CompOp::Contains` // pushdown (via `array_has`) and lets DF 53's optimizer rules // (vectorized IN-list, PhysicalExprSimplifier, CASE-NULL shortcut) - // reach our predicates. Other call sites that still take string SQL - // (hydrate_nodes for the Expand pushdown, count_rows, the mutation - // delete path) migrate in follow-up MRs. - let filter_expr = build_lance_filter_expr(filters, params); + // reach our predicates. Passing the node's `arrow_schema` lets the lowering + // coerce literals to each column's exact type so narrow-numeric BTREEs are + // used. Other call sites that still take string SQL (count_rows, the + // mutation delete path) migrate in follow-up MRs. + let filter_expr = build_lance_filter_expr(filters, params, Some(&node_type.arrow_schema)); // Blob columns must be excluded from scan when a filter is present // (Lance bug: BlobsDescriptions + filter triggers a projection assertion). // We exclude blob columns and add metadata post-scan via take_blobs_by_indices. - let node_type = &catalog.node_types[type_name]; let has_blobs = !node_type.blob_properties.is_empty(); let non_blob_cols: Vec<&str> = node_type .arrow_schema @@ -1990,13 +1995,14 @@ pub(super) fn literal_to_sql(lit: &Literal) -> String { pub(super) fn build_lance_filter_expr( filters: &[IRFilter], params: &ParamMap, + schema: Option<&Schema>, ) -> Option { use datafusion::logical_expr::Operator; use datafusion::prelude::Expr; let mut acc: Option = None; for f in filters { - let Some(e) = ir_filter_to_expr(f, params) else { + let Some(e) = ir_filter_to_expr(f, params, schema) else { continue; }; acc = Some(match acc { @@ -2017,6 +2023,7 @@ pub(super) fn build_lance_filter_expr( pub(super) fn ir_filter_to_expr( filter: &IRFilter, params: &ParamMap, + schema: Option<&Schema>, ) -> Option { use datafusion::functions_nested::expr_fn::array_has; @@ -2027,14 +2034,22 @@ pub(super) fn ir_filter_to_expr( // List-contains: `prop CONTAINS value` lowers to `array_has(prop, value)`. // This is the case the old SQL-string pushdown had to return None for // ("Can't pushdown list contains"); with structured Expr it pushes down fine. + // (Element-type coercion for the contained value is deferred — list columns + // are not scalar-indexed, so the index-eligibility concern below does not apply.) if matches!(filter.op, CompOp::Contains) { - let left = ir_expr_to_expr(&filter.left, params)?; - let right = ir_expr_to_expr(&filter.right, params)?; + let left = ir_expr_to_expr(&filter.left, params, None)?; + let right = ir_expr_to_expr(&filter.right, params, None)?; return Some(array_has(left, right)); } - let left = ir_expr_to_expr(&filter.left, params)?; - let right = ir_expr_to_expr(&filter.right, params)?; + // A literal/param operand is coerced to the OTHER operand's column type so + // the predicate stays a direct `col OP literal` and the scalar index is used. + // Without this, DataFusion widens a narrow column (`CAST(col AS Int64)`), + // which defeats the BTREE (validated by `probe_scalar_index_use_under_literal_type`). + let left_col_type = prop_data_type(&filter.left, schema); + let right_col_type = prop_data_type(&filter.right, schema); + let left = ir_expr_to_expr(&filter.left, params, right_col_type.as_ref())?; + let right = ir_expr_to_expr(&filter.right, params, left_col_type.as_ref())?; Some(match filter.op { CompOp::Eq => left.eq(right), CompOp::Ne => left.not_eq(right), @@ -2052,19 +2067,91 @@ pub(super) fn ir_filter_to_expr( pub(super) fn ir_expr_to_expr( expr: &IRExpr, params: &ParamMap, + target: Option<&arrow_schema::DataType>, ) -> Option { - use datafusion::prelude::{col, lit}; + use datafusion::prelude::col; match expr { IRExpr::PropAccess { property, .. } => Some(col(property)), - IRExpr::Literal(l) => literal_to_expr(l), - IRExpr::Param(name) => params.get(name).and_then(literal_to_expr), + IRExpr::Literal(l) => literal_to_expr_coerced(l, target), + IRExpr::Param(name) => params + .get(name) + .and_then(|l| literal_to_expr_coerced(l, target)), _ => None, } } -/// Convert a Literal to a DataFusion `Expr`. Returns `None` for List -/// (which the existing SQL path also can't pushdown — falls through to -/// post-scan in-memory application). +/// The Arrow type of a `PropAccess` operand, looked up in the scan's schema, or +/// `None` if the expr is not a column or the schema/field is unavailable. +fn prop_data_type(expr: &IRExpr, schema: Option<&Schema>) -> Option { + match expr { + IRExpr::PropAccess { property, .. } => schema? + .field_with_name(property) + .ok() + .map(|f| f.data_type().clone()), + _ => None, + } +} + +/// Lower a literal for pushdown, coercing it to `target` (the comparison +/// column's Arrow type) when known. Falls back to the natural-type +/// `literal_to_expr` on a missing target or any coercion failure, so a filter is +/// never demoted to `None` by coercion (a node scan has no in-memory fallback for +/// inline filters — see `execute_node_scan`). +fn literal_to_expr_coerced( + lit: &Literal, + target: Option<&arrow_schema::DataType>, +) -> Option { + if let Some(target) = target { + if let Some(e) = literal_to_typed_expr(lit, target) { + return Some(e); + } + } + literal_to_expr(lit) +} + +/// Build a literal as a typed Arrow scalar matching `target`, reusing the same +/// `literal_to_array` + `arrow_cast` path as the in-memory arm +/// (`projection.rs::evaluate_filter`) so the two arms agree. Returns `None` on +/// any failure (unbuildable literal, incompatible cast) — the caller then falls +/// back to the natural-type literal. +/// +/// Lossless-only for integer targets: typecheck permits numeric cross-type +/// comparisons (`types_compatible`), so a fractional float or out-of-range +/// integer can reach here. Casting those to a narrower integer would truncate +/// (`2.7 -> 2`) or overflow to null, silently changing which rows match. We +/// round-trip the cast and, on mismatch, return `None` so the caller keeps the +/// natural literal — correct via DataFusion coercion, the index just goes unused +/// for that out-of-domain predicate. Float targets are exempt: narrowing +/// `F64 -> F32` is the column's own precision domain, not a value error. +fn literal_to_typed_expr( + lit: &Literal, + target: &arrow_schema::DataType, +) -> Option { + use datafusion::prelude::lit as df_lit; + use datafusion::scalar::ScalarValue; + + let arr = super::projection::literal_to_array(lit, 1).ok()?; + if arr.data_type() == target { + return Some(df_lit(ScalarValue::try_from_array(&arr, 0).ok()?)); + } + let casted = arrow_cast::cast::cast(&arr, target).ok()?; + if target.is_integer() { + let back = arrow_cast::cast::cast(&casted, arr.data_type()).ok()?; + let original = ScalarValue::try_from_array(&arr, 0).ok()?; + let round_tripped = ScalarValue::try_from_array(&back, 0).ok()?; + if original != round_tripped { + return None; + } + } + Some(df_lit(ScalarValue::try_from_array(&casted, 0).ok()?)) +} + +/// Convert a Literal to a DataFusion `Expr` in its NATURAL Arrow type. This is +/// the fallback used when the comparison column's type is unknown (no schema) or +/// when coercion to it fails; the typed, column-matched coercion that keeps +/// scalar indexes usable lives in `literal_to_typed_expr`. Returns `None` for +/// List (the SQL path also could not pushdown it — falls through to post-scan +/// in-memory application). fn literal_to_expr(lit: &Literal) -> Option { use datafusion::prelude::lit as df_lit; Some(match lit { @@ -2073,9 +2160,12 @@ fn literal_to_expr(lit: &Literal) -> Option { Literal::Integer(n) => df_lit(*n), Literal::Float(f) => df_lit(*f), Literal::Bool(b) => df_lit(*b), - // Date/DateTime stored as strings; pass through as string literals - // — Lance/DataFusion handles the comparison against typed columns - // via implicit cast, matching the existing string-SQL behavior. + // Date/DateTime pass through as strings here. Against a typed Date + // column DataFusion casts the LITERAL (`CAST(Utf8 AS Date32)`), which is + // index-safe (proven by `scalar_index_use_requires_matched_literal_type`). + // At real pushdown sites the schema is known, so `literal_to_typed_expr` + // produces a typed Date32/Date64 anyway; this branch is only the + // no-schema fallback. Literal::Date(s) => df_lit(s.clone()), Literal::DateTime(s) => df_lit(s.clone()), Literal::List(_) => return None, @@ -2285,3 +2375,205 @@ mod expand_chooser_tests { assert_eq!(choose_expand_mode(&i), ExpandMode::Csr); } } + +#[cfg(test)] +mod literal_lowering_tests { + use super::*; + use datafusion::prelude::Expr; + use datafusion::scalar::ScalarValue; + + // With the column type known, the generic coercion types a date literal to + // the column's Date32/Date64 (the live pushdown path). Without a target it + // is the natural Utf8 fallback, which is still index-safe for dates because + // DataFusion casts the LITERAL, not the column (proven by + // `lance_surface_guards::scalar_index_use_requires_matched_literal_type`). + #[test] + fn date_literals_coerce_to_typed_arrow_scalars() { + use arrow_schema::DataType; + let dt = literal_to_expr_coerced( + &Literal::DateTime("2024-06-01T12:00:00Z".into()), + Some(&DataType::Date64), + ) + .unwrap(); + assert!( + matches!(dt, Expr::Literal(ScalarValue::Date64(Some(_)), ..)), + "DateTime vs Date64 column must coerce to a typed Date64, got {dt:?}" + ); + let d = literal_to_expr_coerced(&Literal::Date("2024-06-01".into()), Some(&DataType::Date32)) + .unwrap(); + assert!( + matches!(d, Expr::Literal(ScalarValue::Date32(Some(_)), ..)), + "Date vs Date32 column must coerce to a typed Date32, got {d:?}" + ); + let nat = literal_to_expr_coerced(&Literal::Date("2024-06-01".into()), None).unwrap(); + assert!( + matches!(nat, Expr::Literal(ScalarValue::Utf8(Some(_)), ..)), + "no target should keep the natural Utf8 date literal, got {nat:?}" + ); + } + + // A malformed date string makes coercion fail, so it falls back to the + // natural Utf8 literal rather than dropping the predicate to None. + #[test] + fn malformed_date_literal_falls_back_to_string() { + use arrow_schema::DataType; + let bad = literal_to_expr_coerced( + &Literal::DateTime("not-a-date".into()), + Some(&DataType::Date64), + ) + .unwrap(); + assert!( + matches!(bad, Expr::Literal(ScalarValue::Utf8(Some(_)), ..)), + "malformed DateTime literal should fall back to a Utf8 literal, got {bad:?}" + ); + } + + // With a column target, a literal lowers to the column's EXACT Arrow type + // (not its natural width), so DataFusion does not widen and cast the column + // — keeping the scalar BTREE usable. See + // `lance_surface_guards::scalar_index_use_requires_matched_literal_type`. + #[test] + fn integer_literal_coerces_to_narrow_column_type() { + use arrow_schema::DataType; + let i32_lit = literal_to_expr_coerced(&Literal::Integer(5), Some(&DataType::Int32)).unwrap(); + assert!( + matches!(i32_lit, Expr::Literal(ScalarValue::Int32(Some(5)), ..)), + "integer literal vs Int32 column must lower to Int32, got {i32_lit:?}" + ); + let u32_lit = literal_to_expr_coerced(&Literal::Integer(7), Some(&DataType::UInt32)).unwrap(); + assert!( + matches!(u32_lit, Expr::Literal(ScalarValue::UInt32(Some(7)), ..)), + "integer literal vs UInt32 column must lower to UInt32, got {u32_lit:?}" + ); + } + + #[test] + fn float_literal_coerces_to_f32_column_type() { + use arrow_schema::DataType; + let f32_lit = + literal_to_expr_coerced(&Literal::Float(1.5), Some(&DataType::Float32)).unwrap(); + assert!( + matches!(f32_lit, Expr::Literal(ScalarValue::Float32(Some(_)), ..)), + "float literal vs Float32 column must lower to Float32, got {f32_lit:?}" + ); + } + + // Lossless guard: a fractional float against an integer column must NOT + // truncate (2.7 -> 2). Fall back to the natural Float64 so the comparison + // stays exact (no integer equals 2.7). + #[test] + fn fractional_float_vs_int_column_falls_back_not_truncate() { + use arrow_schema::DataType; + let e = literal_to_expr_coerced(&Literal::Float(2.7), Some(&DataType::Int32)).unwrap(); + assert!( + matches!(e, Expr::Literal(ScalarValue::Float64(Some(_)), ..)), + "fractional float vs Int32 must fall back to natural Float64, got {e:?}" + ); + } + + // A whole-number float IS lossless against an integer column, so it coerces. + #[test] + fn whole_float_vs_int_column_coerces() { + use arrow_schema::DataType; + let e = literal_to_expr_coerced(&Literal::Float(2.0), Some(&DataType::Int32)).unwrap(); + assert!( + matches!(e, Expr::Literal(ScalarValue::Int32(Some(2)), ..)), + "whole-number float vs Int32 is lossless and must coerce to Int32(2), got {e:?}" + ); + } + + // Lossless guard: an integer literal outside the column's range must NOT + // overflow to null; fall back to the natural Int64 (correct via DataFusion). + #[test] + fn out_of_range_int_vs_narrow_column_falls_back() { + use arrow_schema::DataType; + let e = literal_to_expr_coerced(&Literal::Integer(3_000_000_000), Some(&DataType::Int32)) + .unwrap(); + assert!( + matches!(e, Expr::Literal(ScalarValue::Int64(Some(3_000_000_000)), ..)), + "out-of-range integer vs Int32 must fall back to natural Int64, got {e:?}" + ); + } + + // Float targets are exempt from the lossless guard: narrowing to the column's + // own precision is the correct comparison domain, even when the value is not + // exactly representable in F32 (0.1). + #[test] + fn float_vs_f32_column_coerces_even_when_not_exactly_representable() { + use arrow_schema::DataType; + let e = literal_to_expr_coerced(&Literal::Float(0.1), Some(&DataType::Float32)).unwrap(); + assert!( + matches!(e, Expr::Literal(ScalarValue::Float32(Some(_)), ..)), + "float target must coerce 0.1 to Float32 (exempt from lossless guard), got {e:?}" + ); + } + + // No target (caller without a schema) keeps the natural width — the existing + // fallback, so behavior never regresses where the column type is unknown. + #[test] + fn literal_without_target_keeps_natural_width() { + let nat = literal_to_expr_coerced(&Literal::Integer(5), None).unwrap(); + assert!( + matches!(nat, Expr::Literal(ScalarValue::Int64(Some(5)), ..)), + "no target should keep the natural Int64 width, got {nat:?}" + ); + } + + // True if either operand of a binary comparison is an Int32 literal. + fn binary_has_int32_literal(e: &Expr) -> bool { + if let Expr::BinaryExpr(b) = e { + [b.left.as_ref(), b.right.as_ref()] + .iter() + .any(|side| matches!(side, Expr::Literal(ScalarValue::Int32(Some(_)), ..))) + } else { + false + } + } + + fn int32_schema() -> arrow_schema::Schema { + use arrow_schema::{DataType, Field}; + arrow_schema::Schema::new(vec![Field::new("count", DataType::Int32, true)]) + } + + fn count_prop() -> IRExpr { + IRExpr::PropAccess { + variable: "m".into(), + property: "count".into(), + } + } + + // Coercion is operator-independent: a range comparison's literal coerces to + // the column type just like equality does, so range filters on a narrow + // numeric column keep the BTREE. + #[test] + fn ir_filter_coerces_literal_for_range_op() { + let schema = int32_schema(); + let filter = IRFilter { + left: count_prop(), + op: CompOp::Ge, + right: IRExpr::Literal(Literal::Integer(2)), + }; + let expr = ir_filter_to_expr(&filter, &ParamMap::new(), Some(&schema)).unwrap(); + assert!( + binary_has_int32_literal(&expr), + "range-op literal must coerce to the Int32 column type, got {expr:?}" + ); + } + + // The column may be on either side; the literal coerces to the opposite + // operand's column type regardless of order (`5 < count`). + #[test] + fn ir_filter_coerces_literal_when_column_is_on_the_right() { + let schema = int32_schema(); + let filter = IRFilter { + left: IRExpr::Literal(Literal::Integer(2)), + op: CompOp::Lt, + right: count_prop(), + }; + let expr = ir_filter_to_expr(&filter, &ParamMap::new(), Some(&schema)).unwrap(); + assert!( + binary_has_int32_literal(&expr), + "reversed-operand literal must coerce to the Int32 column type, got {expr:?}" + ); + } +} diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 65123c0..b6e8c4d 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -705,6 +705,36 @@ impl TableStore { Ok(IndexCoverage::Indexed) } + /// True if any non-system index on `ds` leaves at least one current + /// fragment uncovered, i.e. rows that the index does not yet account for + /// (appended after the index was built, or rewritten by compaction). Such + /// fragments are scanned unindexed until a reindex (`optimize_indices`) + /// folds them in. Returns false when every index covers every fragment, or + /// when the table has no (non-system) indices to optimize. A `None` + /// `fragment_bitmap` means Lance cannot report coverage for that index, so + /// we do not treat it as uncovered (mirrors `key_column_index_coverage`). + /// + /// Used by `optimize` to decide whether an otherwise-already-compacted + /// table still has index work to do. + pub async fn has_unindexed_fragments(ds: &Dataset) -> Result { + let indices = ds + .load_indices() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let frag_ids: Vec = ds.fragments().iter().map(|f| f.id as u32).collect(); + for index in indices.iter() { + if is_system_index(index) { + continue; + } + if let Some(bitmap) = index.fragment_bitmap.as_ref() { + if frag_ids.iter().any(|id| !bitmap.contains(*id)) { + return Ok(true); + } + } + } + Ok(false) + } + pub async fn count_rows(&self, ds: &Dataset, filter: Option) -> Result { ds.count_rows(filter) .await diff --git a/crates/omnigraph/tests/lance_surface_guards.rs b/crates/omnigraph/tests/lance_surface_guards.rs index 370f9e7..fdb977c 100644 --- a/crates/omnigraph/tests/lance_surface_guards.rs +++ b/crates/omnigraph/tests/lance_surface_guards.rs @@ -36,6 +36,7 @@ use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, use lance::index::DatasetIndexExt; use lance_file::version::LanceFileVersion; use lance_index::IndexType; +use lance_index::optimize::OptimizeOptions; use lance_index::scalar::ScalarIndexParams; use lance_namespace::LanceNamespace; use lance_table::io::commit::ManifestNamingScheme; @@ -541,3 +542,199 @@ async fn fragment_deletion_metadata_is_available() { per-fragment deletions and would need to read the deletion vector.", ); } + +// --- Guard 14: Dataset::optimize_indices signature ---------------------------- +// +// `db/omnigraph/optimize.rs::optimize_one_table` calls +// `ds.optimize_indices(&OptimizeOptions::default())` (via `DatasetIndexExt`) to +// fold appended/compacted fragments back into existing indexes. If Lance +// changes the receiver, the options type, or the return shape, this fails to +// compile. Compile-only. + +#[allow( + dead_code, + unreachable_code, + unused_variables, + unused_mut, + clippy::diverging_sub_expression +)] +async fn _compile_optimize_indices_signature() -> lance::Result<()> { + let mut ds: Dataset = unimplemented!(); + let options = OptimizeOptions::default(); + // `&mut self`, `&OptimizeOptions`, returns `Result<()>` (mutates in place + // and commits — there is no uncommitted variant in this Lance, which is why + // optimize treats it as an inline-commit residual under a recovery sidecar). + let _: () = ds.optimize_indices(&options).await?; + Ok(()) +} + +// --- Guard 15: optimize_indices extends fragment coverage ---------------------- +// +// PR3's reindex assumes `optimize_indices` folds fragments appended AFTER an +// index was built into that index (incremental merge, not retrain). This pins +// that Lance behavior at the surface layer so a regression turns red here, the +// first smoke check on a Lance bump, before the slower engine suite. + +#[tokio::test] +async fn optimize_indices_extends_fragment_coverage() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().join("guard_optimize_indices.lance"); + let uri = uri.to_str().unwrap(); + + // Fragment 0: alice, bob. Build a BTREE over `value` covering only it. + let mut ds = fresh_dataset(uri).await; + ds.create_index_builder(&["value"], IndexType::BTree, &ScalarIndexParams::default()) + .replace(true) + .await + .unwrap(); + + // Append a second fragment the existing index does not cover. + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["carol"])), + Arc::new(Int32Array::from(vec![3])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let params = WriteParams { + mode: WriteMode::Append, + enable_stable_row_ids: true, + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }; + Dataset::write(reader, uri, Some(params)).await.unwrap(); + + let mut ds = Dataset::open(uri).await.unwrap(); + assert!( + value_index_uncovered_count(&ds).await > 0, + "appended fragment should be uncovered by the BTREE before optimize_indices" + ); + + ds.optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + + assert_eq!( + value_index_uncovered_count(&ds).await, + 0, + "optimize_indices must fold the appended fragment into the existing index \ + (incremental coverage); if this regresses, PR3's reindex no longer keeps \ + coverage current — revisit db/omnigraph/optimize.rs and docs/dev/lance.md." + ); +} + +/// Count current fragments not covered by the single-column `value` BTREE — +/// mirrors `TableStore::has_unindexed_fragments` (load_indices + +/// `fragment_bitmap.contains`), pinned by Guard 11. +async fn value_index_uncovered_count(ds: &Dataset) -> usize { + let indices = ds.load_indices().await.unwrap(); + let frag_ids: Vec = ds.fragments().iter().map(|f| f.id as u32).collect(); + let value_fid = ds.schema().field("value").unwrap().id; + for index in indices.iter() { + if index.fields.len() == 1 && index.fields[0] == value_fid { + if let Some(bitmap) = index.fragment_bitmap.as_ref() { + return frag_ids.iter().filter(|id| !bitmap.contains(**id)).count(); + } + } + } + // No `value` index found — treat as fully uncovered so a missing index + // is never mistaken for full coverage. + frag_ids.len() +} + +// --- Guard 16: scalar index use requires a literal matching the column type --- +// +// Pins the substrate behavior the pushdown literal-coercion fix relies on +// (`query.rs::literal_to_typed_expr`): Lance uses the BTREE only when the filter +// is `column OP literal` with a matching type. A width-mismatched literal makes +// DataFusion widen and cast the COLUMN (`CAST(n32 AS Int64)`), which drops the +// scalar index and full-scans. Temporal columns are immune (DataFusion casts the +// Utf8 LITERAL to the date type, not the column). If a Lance/DataFusion bump +// changes either coercion direction, this turns red — re-validate the fix. +#[tokio::test] +async fn scalar_index_use_requires_matched_literal_type() { + use datafusion::physical_plan::displayable; + use datafusion::prelude::{col, lit}; + use datafusion::scalar::ScalarValue; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().join("probe_literal_type.lance"); + let uri = uri.to_str().unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("n32", DataType::Int32, false), + Field::new("d32", DataType::Date32, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), + Arc::new(Int32Array::from(vec![1, 5, 9, 13])), + Arc::new(arrow_array::Date32Array::from(vec![19000, 19723, 20000, 20500])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let params = WriteParams { + mode: WriteMode::Create, + enable_stable_row_ids: true, + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }; + let mut ds = Dataset::write(reader, uri, Some(params)).await.unwrap(); + for c in ["n32", "d32"] { + ds.create_index_builder(&[c], IndexType::BTree, &ScalarIndexParams::default()) + .replace(true) + .await + .unwrap(); + } + + async fn plan_str(ds: &Dataset, filter: datafusion::prelude::Expr) -> String { + let mut scanner = ds.scan(); + scanner.filter_expr(filter); + let plan = scanner.create_plan().await.unwrap(); + format!("{}", displayable(plan.as_ref()).indent(true)) + } + + // (label, filter, expect_index_used) + let cases = [ + ("n32 = 5i32 (matched Int32)", col("n32").eq(lit(5i32)), true), + ("n32 = 5i64 (widened Int64)", col("n32").eq(lit(5i64)), false), + ( + "d32 = Date32 (matched)", + col("d32").eq(lit(ScalarValue::Date32(Some(19723)))), + true, + ), + ( + "d32 = '2024-01-01' (Utf8 vs Date32)", + col("d32").eq(lit("2024-01-01")), + true, + ), + ]; + + for (label, filter, expect_index) in cases { + let s = plan_str(&ds, filter).await; + let uses_index = s.contains("ScalarIndexQuery"); + assert_eq!( + uses_index, expect_index, + "[{label}] expected scalar-index use = {expect_index}, got {uses_index}.\n\ + A change here means Lance/DataFusion shifted its coercion or index \ + pushdown; re-validate query.rs::literal_to_typed_expr.\nplan:\n{s}" + ); + } + + // The widened case must show the index-defeating column CAST (the precise + // shape the fix avoids by coercing the literal to the column type). + let widened = plan_str(&ds, col("n32").eq(lit(5i64))).await; + assert!( + widened.contains("CAST(n32 AS Int64)"), + "expected a column-side cast in the widened plan, got:\n{widened}" + ); +} diff --git a/crates/omnigraph/tests/literal_filters.rs b/crates/omnigraph/tests/literal_filters.rs index a0b2bd7..d486f28 100644 --- a/crates/omnigraph/tests/literal_filters.rs +++ b/crates/omnigraph/tests/literal_filters.rs @@ -19,6 +19,7 @@ node Metric { name: String @key score: F64? ratio: F32? + count: I32? active: Bool? born: Date? seen: DateTime? @@ -26,10 +27,10 @@ node Metric { "#; // Seeds partition every predicate, so a dropped filter returns all 4 rows. -const DATA: &str = r#"{"type":"Metric","data":{"name":"m1","score":2.5,"ratio":0.5,"active":true,"born":"2024-06-01","seen":"2024-06-01T12:00:00Z"}} -{"type":"Metric","data":{"name":"m2","score":1.0,"ratio":0.25,"active":false,"born":"2023-01-01","seen":"2023-01-01T00:00:00Z"}} -{"type":"Metric","data":{"name":"m3","score":3.0,"ratio":0.75,"active":true,"born":"2025-01-01","seen":"2025-01-01T00:00:00Z"}} -{"type":"Metric","data":{"name":"m4","score":0.5,"ratio":0.1,"active":false,"born":"2022-12-31","seen":"2022-01-01T00:00:00Z"}}"#; +const DATA: &str = r#"{"type":"Metric","data":{"name":"m1","score":2.5,"ratio":0.5,"count":1,"active":true,"born":"2024-06-01","seen":"2024-06-01T12:00:00Z"}} +{"type":"Metric","data":{"name":"m2","score":1.0,"ratio":0.25,"count":2,"active":false,"born":"2023-01-01","seen":"2023-01-01T00:00:00Z"}} +{"type":"Metric","data":{"name":"m3","score":3.0,"ratio":0.75,"count":3,"active":true,"born":"2025-01-01","seen":"2025-01-01T00:00:00Z"}} +{"type":"Metric","data":{"name":"m4","score":0.5,"ratio":0.1,"count":4,"active":false,"born":"2022-12-31","seen":"2022-01-01T00:00:00Z"}}"#; async fn metric_db(dir: &tempfile::TempDir) -> Omnigraph { let uri = dir.path().to_str().unwrap(); @@ -67,6 +68,50 @@ query inline() { match { $m: Metric { score: 3.0 } } return { $m.name } } assert_eq!(sorted_metric_names(&mut db, q, "inline").await, vec!["m3"]); } +// Inline-binding equality is the Lance-pushdown arm. With the literal coerced to +// the column's exact Arrow type, a narrow-numeric column (I32) and an F32 column +// must still select the right rows — the coercion changes the literal's type, not +// the result set. (The index-use win this enables is pinned at the Lance-surface +// layer by `lance_surface_guards::scalar_index_use_requires_matched_literal_type`.) +#[tokio::test] +async fn int_and_f32_literal_pushdown_coercion() { + let dir = tempfile::tempdir().unwrap(); + let mut db = metric_db(&dir).await; + let q = r#" +query count_eq() { match { $m: Metric { count: 2 } } return { $m.name } } +query ratio_eq() { match { $m: Metric { ratio: 0.25 } } return { $m.name } } +query count_ge() { match { $m: Metric $m.count >= 3 } return { $m.name } } +"#; + // I32 column, integer literal coerced Int64 -> Int32: count == 2 is m2 only. + assert_eq!(sorted_metric_names(&mut db, q, "count_eq").await, vec!["m2"]); + // F32 column, float literal coerced Float64 -> Float32: ratio == 0.25 is m2. + assert_eq!(sorted_metric_names(&mut db, q, "ratio_eq").await, vec!["m2"]); + // Range on the I32 column: count 3,4 >= 3 -> m3, m4 (coercion is op-independent). + assert_eq!( + sorted_metric_names(&mut db, q, "count_ge").await, + vec!["m3", "m4"] + ); +} + +// A fractional float against an integer column must not be truncated by the +// pushdown coercion (`2.7 -> 2` would wrongly match the count=2 row). The +// lossless guard falls back to the natural Float64 literal, so `count = 2.7` +// matches no integer and returns no rows. +#[tokio::test] +async fn fractional_float_equality_on_int_column_returns_no_rows() { + let dir = tempfile::tempdir().unwrap(); + let mut db = metric_db(&dir).await; + let q = r#" +query count_frac() { match { $m: Metric { count: 2.7 } } return { $m.name } } +"#; + assert!( + sorted_metric_names(&mut db, q, "count_frac") + .await + .is_empty(), + "count = 2.7 must match no integer rows (no truncation to count = 2)" + ); +} + #[tokio::test] async fn bool_literal_filters_execute() { let dir = tempfile::tempdir().unwrap(); @@ -88,9 +133,15 @@ async fn date_and_datetime_literal_filters_execute() { let q = r#" query born_ge() { match { $m: Metric $m.born >= date("2024-01-01") } return { $m.name } } query seen_lt() { match { $m: Metric $m.seen < datetime("2024-01-01T00:00:00Z") } return { $m.name } } +query born_eq() { match { $m: Metric { born: date("2024-06-01") } } return { $m.name } } +query seen_eq() { match { $m: Metric { seen: datetime("2024-06-01T12:00:00Z") } } return { $m.name } } "#; // born: m1 2024-06, m3 2025 >= 2024-01-01 assert_eq!(sorted_metric_names(&mut db, q, "born_ge").await, vec!["m1", "m3"]); // seen: m2 2023, m4 2022 < 2024-01-01 assert_eq!(sorted_metric_names(&mut db, q, "seen_lt").await, vec!["m2", "m4"]); + // Inline-binding equality exercises the Lance-pushdown arm with a typed + // Date32/Date64 literal: the epoch conversion must select exactly m1. + assert_eq!(sorted_metric_names(&mut db, q, "born_eq").await, vec!["m1"]); + assert_eq!(sorted_metric_names(&mut db, q, "seen_eq").await, vec!["m1"]); } diff --git a/crates/omnigraph/tests/maintenance.rs b/crates/omnigraph/tests/maintenance.rs index 13c9de7..deb4d2d 100644 --- a/crates/omnigraph/tests/maintenance.rs +++ b/crates/omnigraph/tests/maintenance.rs @@ -14,9 +14,11 @@ use omnigraph::db::{ SkipReason, }; use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph::table_store::{IndexCoverage, TableStore}; use helpers::{ MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, count_rows, init_and_load, mixed_params, mutate_main, + snapshot_main, }; /// Filesystem URI of a node sub-table, mirroring the engine's layout @@ -131,6 +133,72 @@ async fn optimize_after_load_then_again_is_idempotent() { } } +// PR3 (Workstream B): an existing scalar index does not cover fragments +// appended after it was built (build_indices is existence-gated), so those +// rows are scanned unindexed. `optimize` must fold them back in via Lance's +// incremental `optimize_indices`, restoring full coverage. +#[tokio::test] +async fn optimize_reindexes_fragments_appended_after_index_build() { + const SCHEMA: &str = r#" +node Doc { + slug: String @key + rank: I32 @index +} +"#; + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap(); + + // First load builds the id + rank BTREEs over the initial fragment. + load_jsonl( + &mut db, + "{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"rank\":1}}\n\ + {\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"rank\":2}}", + LoadMode::Merge, + ) + .await + .unwrap(); + + // A second load with NEW keys appends a fragment the existing BTREEs do not + // cover (the existence gate skips re-building an index that already exists). + load_jsonl( + &mut db, + "{\"type\":\"Doc\",\"data\":{\"slug\":\"d3\",\"rank\":3}}\n\ + {\"type\":\"Doc\",\"data\":{\"slug\":\"d4\",\"rank\":4}}", + LoadMode::Merge, + ) + .await + .unwrap(); + + // Precondition: the appended fragment is unindexed. + { + let snap = snapshot_main(&db).await.unwrap(); + let ds = snap.open("node:Doc").await.unwrap(); + assert!( + TableStore::has_unindexed_fragments(&ds).await.unwrap(), + "appended fragment should be unindexed before optimize" + ); + } + + db.optimize().await.unwrap(); + + // Postcondition: optimize_indices folded the appended fragment in, so every + // index covers every fragment and `rank` reports fully Indexed. + let snap = snapshot_main(&db).await.unwrap(); + let ds = snap.open("node:Doc").await.unwrap(); + assert!( + !TableStore::has_unindexed_fragments(&ds).await.unwrap(), + "optimize must extend index coverage to all fragments" + ); + assert_eq!( + TableStore::key_column_index_coverage(&ds, "rank") + .await + .unwrap(), + IndexCoverage::Indexed, + "rank BTREE must cover all fragments after optimize" + ); +} + // Regression: `optimize` must not crash on a graph that has a `Blob` table. // // Lance `compact_files` forces `BlobHandling::AllBinary`, which mis-decodes diff --git a/crates/omnigraph/tests/scalar_indexes.rs b/crates/omnigraph/tests/scalar_indexes.rs new file mode 100644 index 0000000..8d8a3f0 --- /dev/null +++ b/crates/omnigraph/tests/scalar_indexes.rs @@ -0,0 +1,74 @@ +//! Coverage for `build_indices_on_dataset_for_catalog`'s per-property index +//! dispatch: which scalar/vector index each `@index`/`@key` column gets. +//! +//! The observable signal is `TableStore::key_column_index_coverage`, which +//! reports `Indexed` only when a BTREE covers the column (the same helper the +//! traversal chooser uses). Enums and orderable scalars must get a BTREE so +//! `=`/range/IN/IS NULL are index-accelerated; free-text Strings keep FTS +//! (which `key_column_index_coverage` does not count as a BTREE, by design). + +mod helpers; + +use omnigraph::db::Omnigraph; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph::table_store::{IndexCoverage, TableStore}; + +use helpers::*; + +const SCHEMA: &str = r#" +node Item { + slug: String @key + status: enum(active, archived) @index + published: DateTime @index + rank: I32 @index + title: String @index + note: String? +} +"#; + +const DATA: &str = r#"{"type":"Item","data":{"slug":"a","status":"active","published":"2024-06-01T00:00:00Z","rank":1,"title":"alpha","note":"n1"}} +{"type":"Item","data":{"slug":"b","status":"archived","published":"2023-01-01T00:00:00Z","rank":2,"title":"beta","note":"n2"}} +{"type":"Item","data":{"slug":"c","status":"active","published":"2025-02-02T00:00:00Z","rank":3,"title":"gamma","note":"n3"}}"#; + +// Enums and orderable scalars (DateTime, numeric) get a BTREE from load's +// build-indices pass, so a `=`/range filter on them uses the index. Free-text +// String `@index` keeps FTS (no BTREE), and an un-annotated column has no +// scalar index — both report `Degraded`, which is the negative control that +// keeps this test from being vacuously green. +#[tokio::test] +async fn node_scalar_and_enum_index_columns_get_btree() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap(); + load_jsonl(&mut db, DATA, LoadMode::Overwrite).await.unwrap(); + + let snap = snapshot_main(&db).await.unwrap(); + let ds = snap.open("node:Item").await.unwrap(); + + for col in ["status", "published", "rank"] { + let cov = TableStore::key_column_index_coverage(&ds, col).await.unwrap(); + assert_eq!( + cov, + IndexCoverage::Indexed, + "column '{col}' (enum/DateTime/numeric @index) must get a BTREE, got {cov:?}" + ); + } + + // Free-text String @index -> FTS, which is not a BTREE -> Degraded. + let title_cov = TableStore::key_column_index_coverage(&ds, "title") + .await + .unwrap(); + assert!( + matches!(title_cov, IndexCoverage::Degraded { .. }), + "free-text String @index should keep FTS (no BTREE), got {title_cov:?}" + ); + + // No @index annotation -> no scalar index at all -> Degraded. + let note_cov = TableStore::key_column_index_coverage(&ds, "note") + .await + .unwrap(); + assert!( + matches!(note_cov, IndexCoverage::Degraded { .. }), + "un-annotated column should have no scalar index, got {note_cov:?}" + ); +} diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index a0bcc6d..c840309 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -105,7 +105,7 @@ Use it this way: | Schema validation | Type checks, required fields, defaults, edge endpoint checks, and edge cardinality are enforced on write paths | [schema-language.md](../user/schema/index.md), [execution.md](execution.md) | | Unique constraints | Intra-batch and write-path checks exist; intake and branch-merge derive the composite key through one shared function (`loader::composite_unique_key`, a separator-free `Vec` tuple) and fail loudly on an un-keyable column type rather than silently exempting it; full cross-version uniqueness against already-committed rows is still a gap | [schema-language.md](../user/schema/index.md) | | Storage trait | `TableStorage` (via `db.storage()`) is staged-only; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so §1 holds by construction; capability/stat surfaces are roadmap | [writes.md](writes.md), [architecture.md](architecture.md) | -| Index lifecycle | `ensure_indices` is explicit today; reconciler-based convergence is roadmap | [indexes.md](../user/search/indexes.md), [maintenance.md](../user/operations/maintenance.md) | +| Index lifecycle | Index *creation* per `@index`/`@key` property is dispatched by type (enum + orderable scalar → BTREE, free-text String → FTS, Vector → vector) via `node_prop_index_kind`; index *coverage maintenance* exists — `optimize` runs Lance `optimize_indices` after compaction to fold appended/rewritten fragments into existing indexes (still an explicit maintenance call, not yet a background reconciler) | [indexes.md](../user/search/indexes.md), [maintenance.md](../user/operations/maintenance.md) | | Traversal IDs | Runtime still builds `TypeIndex`; Lance stable row-id based graph IDs are roadmap | [architecture.md](architecture.md), [query-language.md](../user/queries/index.md) | | Auth | Bearer token hashing and server-side actor resolution are implemented at the HTTP boundary | [server.md](../user/operations/server.md), [policy.md](../user/operations/policy.md) | | Tests | Tempdir-backed Lance tests are the current substrate; the storage adapter has an in-memory backend for adapter-level contract tests, but Lance datasets bypass it | [testing.md](testing.md) | diff --git a/docs/dev/lance.md b/docs/dev/lance.md index a4e311f..9544e80 100644 --- a/docs/dev/lance.md +++ b/docs/dev/lance.md @@ -169,6 +169,7 @@ Migration from Lance 4.0.0 → 6.0.1 landed in this cycle (DataFusion 52 → 53, - **`Dataset::checkout_version(N).await?.restore().await?`**: `restore()` takes `&mut self` and returns `Result<()>` (mutates in place, does not consume + return a new dataset). The recovery rollback hammer at `db/manifest/recovery.rs:505-522` continues to work. Pinned by `lance_surface_guards.rs::_compile_checkout_version_then_restore_signature`. - **`DatasetBuilder::from_namespace(...).with_branch(...).with_version(...).load()`** surface preserved (the namespace builder chain at `db/manifest/namespace.rs:162-174`). Pinned by `lance_surface_guards.rs::_compile_dataset_builder_from_namespace_signature`. - **`compact_files(&mut ds, CompactionOptions::default(), None)`** signature stable. `CompactionOptions` still does not expose `data_storage_version`; `compact_files` builds its own `WriteParams { ..Default::default() }`. Note: `LanceFileVersion::default()` is now V2_1 in v6, so optimize-rewritten fragments come out at V2_1 by default (was V2_0 in v4). Existing explicit V2_2 pins on creates/appends still apply. +- **`Dataset::optimize_indices(&mut self, &lance_index::optimize::OptimizeOptions)`** (via `DatasetIndexExt`) is a depended-on surface as of the index-coverage work: `db/omnigraph/optimize.rs` calls it after `compact_files` to fold appended/rewritten fragments into existing indexes (incremental merge, not retrain). It is a **committing** call (mutates in place, advances HEAD; no uncommitted variant in v6.0.1), so optimize treats it as an inline-commit residual under the `SidecarKind::Optimize` recovery sidecar. Signature pinned by `lance_surface_guards.rs::_compile_optimize_indices_signature`; the incremental-coverage behavior pinned by `optimize_indices_extends_fragment_coverage` (appended fragment uncovered before, covered after). - **`Dataset::delete(predicate)` returns `DeleteResult { new_dataset: Arc, num_deleted_rows: u64 }`** — unchanged shape. Pinned by `lance_surface_guards.rs::_compile_delete_result_field_shape`. MR-A will repurpose this guard to the staged two-phase variant once `DeleteBuilder::execute_uncommitted` migration lands. - **File reader read methods now async** (Lance PR #6710, v6.0). No effect — omnigraph reaches Lance exclusively through `Dataset::scan` and the staged-write API. - **Tokenizer vendored as `lance-tokenizer`** (Lance PR #6512, v6.0). No effect — no direct tokenizer imports. @@ -178,6 +179,6 @@ Migration from Lance 4.0.0 → 6.0.1 landed in this cycle (DataFusion 52 → 53, - **`Dataset::force_delete_branch`** (`branches().delete(name, force=true)`, dataset.rs:524) tolerates a missing branch-*contents* ref (vs plain `delete_branch`'s `RefNotFound`), but on the local store still errors `NotFound` if the branch `tree/` directory is fully absent (`remove_dir_all`'s NotFound is not caught for Lance's native error variant, refs.rs:526-549). Both variants still refuse a branch with referencing descendants (`RefConflict`). `TableStore::force_delete_branch` wraps this to be fully idempotent (tolerates already-absent). The single-authority branch-delete redesign uses it for orphan reclamation (eager best-effort reclaim + cleanup reconciler). Pinned by `lance_surface_guards.rs::force_delete_branch_semantics`. Branch delete is "flip the ref atomically, then `remove_dir_all(tree/{branch})`"; branch-exclusive data lives under `tree/{branch}/` so a drop reclaims it immediately without touching `main`. - **Lance blob-v2 `compact_files` bug** (no public issue found as of 2026-06): `compact_files` disables binary-copy for blob datasets and forces `BlobHandling::AllBinary` on the read side; the v2.1+ structural decoder then mis-counts column infos for the blob-v2 struct and fails with `Invalid user input: there were more fields in the schema than provided column indices / infos` (`lance-encoding/src/decoder.rs::ColumnInfoIter::expect_next`). This fails even a pristine uniform-V2_2 multi-fragment blob table; vector/list/scalar/ragged columns and mixed file versions all compact fine. Reads/queries use descriptor handling (`BlobHandling::default()`) and are unaffected. `optimize` skips blob-bearing tables behind `LANCE_SUPPORTS_BLOB_COMPACTION = false` (`db/omnigraph/optimize.rs`), reporting `SkipReason::BlobColumnsUnsupportedByLance`. Pinned by `lance_surface_guards.rs::compact_files_still_fails_on_blob_columns`, which turns red when the bug is fixed → flip the gate, remove the skip branch + the `maintenance.rs::optimize_skips_blob_table_and_reports_skip` skip assertions. -Surface guards added: `crates/omnigraph/tests/lance_surface_guards.rs` (10 named guards; 5 runtime + 5 compile-only). Future Lance bumps re-run this file first as the smoke check. Two additional guards from the original plan deferred to follow-up (`manifest_cas_returns_row_level_contention_variant` needs full publisher-race harness; `table_version_metadata_byte_compatible_with_v4` needs `pub(crate)` reach extension). +Surface guards added: `crates/omnigraph/tests/lance_surface_guards.rs` (10 named guards; 5 runtime + 5 compile-only; plus the index-coverage work's `_compile_optimize_indices_signature` and `optimize_indices_extends_fragment_coverage`). Future Lance bumps re-run this file first as the smoke check. Two additional guards from the original plan deferred to follow-up (`manifest_cas_returns_row_level_contention_variant` needs full publisher-race harness; `table_version_metadata_byte_compatible_with_v4` needs `pub(crate)` reach extension). Bump this date stanza on the next alignment pass. diff --git a/docs/dev/writes.md b/docs/dev/writes.md index c3511e0..ccfd5bc 100644 --- a/docs/dev/writes.md +++ b/docs/dev/writes.md @@ -80,10 +80,17 @@ deferred to a follow-up cycle — tracked). Three writers have been migrated onto staged primitives: * **`ensure_indices`** (`db/omnigraph/table_ops.rs::build_indices_on_dataset_for_catalog`) - — scalar indices (BTree, Inverted) now use `stage_create_*_index` + - `commit_staged`. Vector indices stay inline (residual — Lance - `build_index_metadata_from_segments` is `pub(crate)` in 6.0.1; - companion ticket to lance-format/lance#6658 needed). + — scalar indices (BTree, Inverted) use `stage_create_*_index` + + `commit_staged`. Which index a `@index`/`@key` property gets is dispatched by + type via `node_prop_index_kind` (enum + orderable scalar → BTree, free-text + String → Inverted/FTS, Vector → vector). Vector indices stay inline (residual + — Lance `build_index_metadata_from_segments` is `pub(crate)` in 6.0.1; + companion ticket to lance-format/lance#6658 needed). This build is + existence-gated (it creates a *missing* index over current fragments); folding + fragments appended afterward into an *existing* index is `optimize`'s + `optimize_indices` pass — an inline-commit residual, not a staged write (Lance + exposes no uncommitted index-optimize), covered by the optimize recovery + sidecar (see [maintenance.md](../user/operations/maintenance.md)). * **`branch_merge::publish_rewritten_merge_table`** (`exec/merge.rs`) — merge_insert now uses `stage_merge_insert` + `commit_staged`. Deletes stay inline (Lance #6658 residual). diff --git a/docs/user/operations/maintenance.md b/docs/user/operations/maintenance.md index a804e31..4f065e5 100644 --- a/docs/user/operations/maintenance.md +++ b/docs/user/operations/maintenance.md @@ -4,14 +4,15 @@ ## `optimize` — non-destructive -- Compacts every node + edge table on `main`, then **publishes the compacted version to the `__manifest`** so the manifest's recorded version tracks the compacted state. Reads pin the manifest version, so without this publish compaction would be invisible to readers *and* would break the version precondition of the next schema apply / strict update/delete ("stale view … refresh and retry"). The publish advances the graph version (a system-attributed commit) only for tables that actually compacted. +- Compacts every node + edge table on `main`, then reindexes them, then **publishes the resulting version to the `__manifest`** so the manifest's recorded version tracks the compacted-and-reindexed state. Reads pin the manifest version, so without this publish the work would be invisible to readers *and* would break the version precondition of the next schema apply / strict update/delete ("stale view … refresh and retry"). The publish advances the graph version (a system-attributed commit) only for tables that actually changed. - Rewrites small fragments into fewer large ones; old fragments remain reachable via older versions until `cleanup` runs. -- Each table's compact→publish serializes with concurrent mutations on the same table. A crash mid-operation is recovered automatically on the next open (compaction is content-preserving, so roll-forward is always safe). +- **Reindex (index coverage maintenance).** A scalar/FTS/vector index only covers the fragments it was built over. Rows appended after the index was built (e.g. by `load --mode merge`, whose commit does not rebuild an already-existing index) are scanned unindexed, and compaction itself rewrites fragments out of an index's coverage. `optimize` runs Lance's incremental `optimize_indices` after compaction to fold those fragments back in (a delta merge, not a full retrain), restoring full coverage so equality/range/traversal predicates stay index-accelerated. This is why a table with **no compaction work but stale index coverage still commits** a new version under `optimize`. Run `optimize` on a cadence at least as frequent as your freshness window so recently-loaded rows do not linger in the unindexed flat-scan tail. +- Each table's compact→reindex→publish serializes with concurrent mutations on the same table. A crash mid-operation is recovered automatically on the next open (both compaction and reindex are content-preserving, so roll-forward is always safe). - **Requires a recovered graph.** `optimize` refuses (errors) when a pending crash-recovery operation is present — operating on an unrecovered graph could publish a partial write that recovery would roll back. Reopen the graph to run recovery, then re-run `optimize`. - **Uncovered drift is skipped, not interpreted.** If a table's underlying version is ahead of the version recorded in `__manifest` and no crash-recovery record covers that movement, `optimize` reports `skipped: DriftNeedsRepair` with the manifest/head versions and leaves the table untouched. Run `omnigraph repair` to classify and explicitly publish that drift. - Bounded by `OMNIGRAPH_MAINTENANCE_CONCURRENCY` (default 8). - Returns per-table stats: `table_key, fragments_removed, fragments_added, committed, skipped, manifest_version, lance_head_version`. -- **Blob tables are skipped.** A table that declares any `Blob` property is not compacted: it is reported with `skipped: BlobColumnsUnsupportedByLance` (and logged) instead of compacted, and the rest of the sweep proceeds normally. **Reads and writes are unaffected** — only compaction is. Consequence: fragment count and deleted-row space on blob tables are not reclaimed; query results are never affected. +- **Blob tables are skipped.** A table that declares any `Blob` property is not compacted: it is reported with `skipped: BlobColumnsUnsupportedByLance` (and logged) instead of compacted, and the rest of the sweep proceeds normally. **Reads and writes are unaffected** — only compaction is. Consequence: fragment count and deleted-row space on blob tables are not reclaimed; query results are never affected. A skipped blob table is also **not reindexed** in the same sweep (the skip happens before the reindex step), so its index coverage on appended rows is not refreshed by `optimize` today. ## `repair` — explicit diff --git a/docs/user/search/indexes.md b/docs/user/search/indexes.md index 84b968d..ebd69b1 100644 --- a/docs/user/search/indexes.md +++ b/docs/user/search/indexes.md @@ -4,10 +4,27 @@ | Index | Use | Notes | |---|---|---| -| **BTREE scalar** | range / equality on any scalar | created on `@key`, `@index(...)`, and on key columns by `ensure_indices()` | -| **Inverted (FTS)** | `search`, `fuzzy`, `match_text`, `bm25` | created on text columns referenced by FTS queries | +| **BTREE scalar** | `=` / range / `IN` / `IS NULL` on a scalar | always on the node `id` and edge `src`/`dst`; and on each one-column `@index`/`@key` property that is an **enum** or an **orderable scalar** (`DateTime`/`Date`/`I32`/`I64`/`U32`/`U64`/`F32`/`F64`/`Bool`) | +| **Inverted (FTS)** | `search`, `fuzzy`, `match_text`, `bm25` | created on **free-text** (non-enum) `String` `@index`/`@key` columns | | **Vector** | `nearest()` k-NN | Lance picks IVF_PQ vs HNSW family by configuration; OmniGraph stores as FixedSizeList(Float32, dim) | +The per-property index a column gets is decided by `node_prop_index_kind` (shared +by the builder and the sidecar-pinning coverage check so they cannot drift): +enums and orderable scalars → BTREE, free-text Strings → FTS, `Vector` → vector, +list/`Blob` columns → none. + +> **Free-text Strings are not equality-indexed.** A non-enum `String` column +> (including a `String @key` slug) gets an FTS inverted index, which Lance does +> **not** consult for `=`/range — only for `search`/`match_text`/`bm25`. So an +> equality filter on a free-text String falls back to a full scan. If you filter +> a String identifier by equality on a large table, model it so the value is the +> node id, or track it as a follow-up to also build a BTREE on such columns. + +> **Coverage and cost.** Each indexed column adds index files and build time, and +> an index only covers the fragments it was built over. Rows appended after the +> index was built (e.g. by `ingest --mode merge`) are scanned unindexed until a +> reindex extends coverage; see [maintenance](maintenance.md) → `optimize`. + ## L2 — OmniGraph orchestration - `ensure_indices()` / `ensure_indices_on(branch)` — idempotent build of BTREE + inverted indexes for the current head; safe to re-run.