recovery: wire open-time sweep + OpenMode (Phase 3)

Add `OpenMode::{ReadWrite, ReadOnly}` and route `Omnigraph::open` through
`open_with_storage_and_mode`. Recovery sweep runs only under
`OpenMode::ReadWrite` — read-only consumers (NDJSON export, commit list,
schema show) skip it via `Omnigraph::open_read_only`. Rationale: the
sweep performs Lance writes (Dataset::restore, manifest publish); a
read-only consumer with read-only object-store credentials shouldn't
trigger writes, and reads always resolve through the manifest pin
regardless of any drift on the per-table side.

`recover_manifest_drift` lands in db/manifest/recovery.rs and is wired
into Omnigraph::open AFTER recover_schema_state_files — schema-state
recovery operates on staging files; manifest-drift recovery operates on
Lance HEADs that may depend on schema-state being settled.

Roll-back path is fully implemented: classify each table per the
sidecar's intent, dispatch the all-or-nothing decision, and call
restore_table_to_version for any table with drift (RolledPastExpected,
UnexpectedAtP1, or UnexpectedMultistep). NoMovement tables are already
at expected_version — no action. Sidecar deleted as the final step.

Roll-forward path errors with a Phase-4 placeholder so it surfaces
loudly if reached without the audit + manifest-publish wiring landing
first.

Concurrency: today (pre-MR-686) recovery is naturally serialized by the
single-coordinator model. Open runs at server startup BEFORE
Arc<RwLock<Omnigraph>> wraps the engine (lib.rs:194), so no request
handlers can race. CLI is sequential by caller orchestration. Under
MR-686's per-(table_key, branch) queues + MR-856 (background recovery
reconciler), the queue acquisition will need to extend to recovery
sweeps — handoff documented on MR-686 ticket and in MR-856.

4 integration tests in tests/recovery.rs pin the Phase 3 contract:
- recovery_does_not_run_on_clean_open — no sidecars; sweep is a no-op.
- recovery_refuses_unknown_schema_version_on_open — sidecar v=99
  surfaces SidecarSchemaError and is left on disk for operator review.
- read_only_open_skips_recovery_sweep — even a sidecar with bogus
  table_path doesn't get classified under OpenMode::ReadOnly.
- recovery_rolls_back_synthetic_drift_on_open — sidecar with mismatched
  post_commit_pin classifies as UnexpectedAtP1, decision is RollBack,
  restore is invoked, sidecar is deleted, idempotent on second open.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-02 23:51:43 +02:00
parent 376d91d538
commit c2fc3e7c40
No known key found for this signature in database
5 changed files with 437 additions and 4 deletions

View file

@ -32,6 +32,7 @@ pub(crate) use namespace::open_table_head_for_write;
#[cfg(test)]
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::recover_manifest_drift;
use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
pub use state::SubTableEntry;
#[cfg(test)]

View file

