recovery: refresh-time roll-forward closes the in-process residual + invariants helper

Bundle of three correctness fixes plus a shared invariants helper that
existing tests now use.

1. SchemaApply atomicity: close the residual gap where a sidecar exists
   but staging files don't (e.g., Phase B failure BEFORE
   `_schema.pg.staging` write). `recover_schema_state_files` now returns
   a `SchemaStateRecovery` discriminator (`Noop` /
   `CleanedStaging` / `CompletedStagingRename { schema_apply_sidecar }`);
   the token threads through `recover_manifest_drift` →
   `process_sidecar`. SchemaApply sidecars are eligible for roll-forward
   ONLY when the staging rename completed in the same recovery pass.
   Full mode rolls back; RollForwardOnly defers. Without this, recovery
   would publish the manifest pin against new-schema data while
   `_schema.pg` stayed old (real corruption). New failpoint
   `schema_apply.before_staging_write` + new test
   `schema_apply_without_schema_staging_rolls_back_on_next_open` pin
   the gating.

2. Rollback target correction. Rollback now restores Lance HEAD to the
   current manifest pin (`state.manifest_pinned`) instead of the
   sidecar's `expected_version`. For UnexpectedAtP1/UnexpectedMultistep
   classifications these can differ; the old code could regress Lance
   HEAD past the manifest pin, re-introducing drift in the OTHER
   direction. The new behavior establishes `Lance HEAD == manifest pin`
   post-rollback — the canonical drift-free invariant. Param renamed
   from `expected_version` → `target_version` to match. Audit
   `to_version` records the actual restore target.

   This is a latent-behavior change. Any external consumer that compared
   `audit.to_version` against `sidecar.expected_version` for non-trivial
   classifications now sees the manifest pin instead.

