recovery: record RolledForward audit on stale-after-success sidecar

Cursor flagged that if `roll_forward_all` succeeds (manifest pin
advances) but `record_audit` then fails, the sidecar persists. On
the next open, every table classifies as NoMovement
(lance_head == manifest_pinned, both already reflect the prior
roll-forward) → `decide` returns RollBack → `roll_back_sidecar`
records a RolledBack audit row with empty per-table outcomes.
Operators reading `_graph_commit_recoveries.lance` see "RolledBack"
for an operation whose actual outcome was a successful roll-forward.

`process_sidecar`'s RollBack arm now distinguishes "stale-after-
success" from a legitimate rollback: when every classification is
NoMovement AND any pin's `manifest_pinned > expected_version` (the
manifest already advanced past the writer's CAS target), recovery
dispatches to `record_audit_recovery_rollforward` which writes a
RolledForward audit row with reconstructed outcomes
(`from_version = expected_version`,
`to_version = manifest_pinned`) and deletes the sidecar. No Lance
writes — the substrate is already in the post-roll-forward state.

Safe in `RollForwardOnly` mode (refresh-time recovery) because no
`Dataset::restore` is involved; the legitimate-rollback path stays
deferred to the next ReadWrite open as before.

Added `recovery_records_rolled_forward_for_stale_sidecar_after_successful_roll_forward`
integration test that synthesizes the state by writing a sidecar
whose `expected_version < manifest_pin` and asserts:
- audit row records `RolledForward` (not `RolledBack`)
- per-table outcome reports the correct `from_version` /
  `to_version` pair
- sidecar is deleted

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-05 20:12:43 +02:00
parent 11a9b3c8b9
commit 3ea7a1fd50
No known key found for this signature in database
2 changed files with 224 additions and 0 deletions

View file

