diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 5fb3224..7f60fce 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -599,6 +599,50 @@ async fn process_sidecar( } }, SidecarDecision::RollBack => { + // Distinguish "stale sidecar from a previous successful + // roll-forward whose audit/delete failed" from a legitimate + // rollback. If every table is at NoMovement AND any pin's + // manifest_pinned has advanced past expected_version, the + // manifest already reflects the writer's intent — a previous + // recovery's `roll_forward_all` succeeded but `record_audit` + // or `delete_sidecar` failed, leaving the sidecar to be + // re-discovered. Recording this as RolledBack with empty + // outcomes (the naive RollBack path's behavior under all- + // NoMovement) misleads operators reading + // `_graph_commit_recoveries.lance` — the actual outcome was + // a successful roll-forward. + let all_no_movement = states + .iter() + .all(|s| matches!(s.classification, TableClassification::NoMovement)); + let any_pin_advanced = sidecar + .tables + .iter() + .zip(states.iter()) + .any(|(pin, state)| state.manifest_pinned > pin.expected_version); + if all_no_movement && any_pin_advanced { + if matches!(mode, RecoveryMode::RollForwardOnly) { + // Refresh-time audit-recovery is safe: no + // Dataset::restore involved; just an audit-row write + // and sidecar delete. + warn!( + operation_id = sidecar.operation_id.as_str(), + writer_kind = ?sidecar.writer_kind, + "recovery: cleaning up stale sidecar from a prior successful \ + roll-forward (audit-recovery, in-process refresh)" + ); + } else { + warn!( + operation_id = sidecar.operation_id.as_str(), + writer_kind = ?sidecar.writer_kind, + "recovery: cleaning up stale sidecar from a prior successful \ + roll-forward (manifest already advanced; recording RolledForward audit)" + ); + } + return record_audit_recovery_rollforward( + root_uri, storage, snapshot, sidecar, &states, + ) + .await; + } if matches!(mode, RecoveryMode::RollForwardOnly) { // In-process recovery cannot run Dataset::restore safely // (would orphan a concurrent writer's commit). Leave the @@ -744,6 +788,49 @@ async fn roll_back_sidecar( Ok(()) } +/// Cleanup path for stale sidecars where a previous recovery's +/// roll-forward succeeded (manifest pin advanced past `expected_version`) +/// but `record_audit` or sidecar deletion failed, leaving the sidecar +/// to be re-discovered on a subsequent open. By the time we re-classify, +/// every table reads as `NoMovement` (lance_head == manifest_pinned), +/// which the naive `RollBack` arm would record as RolledBack-with-empty- +/// outcomes — misleading for operators because the actual outcome was +/// a successful roll-forward. +/// +/// This helper records the correct shape: a `RolledForward` audit row +/// whose `from_version` is the original `expected_version` and whose +/// `to_version` is the current `manifest_pinned` (the actual published +/// version after the prior roll-forward). No Lance writes are needed — +/// the substrate is already in the post-roll-forward state. +async fn record_audit_recovery_rollforward( + root_uri: &str, + storage: &dyn StorageAdapter, + snapshot: &Snapshot, + sidecar: &RecoverySidecar, + states: &[ClassifiedTable], +) -> Result<()> { + let outcomes: Vec = sidecar + .tables + .iter() + .zip(states.iter()) + .map(|(pin, state)| TableOutcome { + table_key: pin.table_key.clone(), + from_version: pin.expected_version, + to_version: state.manifest_pinned, + }) + .collect(); + record_audit( + root_uri, + sidecar, + snapshot.version(), + RecoveryKind::RolledForward, + outcomes, + ) + .await?; + delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?; + Ok(()) +} + /// Atomically extend every table's manifest pin from `expected_version` to /// `post_commit_pin` via a single `ManifestBatchPublisher::publish` call. /// Returns the new manifest version produced by the publish. diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index 6ffe4fd..5ad87e8 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -497,6 +497,143 @@ async fn recovery_rolls_forward_after_phase_b_completes() { .unwrap(); } +/// A previous recovery's `roll_forward_all` succeeded (manifest pin +/// already advanced past the sidecar's `expected_version`) but +/// `record_audit` or sidecar deletion failed, leaving the sidecar to be +/// re-discovered on a subsequent open. The naive RollBack arm would +/// classify all tables as NoMovement and record a `RolledBack` audit row +/// with empty outcomes — misleading because the actual outcome was a +/// successful roll-forward. Recovery must detect this stale-after- +/// success shape and record `RolledForward` instead. +/// +/// Synthesizes the state by: +/// 1. Letting init + load advance the manifest pin AND Lance HEAD +/// legitimately to some version `v`. +/// 2. Writing a sidecar whose `expected_version < v` and +/// `post_commit_pin == v` — exactly the shape left over after a +/// publisher succeeds but audit fails. +/// +/// On reopen the classifier sees `lance_head == manifest_pinned == v` +/// → all NoMovement → decide returns RollBack. The new audit-recovery +/// branch must detect `manifest_pinned > expected_version` and record +/// `RolledForward` with `from_version=expected_version`, +/// `to_version=v`. +#[tokio::test] +async fn recovery_records_rolled_forward_for_stale_sidecar_after_successful_roll_forward() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} +{"type":"Person","data":{"name":"bob","age":25}} +"#; + load_jsonl(&mut db, test_data, LoadMode::Append) + .await + .unwrap(); + + // Capture the current manifest pin and Lance HEAD — these match + // because the load went through the publisher. + let person_entry = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap() + .entry("node:Person") + .expect("Person entry exists post-load") + .clone(); + let manifest_pin = person_entry.table_version; + drop(db); + + let person_uri = node_table_uri(uri, "Person"); + let head_now = Dataset::open(&person_uri).await.unwrap().version().version; + assert_eq!( + head_now, manifest_pin, + "Lance HEAD must equal manifest pin in steady state" + ); + // Sidecar shape that simulates "publisher succeeded; audit/delete + // failed in a previous recovery pass". `expected_version` is less + // than the current manifest pin (the publish already ran) and + // `post_commit_pin` matches the current head. + let stale_expected = manifest_pin - 1; + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H00000000000000000000SF", + "started_at": "0", + "branch": null, + "actor_id": "act-original", + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{}", + "expected_version": {}, + "post_commit_pin": {} + }} + ] + }}"#, + person_uri, stale_expected, manifest_pin + ); + write_sidecar_file(dir.path(), "01H00000000000000000000SF", &sidecar_json); + + // Reopen — sweep must classify Person as NoMovement (head_now == + // manifest_pinned) but recognize stale-after-success because + // manifest_pinned > stale_expected. Audit-recovery branch records + // RolledForward and deletes the sidecar. + let _db = Omnigraph::open(uri).await.unwrap(); + + // Sidecar deleted. + assert!( + list_recovery_dir(dir.path()).is_empty(), + "stale-after-success sidecar must be deleted after audit-recovery" + ); + + // Audit row says RolledForward (not RolledBack). + let audit = read_latest_recovery_audit(dir.path()).await; + assert_eq!( + audit, + Some(( + "RolledForward".to_string(), + Some("act-original".to_string()), + "01H00000000000000000000SF".to_string(), + "Mutation".to_string(), + )), + "stale-after-success sidecar must record RolledForward, not RolledBack" + ); + // Audit outcomes report from_version=stale_expected, to_version=manifest_pin. + use arrow_array::{Array, StringArray}; + use futures::TryStreamExt; + let recoveries_dir = dir.path().join("_graph_commit_recoveries.lance"); + let ds = Dataset::open(recoveries_dir.to_str().unwrap()) + .await + .unwrap(); + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let last = batches.iter().filter(|b| b.num_rows() > 0).last().unwrap(); + let row = last.num_rows() - 1; + let outcomes_json = last + .column_by_name("per_table_outcomes_json") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(row); + let outcomes: serde_json::Value = serde_json::from_str(outcomes_json).unwrap(); + let arr = outcomes.as_array().unwrap(); + assert_eq!(arr.len(), 1, "outcomes must include the Person table"); + let outcome = &arr[0]; + assert_eq!(outcome["table_key"], "node:Person"); + assert_eq!(outcome["from_version"].as_u64().unwrap(), stale_expected); + assert_eq!(outcome["to_version"].as_u64().unwrap(), manifest_pin); +} + #[tokio::test] async fn recovery_rolls_back_records_audit_row_with_recovery_actor() { use omnigraph::loader::{LoadMode, load_jsonl};