feat(engine): reindex in optimize to keep index coverage current

A scalar/FTS/vector index only covers the fragments it was built over. Rows
appended after the build (e.g. `ingest --mode merge`, whose commit does not
rebuild an existing index) are scanned unindexed, and `compact_files` rewrites
fragments out of coverage. Nothing folded them back in, so coverage decayed as
the graph grew — even the id/src/dst BTREEs that power traversal.

`optimize_one_table` now runs Lance `optimize_indices` after `compact_files`
(incremental merge, not retrain — the same compact->optimize_indices sequence
LanceDB's `optimize()` uses) and enters the publish path on compaction work OR
stale index coverage (new `TableStore::has_unindexed_fragments`, reusing the
fragment_bitmap logic). `optimize_indices` is a committing call with no
uncommitted variant in lance-6.0.1, so it is an inline-commit residual covered
by the existing `SidecarKind::Optimize` recovery sidecar spanning both ops.
Blob-bearing tables are still skipped (the Lance blob-compaction bug is
compaction-specific; reindex-for-blob deferred as a noted follow-up).

Tests: maintenance.rs asserts an appended fragment is uncovered before and
covered after optimize, and idempotency holds (second pass is a no-op).
lance_surface_guards pins the `optimize_indices` signature and its incremental-
coverage behavior. The existing optimize Phase-B recovery failpoint now also
exercises a crash after reindex. Docs: maintenance.md, writes.md, invariants.md,
lance.md, AGENTS.md.
This commit is contained in:
Ragnor Comerford 2026-06-13 18:47:41 +02:00
parent 481de860b2
commit 0edcf3ec59
No known key found for this signature in database
9 changed files with 259 additions and 22 deletions

View file

@ -32,6 +32,8 @@ use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
use lance::dataset::optimize::{
CompactionMetrics, CompactionOptions, compact_files, plan_compaction,
};
use lance::index::DatasetIndexExt;
use lance_index::optimize::OptimizeOptions;
use super::*;
@ -361,16 +363,22 @@ async fn optimize_one_table(
}
// Precise "will it compact?" check — `plan_compaction` also accounts for
// deletion materialization (which can rewrite even a single fragment). A
// steady-state already-compacted table yields an empty plan and is never
// pinned in a sidecar (a zero-commit pin would classify NoMovement on
// recovery and force an all-or-nothing rollback). Uncovered pre-existing
// drift is skipped above and must go through explicit repair.
// deletion materialization (which can rewrite even a single fragment).
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
if plan.num_tasks() == 0 {
let will_compact = plan.num_tasks() > 0;
// Even when there is nothing to compact, the table may still have index
// work: rows appended since the index was built (e.g. via `ingest --mode
// merge`) are scanned unindexed until folded in. Either compaction or stale
// index coverage is enough to enter the publish path. If NEITHER, this
// table is a no-op and must NOT be pinned in a sidecar — a zero-commit pin
// classifies NoMovement on recovery and forces an all-or-nothing rollback
// of sibling tables' legitimate work. Uncovered pre-existing manifest/HEAD
// drift is skipped above and must go through explicit repair.
let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?;
if !will_compact && !needs_reindex {
return Ok(TableOptimizeStats::compacted(
table_key,
&CompactionMetrics::default(),
@ -378,8 +386,9 @@ async fn optimize_one_table(
));
}
// Phase A: recovery sidecar BEFORE compaction advances the Lance HEAD, so a
// crash before the manifest publish rolls forward on next open.
// Phase A: recovery sidecar BEFORE any HEAD-advancing op (compaction or
// index optimize), so a crash before the manifest publish rolls forward on
// next open.
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::Optimize,
None,
@ -398,11 +407,26 @@ async fn optimize_one_table(
let handle =
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?;
// Phase B: compaction (reserve-fragments + rewrite commits advance HEAD).
// Phase B: compaction (if any) then incremental index optimize — both
// advance Lance HEAD inside the sidecar window. `compact_files` rewrites
// fragments and drops them from existing index segments' coverage;
// `optimize_indices` folds the rewritten and any previously-unindexed
// fragments back in (Lance's incremental merge, not a full retrain). This
// is the same compact -> optimize_indices sequencing LanceDB's `optimize()`
// uses. `optimize_indices` is an inline-commit residual: lance-6.0.1
// exposes no uncommitted variant, so like `compact_files` it commits
// directly and relies on the sidecar for recovery.
let version_before = ds.version().version;
let metrics: CompactionMetrics = compact_files(&mut ds, options, None)
let metrics: CompactionMetrics = if will_compact {
compact_files(&mut ds, options, None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
} else {
CompactionMetrics::default()
};
ds.optimize_indices(&OptimizeOptions::default())
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
.map_err(|e| OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)))?;
let version_after = ds.version().version;
let committed = version_after != version_before;

View file

@ -705,6 +705,36 @@ impl TableStore {
Ok(IndexCoverage::Indexed)
}
/// True if any non-system index on `ds` leaves at least one current
/// fragment uncovered, i.e. rows that the index does not yet account for
/// (appended after the index was built, or rewritten by compaction). Such
/// fragments are scanned unindexed until a reindex (`optimize_indices`)
/// folds them in. Returns false when every index covers every fragment, or
/// when the table has no (non-system) indices to optimize. A `None`
/// `fragment_bitmap` means Lance cannot report coverage for that index, so
/// we do not treat it as uncovered (mirrors `key_column_index_coverage`).
///
/// Used by `optimize` to decide whether an otherwise-already-compacted
/// table still has index work to do.
pub async fn has_unindexed_fragments(ds: &Dataset) -> Result<bool> {
let indices = ds
.load_indices()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
for index in indices.iter() {
if is_system_index(index) {
continue;
}
if let Some(bitmap) = index.fragment_bitmap.as_ref() {
if frag_ids.iter().any(|id| !bitmap.contains(*id)) {
return Ok(true);
}
}
}
Ok(false)
}
pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
ds.count_rows(filter)
.await

View file

@ -36,6 +36,7 @@ use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
use lance::index::DatasetIndexExt;
use lance_file::version::LanceFileVersion;
use lance_index::IndexType;
use lance_index::optimize::OptimizeOptions;
use lance_index::scalar::ScalarIndexParams;
use lance_namespace::LanceNamespace;
use lance_table::io::commit::ManifestNamingScheme;
@ -541,3 +542,108 @@ async fn fragment_deletion_metadata_is_available() {
per-fragment deletions and would need to read the deletion vector.",
);
}
// --- Guard 14: Dataset::optimize_indices signature ----------------------------
//
// `db/omnigraph/optimize.rs::optimize_one_table` calls
// `ds.optimize_indices(&OptimizeOptions::default())` (via `DatasetIndexExt`) to
// fold appended/compacted fragments back into existing indexes. If Lance
// changes the receiver, the options type, or the return shape, this fails to
// compile. Compile-only.
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_optimize_indices_signature() -> lance::Result<()> {
let mut ds: Dataset = unimplemented!();
let options = OptimizeOptions::default();
// `&mut self`, `&OptimizeOptions`, returns `Result<()>` (mutates in place
// and commits — there is no uncommitted variant in this Lance, which is why
// optimize treats it as an inline-commit residual under a recovery sidecar).
let _: () = ds.optimize_indices(&options).await?;
Ok(())
}
// --- Guard 15: optimize_indices extends fragment coverage ----------------------
//
// PR3's reindex assumes `optimize_indices` folds fragments appended AFTER an
// index was built into that index (incremental merge, not retrain). This pins
// that Lance behavior at the surface layer so a regression turns red here, the
// first smoke check on a Lance bump, before the slower engine suite.
#[tokio::test]
async fn optimize_indices_extends_fragment_coverage() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard_optimize_indices.lance");
let uri = uri.to_str().unwrap();
// Fragment 0: alice, bob. Build a BTREE over `value` covering only it.
let mut ds = fresh_dataset(uri).await;
ds.create_index_builder(&["value"], IndexType::BTree, &ScalarIndexParams::default())
.replace(true)
.await
.unwrap();
// Append a second fragment the existing index does not cover.
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["carol"])),
Arc::new(Int32Array::from(vec![3])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
let params = WriteParams {
mode: WriteMode::Append,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
Dataset::write(reader, uri, Some(params)).await.unwrap();
let mut ds = Dataset::open(uri).await.unwrap();
assert!(
value_index_uncovered_count(&ds).await > 0,
"appended fragment should be uncovered by the BTREE before optimize_indices"
);
ds.optimize_indices(&OptimizeOptions::default())
.await
.unwrap();
assert_eq!(
value_index_uncovered_count(&ds).await,
0,
"optimize_indices must fold the appended fragment into the existing index \
(incremental coverage); if this regresses, PR3's reindex no longer keeps \
coverage current revisit db/omnigraph/optimize.rs and docs/dev/lance.md."
);
}
/// Count current fragments not covered by the single-column `value` BTREE —
/// mirrors `TableStore::has_unindexed_fragments` (load_indices +
/// `fragment_bitmap.contains`), pinned by Guard 11.
async fn value_index_uncovered_count(ds: &Dataset) -> usize {
let indices = ds.load_indices().await.unwrap();
let frag_ids: Vec<u32> = ds.fragments().iter().map(|f| f.id as u32).collect();
let value_fid = ds.schema().field("value").unwrap().id;
for index in indices.iter() {
if index.fields.len() == 1 && index.fields[0] == value_fid {
if let Some(bitmap) = index.fragment_bitmap.as_ref() {
return frag_ids.iter().filter(|id| !bitmap.contains(**id)).count();
}
}
}
// No `value` index found — treat as fully uncovered so a missing index
// is never mistaken for full coverage.
frag_ids.len()
}

View file

@ -14,9 +14,11 @@ use omnigraph::db::{
SkipReason,
};
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::{IndexCoverage, TableStore};
use helpers::{
MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, count_rows, init_and_load, mixed_params, mutate_main,
snapshot_main,
};
/// Filesystem URI of a node sub-table, mirroring the engine's layout
@ -131,6 +133,72 @@ async fn optimize_after_load_then_again_is_idempotent() {
}
}
// PR3 (Workstream B): an existing scalar index does not cover fragments
// appended after it was built (build_indices is existence-gated), so those
// rows are scanned unindexed. `optimize` must fold them back in via Lance's
// incremental `optimize_indices`, restoring full coverage.
#[tokio::test]
async fn optimize_reindexes_fragments_appended_after_index_build() {
const SCHEMA: &str = r#"
node Doc {
slug: String @key
rank: I32 @index
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap();
// First load builds the id + rank BTREEs over the initial fragment.
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"rank\":1}}\n\
{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"rank\":2}}",
LoadMode::Merge,
)
.await
.unwrap();
// A second load with NEW keys appends a fragment the existing BTREEs do not
// cover (the existence gate skips re-building an index that already exists).
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d3\",\"rank\":3}}\n\
{\"type\":\"Doc\",\"data\":{\"slug\":\"d4\",\"rank\":4}}",
LoadMode::Merge,
)
.await
.unwrap();
// Precondition: the appended fragment is unindexed.
{
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Doc").await.unwrap();
assert!(
TableStore::has_unindexed_fragments(&ds).await.unwrap(),
"appended fragment should be unindexed before optimize"
);
}
db.optimize().await.unwrap();
// Postcondition: optimize_indices folded the appended fragment in, so every
// index covers every fragment and `rank` reports fully Indexed.
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Doc").await.unwrap();
assert!(
!TableStore::has_unindexed_fragments(&ds).await.unwrap(),
"optimize must extend index coverage to all fragments"
);
assert_eq!(
TableStore::key_column_index_coverage(&ds, "rank")
.await
.unwrap(),
IndexCoverage::Indexed,
"rank BTREE must cover all fragments after optimize"
);
}
// Regression: `optimize` must not crash on a graph that has a `Blob` table.
//
// Lance `compact_files` forces `BlobHandling::AllBinary`, which mis-decodes