@ -599,6 +599,50 @@ async fn process_sidecar(
}
},
SidecarDecision::RollBack => {
// Distinguish "stale sidecar from a previous successful
// roll-forward whose audit/delete failed" from a legitimate
// rollback. If every table is at NoMovement AND any pin's
// manifest_pinned has advanced past expected_version, the
// manifest already reflects the writer's intent — a previous
// recovery's `roll_forward_all` succeeded but `record_audit`
// or `delete_sidecar` failed, leaving the sidecar to be
// re-discovered. Recording this as RolledBack with empty
// outcomes (the naive RollBack path's behavior under all-
// NoMovement) misleads operators reading
// `_graph_commit_recoveries.lance` — the actual outcome was
// a successful roll-forward.
let all_no_movement = states
.iter()
.all(|s| matches!(s.classification, TableClassification::NoMovement));
let any_pin_advanced = sidecar
.tables
.iter()
.zip(states.iter())
.any(|(pin, state)| state.manifest_pinned > pin.expected_version);
if all_no_movement && any_pin_advanced {
if matches!(mode, RecoveryMode::RollForwardOnly) {
// Refresh-time audit-recovery is safe: no
// Dataset::restore involved; just an audit-row write
// and sidecar delete.
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: cleaning up stale sidecar from a prior successful \
roll-forward (audit-recovery, in-process refresh)"
);
} else {
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: cleaning up stale sidecar from a prior successful \
roll-forward (manifest already advanced; recording RolledForward audit)"
);
}
return record_audit_recovery_rollforward(
root_uri, storage, snapshot, sidecar, &states,
)
.await;
}
if matches!(mode, RecoveryMode::RollForwardOnly) {
// In-process recovery cannot run Dataset::restore safely
// (would orphan a concurrent writer's commit). Leave the
@ -744,6 +788,49 @@ async fn roll_back_sidecar(
Ok(())
}
/// Cleanup path for stale sidecars where a previous recovery's
/// roll-forward succeeded (manifest pin advanced past `expected_version`)
/// but `record_audit` or sidecar deletion failed, leaving the sidecar
/// to be re-discovered on a subsequent open. By the time we re-classify,
/// every table reads as `NoMovement` (lance_head == manifest_pinned),
/// which the naive `RollBack` arm would record as RolledBack-with-empty-
/// outcomes — misleading for operators because the actual outcome was
/// a successful roll-forward.
///
/// This helper records the correct shape: a `RolledForward` audit row
/// whose `from_version` is the original `expected_version` and whose
/// `to_version` is the current `manifest_pinned` (the actual published
/// version after the prior roll-forward). No Lance writes are needed —
/// the substrate is already in the post-roll-forward state.
async fn record_audit_recovery_rollforward(
root_uri: &str,
storage: &dyn StorageAdapter,
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
states: &[ClassifiedTable],
) -> Result<()> {
let outcomes: Vec<TableOutcome> = sidecar
.tables
.iter()
.zip(states.iter())
.map(|(pin, state)| TableOutcome {
table_key: pin.table_key.clone(),
from_version: pin.expected_version,
to_version: state.manifest_pinned,
})
.collect();
record_audit(
root_uri,
sidecar,
snapshot.version(),
RecoveryKind::RolledForward,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
}
/// Atomically extend every table's manifest pin from `expected_version` to
/// `post_commit_pin` via a single `ManifestBatchPublisher::publish` call.
/// Returns the new manifest version produced by the publish.

View file

@ -497,6 +497,143 @@ async fn recovery_rolls_forward_after_phase_b_completes() {
.unwrap();
}
/// A previous recovery's `roll_forward_all` succeeded (manifest pin
/// already advanced past the sidecar's `expected_version`) but
/// `record_audit` or sidecar deletion failed, leaving the sidecar to be
/// re-discovered on a subsequent open. The naive RollBack arm would
/// classify all tables as NoMovement and record a `RolledBack` audit row
/// with empty outcomes — misleading because the actual outcome was a
/// successful roll-forward. Recovery must detect this stale-after-
/// success shape and record `RolledForward` instead.
///
/// Synthesizes the state by:
/// 1. Letting init + load advance the manifest pin AND Lance HEAD
/// legitimately to some version `v`.
/// 2. Writing a sidecar whose `expected_version < v` and
/// `post_commit_pin == v` — exactly the shape left over after a
/// publisher succeeds but audit fails.
///
/// On reopen the classifier sees `lance_head == manifest_pinned == v`
/// → all NoMovement → decide returns RollBack. The new audit-recovery
/// branch must detect `manifest_pinned > expected_version` and record
/// `RolledForward` with `from_version=expected_version`,
/// `to_version=v`.
#[tokio::test]
async fn recovery_records_rolled_forward_for_stale_sidecar_after_successful_roll_forward() {
use omnigraph::loader::{LoadMode, load_jsonl};
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Person","data":{"name":"bob","age":25}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append)
.await
.unwrap();
// Capture the current manifest pin and Lance HEAD — these match
// because the load went through the publisher.
let person_entry = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.expect("Person entry exists post-load")
.clone();
let manifest_pin = person_entry.table_version;
drop(db);
let person_uri = node_table_uri(uri, "Person");
let head_now = Dataset::open(&person_uri).await.unwrap().version().version;
assert_eq!(
head_now, manifest_pin,
"Lance HEAD must equal manifest pin in steady state"
);
// Sidecar shape that simulates "publisher succeeded; audit/delete
// failed in a previous recovery pass". `expected_version` is less
// than the current manifest pin (the publish already ran) and
// `post_commit_pin` matches the current head.
let stale_expected = manifest_pin - 1;
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H00000000000000000000SF",
"started_at": "0",
"branch": null,
"actor_id": "act-original",
"writer_kind": "Mutation",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{}",
"expected_version": {},
"post_commit_pin": {}
}}
]
}}"#,
person_uri, stale_expected, manifest_pin
);
write_sidecar_file(dir.path(), "01H00000000000000000000SF", &sidecar_json);
// Reopen — sweep must classify Person as NoMovement (head_now ==
// manifest_pinned) but recognize stale-after-success because
// manifest_pinned > stale_expected. Audit-recovery branch records
// RolledForward and deletes the sidecar.
let _db = Omnigraph::open(uri).await.unwrap();
// Sidecar deleted.
assert!(
list_recovery_dir(dir.path()).is_empty(),
"stale-after-success sidecar must be deleted after audit-recovery"
);
// Audit row says RolledForward (not RolledBack).
let audit = read_latest_recovery_audit(dir.path()).await;
assert_eq!(
audit,
Some((
"RolledForward".to_string(),
Some("act-original".to_string()),
"01H00000000000000000000SF".to_string(),
"Mutation".to_string(),
)),
"stale-after-success sidecar must record RolledForward, not RolledBack"
);
// Audit outcomes report from_version=stale_expected, to_version=manifest_pin.
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let recoveries_dir = dir.path().join("_graph_commit_recoveries.lance");
let ds = Dataset::open(recoveries_dir.to_str().unwrap())
.await
.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let last = batches.iter().filter(|b| b.num_rows() > 0).last().unwrap();
let row = last.num_rows() - 1;
let outcomes_json = last
.column_by_name("per_table_outcomes_json")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(row);
let outcomes: serde_json::Value = serde_json::from_str(outcomes_json).unwrap();
let arr = outcomes.as_array().unwrap();
assert_eq!(arr.len(), 1, "outcomes must include the Person table");
let outcome = &arr[0];
assert_eq!(outcome["table_key"], "node:Person");
assert_eq!(outcome["from_version"].as_u64().unwrap(), stale_expected);
assert_eq!(outcome["to_version"].as_u64().unwrap(), manifest_pin);
}
#[tokio::test]
async fn recovery_rolls_back_records_audit_row_with_recovery_actor() {
use omnigraph::loader::{LoadMode, load_jsonl};