3. Audit commit-graph unification. `record_audit` now opens the
   per-branch commit graph for ANY sidecar with `sidecar.branch.is_some()`
   — not just BranchMerge. Plain Mutation/Load/EnsureIndices commits on a
   feature branch now correctly land on that branch's commit graph,
   instead of main's. Closes the class of bug analogous to D2 but for
   non-merge writers.

   Pre-existing repos with non-main commits already on main's commit
   graph stay where they are; future recoveries write to the per-branch
   ref. Mixed-version compatibility is asymmetric but safe (old binaries
   ignore per-branch refs they don't know about; new binaries read both).

4. Recovery invariants helper + branch-axis cells. New
   `tests/helpers/recovery.rs` (~505 LOC) exports
   `assert_post_recovery_invariants(repo, op_id, RecoveryExpectation)`
   plus a `TableExpectation` builder. Six existing recovery tests
   refactored to call it; per-test bespoke assertions replaced. Two new
   branch-axis cells added in `tests/failpoints.rs`:
     - `recovery_rolls_forward_load_on_feature_branch`
     - `recovery_rolls_forward_ensure_indices_on_feature_branch`
   The loader gains a `mutation.post_finalize_pre_publisher` failpoint
   hook (gated on the `failpoints` feature; zero-cost in release) so the
   load test can pin the same Phase B → Phase C boundary the mutation
   path uses.

Misc:
   - `Omnigraph::refresh` extracts `reload_schema_if_source_changed`:
     early-return when schema source unchanged (saves IR parse + catalog
     rebuild on the steady-state refresh path).
   - New test injection point
     `failpoint_publish_table_head_without_index_rebuild_for_test`
     under `#[cfg(feature = "failpoints")]`.

Tests: 31 recovery + failpoint integration tests pass (14 + 17, up from
14 + 16). Full workspace sweep with `--features failpoints` clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-05 16:04:48 +02:00
parent 44c0d0bc4b
commit 815ff743f5
No known key found for this signature in database
12 changed files with 1379 additions and 449 deletions

View file

@ -15,7 +15,7 @@
//! `OpenMode::ReadWrite`) classifies each table in each sidecar and
//! either rolls forward all tables (if every table is at
//! `post_commit_pin` AND matches the sidecar) or rolls back all
//! `RolledPastExpected` tables to `expected_version`.
//! drifted tables to the manifest-pinned version.
//!
//! ## Verified Lance behavior the rollback path depends on
//!
@ -42,8 +42,9 @@ use tracing::warn;
use crate::db::commit_graph::CommitGraph;
use crate::db::graph_coordinator::GraphCoordinator;
use crate::db::recovery_audit::{
now_micros, RecoveryAudit, RecoveryAuditRecord, RecoveryKind, TableOutcome,
RecoveryAudit, RecoveryAuditRecord, RecoveryKind, TableOutcome, now_micros,
};
use crate::db::schema_state::SchemaStateRecovery;
use crate::error::{OmniError, Result};
use crate::storage::StorageAdapter;
@ -209,12 +210,11 @@ pub(crate) enum TableClassification {
RolledPastExpected,
/// `lance_head == manifest_pinned + 1` but the sidecar's
/// `expected_version`/`post_commit_pin` don't match. Some other writer
/// or recovery action moved this table. Roll back to
/// `sidecar.expected_version`.
/// or recovery action moved this table. Roll back to the manifest pin.
UnexpectedAtP1,
/// `lance_head > manifest_pinned + 1`. Multi-step orphan from a
/// previous restore attempt or an external mutation. Roll back to
/// `sidecar.expected_version`.
/// the manifest pin.
UnexpectedMultistep,
/// `lance_head < manifest_pinned`. Should be impossible: the manifest
/// pin can only advance after a successful Lance commit. Surface
@ -231,7 +231,7 @@ pub(crate) enum TableClassification {
///
/// - Any `InvariantViolation` → `Abort` (operator action required).
/// - Any `UnexpectedAtP1` / `UnexpectedMultistep` / `NoMovement` →
/// `RollBack` all `RolledPastExpected` tables to `expected_version`.
/// `RollBack` all drifted tables to the manifest pin.
/// - All `RolledPastExpected` → `RollForward` every table in one
/// manifest publish.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -419,7 +419,10 @@ pub(crate) fn classify_table(
pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision {
use SidecarDecision::*;
use TableClassification::*;
if classifications.iter().any(|c| matches!(c, InvariantViolation { .. })) {
if classifications
.iter()
.any(|c| matches!(c, InvariantViolation { .. }))
{
return Abort;
}
if classifications
@ -432,8 +435,8 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision
RollForward
}
/// Restore a single table's Lance HEAD to `expected_version`, producing a
/// new commit at HEAD+1 with content == content-at-`expected_version`.
/// Restore a single table's Lance HEAD to `target_version`, producing a
/// new commit at HEAD+1 with content == content-at-`target_version`.
///
/// Always runs the actual `Dataset::restore` — there is NO fragment-set
/// short-circuit because equal fragment IDs do NOT imply equal content:
@ -448,7 +451,7 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision
pub(crate) async fn restore_table_to_version(
table_path: &str,
branch: Option<&str>,
expected_version: u64,
target_version: u64,
) -> Result<()> {
let head = Dataset::open(table_path)
.await
@ -461,7 +464,7 @@ pub(crate) async fn restore_table_to_version(
_ => head,
};
let mut to_restore = head
.checkout_version(expected_version)
.checkout_version(target_version)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
to_restore
@ -494,6 +497,7 @@ pub(crate) async fn recover_manifest_drift(
storage: std::sync::Arc<dyn StorageAdapter>,
coordinator: &mut GraphCoordinator,
mode: RecoveryMode,
schema_state_recovery: SchemaStateRecovery,
) -> Result<()> {
let sidecars = list_sidecars(root_uri, storage.as_ref()).await?;
if sidecars.is_empty() {
@ -514,12 +518,9 @@ pub(crate) async fn recover_manifest_drift(
for sidecar in sidecars {
let branch_snapshot = match sidecar.branch.as_deref() {
Some(b) => {
let mut branch_coord = GraphCoordinator::open_branch(
root_uri,
b,
std::sync::Arc::clone(&storage),
)
.await?;
let mut branch_coord =
GraphCoordinator::open_branch(root_uri, b, std::sync::Arc::clone(&storage))
.await?;
branch_coord.refresh().await?;
branch_coord.snapshot()
}
@ -528,8 +529,15 @@ pub(crate) async fn recover_manifest_drift(
coordinator.snapshot()
}
};
process_sidecar(root_uri, storage.as_ref(), &branch_snapshot, &sidecar, mode)
.await?;
process_sidecar(
root_uri,
storage.as_ref(),
&branch_snapshot,
&sidecar,
mode,
schema_state_recovery,
)
.await?;
}
// Final refresh so the caller sees the post-sweep state.
coordinator.refresh().await?;
@ -542,22 +550,24 @@ async fn process_sidecar(
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
mode: RecoveryMode,
schema_state_recovery: SchemaStateRecovery,
) -> Result<()> {
let mut classifications = Vec::with_capacity(sidecar.tables.len());
let mut states = Vec::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
let lance_head =
open_lance_head(&pin.table_path, pin.table_branch.as_deref()).await?;
let lance_head = open_lance_head(&pin.table_path, pin.table_branch.as_deref()).await?;
let manifest_pinned = snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0);
classifications.push(classify_table(
pin,
lance_head,
states.push(ClassifiedTable {
classification: classify_table(pin, lance_head, manifest_pinned, sidecar.writer_kind),
manifest_pinned,
sidecar.writer_kind,
));
});
}
let classifications = states
.iter()
.map(|state| state.classification)
.collect::<Vec<_>>();
match decide(&classifications) {
SidecarDecision::Abort => match mode {
@ -605,51 +615,31 @@ async fn process_sidecar(
writer_kind = ?sidecar.writer_kind,
"recovery: rolling back sidecar (mixed or unexpected state)"
);
// Restore every table whose Lance HEAD has drifted from the
// manifest pin (RolledPastExpected, UnexpectedAtP1,
// UnexpectedMultistep). NoMovement tables are already at
// expected_version — no action. Restore is unconditional;
// repeated mid-rollback crashes accumulate a few extra
// Lance commits that `omnigraph cleanup` reclaims.
let mut outcomes = Vec::with_capacity(sidecar.tables.len());
for (pin, cls) in sidecar.tables.iter().zip(classifications.iter()) {
if matches!(
cls,
TableClassification::RolledPastExpected
| TableClassification::UnexpectedAtP1
| TableClassification::UnexpectedMultistep
) {
restore_table_to_version(
&pin.table_path,
pin.table_branch.as_deref(),
pin.expected_version,
)
.await?;
outcomes.push(TableOutcome {
table_key: pin.table_key.clone(),
from_version: snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0),
to_version: pin.expected_version,
});
}
}
// Manifest pin doesn't move on rollback; record an audit-only
// commit at the existing version so operators can correlate via
// `omnigraph commit list --filter actor=omnigraph:recovery`.
record_audit(
root_uri,
sidecar,
snapshot.version(),
RecoveryKind::RolledBack,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states).await
}
SidecarDecision::RollForward => {
if matches!(sidecar.writer_kind, SidecarKind::SchemaApply)
&& !schema_state_recovery.completed_schema_apply_sidecar_rename()
{
return match mode {
RecoveryMode::Full => {
warn!(
operation_id = sidecar.operation_id.as_str(),
"recovery: rolling back SchemaApply sidecar because schema staging \
files were not promoted in this recovery pass"
);
roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states).await
}
RecoveryMode::RollForwardOnly => {
warn!(
operation_id = sidecar.operation_id.as_str(),
"recovery: deferring SchemaApply sidecar because schema staging files \
were not promoted in this recovery pass"
);
Ok(())
}
};
}
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
@ -688,6 +678,64 @@ async fn process_sidecar(
}
}
#[derive(Debug, Clone, Copy)]
struct ClassifiedTable {
classification: TableClassification,
manifest_pinned: u64,
}
async fn roll_back_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
states: &[ClassifiedTable],
) -> Result<()> {
// Restore every table whose Lance HEAD has drifted from the
// manifest pin (RolledPastExpected, UnexpectedAtP1,
// UnexpectedMultistep). NoMovement tables are already at the
// manifest pin — no action. Restore is unconditional; repeated
// mid-rollback crashes accumulate a few extra Lance commits that
// `omnigraph cleanup` reclaims.
let mut outcomes = Vec::with_capacity(sidecar.tables.len());
for (pin, state) in sidecar.tables.iter().zip(states.iter()) {
if matches!(
state.classification,
TableClassification::RolledPastExpected
| TableClassification::UnexpectedAtP1
| TableClassification::UnexpectedMultistep
) {
restore_table_to_version(
&pin.table_path,
pin.table_branch.as_deref(),
state.manifest_pinned,
)
.await?;
outcomes.push(TableOutcome {
table_key: pin.table_key.clone(),
from_version: snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0),
to_version: state.manifest_pinned,
});
}
}
// Manifest pin doesn't move on rollback; record an audit-only
// commit at the existing version so operators can correlate via
// `omnigraph commit list --filter actor=omnigraph:recovery`.
record_audit(
root_uri,
sidecar,
snapshot.version(),
RecoveryKind::RolledBack,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
}
/// Atomically extend every table's manifest pin from `expected_version` to
/// `post_commit_pin` via a single `ManifestBatchPublisher::publish` call.
/// Returns the new manifest version produced by the publish.
@ -710,8 +758,7 @@ async fn roll_forward_all(
) -> Result<(u64, HashMap<String, u64>)> {
let mut updates: Vec<ManifestChange> = Vec::with_capacity(sidecar.tables.len());
let mut expected: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
let mut published_versions: HashMap<String, u64> =
HashMap::with_capacity(sidecar.tables.len());
let mut published_versions: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
// Open the dataset at its CURRENT Lance HEAD on the pin's branch
@ -738,12 +785,11 @@ async fn roll_forward_all(
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
let table_relative_path = super::table_path_for_table_key(&pin.table_key)?;
let version_metadata =
super::metadata::TableVersionMetadata::from_dataset(
root_uri,
&table_relative_path,
&head_ds,
)?;
let version_metadata = super::metadata::TableVersionMetadata::from_dataset(
root_uri,
&table_relative_path,
&head_ds,
)?;
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: pin.table_key.clone(),
@ -779,33 +825,26 @@ async fn record_audit(
kind: RecoveryKind,
outcomes: Vec<TableOutcome>,
) -> Result<()> {
// BranchMerge sidecars carry the source branch's HEAD commit id so
// recovery can record this as a MERGE commit (with parent linkage)
// instead of a plain commit. Without the merge parent, future
// `branch_merge feature → main` between the same pair would not
// recognize "already up-to-date" and merge-base computations break.
//
// For BranchMerge on a non-main target, the parent commit id is the
// TARGET BRANCH's tip — `CommitGraph::open()` returns the global
// commit graph whose `head_commit_id()` is the global head and would
// record the wrong parent. Open the per-branch instance instead.
// Non-main recovery commits must be appended on the sidecar branch's
// commit graph, otherwise parent_commit_id comes from the global
// main head. BranchMerge additionally records the source branch's
// HEAD as merged_parent_commit_id so future merges between the same
// pair recognize "already up-to-date".
let target_branch = sidecar.branch.as_deref();
let mut graph = match target_branch {
Some(branch) => CommitGraph::open_at_branch(root_uri, branch).await?,
None => CommitGraph::open(root_uri).await?,
};
let graph_commit_id = match (
sidecar.writer_kind,
sidecar.merge_source_commit_id.as_deref(),
kind,
) {
(SidecarKind::BranchMerge, Some(source_id), RecoveryKind::RolledForward) => {
let mut branch_graph = match sidecar.branch.as_deref() {
Some(target_branch) => {
CommitGraph::open_at_branch(root_uri, target_branch).await?
}
None => CommitGraph::open(root_uri).await?,
};
let parent_commit_id =
branch_graph.head_commit_id().await?.unwrap_or_default();
branch_graph
let parent_commit_id = graph.head_commit_id().await?.unwrap_or_default();
graph
.append_merge_commit(
sidecar.branch.as_deref(),
target_branch,
manifest_version,
&parent_commit_id,
source_id,
@ -814,13 +853,8 @@ async fn record_audit(
.await?
}
_ => {
let mut graph = CommitGraph::open(root_uri).await?;
graph
.append_commit(
sidecar.branch.as_deref(),
manifest_version,
Some(RECOVERY_ACTOR),
)
.append_commit(target_branch, manifest_version, Some(RECOVERY_ACTOR))
.await?
}
};
@ -915,11 +949,11 @@ pub(crate) fn new_sidecar(
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use crate::storage::LocalStorageAdapter;
use crate::table_store::TableStore;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
fn person_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
@ -1183,11 +1217,10 @@ mod tests {
assert_eq!(post.version().version, head_before + 1);
// Content matches v1 (just alice).
let scanner = post.scan();
let batches: Vec<RecordBatch> = futures::TryStreamExt::try_collect(
scanner.try_into_stream().await.unwrap(),
)
.await
.unwrap();
let batches: Vec<RecordBatch> =
futures::TryStreamExt::try_collect(scanner.try_into_stream().await.unwrap())
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 1);
}
@ -1295,7 +1328,11 @@ mod tests {
// Write sidecars in REVERSE chronological order (newest first).
// The classifier shouldn't care, but the sweep needs deterministic
// processing for reproducibility.
let ids = ["01H000000000000000000000ZZ", "01H000000000000000000000MM", "01H000000000000000000000AA"];
let ids = [
"01H000000000000000000000ZZ",
"01H000000000000000000000MM",
"01H000000000000000000000AA",
];
for id in &ids {
let sc = new_sidecar(
SidecarKind::Mutation,

View file

@ -179,8 +179,9 @@ impl Omnigraph {
// the manifest pin is the consistent snapshot regardless of
// drift on the per-table side or leftover schema-staging files.
if matches!(mode, OpenMode::ReadWrite) {
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
.await?;
let schema_state_recovery =
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
.await?;
// Recovery sweep: close the Phase B → Phase C residual on
// any sidecar left over from a crashed writer. Continuous
// in-process recovery for long-running servers (no restart
@ -191,6 +192,7 @@ impl Omnigraph {
Arc::clone(&storage),
&mut coordinator,
crate::db::manifest::RecoveryMode::Full,
schema_state_recovery,
)
.await?;
}
@ -409,7 +411,7 @@ impl Omnigraph {
/// avoid the recovery sweep racing their own sidecar.
pub async fn refresh(&mut self) -> Result<()> {
self.coordinator.refresh().await?;
recover_schema_state_files(
let schema_state_recovery = recover_schema_state_files(
&self.root_uri,
Arc::clone(&self.storage),
&self.coordinator.snapshot(),
@ -420,18 +422,20 @@ impl Omnigraph {
Arc::clone(&self.storage),
&mut self.coordinator,
crate::db::manifest::RecoveryMode::RollForwardOnly,
schema_state_recovery,
)
.await?;
// Re-read the schema source / catalog from disk: schema-state
// recovery above may have renamed staging files into place
// (completing an in-flight schema_apply), so the on-disk
// `_schema.pg` and IR contract may now reflect a NEWER schema
// than the in-memory `self.catalog` / `self.schema_source`.
// Without this reload subsequent ops on the handle would use
// stale catalog metadata against post-migration data on disk.
// Mirrors `open_with_storage_and_mode`'s schema-load sequence.
self.reload_schema_if_source_changed().await?;
self.runtime_cache.invalidate_all().await;
Ok(())
}
async fn reload_schema_if_source_changed(&mut self) -> Result<()> {
let schema_path = schema_source_uri(&self.root_uri);
let schema_source = self.storage.read_text(&schema_path).await?;
if schema_source == self.schema_source {
return Ok(());
}
let current_source_ir = read_schema_ir_from_source(&schema_source)?;
let branches = self.coordinator.branch_list().await?;
let (accepted_ir, _) = load_or_bootstrap_schema_contract(
@ -445,7 +449,6 @@ impl Omnigraph {
fixup_blob_schemas(&mut catalog);
self.schema_source = schema_source;
self.catalog = catalog;
self.runtime_cache.invalidate_all().await;
Ok(())
}
@ -611,6 +614,23 @@ impl Omnigraph {
table_ops::ensure_indices_on(self, branch).await
}
#[cfg(feature = "failpoints")]
#[doc(hidden)]
pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
&mut self,
branch: &str,
table_key: &str,
table_branch: Option<&str>,
) -> Result<u64> {
table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
self,
branch,
table_key,
table_branch,
)
.await
}
/// Compact small Lance fragments into fewer larger ones across every
/// node + edge table on `main`. See [`optimize`] for details.
pub async fn optimize(&mut self) -> Result<Vec<optimize::TableOptimizeStats>> {
@ -989,7 +1009,6 @@ impl Omnigraph {
pub(crate) async fn invalidate_graph_index(&self) {
table_ops::invalidate_graph_index(self).await
}
}
pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {

View file

@ -302,11 +302,7 @@ pub(super) async fn apply_schema_with_lock(
// open the wrong HEAD here.
let existing = db
.table_store
.open_dataset_head_for_write(
table_key,
&dataset_uri,
entry.table_branch.as_deref(),
)
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
.await?;
let staged = db.table_store.stage_overwrite(&existing, batch).await?;
db.table_store
@ -398,6 +394,8 @@ pub(super) async fn apply_schema_with_lock(
// `recover_schema_state_files`:
// - crash before commit → manifest unchanged; staging deleted on open
// - crash after commit → manifest advanced; staging renamed on open
crate::failpoints::maybe_fail("schema_apply.before_staging_write")?;
let staging_pg_uri = schema_source_staging_uri(&db.root_uri);
db.storage
.write_text(&staging_pg_uri, desired_schema_source)
@ -449,9 +447,7 @@ pub(super) async fn apply_schema_with_lock(
// Failing the schema_apply call would report failure for a migration
// that already succeeded.
if let Some(handle) = recovery_handle {
if let Err(err) =
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
{
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),

View file

@ -31,6 +31,37 @@ pub(super) async fn ensure_indices_on(db: &mut Omnigraph, branch: &str) -> Resul
ensure_indices_for_branch(db, branch.as_deref()).await
}
#[cfg(feature = "failpoints")]
pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test(
db: &mut Omnigraph,
branch: &str,
table_key: &str,
table_branch: Option<&str>,
) -> Result<u64> {
let branch = normalize_branch_name(branch)?;
let snapshot = db.snapshot_for_branch(branch.as_deref()).await?;
let entry = snapshot
.entry(table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
let ds = db
.table_store
.open_dataset_head_for_write(table_key, &full_path, table_branch)
.await?;
let state = db.table_store.table_state(&full_path, &ds).await?;
let update = crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: state.version,
table_branch: table_branch.map(str::to_string),
row_count: state.row_count,
version_metadata: state.version_metadata,
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.to_string(), entry.table_version);
commit_prepared_updates_on_branch_with_expected(db, branch.as_deref(), &[update], &expected)
.await
}
pub(super) async fn ensure_indices_for_branch(
db: &mut Omnigraph,
branch: Option<&str>,
@ -100,9 +131,7 @@ pub(super) async fn ensure_indices_for_branch(
continue;
}
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref())
.await?
{
if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref()).await? {
recovery_pins.push(crate::db::manifest::SidecarTablePin {
table_key,
table_path: full_path,
@ -243,9 +272,7 @@ pub(super) async fn ensure_indices_for_branch(
// per-table commit window regardless). Best-effort cleanup; failing
// the user here would error a call that already succeeded.
if let Some(handle) = recovery_handle {
if let Err(err) =
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
{
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),

View file

@ -285,6 +285,24 @@ fn schema_lock_conflict(detail: impl Into<String>) -> OmniError {
))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SchemaStateRecovery {
Noop,
CleanedStaging,
CompletedStagingRename { schema_apply_sidecar: bool },
}
impl SchemaStateRecovery {
pub(crate) fn completed_schema_apply_sidecar_rename(self) -> bool {
matches!(
self,
Self::CompletedStagingRename {
schema_apply_sidecar: true,
}
)
}
}
/// Reconcile leftover schema staging files (`_schema.pg.staging`,
/// `_schema.ir.json.staging`, `__schema_state.json.staging`) against the
/// manifest snapshot.
@ -306,7 +324,7 @@ pub(crate) async fn recover_schema_state_files(
root_uri: &str,
storage: Arc<dyn StorageAdapter>,
snapshot: &Snapshot,
) -> Result<()> {
) -> Result<SchemaStateRecovery> {
let pg_staging = schema_source_staging_uri(root_uri);
let ir_staging = schema_ir_staging_uri(root_uri);
let state_staging = schema_state_staging_uri(root_uri);
@ -316,7 +334,7 @@ pub(crate) async fn recover_schema_state_files(
let state_exists = storage.exists(&state_staging).await?;
if !pg_exists && !ir_exists && !state_exists {
return Ok(());
return Ok(SchemaStateRecovery::Noop);
}
// Schema-apply atomicity: when a SchemaApply sidecar is present,
@ -335,7 +353,9 @@ pub(crate) async fn recover_schema_state_files(
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
return Ok(());
return Ok(SchemaStateRecovery::CompletedStagingRename {
schema_apply_sidecar: true,
});
}
if !pg_exists {
@ -365,7 +385,9 @@ pub(crate) async fn recover_schema_state_files(
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
return Ok(());
return Ok(SchemaStateRecovery::CompletedStagingRename {
schema_apply_sidecar: false,
});
}
let staging_source = storage.read_text(&pg_staging).await?;
@ -384,7 +406,7 @@ pub(crate) async fn recover_schema_state_files(
"removing leftover schema staging files matching the live schema (no-op apply that crashed)"
);
cleanup_staging_files(root_uri, storage.as_ref()).await?;
return Ok(());
return Ok(SchemaStateRecovery::CleanedStaging);
}
let live_keys = expected_table_keys(&live_ir);
@ -407,14 +429,16 @@ pub(crate) async fn recover_schema_state_files(
snapshot.version()
);
cleanup_staging_files(root_uri, storage.as_ref()).await?;
Ok(())
Ok(SchemaStateRecovery::CleanedStaging)
} else if actual_keys == staging_keys {
warn!(
"schema apply crashed after manifest commit; completing schema-file rename (manifest v{})",
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
Ok(())
Ok(SchemaStateRecovery::CompletedStagingRename {
schema_apply_sidecar: false,
})
} else {
Err(schema_lock_conflict(format!(
"found schema staging files but the manifest's table set ({:?}) matches neither the live schema ({:?}) nor the staging schema ({:?}); manual operator action required",
@ -426,9 +450,7 @@ pub(crate) async fn recover_schema_state_files(
async fn cleanup_staging_files(root_uri: &str, storage: &dyn StorageAdapter) -> Result<()> {
storage.delete(&schema_source_staging_uri(root_uri)).await?;
storage.delete(&schema_ir_staging_uri(root_uri)).await?;
storage
.delete(&schema_state_staging_uri(root_uri))
.await?;
storage.delete(&schema_state_staging_uri(root_uri)).await?;
Ok(())
}

View file

@ -540,6 +540,11 @@ async fn load_jsonl_reader<R: BufRead>(
let (updates, expected_versions, sidecar_handle) = staging
.finalize(db, branch, crate::db::manifest::SidecarKind::Load)
.await?;
// Same finalize → publisher residual as mutations: per-table
// staged commits have advanced Lance HEAD, but the manifest
// publish has not run yet. Reuse the mutation failpoint name so
// one failpoint pins the shared `MutationStaging` boundary.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
.await?;
// The recovery sidecar protects the per-table commit_staged →

View file

@ -6,12 +6,50 @@ use fail::FailScenario;
use omnigraph::db::Omnigraph;
use omnigraph::failpoints::ScopedFailPoint;
use helpers::recovery::{
FollowUpMutation, RecoveryExpectation, TableExpectation, assert_post_recovery_invariants,
branch_head_commit_id, single_sidecar_operation_id,
};
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main};
const SCHEMA_V1: &str = "node Person { name: String @key }\n";
const SCHEMA_V2_ADDED_TYPE: &str =
"node Person { name: String @key }\nnode Company { name: String @key }\n";
fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100_0000_01b3);
}
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
fn person_batch(rows: &[(&str, &str, Option<i32>)]) -> arrow_array::RecordBatch {
use std::sync::Arc;
use arrow_array::{Int32Array, StringArray};
use arrow_schema::{DataType, Field, Schema};
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
Field::new("name", DataType::Utf8, false),
]));
let ids: Vec<&str> = rows.iter().map(|(id, _, _)| *id).collect();
let names: Vec<&str> = rows.iter().map(|(_, name, _)| *name).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, _, age)| *age).collect();
arrow_array::RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
Arc::new(StringArray::from(names)),
],
)
.unwrap()
}
#[tokio::test]
async fn branch_create_failpoint_triggers() {
let _scenario = FailScenario::setup();
@ -174,12 +212,12 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
// Phase A: trigger the residual.
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
// The mutation's finalize completes (commit_staged advances Lance
// HEAD on node:Person AND writes a `__recovery/{ulid}.json`
@ -195,9 +233,8 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() {
.await
.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: mutation.post_finalize_pre_publisher"
),
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
@ -212,6 +249,7 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() {
1,
"exactly one sidecar should persist after the finalize failure"
);
operation_id = single_sidecar_operation_id(dir.path());
// Drop the failpoint scope and the engine handle.
}
@ -220,21 +258,7 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() {
// sidecar, classifies node:Person as RolledPastExpected, decides
// RollForward, publishes the manifest update, records the audit
// row, deletes the sidecar.
let mut db = Omnigraph::open(&uri).await.unwrap();
// Sidecar gone — sweep completed end to end.
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted after successful roll-forward; remaining: {:?}",
remaining,
);
}
let db = Omnigraph::open(&uri).await.unwrap();
// The originally-attempted "Eve" insert is now visible — the recovery
// sweep extended the manifest pin to include the staged commit.
@ -243,27 +267,258 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() {
person_count, 1,
"exactly one person (Eve) must be visible after roll-forward"
);
drop(db);
// The next mutation on the same table succeeds — no ExpectedVersionMismatch.
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![TableExpectation::main("node:Person").follow_up_mutation(
FollowUpMutation::new(
"main",
MUTATION_QUERIES,
"insert_person",
mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
),
)],
},
)
.await
.expect("next mutation must succeed after recovery rolled forward");
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
let person_count = helpers::count_rows(&db, "node:Person").await;
assert_eq!(
person_count, 2,
"Frank's insert must land normally after recovery"
);
}
// Audit row recorded.
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
assert!(
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after a successful recovery"
#[tokio::test]
async fn recovery_rolls_forward_load_on_feature_branch() {
use omnigraph::loader::LoadMode;
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
let main_person_pin;
let feature_parent_commit_id;
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "BeforeLoad")], &[("$age", 40)]),
)
.await
.unwrap();
main_person_pin = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.expect("main must have Person")
.table_version;
feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap();
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = db
.load(
"feature",
r#"{"type":"Person","data":{"name":"FeatureLoad","age":41}}
"#,
LoadMode::Append,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
2,
"feature branch load row must be visible after recovery"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
0,
"feature branch load recovery must not publish the row to main"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::branch("node:Person", "feature")
.expected_main_manifest_pin(main_person_pin)
.expected_recovery_parent_commit_id(feature_parent_commit_id)
.follow_up_mutation(FollowUpMutation::new(
"feature",
MUTATION_QUERIES,
"insert_person",
mixed_params(&[("$name", "AfterLoad")], &[("$age", 42)]),
)),
],
},
)
.await
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
3,
"follow-up feature mutation must succeed after load recovery"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
0,
"follow-up feature mutation must not move main"
);
}
#[tokio::test]
async fn recovery_rolls_forward_ensure_indices_on_feature_branch() {
use lance_index::DatasetIndexExt;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
let feature_parent_commit_id;
let main_person_pin;
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "BeforeEnsure")], &[("$age", 42)]),
)
.await
.unwrap();
main_person_pin = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.expect("main must have Person")
.table_version;
// Make the feature branch's Person table genuinely need index work
// while keeping the manifest internally consistent. The test-only
// publisher deliberately skips the normal index-rebuild preparation;
// the failed writer below is still the real `ensure_indices_on`.
let person_uri = node_table_uri(&uri, "Person");
let store = TableStore::new(&uri);
let mut ds = store
.open_dataset_head(&person_uri, Some("feature"))
.await
.unwrap();
ds.drop_index("id_idx").await.unwrap();
let dropped_index_head = ds.version().version;
db.failpoint_publish_table_head_without_index_rebuild_for_test(
"feature",
"node:Person",
Some("feature"),
)
.await
.unwrap();
let feature_snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("feature"))
.await
.unwrap();
assert_eq!(
feature_snapshot
.entry("node:Person")
.expect("feature must have Person")
.table_version,
dropped_index_head,
"test setup must publish the dropped-index table head before ensure_indices runs",
);
feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap();
{
let _failpoint =
ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return");
let err = db.ensure_indices_on("feature").await.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
operation_id = single_sidecar_operation_id(dir.path());
}
drop(db);
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
2,
"feature should see inherited alice plus recovered branch-local row"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"ensure_indices branch recovery must not move main"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::branch("node:Person", "feature")
.expected_main_manifest_pin(main_person_pin)
.expected_recovery_parent_commit_id(feature_parent_commit_id)
.follow_up_mutation(FollowUpMutation::new(
"feature",
MUTATION_QUERIES,
"insert_person",
mixed_params(&[("$name", "AfterEnsure")], &[("$age", 44)]),
)),
],
},
)
.await
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
3,
"follow-up feature mutation must succeed after ensure_indices recovery"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"follow-up feature mutation must not move main"
);
}
@ -286,8 +541,7 @@ async fn refresh_runs_roll_forward_recovery_in_process() {
// Phase A: trigger the residual (sidecar persists; manifest unchanged).
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
@ -297,9 +551,8 @@ async fn refresh_runs_roll_forward_recovery_in_process() {
.await
.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: mutation.post_finalize_pre_publisher"
),
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
@ -447,7 +700,9 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
// Trigger refresh-time recovery directly. Sidecar is rollback-
// eligible (UnexpectedAtP1); RollForwardOnly mode defers it,
// leaving the sidecar on disk and Lance HEAD unchanged on Person.
db.refresh().await.expect("refresh must succeed (deferring rollback)");
db.refresh()
.await
.expect("refresh must succeed (deferring rollback)");
// Sidecar still on disk.
assert_eq!(
@ -509,8 +764,7 @@ async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
.unwrap();
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let _ = mutate_main(
&mut db,
MUTATION_QUERIES,
@ -570,13 +824,14 @@ async fn ensure_indices_phase_a_btree_failure_leaves_existing_tables_writable()
// node:Project table's btree-on-id build. (TEST_SCHEMA already
// has Person + Company + Knows + WorksAt — pick a name that isn't
// already declared.)
let extended_schema = format!("{}\nnode Project {{ name: String @key }}\n", helpers::TEST_SCHEMA);
let extended_schema = format!(
"{}\nnode Project {{ name: String @key }}\n",
helpers::TEST_SCHEMA
);
{
let _failpoint = ScopedFailPoint::new(
"ensure_indices.post_stage_pre_commit_btree",
"return",
);
let _failpoint =
ScopedFailPoint::new("ensure_indices.post_stage_pre_commit_btree", "return");
let err = db.apply_schema(&extended_schema).await.unwrap_err();
assert!(
err.to_string()
@ -629,6 +884,98 @@ fn assert_no_staging_files(repo: &std::path::Path) {
// recorded, sidecar deleted) and a follow-up operation succeeds without
// ExpectedVersionMismatch.
#[tokio::test]
async fn schema_apply_without_schema_staging_rolls_back_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
}
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
{
let mut db = Omnigraph::open(&uri).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.before_staging_write", "return");
let v2_schema = r#"node Person {
name: String @key
age: I32?
city: String?
}
node Company {
name: String @key
}
node Tag {
label: String @key
}
edge Knows: Person -> Person {
since: Date?
}
edge WorksAt: Person -> Company
"#;
let err = db.apply_schema(v2_schema).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.before_staging_write"),
"unexpected error: {err}"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
version_main(&db).await.unwrap(),
pre_failure_version,
"manifest must remain on the old schema when no schema staging files existed"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"old-schema data must remain readable after rollback"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledBack {
tables: vec![TableExpectation::main("node:Person")],
},
)
.await
.unwrap();
let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap();
assert!(
!live_schema.contains("city: String?"),
"_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}",
);
assert!(
!live_schema.contains("node Tag"),
"_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}",
);
}
#[tokio::test]
async fn schema_apply_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
@ -636,6 +983,7 @@ async fn schema_apply_phase_b_failure_recovered_on_next_open() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
// Seed: a Person table with one row so the schema-apply rewritten_tables
// loop has actual work to do.
@ -710,6 +1058,7 @@ edge WorksAt: Person -> Company
1,
"exactly one sidecar must persist after schema_apply failure"
);
operation_id = single_sidecar_operation_id(dir.path());
}
// Phase B: reopen runs the recovery sweep. Sidecar's writer_kind is
@ -718,25 +1067,6 @@ edge WorksAt: Person -> Company
// current Lance HEAD.
let db = Omnigraph::open(&uri).await.unwrap();
// Sidecar gone, audit row recorded.
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted; remaining: {:?}",
remaining,
);
}
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
assert!(
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after schema_apply recovery"
);
// Recovery sweep must have advanced the manifest pin on the rewritten
// table: roll-forward published the post-failure Lance HEAD.
let post_recovery_version = version_main(&db).await.unwrap();
@ -745,6 +1075,17 @@ edge WorksAt: Person -> Company
"manifest version must advance post-recovery; pre={pre_failure_version}, \
post={post_recovery_version}",
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![TableExpectation::main("node:Person")],
},
)
.await
.unwrap();
// Schema-apply atomicity: the live `_schema.pg` must reflect the
// NEW schema (city column on Person, Tag node type) — not the old.
@ -761,7 +1102,6 @@ edge WorksAt: Person -> Company
live_schema.contains("node Tag"),
"_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}",
);
drop(db);
}
#[tokio::test]
@ -939,6 +1279,8 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
let target_parent_commit_id;
// Setup:
// main: alice
@ -975,41 +1317,18 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
.unwrap();
}
// Capture target_branch's commit-graph head BEFORE the failed merge.
// This is the commit the recovery's merge commit must claim as its
// `parent_commit_id` (D2 — without the per-branch CommitGraph fix,
// recovery would record the GLOBAL head as parent instead).
let target_branch_head_before_failure = {
let commits_dir = dir.path().join("_graph_commits.lance");
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
let main_person_pin = {
let db = Omnigraph::open(&uri).await.unwrap();
db.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.checkout_branch("target_branch")
.await
.unwrap();
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap();
// Grab the latest commit_id by created_at order (the per-branch
// checkout ensures we only see target_branch's commits).
let mut latest: Option<(i64, String)> = None;
for batch in batches {
let ids = batch
.column_by_name("graph_commit_id").unwrap()
.as_any().downcast_ref::<StringArray>().unwrap();
let created = batch
.column_by_name("created_at").unwrap()
.as_any().downcast_ref::<arrow_array::TimestampMicrosecondArray>().unwrap();
for i in 0..ids.len() {
let ts = created.value(i);
if latest.as_ref().is_none_or(|(t, _)| ts > *t) {
latest = Some((ts, ids.value(i).to_string()));
}
}
}
latest.expect("target_branch must have at least one commit (the insert-Bob mutate)").1
.entry("node:Person")
.expect("main must have Person")
.table_version
};
target_parent_commit_id = branch_head_commit_id(dir.path(), "target_branch")
.await
.unwrap();
// Phase A: failpoint fires after the per-table publish loop completes
// but before commit_manifest_updates. Sidecar persists with
@ -1031,105 +1350,31 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
let recovery_dir = dir.path().join("__recovery");
let sidecar_count = std::fs::read_dir(&recovery_dir).unwrap().count();
assert_eq!(
sidecar_count,
1,
sidecar_count, 1,
"exactly one sidecar must persist after non-main branch_merge failure"
);
operation_id = single_sidecar_operation_id(dir.path());
}
// Phase B: reopen runs full sweep. The BranchMerge sidecar's branch
// = Some("target_branch"); D2 fix opens a per-branch CommitGraph
// for the audit append so the merge-parent linkage is correct.
let _db = Omnigraph::open(&uri).await.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
drop(db);
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted; remaining: {:?}",
remaining,
);
}
// Find the recovery commit on target_branch's commit graph and
// assert its `parent_commit_id` matches the head we captured BEFORE
// the failed merge. This is what catches D2: without the
// per-branch CommitGraph fix, recovery records the GLOBAL head as
// parent, which on this test setup is the source_branch's
// insert-Carol commit (a different ULID), and the assertion fails.
//
// `merged_parent_commit_id` alone is insufficient — it's
// independently populated from sidecar.merge_source_commit_id, so
// it would be set correctly even with D2's bug.
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let commits_dir = dir.path().join("_graph_commits.lance");
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
.await
.unwrap()
.checkout_branch("target_branch")
.await
.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let mut recovery_merge_parent: Option<String> = None;
let mut recovery_merge_merged_parent: Option<String> = None;
for batch in batches {
let merged = batch
.column_by_name("merged_parent_commit_id")
.expect("merged_parent_commit_id column present")
.as_any()
.downcast_ref::<StringArray>()
.expect("merged_parent_commit_id is Utf8");
let parents = batch
.column_by_name("parent_commit_id")
.expect("parent_commit_id column present")
.as_any()
.downcast_ref::<StringArray>()
.expect("parent_commit_id is Utf8");
for i in 0..merged.len() {
if !merged.is_null(i) {
// First (and only — single recovery, single merge commit)
// commit with a merged parent IS the recovery commit.
recovery_merge_parent = if parents.is_null(i) {
None
} else {
Some(parents.value(i).to_string())
};
recovery_merge_merged_parent = Some(merged.value(i).to_string());
break;
}
}
if recovery_merge_parent.is_some() {
break;
}
}
let recovery_parent = recovery_merge_parent
.expect("non-main branch_merge recovery must record a merge commit with parent_commit_id");
assert_eq!(
recovery_parent, target_branch_head_before_failure,
"recovery merge commit's parent_commit_id must == target_branch's pre-failure head; \
expected {}, got {} this would regress to the GLOBAL head if D2's per-branch \
CommitGraph::open_at_branch fix were removed",
target_branch_head_before_failure, recovery_parent,
);
// Sanity: merged_parent is set from the source branch (independent
// of D2; would be correct even with the bug, but we still verify
// it's non-null so the row is a true merge commit).
assert!(
recovery_merge_merged_parent.is_some(),
"recovery merge commit must have non-null merged_parent_commit_id"
);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::branch("node:Person", "target_branch")
.expected_main_manifest_pin(main_person_pin)
.expected_recovery_parent_commit_id(target_parent_commit_id),
],
},
)
.await
.unwrap();
}
/// `ensure_indices` only writes a sidecar when at least one table
@ -1180,10 +1425,8 @@ async fn ensure_indices_phase_b_failure_does_not_leak_sidecar_when_no_work_neede
// still fires, surfacing the Err.
{
let mut db = Omnigraph::open(&uri).await.unwrap();
let _failpoint = ScopedFailPoint::new(
"ensure_indices.post_phase_b_pre_manifest_commit",
"return",
);
let _failpoint =
ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return");
let err = db.ensure_indices().await.unwrap_err();
assert!(
err.to_string().contains(

View file

@ -1,5 +1,7 @@
#![allow(dead_code)]
pub mod recovery;
use arrow_array::{Array, RecordBatch, StringArray};
use futures::TryStreamExt;

View file

@ -0,0 +1,559 @@
use std::path::{Path, PathBuf};
use arrow_array::{Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use lance::Dataset;
use omnigraph::db::commit_graph::CommitGraph;
use omnigraph::db::{GraphCommit, Omnigraph, ReadTarget, SubTableEntry};
use omnigraph::error::{OmniError, Result};
use omnigraph_compiler::ir::ParamMap;
use serde::Deserialize;
const RECOVERY_ACTOR: &str = "omnigraph:recovery";
#[derive(Debug)]
pub enum RecoveryExpectation {
RolledForward { tables: Vec<TableExpectation> },
RolledBack { tables: Vec<TableExpectation> },
Deferred,
NoOp,
}
#[derive(Debug)]
pub struct TableExpectation {
pub table_key: String,
pub branch: Option<String>,
pub expected_lance_head: Option<u64>,
pub expected_main_manifest_pin: Option<u64>,
pub expected_recovery_parent_commit_id: Option<String>,
pub follow_up_mutation: Option<FollowUpMutation>,
}
#[derive(Debug)]
pub struct FollowUpMutation {
pub branch: String,
pub query_source: String,
pub query_name: String,
pub params: ParamMap,
}
#[derive(Debug, Clone)]
struct RecoveryAuditRow {
graph_commit_id: String,
recovery_kind: String,
operation_id: String,
sidecar_writer_kind: String,
per_table_outcomes: Vec<TableOutcome>,
}
#[derive(Debug, Clone, Deserialize)]
struct TableOutcome {
table_key: String,
to_version: u64,
}
impl TableExpectation {
pub fn main(table_key: impl Into<String>) -> Self {
Self::new(table_key, None::<String>)
}
pub fn branch(table_key: impl Into<String>, branch: impl Into<String>) -> Self {
Self::new(table_key, Some(branch))
}
pub fn new(table_key: impl Into<String>, branch: Option<impl Into<String>>) -> Self {
Self {
table_key: table_key.into(),
branch: branch.map(Into::into),
expected_lance_head: None,
expected_main_manifest_pin: None,
expected_recovery_parent_commit_id: None,
follow_up_mutation: None,
}
}
pub fn expected_lance_head(mut self, version: u64) -> Self {
self.expected_lance_head = Some(version);
self
}
pub fn expected_main_manifest_pin(mut self, version: u64) -> Self {
self.expected_main_manifest_pin = Some(version);
self
}
pub fn expected_recovery_parent_commit_id(mut self, commit_id: impl Into<String>) -> Self {
self.expected_recovery_parent_commit_id = Some(commit_id.into());
self
}
pub fn follow_up_mutation(mut self, mutation: FollowUpMutation) -> Self {
self.follow_up_mutation = Some(mutation);
self
}
}
impl FollowUpMutation {
pub fn new(
branch: impl Into<String>,
query_source: impl Into<String>,
query_name: impl Into<String>,
params: ParamMap,
) -> Self {
Self {
branch: branch.into(),
query_source: query_source.into(),
query_name: query_name.into(),
params,
}
}
}
pub fn single_sidecar_operation_id(repo_root: &Path) -> String {
let ids = sidecar_operation_ids(repo_root);
assert_eq!(
ids.len(),
1,
"expected exactly one recovery sidecar under __recovery/, got {:?}",
ids,
);
ids.into_iter().next().unwrap()
}
pub fn sidecar_operation_ids(repo_root: &Path) -> Vec<String> {
let dir = repo_root.join("__recovery");
if !dir.exists() {
return Vec::new();
}
let mut ids = std::fs::read_dir(&dir)
.unwrap()
.filter_map(|entry| {
let entry = entry.ok()?;
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
return None;
}
path.file_stem()
.and_then(|stem| stem.to_str())
.map(str::to_string)
})
.collect::<Vec<_>>();
ids.sort();
ids
}
pub async fn branch_head_commit_id(repo_root: &Path, branch: &str) -> Result<String> {
let graph = match branch {
"main" => CommitGraph::open(&repo_uri(repo_root)).await?,
branch => CommitGraph::open_at_branch(&repo_uri(repo_root), branch).await?,
};
graph.head_commit_id().await?.ok_or_else(|| {
OmniError::manifest_internal(format!("commit graph for branch {branch} has no head"))
})
}
pub async fn assert_post_recovery_invariants(
repo_root: &Path,
operation_id: &str,
expectation: RecoveryExpectation,
) -> Result<()> {
match expectation {
RecoveryExpectation::RolledForward { tables } => {
assert_sidecar_absent(repo_root, operation_id);
let audit = read_audit_row(repo_root, operation_id).await?;
assert_eq!(
audit.recovery_kind, "RolledForward",
"audit row for {operation_id} recorded the wrong recovery_kind",
);
assert_manifest_pins_match_lance_heads(repo_root, &tables).await?;
assert_audit_to_versions_match_lance_heads(repo_root, &audit, &tables).await?;
assert_recovery_commit_shape(repo_root, &audit, &tables).await?;
assert_non_main_did_not_move_main(repo_root, &tables).await?;
assert_idempotent_reopen(repo_root, operation_id).await?;
run_follow_up_mutations(repo_root, tables).await?;
}
RecoveryExpectation::RolledBack { tables } => {
assert_sidecar_absent(repo_root, operation_id);
let audit = read_audit_row(repo_root, operation_id).await?;
assert_eq!(
audit.recovery_kind, "RolledBack",
"audit row for {operation_id} recorded the wrong recovery_kind",
);
assert_recovery_commit_shape(repo_root, &audit, &tables).await?;
assert_non_main_did_not_move_main(repo_root, &tables).await?;
assert_idempotent_reopen(repo_root, operation_id).await?;
run_follow_up_mutations(repo_root, tables).await?;
}
RecoveryExpectation::Deferred => {
assert!(
sidecar_path(repo_root, operation_id).exists(),
"deferred recovery must leave sidecar {operation_id} on disk",
);
assert!(
read_audit_row(repo_root, operation_id).await.is_err(),
"deferred recovery must not record an audit row for {operation_id}",
);
}
RecoveryExpectation::NoOp => {
assert_sidecar_absent(repo_root, operation_id);
assert!(
read_audit_row(repo_root, operation_id).await.is_err(),
"no-op recovery must not record an audit row for {operation_id}",
);
}
}
Ok(())
}
fn branch_context(tables: &[TableExpectation]) -> Option<String> {
tables
.iter()
.filter_map(|table| table.branch.as_deref())
.find(|branch| *branch != "main")
.map(str::to_string)
}
fn sidecar_path(repo_root: &Path, operation_id: &str) -> PathBuf {
repo_root
.join("__recovery")
.join(format!("{operation_id}.json"))
}
fn assert_sidecar_absent(repo_root: &Path, operation_id: &str) {
assert!(
!sidecar_path(repo_root, operation_id).exists(),
"recovery sidecar {operation_id} must be deleted after successful recovery",
);
}
async fn assert_manifest_pins_match_lance_heads(
repo_root: &Path,
tables: &[TableExpectation],
) -> Result<()> {
let uri = repo_uri(repo_root);
let db = Omnigraph::open(&uri).await?;
for table in tables {
let (entry, lance_head) = entry_and_lance_head(&db, &uri, table).await?;
assert_eq!(
entry.table_version, lance_head,
"manifest pin for {} on {:?} must match Lance HEAD after roll-forward",
table.table_key, table.branch,
);
if let Some(expected) = table.expected_lance_head {
assert_eq!(
lance_head, expected,
"Lance HEAD for {} on {:?} did not match the test's expected value",
table.table_key, table.branch,
);
}
}
Ok(())
}
async fn assert_audit_to_versions_match_lance_heads(
repo_root: &Path,
audit: &RecoveryAuditRow,
tables: &[TableExpectation],
) -> Result<()> {
let uri = repo_uri(repo_root);
let db = Omnigraph::open(&uri).await?;
for table in tables {
let (_, lance_head) = entry_and_lance_head(&db, &uri, table).await?;
let outcome = audit
.per_table_outcomes
.iter()
.find(|outcome| outcome.table_key == table.table_key)
.ok_or_else(|| {
OmniError::manifest_internal(format!(
"audit row for {} has no outcome for {}",
audit.operation_id, table.table_key,
))
})?;
assert_eq!(
outcome.to_version, lance_head,
"audit to_version for {} must match the published Lance HEAD",
table.table_key,
);
}
Ok(())
}
async fn assert_non_main_did_not_move_main(
repo_root: &Path,
tables: &[TableExpectation],
) -> Result<()> {
let uri = repo_uri(repo_root);
let db = Omnigraph::open(&uri).await?;
let main = db.snapshot_of(ReadTarget::branch("main")).await?;
for table in tables {
let Some(expected) = table.expected_main_manifest_pin else {
continue;
};
let entry = main.entry(&table.table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"main snapshot has no entry for {}",
table.table_key,
))
})?;
assert_eq!(
entry.table_version, expected,
"non-main recovery for {} on {:?} must not move main's manifest pin",
table.table_key, table.branch,
);
}
Ok(())
}
async fn assert_recovery_commit_shape(
repo_root: &Path,
audit: &RecoveryAuditRow,
tables: &[TableExpectation],
) -> Result<()> {
let branch = branch_context(tables);
let expected_parent = expected_recovery_parent(tables)?;
let branch = branch.as_deref();
let commit = read_recovery_commit(repo_root, audit, branch).await?;
assert_eq!(
commit.actor_id.as_deref(),
Some(RECOVERY_ACTOR),
"recovery commit {} for operation {} must use actor {}",
commit.graph_commit_id,
audit.operation_id,
RECOVERY_ACTOR,
);
if let Some(expected_parent) = expected_parent {
assert_eq!(
commit.parent_commit_id.as_deref(),
Some(expected_parent.as_str()),
"recovery commit {} for operation {} recorded the wrong parent",
commit.graph_commit_id,
audit.operation_id,
);
}
if audit.sidecar_writer_kind == "BranchMerge" {
assert!(
commit.merged_parent_commit_id.is_some(),
"recovered BranchMerge must record merged_parent_commit_id",
);
if let Some(branch) = branch {
let graph = CommitGraph::open_at_branch(&repo_uri(repo_root), branch).await?;
let commits = graph.load_commits().await?;
let parent = commit.parent_commit_id.as_deref().ok_or_else(|| {
OmniError::manifest_internal(format!(
"recovered BranchMerge commit {} has no parent_commit_id",
commit.graph_commit_id,
))
})?;
assert!(
commits
.iter()
.any(|candidate| candidate.graph_commit_id == parent),
"recovered BranchMerge parent_commit_id {} is not on target branch {}",
parent,
branch,
);
}
}
Ok(())
}
fn expected_recovery_parent(tables: &[TableExpectation]) -> Result<Option<String>> {
let mut expected = None;
for table in tables {
let Some(candidate) = &table.expected_recovery_parent_commit_id else {
continue;
};
match &expected {
None => expected = Some(candidate.clone()),
Some(existing) if existing == candidate => {}
Some(existing) => {
return Err(OmniError::manifest_internal(format!(
"conflicting expected recovery parents in table expectations: {existing} vs {candidate}",
)));
}
}
}
Ok(expected)
}
async fn assert_idempotent_reopen(repo_root: &Path, operation_id: &str) -> Result<()> {
let before = matching_audit_rows(repo_root, operation_id).await?;
let uri = repo_uri(repo_root);
let _db = Omnigraph::open(&uri).await?;
assert_sidecar_absent(repo_root, operation_id);
let after = matching_audit_rows(repo_root, operation_id).await?;
assert_eq!(
after.len(),
before.len(),
"immediate reopen after recovery must be a clean no-op for {operation_id}",
);
Ok(())
}
async fn run_follow_up_mutations(repo_root: &Path, tables: Vec<TableExpectation>) -> Result<()> {
let mut db: Option<Omnigraph> = None;
for table in tables {
let Some(mutation) = table.follow_up_mutation else {
continue;
};
if db.is_none() {
db = Some(Omnigraph::open(&repo_uri(repo_root)).await?);
}
let db = db.as_mut().unwrap();
db.mutate(
&mutation.branch,
&mutation.query_source,
&mutation.query_name,
&mutation.params,
)
.await
.map_err(|err| {
OmniError::manifest_internal(format!(
"follow-up mutation {} on {} after recovery failed: {}",
mutation.query_name, table.table_key, err,
))
})?;
}
Ok(())
}
async fn entry_and_lance_head(
db: &Omnigraph,
root_uri: &str,
table: &TableExpectation,
) -> Result<(SubTableEntry, u64)> {
let branch = table.branch.as_deref().unwrap_or("main");
let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?;
let entry = snapshot
.entry(&table.table_key)
.ok_or_else(|| {
OmniError::manifest_internal(format!(
"snapshot for branch {branch} has no entry for {}",
table.table_key,
))
})?
.clone();
let lance_head = lance_head_for_entry(root_uri, &entry).await?;
Ok((entry, lance_head))
}
async fn lance_head_for_entry(root_uri: &str, entry: &SubTableEntry) -> Result<u64> {
let table_uri = format!("{}/{}", root_uri.trim_end_matches('/'), entry.table_path);
let ds = Dataset::open(&table_uri)
.await
.map_err(|err| OmniError::Lance(err.to_string()))?;
let ds = match entry.table_branch.as_deref() {
Some(branch) if branch != "main" => ds
.checkout_branch(branch)
.await
.map_err(|err| OmniError::Lance(err.to_string()))?,
_ => ds,
};
Ok(ds.version().version)
}
async fn read_recovery_commit(
repo_root: &Path,
audit: &RecoveryAuditRow,
branch: Option<&str>,
) -> Result<GraphCommit> {
let uri = repo_uri(repo_root);
let graph = match branch {
Some(branch) => CommitGraph::open_at_branch(&uri, branch).await?,
None => CommitGraph::open(&uri).await?,
};
graph
.load_commits()
.await?
.into_iter()
.find(|commit| commit.graph_commit_id == audit.graph_commit_id)
.ok_or_else(|| {
OmniError::manifest_internal(format!(
"recovery commit {} for operation {} was not found",
audit.graph_commit_id, audit.operation_id,
))
})
}
async fn read_audit_row(repo_root: &Path, operation_id: &str) -> Result<RecoveryAuditRow> {
let mut rows = matching_audit_rows(repo_root, operation_id).await?;
if rows.len() != 1 {
return Err(OmniError::manifest_internal(format!(
"expected exactly one recovery audit row for {operation_id}, got {}",
rows.len(),
)));
}
Ok(rows.remove(0))
}
async fn matching_audit_rows(
repo_root: &Path,
operation_id: &str,
) -> Result<Vec<RecoveryAuditRow>> {
let recoveries_dir = repo_root.join("_graph_commit_recoveries.lance");
if !recoveries_dir.exists() {
return Ok(Vec::new());
}
let ds = Dataset::open(recoveries_dir.to_str().unwrap())
.await
.map_err(|err| OmniError::Lance(err.to_string()))?;
let batches: Vec<RecordBatch> = ds
.scan()
.try_into_stream()
.await
.map_err(|err| OmniError::Lance(err.to_string()))?
.try_collect()
.await
.map_err(|err| OmniError::Lance(err.to_string()))?;
let mut rows = Vec::new();
for batch in batches {
let graph_commit_ids = string_column(&batch, "graph_commit_id")?;
let kinds = string_column(&batch, "recovery_kind")?;
let ops = string_column(&batch, "operation_id")?;
let writers = string_column(&batch, "sidecar_writer_kind")?;
let outcomes_json = string_column(&batch, "per_table_outcomes_json")?;
for row in 0..batch.num_rows() {
if ops.value(row) != operation_id {
continue;
}
let per_table_outcomes =
serde_json::from_str(outcomes_json.value(row)).map_err(|err| {
OmniError::manifest_internal(format!(
"failed to parse recovery audit outcomes for {operation_id}: {err}",
))
})?;
rows.push(RecoveryAuditRow {
graph_commit_id: graph_commit_ids.value(row).to_string(),
recovery_kind: kinds.value(row).to_string(),
operation_id: ops.value(row).to_string(),
sidecar_writer_kind: writers.value(row).to_string(),
per_table_outcomes,
});
}
}
Ok(rows)
}
fn string_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a StringArray> {
batch
.column_by_name(name)
.ok_or_else(|| {
OmniError::manifest_internal(format!("recovery audit batch missing '{name}'"))
})?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
OmniError::manifest_internal(format!("recovery audit column '{name}' is not Utf8"))
})
}
fn repo_uri(repo_root: &Path) -> String {
repo_root.to_str().unwrap().to_string()
}

