fix: optimize publishes compaction; recovery roll-back converges manifest (#141)

* test(optimize): cover manifest publish + HEAD-drift reconcile

Red against the pre-fix optimize, which ran compact_files without
publishing the compacted version to __manifest:

- maintenance: optimize must publish so the manifest table_version
  tracks the compacted Lance HEAD and a later schema apply succeeds;
  and must reconcile a pre-existing manifest-behind-HEAD drift (forged
  via raw Lance compaction) so strict writes commit again.
- end_to_end + composite_flow: post-optimize query / strict update /
  reopen in the full lifecycle (the canonical flow previously omitted
  post-optimize writes as a documented "known limitation").
- failpoints: a crash between compaction and the manifest publish rolls
  forward on next open.

* fix(optimize): publish compaction to manifest and reconcile HEAD drift

optimize ran Lance compact_files without publishing the new version to
__manifest, so the manifest table_version lagged the Lance HEAD: reads
stayed pinned to the pre-compaction version, and the next schema apply or
strict update/delete failed its HEAD-vs-manifest precondition with
"stale view ... refresh and retry" (open-time recovery rollback inflated
the gap on retry).

optimize now publishes each compacted table's version under the
per-(table, main) write queue, guarded by a manifest CAS and a
SidecarKind::Optimize recovery sidecar (loose-match; roll-forward is safe
because compaction is content-preserving). When a table has nothing left
to compact but its Lance HEAD is already ahead of the manifest pin
(pre-fix drift, or a recovery restore commit), optimize reconciles the
manifest forward to HEAD (metadata-only, no sidecar). Caches and the
CSR/CSC graph index are invalidated after a publish.

Docs updated (maintenance, storage, branches-commits, writes, testing).

* test(recovery): rollback convergence + optimize-defer regressions

Red against the current code, landed before the fix:
- recovery: after the open-time sweep rolls a sidecar back, the manifest
  must track Lance HEAD (no residual drift) so a follow-up schema apply
  succeeds — the original "+1 per retry" loop. Today roll-back restores
  without publishing, so the manifest lags HEAD and the apply fails its
  HEAD-vs-manifest precondition.
- maintenance: optimize must refuse while a recovery sidecar is pending —
  operating on an unrecovered graph could publish a partial write the
  sweep would roll back.

Also removes optimize_reconciles_preexisting_manifest_head_drift: the
ad-hoc drift reconcile it covered is replaced by recovery-side convergence.

* fix(recovery): converge manifest on roll-back; optimize defers on pending recovery

Root of PR #141's review findings and the original "+1 per retry" loop:
a Lance HEAD ahead of the manifest was ambiguous (benign content-preserving
drift vs. a partial write a sidecar will roll back), and optimize's reconcile
guessed it benign. Close the class instead of guessing:

- Recovery roll-back now PUBLISHES the restored version (via a
  push_table_update_at_head helper shared with roll-forward), so the manifest
  tracks the Lance HEAD after recovery — symmetric with roll-forward. This
  fixes the +1 loop (after one roll-back the retry's HEAD-vs-manifest
  precondition passes) and removes the only remaining source of orphaned
  drift. The audit still records the logical rolled-back-to version; the
  manifest is published at the restore commit (identical content).
- optimize drops the ad-hoc drift reconcile and instead REFUSES when a
  __recovery sidecar is pending, so it only ever operates on a recovered
  graph (manifest == HEAD); its compaction publish can no longer commit a
  partial write. With the reconcile gone, the blob-skip-vs-reconcile gap is
  moot.

Updates the rollback recovery-test helper (manifest == HEAD after roll-back),
the failpoints assertions, and the user/dev docs.

* test(recovery): fix rollback assertion for manifest convergence

The roll-back-publishes change makes the manifest version advance after a
SchemaApply roll-back (to the old-schema content), so the
schema_apply_without_schema_staging_rolls_back_on_next_open assertion must
be `version > pre`, not `version == pre`. This update was dropped during
the commit churn and surfaced as a CI Test Workspace failure; the
old-schema-preserved intent stays covered by count_rows + _schema.pg + the
RolledBack convergence invariant.
This commit is contained in:
Ragnor Comerford 2026-06-08 01:50:12 +02:00 committed by GitHub
parent 4a66d6e071
commit e62d9166fb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 816 additions and 137 deletions

View file

@ -236,8 +236,8 @@ omnigraph policy explain --actor act-alice --action change --branch main
| Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing |
| Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables |
| Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering |
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore`, and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). Continuous in-process recovery (no restart needed between Phase B failure and recovery) is the goal of a future background reconciler. Engine writes route through a sealed `TableStorage` trait exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and the migration of every call site completes. |
| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) |
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). Continuous in-process recovery (no restart needed between Phase B failure and recovery) is the goal of a future background reconciler. Engine writes route through a sealed `TableStorage` trait exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and the migration of every call site completes. |
| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **publishes each compacted table's new version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe compaction and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage; **refuses on an unrecovered graph** (errors if a `__recovery` sidecar is pending — recovery may roll back a partial write, so optimize requires `manifest == HEAD` going in); **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) |
| Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy |
| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches |
| `merge_insert` upsert | ✅ | `LoadMode::Merge`, mutation `update`/`insert`/`delete` lowering |

View file

@ -36,7 +36,7 @@ use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::{
RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
SidecarTableRegistration, SidecarTombstone, delete_sidecar, has_schema_apply_sidecar,
new_sidecar, recover_manifest_drift, write_sidecar,
list_sidecars, new_sidecar, recover_manifest_drift, write_sidecar,
};
pub use state::SubTableEntry;
#[cfg(test)]

View file

@ -106,6 +106,12 @@ pub(crate) enum SidecarKind {
BranchMerge,
/// `ensure_indices_for_branch` — index lifecycle commits.
EnsureIndices,
/// `optimize_all_tables` — Lance `compact_files` (reserve-fragments +
/// rewrite commits) followed by a manifest publish of the compacted
/// version. Loose-match like the other multi-commit writers; roll-forward
/// is always safe because compaction is content-preserving (Lance
/// `Operation::Rewrite` "reorganizes data without semantic modification").
Optimize,
}
/// One table's contribution to a sidecar's intended commit. The classifier
@ -412,11 +418,13 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result<RecoverySid
/// - **Strict** (`Mutation`, `Load`): exactly one `commit_staged` per
/// table, so `lance_head == manifest_pinned + 1` AND
/// `post_commit_pin == lance_head` is required.
/// - **Loose** (`SchemaApply`, `EnsureIndices`, `BranchMerge`): the
/// writer may run N ≥ 1 `commit_staged` calls per table (one per
/// index built + one for the overwrite, etc.; merge tables run
/// merge_insert + delete_where + index rebuilds) and the exact N
/// is hard to compute at sidecar-write time. The loose match accepts
/// - **Loose** (`SchemaApply`, `EnsureIndices`, `BranchMerge`,
/// `Optimize`): the writer advances the Lance HEAD by N ≥ 1 commits
/// per table (one per index built + one for the overwrite, etc.;
/// merge tables run merge_insert + delete_where + index rebuilds;
/// `Optimize` runs `compact_files`, which commits reserve-fragments +
/// rewrite) and the exact N is hard to compute at sidecar-write time.
/// The loose match accepts
/// any `lance_head > manifest_pinned` as `RolledPastExpected` when
/// `pin.expected_version == manifest_pinned` (the writer's CAS
/// target matches what the manifest currently shows). The risk this
@ -494,9 +502,12 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision
/// Skipping the restore in those cases would leave Lance HEAD ahead of
/// the manifest with no recovery artifact left.
///
/// Cost: under repeated mid-rollback crashes (rare), Lance HEAD
/// accumulates extra restore commits that `omnigraph cleanup` reclaims.
/// Bounded by the number of recovery iterations — typically 1.
/// Cost: a successful roll-back appends one restore commit and then publishes
/// the manifest to match (`roll_back_sidecar`), so the table converges
/// (`manifest == HEAD`) in one pass. Only repeated crashes *between* the restore
/// and that publish (rare) accumulate extra restore commits; each re-classified
/// roll-back restores again and `omnigraph cleanup` reclaims the surplus.
/// Bounded by the number of interrupted recovery iterations — typically 0.
pub(crate) async fn restore_table_to_version(
table_path: &str,
branch: Option<&str>,
@ -801,13 +812,24 @@ async fn roll_back_sidecar(
sidecar: &RecoverySidecar,
states: &[ClassifiedTable],
) -> Result<()> {
// Restore every table whose Lance HEAD has drifted from the
// manifest pin (RolledPastExpected, UnexpectedAtP1,
// UnexpectedMultistep). NoMovement tables are already at the
// manifest pin — no action. Restore is unconditional; repeated
// mid-rollback crashes accumulate a few extra Lance commits that
// `omnigraph cleanup` reclaims.
// Restore every drifted table (RolledPastExpected / UnexpectedAtP1 /
// UnexpectedMultistep) to its manifest-pinned content, then PUBLISH so
// `manifest == Lance HEAD` for each — symmetric with roll-forward. The
// restore commit's content equals the manifest-pinned version, so re-pinning
// the manifest to the new (restored) HEAD is content-correct and closes the
// orphaned-drift class (`HEAD > manifest` with no covering sidecar). This is
// what makes a failed-then-retried schema_apply converge: after one
// roll-back `manifest == HEAD`, so the retry's precondition passes instead of
// failing one version higher each iteration.
//
// NoMovement tables are already at the pin — excluded from both the restore
// and the publish. The audit `to_version` stays the *logical* rolled-back-to
// version (`manifest_pinned`), while the manifest is published at
// `manifest_pinned + 1` (the restore commit, same content) — keep that
// asymmetry so the audit records the drift (`from_version > to_version`).
let mut outcomes = Vec::with_capacity(sidecar.tables.len());
let mut updates: Vec<ManifestChange> = Vec::with_capacity(sidecar.tables.len());
let mut expected: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
for (pin, state) in sidecar.tables.iter().zip(states.iter()) {
if matches!(
state.classification,
@ -821,10 +843,20 @@ async fn roll_back_sidecar(
state.manifest_pinned,
)
.await?;
// `from_version` records the Lance HEAD observed BEFORE the
// restore (the actual drift), not the manifest pin. Operators
// reading `_graph_commit_recoveries.lance` see "rolled back
// from v7 to v5" rather than "v5 → v5".
// Publish the post-restore HEAD, CAS against the current (unmoved)
// manifest pin — the same helper roll-forward uses.
push_table_update_at_head(
root_uri,
&pin.table_key,
&pin.table_path,
pin.table_branch.as_deref(),
state.manifest_pinned,
&mut updates,
&mut expected,
)
.await?;
// `from_version` records the Lance HEAD observed BEFORE the restore
// (the actual drift); `to_version` the logical pin we rolled back to.
outcomes.push(TableOutcome {
table_key: pin.table_key.clone(),
from_version: state.lance_head,
@ -832,13 +864,23 @@ async fn roll_back_sidecar(
});
}
}
// Manifest pin doesn't move on rollback; record an audit-only
// commit at the existing version so operators can correlate via
// `omnigraph commit list --filter actor=omnigraph:recovery`.
// Publish the restored HEADs so manifest == HEAD. A degenerate all-NoMovement
// roll-back restores nothing — there's nothing to publish, and the audit
// records the unchanged snapshot version.
let manifest_version = if updates.is_empty() {
snapshot.version()
} else {
let publisher = GraphNamespacePublisher::new(root_uri, sidecar.branch.as_deref());
publisher
.publish(&updates, &expected)
.await?
.version()
.version
};
record_audit(
root_uri,
sidecar,
snapshot.version(),
manifest_version,
RecoveryKind::RolledBack,
outcomes,
)
@ -919,44 +961,20 @@ async fn roll_forward_all(
HashMap::with_capacity(sidecar.tables.len() + sidecar.additional_registrations.len());
for pin in &sidecar.tables {
// Open the dataset at its CURRENT Lance HEAD on the pin's branch
// (not at the sidecar's post_commit_pin). For strict-match writers
// (Mutation/Load) HEAD == post_commit_pin by construction. For
// loose-match writers (SchemaApply/EnsureIndices/BranchMerge) HEAD
// may be higher than post_commit_pin (multiple commit_staged
// calls per table); we want to publish to the actual current HEAD.
let head_ds = Dataset::open(&pin.table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head_ds = match pin.table_branch.as_deref() {
Some(b) if b != "main" => head_ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head_ds,
};
let head_version = head_ds.version().version;
let row_count = head_ds
.count_rows(None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
let table_relative_path = super::table_path_for_table_key(&pin.table_key)?;
let version_metadata = super::metadata::TableVersionMetadata::from_dataset(
// Publish to the table's CURRENT Lance HEAD on the pin's branch (not the
// sidecar's `post_commit_pin`, a lower bound for loose-match writers that
// run multiple commit_staged calls per table). CAS against the pin's
// pre-write `expected_version`.
let head_version = push_table_update_at_head(
root_uri,
&table_relative_path,
&head_ds,
)?;
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: pin.table_key.clone(),
table_version: head_version,
table_branch: pin.table_branch.clone(),
row_count,
version_metadata,
}));
expected.insert(pin.table_key.clone(), pin.expected_version);
&pin.table_key,
&pin.table_path,
pin.table_branch.as_deref(),
pin.expected_version,
&mut updates,
&mut expected,
)
.await?;
published_versions.insert(pin.table_key.clone(), head_version);
}
@ -1047,6 +1065,57 @@ async fn roll_forward_all(
Ok((new_dataset.version().version, published_versions))
}
/// Open `table_path` at its branch HEAD, read the current Lance HEAD version,
/// row count, and version metadata, and push a `ManifestChange::Update` (plus
/// its CAS `expected` entry) that re-pins the manifest to that HEAD. Returns the
/// published HEAD version.
///
/// Shared by `roll_forward_all` (where `expected_version` is the sidecar's
/// pre-write pin) and `roll_back_sidecar` (where it is the manifest-pinned
/// version the table was just restored to). The HEAD is read AFTER any restore
/// in the same single-threaded sweep, so no concurrent writer can have advanced
/// it.
async fn push_table_update_at_head(
root_uri: &str,
table_key: &str,
table_path: &str,
branch: Option<&str>,
expected_version: u64,
updates: &mut Vec<ManifestChange>,
expected: &mut HashMap<String, u64>,
) -> Result<u64> {
let head_ds = Dataset::open(table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head_ds = match branch {
Some(b) if b != "main" => head_ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head_ds,
};
let head_version = head_ds.version().version;
let row_count = head_ds
.count_rows(None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
let table_relative_path = super::table_path_for_table_key(table_key)?;
let version_metadata = super::metadata::TableVersionMetadata::from_dataset(
root_uri,
&table_relative_path,
&head_ds,
)?;
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: table_key.to_string(),
table_version: head_version,
table_branch: branch.map(str::to_string),
row_count,
version_metadata,
}));
expected.insert(table_key.to_string(), expected_version);
Ok(head_version)
}
/// Append the audit row describing this recovery action.
///
/// Two-part write: (a) `_graph_commits.lance` row anchored on the recovery

View file

@ -8,8 +8,14 @@
//! Two dials:
//!
//! * `optimize_all_tables` — Lance `compact_files` on every table. Rewrites
//! small fragments into fewer large ones. Non-destructive (creates a new
//! version; old fragments remain reachable via older manifest versions).
//! small fragments into fewer large ones, then **publishes the compacted
//! version to the `__manifest`** so the manifest's `table_version` tracks the
//! compacted Lance HEAD (reads pin the manifest version, so without the
//! publish compaction would be invisible to readers and would break the
//! HEAD-vs-manifest precondition of schema apply / strict writes). Compaction
//! is content-preserving (Lance `Operation::Rewrite` "reorganizes data
//! without semantic modification"), so old fragments remain reachable via
//! older manifest versions until `cleanup` runs.
//! * `cleanup_all_tables` — Lance `cleanup_old_versions` on every table.
//! Removes manifests (and their unique fragments) older than the configured
//! retention. Destructive to version history — callers should gate this
@ -23,7 +29,9 @@ use std::time::Duration;
use chrono::Utc;
use futures::stream::StreamExt;
use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
use lance::dataset::optimize::{CompactionMetrics, CompactionOptions, compact_files};
use lance::dataset::optimize::{
CompactionMetrics, CompactionOptions, compact_files, plan_compaction,
};
use super::*;
@ -111,7 +119,8 @@ pub struct TableOptimizeStats {
pub fragments_removed: usize,
/// Number of new, larger fragments Lance produced.
pub fragments_added: usize,
/// Did this table get a new Lance manifest version from the compaction?
/// Did this table get a new manifest version from the compaction? True when
/// compaction ran and its compacted version was published to `__manifest`.
pub committed: bool,
/// `Some(reason)` if this table was deliberately not compacted. When set,
/// `fragments_removed == 0`, `fragments_added == 0`, and `!committed`.
@ -153,12 +162,29 @@ pub struct TableCleanupStats {
pub error: Option<String>,
}
/// Run Lance `compact_files` on every node + edge table on `main`.
/// Tables run in parallel (bounded concurrency).
/// Run Lance `compact_files` on every node + edge table on `main`, publishing
/// each compacted table's new version to the `__manifest`. Tables run in
/// parallel (bounded concurrency); each is fault-isolated only at the Lance
/// level — a publish error is propagated (the recovery sidecar covers it).
pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStats>> {
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("optimize").await?;
// Refuse on an unrecovered graph. A pending recovery sidecar means a failed
// write left partial state that the open-time sweep must resolve (roll
// forward/back) first; compacting + publishing a table covered by such a
// sidecar could commit a partial write the sweep would roll back. Reopen the
// graph to run recovery, then re-run optimize.
if !crate::db::manifest::list_sidecars(db.root_uri(), db.storage_adapter())
.await?
.is_empty()
{
return Err(OmniError::manifest_conflict(
"optimize requires a clean recovery state; reopen the graph to run the \
recovery sweep before optimizing",
));
}
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
@ -183,49 +209,179 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
}
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let table_store = &db.table_store;
let stats: Vec<Result<TableOptimizeStats>> = futures::stream::iter(table_tasks.into_iter())
.map(|(table_key, full_path, has_blob)| async move {
// Lance `compact_files` mis-decodes blob-v2 columns under the forced
// `BlobHandling::AllBinary` read (see LANCE_SUPPORTS_BLOB_COMPACTION).
// Skip blob-bearing tables and report it rather than aborting the
// whole sweep — the other tables still compact.
if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION {
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
"skipping compaction: table has blob columns the current Lance \
cannot rewrite (blob-v2 AllBinary decode bug); other tables \
unaffected rerun after the Lance fix",
);
return Ok(TableOptimizeStats::skipped(
table_key,
SkipReason::BlobColumnsUnsupportedByLance,
));
}
let mut ds = table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let version_before = ds.version().version;
let metrics: CompactionMetrics =
compact_files(&mut ds, CompactionOptions::default(), None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let version_after = ds.version().version;
Ok(TableOptimizeStats::compacted(
table_key,
&metrics,
version_after != version_before,
))
.map(move |(table_key, full_path, has_blob)| async move {
optimize_one_table(db, table_key, full_path, has_blob).await
})
.buffer_unordered(concurrency)
.collect()
.await;
// Invalidate caches for any table that published a compaction — done BEFORE
// propagating a sibling table's error, since the published versions are
// durable and reads must observe the new fragment layout (Lance invalidates
// the original row addresses on rewrite). The CSR/CSC graph topology index
// is rebuilt only when an edge table moved. Mirrors schema_apply's
// post-publish invalidation.
let any_committed = stats
.iter()
.any(|s| matches!(s, Ok(st) if st.committed));
let edge_committed = stats
.iter()
.any(|s| matches!(s, Ok(st) if st.committed && st.table_key.starts_with("edge:")));
if any_committed {
db.runtime_cache.invalidate_all().await;
if edge_committed {
db.invalidate_graph_index().await;
}
}
stats.into_iter().collect()
}
/// Compact one table and publish the compacted version to the `__manifest`.
///
/// Compaction (`compact_files`) advances the *dataset's* Lance HEAD via a
/// reserve-fragments + rewrite commit, but Lance knows nothing about the
/// `__manifest`. To keep the manifest the single authority for each table's
/// visible version (invariant 2), optimize must publish the compacted version.
/// The Lance-HEAD-before-manifest-publish gap is unavoidable (Lance has no
/// staged/uncommitted compaction), so it is covered by a recovery sidecar like
/// the other multi-commit writers; roll-forward is always safe because
/// compaction is content-preserving.
async fn optimize_one_table(
db: &Omnigraph,
table_key: String,
full_path: String,
has_blob: bool,
) -> Result<TableOptimizeStats> {
// Lance `compact_files` mis-decodes blob-v2 columns under the forced
// `BlobHandling::AllBinary` read (see LANCE_SUPPORTS_BLOB_COMPACTION). Skip
// blob-bearing tables and report it rather than aborting the whole sweep.
if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION {
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
"skipping compaction: table has blob columns the current Lance \
cannot rewrite (blob-v2 AllBinary decode bug); other tables \
unaffected rerun after the Lance fix",
);
return Ok(TableOptimizeStats::skipped(
table_key,
SkipReason::BlobColumnsUnsupportedByLance,
));
}
// Serialize the whole compact→publish against concurrent mutations on this
// (table, main): compaction is a Rewrite op that retryable-conflicts with a
// concurrent Merge/Update/Delete on overlapping fragments, and an
// interleaved write would also move the manifest version out from under the
// CAS below. Holding the queue makes the CAS baseline read under it exact.
let _guard = db
.write_queue()
.acquire_many(&[(table_key.clone(), None)])
.await;
let mut ds = db
.table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
// CAS baseline: the table's current manifest version, read under the queue
// (in-memory coordinator snapshot, no storage I/O — stable for this section).
let expected_version = db
.snapshot()
.await
.entry(&table_key)
.map(|e| e.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
// Precise "will it compact?" check — `plan_compaction` also accounts for
// deletion materialization (which can rewrite even a single fragment). A
// steady-state already-compacted table yields an empty plan and is never
// pinned in a sidecar (a zero-commit pin would classify NoMovement on
// recovery and force an all-or-nothing rollback). There is no drift to
// reconcile here: optimize runs only on a recovered graph (the pending-
// sidecar guard above), and recovery roll-back now publishes, so
// `HEAD == manifest` holds going in.
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
if plan.num_tasks() == 0 {
return Ok(TableOptimizeStats::compacted(
table_key,
&CompactionMetrics::default(),
false,
));
}
// Phase A: recovery sidecar BEFORE compaction advances the Lance HEAD, so a
// crash before the manifest publish rolls forward on next open.
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::Optimize,
None,
// optimize is system-attributed (no `optimize_as` actor API today).
None,
vec![crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: full_path.clone(),
expected_version,
// Lower bound — compaction commits N≥1 versions (reserve + rewrite);
// the classifier loose-matches SidecarKind::Optimize.
post_commit_pin: expected_version + 1,
table_branch: None,
}],
);
let handle =
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?;
// Phase B: compaction (reserve-fragments + rewrite commits advance HEAD).
let version_before = ds.version().version;
let metrics: CompactionMetrics = compact_files(&mut ds, options, None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let version_after = ds.version().version;
let committed = version_after != version_before;
// Pin the per-writer Phase B → Phase C residual for optimize: Lance HEAD has
// advanced but the manifest publish below hasn't run.
crate::failpoints::maybe_fail("optimize.post_phase_b_pre_manifest_commit")?;
// Phase C: publish the compacted version to the manifest (one CAS commit,
// expected = the version observed under the queue). On failure the sidecar
// is intentionally left for the open-time recovery sweep to roll forward.
if committed {
let state = db.table_store.table_state(&full_path, &ds).await?;
let update = crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.clone(), expected_version);
db.coordinator
.write()
.await
.commit_updates_with_actor_with_expected(&[update], &expected, None)
.await?;
}
// Phase D: delete the sidecar (best-effort; recovery resolves a leftover).
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it"
);
}
Ok(TableOptimizeStats::compacted(table_key, &metrics, committed))
}
/// Run Lance `cleanup_old_versions` on every node + edge table on `main`,
/// using [`CleanupPolicyOptions`]. The latest manifest is always preserved
/// regardless (Lance invariant).

View file

@ -294,21 +294,19 @@ async fn composite_flow_canonical_lifecycle() {
);
// ─────────────────────────────────────────────────────────────────
// Step 10: optimize the post-merge graph — verify indices stay
// valid and queryable.
// Step 10: optimize the post-merge graph — verify compaction is
// published to the manifest (so the manifest pin tracks the compacted
// Lance HEAD), indices stay valid and queryable, and a post-optimize
// strict write commits.
//
// **Known limitation**: `optimize_all_tables` calls Lance
// `compact_files` directly — it advances per-table Lance HEAD
// without updating the omnigraph `__manifest` pin. After optimize,
// the next writer's expected_table_versions captures the
// pre-optimize manifest pin, but the publisher's pre-check reads
// a higher version from the manifest dataset (because some other
// path — possibly schema-state recovery on reopen — wrote a newer
// __manifest row). The `ExpectedVersionMismatch` is benign
// (re-issuing the mutation after a snapshot refresh succeeds), but
// a composite test cannot reliably exercise post-optimize mutations
// until that path is investigated. Coverage of post-optimize
// mutations is left to a focused optimize+cleanup integration test.
// This step used to carry a "Known limitation": `optimize_all_tables`
// ran Lance `compact_files` without publishing the new version to
// `__manifest`, so the manifest pin lagged the Lance HEAD and the next
// strict write / schema apply failed with `ExpectedVersionMismatch`
// ("stale view … refresh and retry") — so post-optimize mutations were
// deliberately omitted here. optimize now publishes the compacted
// version, and this flow exercises exactly that previously-failing
// write below.
// ─────────────────────────────────────────────────────────────────
let optimize_stats = db.optimize().await.unwrap();
assert!(
@ -331,6 +329,28 @@ async fn composite_flow_canonical_lifecycle() {
"row counts unchanged by optimize"
);
// A strict update on a compacted table is exactly the write that
// failed with "stale view" before optimize published its compaction.
// It must now commit (Alice is one of the seed Persons; an update
// leaves the row count at 6).
let post_optimize_update = mutate_main(
&mut db,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Alice")], &[("$age", 41)]),
)
.await
.expect("post-optimize strict update must commit — optimize published the manifest");
assert_eq!(
post_optimize_update.affected_nodes, 1,
"post-optimize update must affect exactly Alice"
);
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"an update must not change the Person row count"
);
// ─────────────────────────────────────────────────────────────────
// Step 11: cleanup — keep last 10 versions, only purge versions
// older than 1 hour. With this small test, we have well under 10
@ -373,14 +393,27 @@ async fn composite_flow_canonical_lifecycle() {
branches,
);
// Final query exercise — full read path works post-reopen,
// post-cleanup. Post-cleanup mutation is omitted here pending
// resolution of the optimize-vs-manifest-pin interaction documented
// in Step 10.
// Final exercise — full read AND write path works post-reopen,
// post-cleanup. (The post-cleanup mutation was previously omitted
// pending resolution of the optimize-vs-manifest-pin interaction in
// Step 10; that is now fixed, so a strict write here must commit.)
let final_total = query_main(&mut db, TEST_QUERIES, "total_people", &ParamMap::default())
.await
.unwrap();
assert!(!final_total.batches().is_empty());
let post_reopen_update = mutate_main(
&mut db,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Alice")], &[("$age", 42)]),
)
.await
.expect("post-reopen, post-cleanup strict update must commit");
assert_eq!(
post_reopen_update.affected_nodes, 1,
"post-reopen update must affect exactly Alice"
);
}
/// Cross-handle sequence that exercises operations after a schema_apply

View file

@ -1933,3 +1933,87 @@ query docs_with_tag($tag: String) {
"contains-pushdown should return exactly the rows whose tags list contains 'red'"
);
}
// ─── Maintenance in the full lifecycle: optimize (compaction) ────────────────
/// `optimize` (Lance compaction) is part of a realistic graph lifecycle: it
/// advances the Lance HEAD and publishes the compacted version to the manifest.
/// The rest of the flow must keep working across that boundary — reads observe
/// the compacted data, strict updates (which check Lance HEAD == manifest
/// version) still commit, inserts still commit, and the state survives a reopen
/// (the open-time recovery sweep finds no leftover drift). Before optimize
/// published its compaction, the manifest lagged the Lance HEAD here and the
/// post-optimize update below failed with "stale view ... refresh and retry".
#[tokio::test]
async fn full_flow_optimize_then_query_update_and_reopen() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = init_and_load(&dir).await;
// Build several Person fragments so compaction has something to merge.
for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42)] {
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
let stats = db.optimize().await.unwrap();
assert!(
stats.iter().any(|s| s.committed),
"a multi-fragment table should have compacted in this flow"
);
// Reads observe the compacted data.
let qr = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&params(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
// Strict update after optimize commits (previously failed with "stale view"
// because the manifest lagged the compacted Lance HEAD).
let upd = mutate_main(
&mut db,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Alice")], &[("$age", 31)]),
)
.await
.unwrap();
assert_eq!(upd.affected_nodes, 1);
// Insert after optimize also commits.
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Ivan")], &[("$age", 50)]),
)
.await
.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, 8); // 4 seed + Eve/Frank/Grace + Ivan
// State survives a reopen — the recovery sweep runs and finds no drift.
drop(db);
let reopened = Omnigraph::open(&uri).await.unwrap();
assert_eq!(count_rows(&reopened, "node:Person").await, 8);
let alice = reopened
.entity_at_target(ReadTarget::branch("main"), "node:Person", "Alice")
.await
.unwrap()
.unwrap();
assert_eq!(
alice["age"],
serde_json::json!(31),
"Alice's post-optimize age update must persist across reopen"
);
}

View file

@ -1245,7 +1245,7 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
// the rollback (will use Dataset::restore safely; no concurrent
// writers at open time).
drop(db);
let _db = Omnigraph::open(&uri).await.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
// After full-sweep recovery, the sidecar should be processed
// (deleted). Sidecar's tables are eligible for rollback (UnexpectedAtP1):
// restore happens on Person (HEAD advances by 1).
@ -1268,6 +1268,19 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
"full sweep must run Dataset::restore (head advances); \
post_head={post_head}, final_head={final_head}",
);
// Convergence: roll-back published the restored HEAD, so the manifest pin
// tracks Lance HEAD afterward (no residual drift).
let entry_version = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.unwrap()
.table_version;
assert_eq!(
entry_version, final_head,
"full-sweep roll-back must publish so manifest pin ({entry_version}) == Lance HEAD ({final_head})",
);
}
/// Companion to the above — confirms that a finalize→publisher failure
@ -1461,10 +1474,15 @@ edge WorksAt: Person -> Company
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
version_main(&db).await.unwrap(),
pre_failure_version,
"manifest must remain on the old schema when no schema staging files existed"
// Roll-back now publishes the restored version, so the manifest version
// advances — but to the OLD-schema content: the migration never applied
// (asserted by count_rows + the `_schema.pg` checks below), and the sweep
// converges (`manifest == Lance HEAD`, asserted by
// assert_post_recovery_invariants's RolledBack arm).
assert!(
version_main(&db).await.unwrap() > pre_failure_version,
"roll-back publishes the restored (old-schema) version, advancing the manifest; \
pre={pre_failure_version}",
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
@ -1637,6 +1655,100 @@ edge WorksAt: Person -> Company
);
}
/// `optimize` Phase B → Phase C residual: `compact_files` advanced the Lance
/// HEAD but the manifest publish hasn't run. The `Optimize` recovery sidecar
/// (loose-match, like SchemaApply/EnsureIndices) must roll the compacted version
/// forward on next open so the manifest tracks the Lance HEAD — and the healed
/// table must then accept a schema apply (the original bug's victim).
#[tokio::test]
async fn optimize_phase_b_failure_recovered_on_next_open() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
// Seed: several separate Person inserts → multiple fragments, so compaction
// has real work and advances the Lance HEAD.
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
}
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
// Failpoint fires AFTER compact_files advanced the Lance HEAD but BEFORE the
// manifest publish. The Optimize sidecar persists (only node:Person has
// compactable fragments, so exactly one sidecar is written).
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("optimize.post_phase_b_pre_manifest_commit", "return");
let err = db.optimize().await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: optimize.post_phase_b_pre_manifest_commit"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one Optimize sidecar must persist after optimize failure"
);
operation_id = single_sidecar_operation_id(dir.path());
}
// Recovery: reopen runs the sweep. The Optimize sidecar classifies
// RolledPastExpected (loose-match) → RollForward → manifest extends to the
// compacted Lance HEAD.
let db = Omnigraph::open(&uri).await.unwrap();
let post_recovery_version = version_main(&db).await.unwrap();
assert!(
post_recovery_version > pre_failure_version,
"manifest version must advance post-recovery (compaction rolled forward); \
pre={pre_failure_version}, post={post_recovery_version}",
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![TableExpectation::main("node:Person")],
},
)
.await
.unwrap();
// The healed table accepts an additive schema apply — its HEAD-vs-manifest
// precondition is satisfied because recovery published the compacted version.
let db = Omnigraph::open(&uri).await.unwrap();
let desired = helpers::TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
db.apply_schema(&desired)
.await
.expect("schema apply after optimize recovery must succeed");
}
#[tokio::test]
async fn branch_merge_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};

View file

@ -181,6 +181,9 @@ pub async fn assert_post_recovery_invariants(
"audit row for {operation_id} recorded the wrong recovery_kind",
);
assert_rollback_outcomes_record_drift(&audit);
// Roll-back now publishes the restored HEAD, so manifest == Lance
// HEAD afterward (symmetric with roll-forward) — no residual drift.
assert_manifest_pins_match_lance_heads(graph_root, &tables).await?;
assert_recovery_commit_shape(graph_root, &audit, &tables).await?;
assert_non_main_did_not_move_main(graph_root, &tables).await?;
assert_idempotent_reopen(graph_root, operation_id).await?;

View file

@ -8,10 +8,12 @@ mod helpers;
use std::time::Duration;
use lance::Dataset;
use omnigraph::db::{CleanupPolicyOptions, Omnigraph, SkipReason};
use omnigraph::db::{CleanupPolicyOptions, Omnigraph, ReadTarget, SkipReason};
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::{TEST_DATA, TEST_SCHEMA, count_rows, init_and_load};
use helpers::{
MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, count_rows, init_and_load, mixed_params, mutate_main,
};
/// Filesystem URI of a node sub-table, mirroring the engine's layout
/// (FNV-1a of the type name under `nodes/`). Matches the helper in
@ -163,6 +165,124 @@ node Tag {\n slug: String @key\n}\n";
assert_eq!(tag.skipped, None, "non-blob table must not be skipped");
}
// Regression: `optimize` must publish its compaction to the `__manifest` so the
// manifest's recorded `table_version` tracks the compacted Lance HEAD.
//
// Lance `compact_files` advances the *dataset's* version (reserve-fragments +
// rewrite commits) but knows nothing about OmniGraph's `__manifest`. If optimize
// does not publish a manifest update, the manifest's `table_version` lags the
// Lance HEAD: reads stay pinned to the pre-compaction version (compaction is
// invisible to them) and any subsequent schema apply / strict update/delete
// fails its HEAD-vs-manifest precondition with
// "stale view of '<table>': expected manifest table version X but current is Y".
// This pins the fix — optimize publishes the compacted version, so manifest ==
// HEAD and migrations after a compaction succeed.
#[tokio::test]
async fn optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_str().unwrap().trim_end_matches('/').to_string();
let mut db = init_and_load(&dir).await;
// Several separate inserts → multiple Person fragments, so `compact_files`
// actually merges and moves the Lance HEAD (a single fragment is a no-op).
for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42), ("Heidi", 43)] {
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age as i64)]),
)
.await
.expect("insert");
}
let stats = db.optimize().await.unwrap();
let person = stats
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person stat present");
assert!(
person.committed,
"Person is multi-fragment, so optimize must have compacted it"
);
// After optimize, the manifest's recorded table_version must equal the actual
// Lance HEAD — optimize published its compaction, so there is no drift.
let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
let entry = snap.entry("node:Person").unwrap();
let manifest_version = entry.table_version;
let full = format!("{}/{}", root, entry.table_path);
let lance_head = Dataset::open(&full).await.unwrap().version().version;
assert_eq!(
manifest_version, lance_head,
"after optimize, manifest table_version ({manifest_version}) must equal Lance HEAD ({lance_head})",
);
// Reads observe the compacted version with rows preserved (4 seed + 4 inserts).
assert_eq!(count_rows(&db, "node:Person").await, 8);
// The headline: an additive (nullable property) migration touching the
// just-compacted table succeeds, where it previously failed with "stale view".
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
let result = db
.apply_schema(&desired)
.await
.expect("additive schema apply after optimize must succeed");
assert!(result.applied, "schema apply should report applied=true");
}
// Regression: `optimize` must REFUSE when an unresolved recovery sidecar is
// pending. Operating on an unrecovered graph could publish a partial write that
// the all-or-nothing recovery sweep would roll back; the operator must reopen
// (run the recovery sweep) first.
#[tokio::test]
async fn optimize_defers_when_recovery_sidecar_is_pending() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = init_and_load(&dir).await;
// Simulate an in-process failed write that left a recovery sidecar on disk.
let recovery_dir = dir.path().join("__recovery");
std::fs::create_dir_all(&recovery_dir).unwrap();
let person_path = node_table_uri(uri, "Person");
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H000000000000000000DEFR",
"started_at": "0",
"branch": null,
"actor_id": "act-test",
"writer_kind": "Mutation",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{}",
"expected_version": 1,
"post_commit_pin": 2
}}
]
}}"#,
person_path
);
std::fs::write(
recovery_dir.join("01H000000000000000000DEFR.json"),
sidecar_json,
)
.unwrap();
let err = db
.optimize()
.await
.expect_err("optimize must defer (error) while a recovery sidecar is pending");
assert!(
err.to_string().to_lowercase().contains("recovery"),
"optimize defer error should mention recovery; got: {err}",
);
}
#[tokio::test]
async fn cleanup_without_any_policy_option_errors() {
let dir = tempfile::tempdir().unwrap();

View file

@ -278,6 +278,97 @@ async fn recovery_rolls_back_synthetic_drift_on_open() {
);
}
/// Regression: recovery roll-back must PUBLISH the restored version so
/// `manifest == Lance HEAD` afterward (no residual "orphaned drift"). Before the
/// fix, roll-back restored via `Dataset::restore` but left the manifest pin
/// behind HEAD, so a subsequent strict write / schema apply failed its
/// HEAD-vs-manifest precondition ("stale view … refresh and retry") — and a
/// failed schema apply's own roll-back leaked +1 each retry (the original bug's
/// loop). With convergence, one roll-back leaves `manifest == HEAD` and the
/// follow-up succeeds.
#[tokio::test]
async fn recovery_rollback_converges_manifest_so_schema_apply_succeeds() {
use omnigraph::db::ReadTarget;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Person","data":{"name":"bob","age":25}}
"#,
LoadMode::Append,
)
.await
.unwrap();
drop(db);
// Forge a Phase-B residual: advance Person's Lance HEAD without publishing to
// the manifest (the manifest pin stays at the load's committed version).
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let manifest_pin = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
drop(ds);
// Roll-back-classified sidecar (post_commit_pin != observed head ⇒
// UnexpectedAtP1 ⇒ RollBack).
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H0000000000000000000CVG",
"started_at": "0",
"branch": null,
"actor_id": "act-test",
"writer_kind": "Mutation",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{}",
"expected_version": {},
"post_commit_pin": {}
}}
]
}}"#,
person_uri, manifest_pin, manifest_pin
);
write_sidecar_file(dir.path(), "01H0000000000000000000CVG", &sidecar_json);
// Reopen runs the sweep: restore Person to manifest_pin, then PUBLISH so the
// manifest tracks the restored Lance HEAD.
let db = Omnigraph::open(uri).await.unwrap();
// Convergence: manifest pin == Lance HEAD. Fails before the fix — the
// manifest stays at manifest_pin while HEAD advanced past it.
let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
let entry = snap.entry("node:Person").unwrap();
let lance_head = Dataset::open(&person_uri).await.unwrap().version().version;
assert_eq!(
entry.table_version, lance_head,
"roll-back must publish so manifest pin ({}) == Lance HEAD ({})",
entry.table_version, lance_head,
);
// The +1-loop victim: an additive schema apply must now succeed (its
// HEAD-vs-manifest precondition is satisfied). Before the fix this failed
// with "stale view … refresh and retry".
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
db.apply_schema(&desired)
.await
.expect("schema apply after a converging roll-back must succeed");
}
// =====================================================================
// Phase 4 — roll-forward path + audit row recording
// =====================================================================

View file

@ -34,10 +34,10 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
| `s3_storage.rs` | S3-backed graph (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set) |
| `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior |
| `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths |
| `maintenance.rs` | `optimize` (compaction) + `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation |
| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the four per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`). |
| `maintenance.rs` | `optimize` (compaction) + `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation; `optimize` publishes the compacted version so the manifest tracks the Lance HEAD and a subsequent schema apply succeeds (`optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds`), and reconciles a pre-existing manifest-behind-HEAD drift forged via raw Lance compaction (`optimize_reconciles_preexisting_manifest_head_drift`) |
| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`). |
| `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path |
| `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories). |
| `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories, post-optimize and post-cleanup strict writes). |
## Fixtures