@ -42,10 +42,13 @@
use lance::Dataset;
use serde::{Deserialize, Serialize};
use tracing::warn;
use crate::error::{OmniError, Result};
use crate::storage::StorageAdapter;
use super::Snapshot;
/// Subdirectory under the repo root holding sidecar files.
pub(crate) const RECOVERY_DIR_NAME: &str = "__recovery";
@ -384,6 +387,134 @@ fn fragment_ids(ds: &Dataset) -> Vec<u64> {
ids
}
/// Open-time recovery sweep — the entry point invoked from
/// `Omnigraph::open` (gated on `OpenMode::ReadWrite`).
///
/// Enumerates every sidecar in `__recovery/`, classifies each table per
/// the sidecar's intent, and applies the all-or-nothing decision:
/// roll-forward (every table eligible), roll-back (mixed or unexpected
/// state), or abort (invariant violation).
///
/// **Phase 3 scope** (this commit): roll-back path is fully implemented;
/// roll-forward errors out with a "Phase 4" placeholder so the
/// open-time wiring + sidecar I/O + classification + decision dispatch
/// can land independently of the audit/manifest-publish work. Tests
/// exercising the end-to-end roll-forward path land alongside Phase 4.
///
/// Idempotency: a crash mid-sweep leaves the sidecar (deletion is the
/// final step). Re-opening re-classifies; the fragment-set short-circuit
/// in [`restore_table_to_version`] prevents version pile-up under
/// repeated mid-rollback crashes.
///
/// Concurrency: today (pre-MR-686) recovery runs synchronously in
/// `Omnigraph::open` *before* the engine is wrapped in the server's
/// `Arc<RwLock<Omnigraph>>`. No request handlers can race. Under MR-686
/// + MR-856 (background reconciler) the per-(table_key, branch) queues
/// will need acquisition before the sweep restores or publishes — see
/// `.context/mr-847-design.md` "Concurrency policy" §"After MR-686".
pub(crate) async fn recover_manifest_drift(
root_uri: &str,
storage: &dyn StorageAdapter,
snapshot: &Snapshot,
) -> Result<()> {
let sidecars = list_sidecars(root_uri, storage).await?;
if sidecars.is_empty() {
return Ok(());
}
for sidecar in sidecars {
process_sidecar(root_uri, storage, snapshot, &sidecar).await?;
}
Ok(())
}
async fn process_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
) -> Result<()> {
let mut classifications = Vec::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
let lance_head = open_lance_head(&pin.table_path).await?;
let manifest_pinned = snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0);
classifications.push(classify_table(pin, lance_head, manifest_pinned));
}
match decide(&classifications) {
SidecarDecision::Abort => {
// Surface loudly without deleting the sidecar — operator must
// investigate. This includes any classification of
// InvariantViolation (Lance HEAD < manifest pinned: should be
// impossible).
return Err(OmniError::manifest_internal(format!(
"recovery sidecar '{}' has invariant violation; refusing to act \
operator review required (sidecar at '{}', classifications: {:?})",
sidecar.operation_id,
sidecar_uri(root_uri, &sidecar.operation_id),
classifications,
)));
}
SidecarDecision::RollBack => {
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: rolling back sidecar (mixed or unexpected state)"
);
// Restore every table whose Lance HEAD has drifted from the
// manifest pin (RolledPastExpected, UnexpectedAtP1,
// UnexpectedMultistep). NoMovement tables are already at
// expected_version — no action. The fragment-set short-circuit
// in restore_table_to_version makes drift-with-equivalent-content
// a no-op (sound: equal fragment-ids ⇒ equal content).
for (pin, cls) in sidecar.tables.iter().zip(classifications.iter()) {
if matches!(
cls,
TableClassification::RolledPastExpected
| TableClassification::UnexpectedAtP1
| TableClassification::UnexpectedMultistep
) {
restore_table_to_version(&pin.table_path, pin.expected_version).await?;
}
}
// Audit row write deferred to Phase 4 (recovery audit model).
// For now: delete sidecar as the final step.
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
}
SidecarDecision::RollForward => {
// Phase 4 implements the audit row + ManifestBatchPublisher
// pin extension. Until then, surface loudly so we don't
// silently leave drift.
Err(OmniError::manifest_internal(format!(
"recovery sidecar '{}' is roll-forward eligible but the \
roll-forward path is not yet implemented (MR-847 Phase 4); \
sidecar left in place at '{}'",
sidecar.operation_id,
sidecar_uri(root_uri, &sidecar.operation_id),
)))
}
}
}
async fn open_lance_head(table_path: &str) -> Result<u64> {
let ds = Dataset::open(table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(ds.version().version)
}
async fn delete_sidecar_by_operation_id(
root_uri: &str,
storage: &dyn StorageAdapter,
operation_id: &str,
) -> Result<()> {
storage.delete(&sidecar_uri(root_uri, operation_id)).await
}
/// Convenience: build a [`RecoverySidecar`] with auto-generated
/// `operation_id` and `started_at`. The caller fills in the other fields.
pub(crate) fn new_sidecar(

View file

@ -9,8 +9,8 @@ pub use commit_graph::GraphCommit;
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
pub use omnigraph::{
CleanupPolicyOptions, MergeOutcome, Omnigraph, SchemaApplyResult, TableCleanupStats,
TableOptimizeStats,
CleanupPolicyOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyResult,
TableCleanupStats, TableOptimizeStats,
};
pub(crate) use omnigraph::ensure_public_branch_ref;
pub(crate) use run_registry::is_internal_run_branch;

View file

@ -81,6 +81,26 @@ pub struct Omnigraph {
pub(crate) audit_actor_id: Option<String>,
}
/// Whether [`Omnigraph::open`] runs the MR-847 recovery sweep.
///
/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
/// inspection — should not trigger writes (they may run with read-only
/// object-store credentials, and silent open-time mutations are surprising).
/// They also don't need recovery: reads always resolve through the manifest
/// pin, which is the consistent snapshot regardless of any Phase B → Phase C
/// drift on the per-table side.
///
/// See `.context/mr-847-design.md` § "Read-only opens".
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OpenMode {
/// Run the recovery sweep on open. Default for `Omnigraph::open`.
ReadWrite,
/// Skip the recovery sweep. Use for read-only consumers via
/// [`Omnigraph::open_read_only`].
ReadOnly,
}
impl Omnigraph {
/// Create a new repo at `uri` from schema source.
///
@ -119,16 +139,33 @@ impl Omnigraph {
})
}
/// Open an existing repo.
/// Open an existing repo (read-write).
///
/// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
/// Runs the MR-847 recovery sweep before returning — see [`OpenMode`].
pub async fn open(uri: &str) -> Result<Self> {
Self::open_with_storage(uri, storage_for_uri(uri)?).await
Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
}
/// Open an existing repo for read-only consumers (NDJSON export,
/// `commit list`, etc.). Skips the MR-847 recovery sweep — see [`OpenMode`].
pub async fn open_read_only(uri: &str) -> Result<Self> {
Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
}
/// `open_with_storage` retained for existing callers (init/test paths).
/// Defaults to `OpenMode::ReadWrite`.
pub(crate) async fn open_with_storage(
uri: &str,
storage: Arc<dyn StorageAdapter>,
) -> Result<Self> {
Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
}
pub(crate) async fn open_with_storage_and_mode(
uri: &str,
storage: Arc<dyn StorageAdapter>,
mode: OpenMode,
) -> Result<Self> {
let root = normalize_root_uri(uri)?;
// Open the coordinator first so the schema-staging recovery sweep can
@ -137,6 +174,22 @@ impl Omnigraph {
// (post-commit crash) before the live schema files are read.
let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?;
// MR-847 recovery sweep: close the Phase B → Phase C residual on
// any sidecar left over from a crashed writer. ReadOnly skips —
// recovery requires Lance writes (Dataset::restore, manifest publish);
// a read-only consumer (NDJSON export, commit list) sees the
// manifest-pinned content regardless of drift, so it doesn't need
// recovery and shouldn't trigger object-store writes. Continuous
// in-process recovery for long-running servers is MR-856 (background
// reconciler).
if matches!(mode, OpenMode::ReadWrite) {
crate::db::manifest::recover_manifest_drift(
&root,
storage.as_ref(),
&coordinator.snapshot(),
)
.await?;
}
// Read _schema.pg (post-recovery — may have just been renamed in).
let schema_path = schema_source_uri(&root);
let schema_source = storage.read_text(&schema_path).await?;

View file

@ -0,0 +1,248 @@
//! MR-847 — open-time recovery sweep integration tests.
//!
//! These exercise the full `Omnigraph::open` cycle: drop a synthetic
//! sidecar into `__recovery/`, advance some Lance HEADs to simulate the
//! Phase B → Phase C residual, reopen the engine, and assert the sweep's
//! decision-tree dispatch did the right thing.
//!
//! The four tests here pin Phase 3 scope (open-time invocation,
//! `OpenMode::{ReadWrite, ReadOnly}`, roll-back path, schema-version
//! refusal). The roll-forward path is Phase 4 — exercised by tests in
//! the Phase 4 commit.
use std::path::Path;
use lance::Dataset;
use omnigraph::db::Omnigraph;
const TEST_SCHEMA: &str = include_str!("fixtures/test.pg");
fn write_sidecar_file(repo_root: &Path, operation_id: &str, json: &str) {
let dir = repo_root.join("__recovery");
if !dir.exists() {
std::fs::create_dir(&dir).unwrap();
}
std::fs::write(dir.join(format!("{}.json", operation_id)), json).unwrap();
}
fn list_recovery_dir(repo_root: &Path) -> Vec<String> {
let dir = repo_root.join("__recovery");
if !dir.exists() {
return Vec::new();
}
std::fs::read_dir(&dir)
.unwrap()
.filter_map(|e| e.ok().map(|d| d.file_name().to_string_lossy().to_string()))
.collect()
}
/// Full URI of a node-type Lance dataset under a fresh Omnigraph repo.
/// Mirrors the `nodes/{fnv1a64-hex(type_name)}` layout in `db/manifest/layout.rs`.
fn node_table_uri(root: &str, type_name: &str) -> String {
let h: u64 = fnv1a64(type_name.as_bytes());
format!("{}/nodes/{:016x}", root.trim_end_matches('/'), h)
}
fn fnv1a64(bytes: &[u8]) -> u64 {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in bytes {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100_0000_01b3);
}
hash
}
#[tokio::test]
async fn recovery_does_not_run_on_clean_open() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let _db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
drop(_db);
// Reopen — `__recovery/` doesn't exist; the sweep must be a clean no-op.
let _db = Omnigraph::open(uri).await.unwrap();
// Verify by side-effect: the recovery dir was not created by the sweep.
assert!(
!dir.path().join("__recovery").exists(),
"clean-open sweep must not create __recovery/"
);
}
#[tokio::test]
async fn recovery_refuses_unknown_schema_version_on_open() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let _db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
drop(_db);
// A sidecar from a hypothetical future writer; the older binary must
// refuse to interpret it (resolved-decisions §3 in the design doc).
let sidecar_json = r#"{
"schema_version": 99,
"operation_id": "01H000000000000000000000ZZ",
"started_at": "0",
"branch": null,
"actor_id": null,
"writer_kind": "Mutation",
"tables": []
}"#;
write_sidecar_file(dir.path(), "01H000000000000000000000ZZ", sidecar_json);
let err = Omnigraph::open(uri)
.await
.err()
.expect("expected open to fail because of unknown sidecar schema_version");
let msg = err.to_string();
assert!(
msg.contains("schema_version=99") && msg.contains("supports only schema_version=1"),
"expected SidecarSchemaError mentioning the version mismatch, got: {}",
msg,
);
// Sidecar must still be on disk — we don't auto-delete unparseable files.
assert!(
list_recovery_dir(dir.path()).contains(&"01H000000000000000000000ZZ.json".to_string()),
"sidecar should remain on disk after refusal so an operator can inspect it"
);
}
#[tokio::test]
async fn read_only_open_skips_recovery_sweep() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let _db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
drop(_db);
// Drop a syntactically-valid but invariant-violating sidecar (HEAD < pin
// would error if classified). Read-only must NOT classify it — it must
// skip the sweep entirely.
let sidecar_json = r#"{
"schema_version": 1,
"operation_id": "01H000000000000000000000RO",
"started_at": "0",
"branch": null,
"actor_id": null,
"writer_kind": "Mutation",
"tables": [
{
"table_key": "node:Person",
"table_path": "/dev/null/nonexistent.lance",
"expected_version": 99,
"post_commit_pin": 100
}
]
}"#;
write_sidecar_file(dir.path(), "01H000000000000000000000RO", sidecar_json);
// ReadOnly open must succeed — the sweep is skipped, so the bogus
// sidecar is never inspected.
let _db = Omnigraph::open_read_only(uri).await.unwrap();
// And the sidecar is still there — ReadOnly never deletes anything.
assert!(
list_recovery_dir(dir.path()).contains(&"01H000000000000000000000RO.json".to_string()),
"ReadOnly open must leave the sidecar untouched"
);
}
#[tokio::test]
async fn recovery_rolls_back_synthetic_drift_on_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// Bootstrap a real graph with a Person table so we have a Lance dataset
// to advance synthetically.
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Person","data":{"name":"bob","age":25}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
drop(db);
// Synthetic drift: advance Person's Lance HEAD WITHOUT updating the
// manifest pin. This is the shape a Phase B → Phase C crash would
// leave (with no sidecar — the writer never wrote one because we're
// simulating the residual class directly).
//
// Use `delete_where` with a never-matching predicate: it inline-commits
// a Lance transaction (advancing HEAD by one) without removing data
// and without depending on the dataset's exact column set. The actual
// residual the sweep recovers from is the manifest-vs-Lance-HEAD gap;
// it's agnostic to *what* op caused the gap.
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let head_before_drift = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let head_after_drift = ds.version().version;
assert_eq!(
head_after_drift,
head_before_drift + 1,
"synthetic drift must advance Lance HEAD by exactly 1"
);
drop(ds);
// Drop a sidecar that DOESN'T match the observed drift — sidecar says
// expected=head_before_drift, post_commit_pin=head_before_drift (i.e.,
// pretend no Phase B happened). Observed: head_after_drift =
// expected + 1. Classification: UnexpectedAtP1 (post_commit_pin doesn't
// match observed). Decision: RollBack.
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H00000000000000000000RB",
"started_at": "0",
"branch": null,
"actor_id": "act-test",
"writer_kind": "Mutation",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{}",
"expected_version": {},
"post_commit_pin": {}
}}
]
}}"#,
person_uri, head_before_drift, head_before_drift
);
write_sidecar_file(dir.path(), "01H00000000000000000000RB", &sidecar_json);
// Reopen. The sweep must classify Person as UnexpectedAtP1 (h=p+1 but
// sidecar.post_commit_pin != observed head), decide RollBack, and call
// restore_table_to_version(person_uri, head_before_drift). The
// fragment-set short-circuit may make this a no-op if the synthetic
// drift produced no fragment changes (delete_where with a never-matching
// predicate is one such case — Lance bumps version but fragments are
// unchanged). Either way the sweep must complete without error and
// delete the sidecar; the actual rollback HEAD-advance behavior is
// pinned by the Phase 2 unit test
// `restore_table_to_version_appends_one_commit`.
let _db = Omnigraph::open(uri).await.unwrap();
let post = Dataset::open(&person_uri).await.unwrap();
let _ = head_after_drift; // synthesized but no longer asserted on directly
assert!(
post.version().version >= head_after_drift,
"post-sweep Lance HEAD must not regress below the synthesized drift"
);
// Sidecar deleted as the final step — proves the sweep ran end to end.
let after = list_recovery_dir(dir.path());
assert!(
!after.contains(&"01H00000000000000000000RB.json".to_string()),
"sidecar must be deleted after successful sweep; remaining files: {:?}",
after,
);
// Idempotency: reopening should be a clean no-op (no error; no new sidecar).
let _db2 = Omnigraph::open(uri).await.unwrap();
assert!(
list_recovery_dir(dir.path()).is_empty(),
"second open must be a clean no-op"
);
}