View file

@ -18,6 +18,7 @@ use lance::Dataset;
use omnigraph::db::Omnigraph;
mod helpers;
use helpers::recovery::{RecoveryExpectation, TableExpectation, assert_post_recovery_invariants};
const TEST_SCHEMA: &str = include_str!("fixtures/test.pg");
@ -185,7 +186,9 @@ async fn recovery_rolls_back_synthetic_drift_on_open() {
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Person","data":{"name":"bob","age":25}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
load_jsonl(&mut db, test_data, LoadMode::Append)
.await
.unwrap();
drop(db);
// Synthetic drift: advance Person's Lance HEAD WITHOUT updating the
@ -289,8 +292,14 @@ async fn count_recovery_audit_rows(repo_root: &Path) -> usize {
.await
.expect("recoveries dataset opens");
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
batches.iter().map(|b| b.num_rows()).sum()
}
@ -303,13 +312,17 @@ async fn read_latest_recovery_audit(
if !recoveries_dir.exists() {
return None;
}
let ds = Dataset::open(recoveries_dir.to_str().unwrap())
.await
.ok()?;
let ds = Dataset::open(recoveries_dir.to_str().unwrap()).await.ok()?;
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.ok()?.try_collect().await.ok()?;
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.ok()?
.try_collect()
.await
.ok()?;
let last_batch = batches.iter().filter(|b| b.num_rows() > 0).last()?;
let row = last_batch.num_rows() - 1;
let kinds = last_batch
@ -349,11 +362,19 @@ async fn list_recovery_audit_kinds(repo_root: &Path) -> Vec<String> {
if !recoveries_dir.exists() {
return Vec::new();
}
let ds = Dataset::open(recoveries_dir.to_str().unwrap()).await.unwrap();
let ds = Dataset::open(recoveries_dir.to_str().unwrap())
.await
.unwrap();
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let mut out = Vec::new();
for batch in batches {
let kinds = batch
@ -378,8 +399,14 @@ async fn count_recovery_actor_commits(repo_root: &Path) -> usize {
let ds = Dataset::open(actors_dir.to_str().unwrap()).await.unwrap();
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let mut count = 0;
for batch in &batches {
let actors = batch
@ -411,7 +438,9 @@ async fn recovery_rolls_forward_after_phase_b_completes() {
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Person","data":{"name":"bob","age":25}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
load_jsonl(&mut db, test_data, LoadMode::Append)
.await
.unwrap();
drop(db);
let person_uri = node_table_uri(uri, "Person");
@ -454,48 +483,18 @@ async fn recovery_rolls_forward_after_phase_b_completes() {
// Reopen — sweep must roll forward, advancing the manifest pin to
// head_after via a single ManifestBatchPublisher::publish call.
let _db = Omnigraph::open(uri).await.unwrap();
let db = Omnigraph::open(uri).await.unwrap();
drop(db);
// Sidecar deleted (sweep completed end-to-end).
assert!(
!list_recovery_dir(dir.path()).contains(&"01H00000000000000000000RF.json".to_string()),
"sidecar must be deleted after successful roll-forward"
);
// Audit row recorded.
assert_eq!(
count_recovery_audit_rows(dir.path()).await,
1,
"roll-forward must record exactly one audit row"
);
assert_eq!(
count_recovery_actor_commits(dir.path()).await,
1,
"roll-forward must record exactly one commit-graph row tagged with omnigraph:recovery"
);
let audit = read_latest_recovery_audit(dir.path()).await;
assert_eq!(
audit,
Some((
"RolledForward".to_string(),
Some("act-alice".to_string()),
"01H00000000000000000000RF".to_string(),
"Mutation".to_string(),
)),
"audit row content mismatch"
);
// Idempotency: reopen is a no-op.
let _db2 = Omnigraph::open(uri).await.unwrap();
assert!(
list_recovery_dir(dir.path()).is_empty(),
"second open must be a clean no-op"
);
assert_eq!(
count_recovery_audit_rows(dir.path()).await,
1,
"second open must NOT record a new audit row"
);
assert_post_recovery_invariants(
dir.path(),
"01H00000000000000000000RF",
RecoveryExpectation::RolledForward {
tables: vec![TableExpectation::main("node:Person").expected_lance_head(head_after)],
},
)
.await
.unwrap();
}
#[tokio::test]
@ -509,7 +508,9 @@ async fn recovery_rolls_back_records_audit_row_with_recovery_actor() {
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
load_jsonl(&mut db, test_data, LoadMode::Append)
.await
.unwrap();
drop(db);
let person_uri = node_table_uri(uri, "Person");
@ -574,7 +575,9 @@ async fn recovery_rolls_forward_with_null_actor() {
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
load_jsonl(&mut db, test_data, LoadMode::Append)
.await
.unwrap();
drop(db);
let person_uri = node_table_uri(uri, "Person");
@ -650,7 +653,9 @@ async fn recovery_processes_multiple_sidecars_with_fresh_snapshot_per_iter() {
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Company","data":{"name":"acme"}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
load_jsonl(&mut db, test_data, LoadMode::Append)
.await
.unwrap();
drop(db);
// Synthesize drift on both tables independently.
@ -745,7 +750,9 @@ async fn recovery_ensure_indices_steady_state_no_sidecar() {
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Company","data":{"name":"acme"}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
load_jsonl(&mut db, test_data, LoadMode::Append)
.await
.unwrap();
db.ensure_indices().await.unwrap();
drop(db);
@ -1045,6 +1052,13 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() {
.expect("feature snapshot must have Person entry");
let v_pin = feature_entry.table_version;
let feature_branch_name = feature_entry.table_branch.clone();
let main_pin = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.expect("main snapshot must have Person entry")
.table_version;
drop(db);
// Bypass the manifest: append directly to Person's Lance HEAD on the
@ -1100,37 +1114,21 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() {
// against feature's snapshot, not main's. With the fix, feature's
// manifest pin advances v_pin → v_head.
let db = Omnigraph::open(uri).await.unwrap();
assert!(
list_recovery_dir(dir.path()).is_empty(),
"feature-branch sidecar must be processed (deleted) after recovery"
);
drop(db);
// The post-recovery feature snapshot must show Person pinned at v_head.
let post_feature_snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("feature"))
.await
.unwrap();
let post_entry = post_feature_snapshot
.entry("node:Person")
.expect("Person must still be pinned on feature");
assert_eq!(
post_entry.table_version, v_head,
"feature manifest pin must advance v_pin={} → v_head={}; got {} \
without branch-aware recovery, classification would have \
compared against main and rolled back / no-op'd",
v_pin, v_head, post_entry.table_version,
);
// Audit row recorded for the recovery action — and the row's
// recovery_kind == RolledForward (proves the branch-aware classifier
// got us through the eligible path; without it, the snapshot lookup
// against main's pin would have produced NoMovement → RollBack).
let kinds = list_recovery_audit_kinds(dir.path()).await;
assert_eq!(
kinds, vec!["RolledForward".to_string()],
"feature-branch sidecar recovery must record exactly one RolledForward audit row; got {:?}",
kinds,
);
assert_post_recovery_invariants(
dir.path(),
"01H0000000000000000000FEAT",
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::branch("node:Person", "feature")
.expected_lance_head(v_head)
.expected_main_manifest_pin(main_pin),
],
},
)
.await
.unwrap();
}
/// Companion to the roll-forward feature-branch test: branch-axis
@ -1176,6 +1174,13 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() {
.expect("feature snapshot must have Person entry");
let v_pin = feature_entry.table_version;
let feature_branch_name = feature_entry.table_branch.clone();
let main_pin = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.expect("main snapshot must have Person entry")
.table_version;
drop(db);
// Bypass the manifest: append on the feature ref to advance HEAD past
@ -1230,21 +1235,21 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() {
write_sidecar_file(dir.path(), "01H0000000000000000000FRB1", &sidecar_json);
// Reopen with full sweep — RollBack is allowed at open time.
let _db = Omnigraph::open(uri).await.unwrap();
assert!(
list_recovery_dir(dir.path()).is_empty(),
"feature-branch rollback sidecar must be deleted after recovery"
);
let db = Omnigraph::open(uri).await.unwrap();
drop(db);
// Audit kind == RolledBack (proves classifier saw feature's HEAD,
// not main's; main's view of Person would be NoMovement → no audit
// row attribution).
let kinds = list_recovery_audit_kinds(dir.path()).await;
assert_eq!(
kinds, vec!["RolledBack".to_string()],
"feature-branch rollback must record one RolledBack audit row; got {:?}",
kinds,
);
assert_post_recovery_invariants(
dir.path(),
"01H0000000000000000000FRB1",
RecoveryExpectation::RolledBack {
tables: vec![
TableExpectation::branch("node:Person", "feature")
.expected_main_manifest_pin(main_pin),
],
},
)
.await
.unwrap();
// Lance HEAD on the feature ref must have advanced (real restore ran).
let post = store
@ -1257,6 +1262,18 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() {
v_head,
post.version().version,
);
let db = Omnigraph::open(uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
2,
"feature branch must still expose the manifest-pinned rows after rollback"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"feature rollback must not move main"
);
}
/// `OpenMode::ReadOnly` must NOT run `recover_schema_state_files`,

View file

@ -16,7 +16,7 @@
- `CleanupPolicyOptions { keep_versions: Option<u32>, older_than: Option<Duration> }` — at least one is required.
- Returns `[TableCleanupStats { table_key, bytes_removed, old_versions_removed }]`.
- CLI guards with `--confirm`; without it, prints a preview line.
- **Recovery floor:** `--keep < 3` may garbage-collect Lance versions that the open-time recovery sweep needs as a rollback target (the sweep restores to the manifest-pinned `expected_version`, which is HEAD-1 in the typical Phase B → Phase C drift case). Default `--keep 10` is safe.
- **Recovery floor:** `--keep < 3` may garbage-collect Lance versions that the open-time recovery sweep needs as a rollback target (the sweep restores to the branch's manifest-pinned table version, which is HEAD-1 in the typical Phase B → Phase C drift case). Default `--keep 10` is safe.
## Tombstones

View file

@ -176,10 +176,13 @@ recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`:
sidecar on disk for operator review.
- Otherwise, if every table is `RolledPastExpected`, **roll forward**:
a single `ManifestBatchPublisher::publish` call extends every pin
atomically.
atomically. `SchemaApply` sidecars are eligible only when schema-state
recovery promoted the matching staging files in the same recovery pass;
otherwise full open-time recovery rolls them back and refresh-time
recovery leaves them for the next read-write open.
- Otherwise **roll back**: per-table `Dataset::restore` to the
expected_version (with a fragment-set short-circuit so repeated
mid-sweep crashes don't pile up versions).
manifest-pinned table version for that branch. Rollback records the
actual restore target in the audit row's `to_version`.
- After a successful roll-forward or roll-back, an audit row is
recorded — `_graph_commits.lance` carries
a commit tagged `actor_id = "omnigraph:recovery"`, and a sibling