View file

@ -157,10 +157,14 @@ are left at `Lance HEAD = manifest_pinned + 1`.
**Recovery protocol** (lifecycle of every staged-write writer —
`MutationStaging::finalize`, `schema_apply::apply_schema_with_lock`,
`branch_merge_on_current_target`, `ensure_indices_for_branch`):
`branch_merge_on_current_target`, `ensure_indices_for_branch`,
`optimize_all_tables`):
1. **Phase A**: writer writes a sidecar JSON to
`__recovery/{ulid}.json` BEFORE its first `commit_staged`. The
`__recovery/{ulid}.json` BEFORE its first HEAD-advancing commit
(`commit_staged`, or `compact_files` for `optimize_all_tables`,
which advances the Lance HEAD via a reserve-fragments + rewrite
commit rather than a staged write). The
sidecar names every `(table_key, table_path, expected_version,
post_commit_pin)` it intends to commit + the writer kind +
actor_id.
@ -195,8 +199,13 @@ recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`:
otherwise full open-time recovery rolls them back and refresh-time
recovery leaves them for the next read-write open.
- Otherwise **roll back**: per-table `Dataset::restore` to the
manifest-pinned table version for that branch. Rollback records the
actual restore target in the audit row's `to_version`.
manifest-pinned table version, then a single `ManifestBatchPublisher::publish`
of the restored HEAD — symmetric with roll-forward, so `manifest == HEAD`
after recovery (no residual drift). This convergence is what lets a
failed-then-retried schema apply succeed instead of failing one version higher
each iteration. The audit row's `to_version` records the logical
rolled-back-to version (`manifest_pinned`); the manifest is published at the
restore commit (`manifest_pinned + 1`, same content).
- After a successful roll-forward or roll-back, an audit row is
recorded — `_graph_commits.lance` carries
a commit tagged `actor_id = "omnigraph:recovery"`, and a sibling

View file

@ -58,6 +58,6 @@ Internal or legacy branch refs:
## L2 — Recovery audit trail
The four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) protect their multi-table commits with a sidecar at `__recovery/{ulid}.json` written before Phase B and deleted after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: classify per-table state, decide all-or-nothing per sidecar, roll forward / back, record an audit row.
The five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) protect their multi-table commits with a sidecar at `__recovery/{ulid}.json` written before Phase B and deleted after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: classify per-table state, decide all-or-nothing per sidecar, roll forward / back, record an audit row.
Audit rows live in `_graph_commit_recoveries.lance` (sibling to `_graph_commits.lance`) and reference the commit graph by `graph_commit_id`. The linked recovery commit is identified by that same `graph_commit_id`, and `actor_id="omnigraph:recovery"` is stored in `_graph_commit_actors.lance` (joined by `graph_commit_id`) — `_graph_commits.lance` itself does not carry the `actor_id` column. To find recoveries for a specific original actor: `omnigraph commit list --filter actor=omnigraph:recovery`, then join to `_graph_commit_recoveries.lance` by `graph_commit_id` to read `recovery_for_actor`. Schema: see `crates/omnigraph/src/db/recovery_audit.rs`.

View file

@ -4,8 +4,10 @@
## `optimize_all_tables(db)` — non-destructive
- Lance `compact_files()` on every node + edge table on `main`.
- Rewrites small fragments into fewer large ones; old fragments remain reachable via older manifests.
- Lance `compact_files()` on every node + edge table on `main`, then **publishes the compacted version to the `__manifest`** so the manifest's `table_version` tracks the compacted Lance HEAD. Reads pin the manifest version, so without this publish compaction would be invisible to readers *and* would break the HEAD-vs-manifest precondition of the next schema apply / strict update/delete ("stale view … refresh and retry"). The publish advances the graph version (a system-attributed commit) only for tables that actually compacted.
- Rewrites small fragments into fewer large ones; old fragments remain reachable via older manifests until `cleanup` runs.
- Each table's compact→publish runs under its per-`(table, main)` write queue (serializing with concurrent mutations — compaction is a Lance `Rewrite` op that retryable-conflicts with a concurrent merge/update/delete on overlapping fragments). The Lance-HEAD-before-manifest-publish gap is covered by a `SidecarKind::Optimize` recovery sidecar (loose-match): a crash in that window rolls the compacted version forward on the next `Omnigraph::open` (compaction is content-preserving, so roll-forward is always safe).
- **Requires a recovered graph.** `optimize` refuses (errors) when an unresolved recovery sidecar is present under `__recovery` — operating on an unrecovered graph could publish a partial write the open-time recovery sweep would roll back. Reopen the graph to run the recovery sweep, then re-run `optimize`. (Recovery roll-back now publishes its restored version, so a recovered graph always satisfies `manifest == Lance HEAD` going in; there is no leftover drift for `optimize` to interpret.)
- Bounded by `OMNIGRAPH_MAINTENANCE_CONCURRENCY` (default 8).
- Returns `[TableOptimizeStats { table_key, fragments_removed, fragments_added, committed, skipped }]`.
- **Blob tables are skipped.** A table that declares any `Blob` property is not compacted: it is reported with `skipped: Some(BlobColumnsUnsupportedByLance)` (and logged via `tracing::warn`) instead of compacted, and the rest of the sweep proceeds normally. The current Lance `compact_files` mis-decodes blob-v2 columns under its forced `BlobHandling::AllBinary` read; **reads and writes are unaffected** — only compaction is. This is gated by `LANCE_SUPPORTS_BLOB_COMPACTION` (`db/omnigraph/optimize.rs`) and removed when the upstream Lance fix lands (see [docs/dev/lance.md](../dev/lance.md)). Consequence: fragment count and deleted-row space on blob tables are not reclaimed until then; query results are never affected.

View file

@ -94,7 +94,7 @@ flowchart TB
- **`nodes/`** and **`edges/`** are sibling directories holding one Lance dataset per declared type. Names are `fnv1a64-hex` of the type name to keep paths fixed-length and case-safe.
- **`_graph_commits.lance`** is an L2 dataset that records the graph-level commit DAG, with a paired `_graph_commit_actors.lance` for the actor map. (Pre-v0.4.0 graphs also have inert `_graph_runs.lance` / `_graph_run_actors.lance` from the removed Run state machine; the v2→v3 migration sweeps their stale `__run__*` branches, and the dataset bytes are reclaimed once `delete_prefix` lands.)
- **`_graph_commit_recoveries.lance`** — one row per recovery sweep action. Joined to `_graph_commits.lance` by `graph_commit_id`; the linked commit row carries `actor_id=omnigraph:recovery`. Operators correlate recoveries with the original mutations they rolled forward / back via this join. See `crates/omnigraph/src/db/recovery_audit.rs`.
- **`__recovery/{ulid}.json`** — transient sidecar files written by the four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) before Phase B begins, deleted after Phase C succeeds. A sidecar persisting after process exit means the writer crashed in the Phase B → Phase C window; the next `Omnigraph::open` recovery sweep processes it. Steady-state directory is empty. See `crates/omnigraph/src/db/manifest/recovery.rs`.
- **`__recovery/{ulid}.json`** — transient sidecar files written by the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) before Phase B begins, deleted after Phase C succeeds. A sidecar persisting after process exit means the writer crashed in the Phase B → Phase C window; the next `Omnigraph::open` recovery sweep processes it. Steady-state directory is empty. See `crates/omnigraph/src/db/manifest/recovery.rs`.
- **`_refs/branches/{name}.json`** is graph-level branch metadata — pointers from a branch name to the manifest version it heads.
- **Inside each Lance dataset** (orange): the standard Lance directory layout. `_versions/{n}.manifest` records every commit; `data/` holds the actual Arrow fragments; `_indices/{uuid}/` holds index segments with their own `fragment_bitmap` for partial coverage; `_refs/` holds Lance-native per-dataset branches and tags.