From ca21e73d439288a1bc4be2ab5d2551c6af713c18 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 2 May 2026 23:58:23 +0200 Subject: [PATCH] recovery: roll-forward execution + audit row (Phase 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement the remaining half of the open-time recovery sweep. Roll-forward execution (db/manifest/recovery.rs::roll_forward_all): constructs a GraphNamespacePublisher directly (recovery runs inside Omnigraph::open before the engine struct exists, so we can't go through Omnigraph::commit_updates_on_branch_with_expected). Builds a ManifestChange::Update per sidecar table reading row_count and TableVersionMetadata from the dataset at post_commit_pin (cheap; manifest-level reads, not a row scan), then calls publisher.publish with expected_table_versions = sidecar.expected_version per table. Single __manifest CAS extends every pin atomically — all-or-nothing at the substrate. Persistent CAS contention surfaces as the typed ExpectedVersionMismatch error and leaves the sidecar in place for the next open's retry. Audit model (new crates/omnigraph/src/db/recovery_audit.rs + record_audit() in recovery.rs): each successful recovery sweep records a graph-commit row tagged with actor_id="omnigraph:recovery" plus a row in a new sibling table _graph_commit_recoveries.lance carrying recovery_kind (RolledForward | RolledBack), recovery_for_actor (the sidecar's original actor_id), operation_id (sidecar ULID), sidecar_writer_kind, per_table_outcomes (JSON-serialized for schema flexibility), and created_at. Operators investigating "did my mutation land?" can find the answer via `omnigraph commit list --filter actor=omnigraph:recovery` joined to the recoveries table by graph_commit_id. The sibling-table choice avoids bumping INTERNAL_MANIFEST_SCHEMA_VERSION or migrating _graph_commits.lance. Same not-atomic-pair-write shape as the existing _graph_commits + _graph_commit_actors split — a crash between the two sequential writes leaves an orphan commit row with no recovery row. Recovery sweep tolerates this: re-entry classifies already-restored / already-published tables as NoMovement, the action is a no-op, and the audit append is retried. Note on classifier: process_sidecar's RollBack arm now restores RolledPastExpected, UnexpectedAtP1, AND UnexpectedMultistep (any drift class). Earlier Phase 3 logic restricted to RolledPastExpected only, which left UnexpectedAtP1/UnexpectedMultistep tables drifted; the all-or-nothing decision rule per docs/invariants.md §VI.23 demands all drifted tables be restored. 3 new integration tests in tests/recovery.rs (7 total now): - recovery_rolls_forward_after_phase_b_completes — happy-path roll-forward; audit row recorded; idempotent on second open. - recovery_rolls_back_records_audit_row_with_recovery_actor — roll-back path also records an audit row with the original actor. - recovery_rolls_forward_with_null_actor — sidecar without actor_id still records the audit row (recovery_for_actor = None). 3 new unit tests in db::recovery_audit pin the round-trip + persistence + recovery_kind string parsing. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/db/manifest/recovery.rs | 171 ++++++++- crates/omnigraph/src/db/mod.rs | 1 + crates/omnigraph/src/db/recovery_audit.rs | 362 +++++++++++++++++++ crates/omnigraph/tests/recovery.rs | 326 ++++++++++++++++- 4 files changed, 842 insertions(+), 18 deletions(-) create mode 100644 crates/omnigraph/src/db/recovery_audit.rs diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 5270c53..2a5ed72 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -40,14 +40,28 @@ //! //! See `.context/mr-847-design.md` for the full design. +use std::collections::HashMap; + use lance::Dataset; use serde::{Deserialize, Serialize}; use tracing::warn; +use crate::db::commit_graph::CommitGraph; +use crate::db::recovery_audit::{ + now_micros, RecoveryAudit, RecoveryAuditRecord, RecoveryKind, TableOutcome, +}; use crate::error::{OmniError, Result}; use crate::storage::StorageAdapter; use super::Snapshot; +use super::publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; +use super::{ManifestChange, SubTableUpdate}; + +/// System actor identifier recorded on every recovery commit. Operators +/// distinguish recovery commits from user commits in `omnigraph commit list` +/// by filtering on this actor; the original sidecar's actor (if any) flows +/// into the audit row's `recovery_for_actor` field. +pub(crate) const RECOVERY_ACTOR: &str = "omnigraph:recovery"; /// Subdirectory under the repo root holding sidecar files. pub(crate) const RECOVERY_DIR_NAME: &str = "__recovery"; @@ -450,13 +464,13 @@ async fn process_sidecar( // investigate. This includes any classification of // InvariantViolation (Lance HEAD < manifest pinned: should be // impossible). - return Err(OmniError::manifest_internal(format!( + Err(OmniError::manifest_internal(format!( "recovery sidecar '{}' has invariant violation; refusing to act \ — operator review required (sidecar at '{}', classifications: {:?})", sidecar.operation_id, sidecar_uri(root_uri, &sidecar.operation_id), classifications, - ))); + ))) } SidecarDecision::RollBack => { warn!( @@ -470,6 +484,7 @@ async fn process_sidecar( // expected_version — no action. The fragment-set short-circuit // in restore_table_to_version makes drift-with-equivalent-content // a no-op (sound: equal fragment-ids ⇒ equal content). + let mut outcomes = Vec::with_capacity(sidecar.tables.len()); for (pin, cls) in sidecar.tables.iter().zip(classifications.iter()) { if matches!( cls, @@ -478,28 +493,156 @@ async fn process_sidecar( | TableClassification::UnexpectedMultistep ) { restore_table_to_version(&pin.table_path, pin.expected_version).await?; + outcomes.push(TableOutcome { + table_key: pin.table_key.clone(), + from_version: snapshot + .entry(&pin.table_key) + .map(|e| e.table_version) + .unwrap_or(0), + to_version: pin.expected_version, + }); } } - // Audit row write deferred to Phase 4 (recovery audit model). - // For now: delete sidecar as the final step. + // Manifest pin doesn't move on rollback; record an audit-only + // commit at the existing version so operators can correlate via + // `omnigraph commit list --filter actor=omnigraph:recovery`. + record_audit( + root_uri, + sidecar, + snapshot.version(), + RecoveryKind::RolledBack, + outcomes, + ) + .await?; delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?; Ok(()) } SidecarDecision::RollForward => { - // Phase 4 implements the audit row + ManifestBatchPublisher - // pin extension. Until then, surface loudly so we don't - // silently leave drift. - Err(OmniError::manifest_internal(format!( - "recovery sidecar '{}' is roll-forward eligible but the \ - roll-forward path is not yet implemented (MR-847 Phase 4); \ - sidecar left in place at '{}'", - sidecar.operation_id, - sidecar_uri(root_uri, &sidecar.operation_id), - ))) + warn!( + operation_id = sidecar.operation_id.as_str(), + writer_kind = ?sidecar.writer_kind, + "recovery: rolling forward sidecar (Phase B completed; \ + Phase C did not land)" + ); + let new_manifest_version = roll_forward_all(root_uri, sidecar).await?; + let outcomes: Vec = sidecar + .tables + .iter() + .map(|pin| TableOutcome { + table_key: pin.table_key.clone(), + from_version: pin.expected_version, + to_version: pin.post_commit_pin, + }) + .collect(); + record_audit( + root_uri, + sidecar, + new_manifest_version, + RecoveryKind::RolledForward, + outcomes, + ) + .await?; + delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?; + Ok(()) } } } +/// Atomically extend every table's manifest pin from `expected_version` to +/// `post_commit_pin` via a single `ManifestBatchPublisher::publish` call. +/// Returns the new manifest version produced by the publish. +/// +/// All-or-nothing at the substrate: the publish writes one `__manifest` +/// row-level CAS that either advances every listed pin together or fails +/// with `ExpectedVersionMismatch` (no partial publish). The publisher's +/// internal `PUBLISHER_RETRY_BUDGET = 5` handles transient row-level CAS +/// contention; persistent contention surfaces the typed conflict error to +/// the recovery sweep, which leaves the sidecar in place for the next +/// open's retry. +async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result { + let mut updates: Vec = Vec::with_capacity(sidecar.tables.len()); + let mut expected: HashMap = HashMap::with_capacity(sidecar.tables.len()); + + for pin in &sidecar.tables { + // Read the post-commit dataset at `post_commit_pin` to capture the + // row count + version metadata that the manifest row needs. Cheap: + // these are manifest-level values, not a row scan. + let post_ds = Dataset::open(&pin.table_path) + .await + .map_err(|e| OmniError::Lance(e.to_string()))? + .checkout_version(pin.post_commit_pin) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + + let row_count = post_ds + .count_rows(None) + .await + .map_err(|e| OmniError::Lance(e.to_string()))? as u64; + + let table_relative_path = super::table_path_for_table_key(&pin.table_key)?; + let version_metadata = + super::metadata::TableVersionMetadata::from_dataset( + root_uri, + &table_relative_path, + &post_ds, + )?; + + updates.push(ManifestChange::Update(SubTableUpdate { + table_key: pin.table_key.clone(), + table_version: pin.post_commit_pin, + table_branch: sidecar.branch.clone(), + row_count, + version_metadata, + })); + expected.insert(pin.table_key.clone(), pin.expected_version); + } + + let publisher = GraphNamespacePublisher::new(root_uri, sidecar.branch.as_deref()); + let new_dataset = publisher.publish(&updates, &expected).await?; + Ok(new_dataset.version().version) +} + +/// Append the audit row describing this recovery action. +/// +/// Two-part write: (a) `_graph_commits.lance` row anchored on the recovery +/// actor (`omnigraph:recovery`); (b) `_graph_commit_recoveries.lance` row +/// linking back to (a) and naming the original actor + per-table outcomes. +/// Same not-atomic-pair-write shape as the existing `_graph_commits` +/// + `_graph_commit_actors` split — a crash between the two leaves an +/// orphan commit row with no audit row. The recovery sweep tolerates this: +/// on re-entry the classifier surfaces `NoMovement` for already-restored / +/// already-published tables, the action is a no-op, and the audit append +/// is retried. +async fn record_audit( + root_uri: &str, + sidecar: &RecoverySidecar, + manifest_version: u64, + kind: RecoveryKind, + outcomes: Vec, +) -> Result<()> { + let mut graph = CommitGraph::open(root_uri).await?; + let graph_commit_id = graph + .append_commit( + sidecar.branch.as_deref(), + manifest_version, + Some(RECOVERY_ACTOR), + ) + .await?; + let mut audit = RecoveryAudit::open(root_uri).await?; + audit + .append(RecoveryAuditRecord { + graph_commit_id, + recovery_kind: kind, + recovery_for_actor: sidecar.actor_id.clone(), + operation_id: sidecar.operation_id.clone(), + sidecar_writer_kind: format!("{:?}", sidecar.writer_kind), + per_table_outcomes: outcomes, + created_at: now_micros()?, + }) + .await?; + Ok(()) +} + async fn open_lance_head(table_path: &str) -> Result { let ds = Dataset::open(table_path) .await diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index 16b62b0..7a335fd 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -2,6 +2,7 @@ pub mod commit_graph; pub mod graph_coordinator; pub mod manifest; mod omnigraph; +mod recovery_audit; mod run_registry; mod schema_state; diff --git a/crates/omnigraph/src/db/recovery_audit.rs b/crates/omnigraph/src/db/recovery_audit.rs new file mode 100644 index 0000000..bf801ce --- /dev/null +++ b/crates/omnigraph/src/db/recovery_audit.rs @@ -0,0 +1,362 @@ +//! MR-847 Phase 4 — Recovery audit row storage in `_graph_commit_recoveries.lance`. +//! +//! Sibling to `_graph_commits.lance` (`commit_graph.rs`). Each successful +//! recovery sweep — roll-forward or roll-back — records one row here so +//! operators investigating a sidecar-attributed mutation can correlate +//! `omnigraph commit list --filter actor=omnigraph:recovery` with the +//! original actor whose mutation was rolled forward / back. +//! +//! The schema-migration alternative (adding `recovery_for_actor` and +//! `recovery_kind` columns to `_graph_commits.lance` itself) was +//! considered and rejected for MR-847 — see `.context/mr-847-design.md` +//! § "Recovery audit model". Sibling-table is additive, doesn't bump +//! `INTERNAL_MANIFEST_SCHEMA_VERSION`, and can be removed in favor of a +//! schema migration later if the join cost matters. +//! +//! Atomicity caveat: append to `_graph_commit_recoveries.lance` is +//! sequential w.r.t. the `CommitGraph::append_commit` write. A crash +//! between the two leaves an orphan commit-graph row with no audit row. +//! Same shape as the existing `_graph_commits` + `_graph_commit_actors` +//! split; the recovery sweep tolerates it the same way (re-entry sees +//! `NoMovement` for already-restored / already-published tables; the +//! audit append is retried). + +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use arrow_array::{ + Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, +}; +use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use futures::TryStreamExt; +use lance::Dataset; +use lance::dataset::{WriteMode, WriteParams}; +use lance_file::version::LanceFileVersion; +use serde::{Deserialize, Serialize}; + +use crate::error::{OmniError, Result}; + +const RECOVERIES_DIR: &str = "_graph_commit_recoveries.lance"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) enum RecoveryKind { + RolledForward, + RolledBack, +} + +impl RecoveryKind { + fn as_str(self) -> &'static str { + match self { + RecoveryKind::RolledForward => "RolledForward", + RecoveryKind::RolledBack => "RolledBack", + } + } + + fn parse(s: &str) -> Result { + match s { + "RolledForward" => Ok(RecoveryKind::RolledForward), + "RolledBack" => Ok(RecoveryKind::RolledBack), + other => Err(OmniError::manifest_internal(format!( + "unknown recovery_kind '{}' in _graph_commit_recoveries.lance", + other + ))), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct TableOutcome { + pub table_key: String, + /// For RolledForward: the prior manifest pin (== sidecar.expected_version). + /// For RolledBack: same. + pub from_version: u64, + /// For RolledForward: the new manifest pin (== sidecar.post_commit_pin). + /// For RolledBack: == sidecar.expected_version (Lance HEAD reverted). + pub to_version: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct RecoveryAuditRecord { + pub graph_commit_id: String, + pub recovery_kind: RecoveryKind, + pub recovery_for_actor: Option, + pub operation_id: String, + pub sidecar_writer_kind: String, + pub per_table_outcomes: Vec, + pub created_at: i64, +} + +pub(crate) struct RecoveryAudit { + root_uri: String, + dataset: Option, +} + +impl RecoveryAudit { + /// Open the recovery-audit dataset for the repo, or return a handle + /// with no dataset yet (created on first append). Mirrors the + /// optional-dataset pattern from `_graph_commit_actors.lance`. + pub(crate) async fn open(root_uri: &str) -> Result { + let root = root_uri.trim_end_matches('/').to_string(); + let dataset = Dataset::open(&recoveries_uri(&root)).await.ok(); + Ok(Self { + root_uri: root, + dataset, + }) + } + + /// Append one recovery audit record. Lazily initializes the dataset + /// on first call (idempotent under racy creation via the same + /// `Dataset already exists` rebound as `_graph_commit_actors.lance`). + pub(crate) async fn append(&mut self, record: RecoveryAuditRecord) -> Result<()> { + let batch = recovery_record_to_batch(&record)?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], recoveries_schema()); + let mut dataset = match self.dataset.take() { + Some(dataset) => dataset, + None => create_recoveries_dataset(&self.root_uri).await?, + }; + dataset + .append(reader, None) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + self.dataset = Some(dataset); + Ok(()) + } + + /// Read every recorded recovery (test + audit-CLI surface). Ordered by + /// `created_at` ascending. + pub(crate) async fn list(&self) -> Result> { + let dataset = match &self.dataset { + Some(dataset) => dataset, + None => return Ok(Vec::new()), + }; + let batches: Vec = dataset + .scan() + .try_into_stream() + .await + .map_err(|e| OmniError::Lance(e.to_string()))? + .try_collect() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + + let mut out = Vec::new(); + for batch in batches { + for row in 0..batch.num_rows() { + out.push(decode_row(&batch, row)?); + } + } + out.sort_by_key(|r| r.created_at); + Ok(out) + } +} + +fn recoveries_uri(root_uri: &str) -> String { + format!("{}/{}", root_uri.trim_end_matches('/'), RECOVERIES_DIR) +} + +fn recoveries_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("graph_commit_id", DataType::Utf8, false), + Field::new("recovery_kind", DataType::Utf8, false), + Field::new("recovery_for_actor", DataType::Utf8, true), + Field::new("operation_id", DataType::Utf8, false), + Field::new("sidecar_writer_kind", DataType::Utf8, false), + // per_table_outcomes is serialized as a JSON string. The audit + // table is queried infrequently; a JSON column avoids needing + // a list-of-struct schema, which would make schema evolution + // (adding fields per outcome) more painful. + Field::new("per_table_outcomes_json", DataType::Utf8, false), + Field::new( + "created_at", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ), + ])) +} + +async fn create_recoveries_dataset(root_uri: &str) -> Result { + let uri = recoveries_uri(root_uri); + let batch = RecordBatch::new_empty(recoveries_schema()); + let reader = RecordBatchIterator::new(vec![Ok(batch)], recoveries_schema()); + let params = WriteParams { + mode: WriteMode::Create, + enable_stable_row_ids: true, + data_storage_version: Some(LanceFileVersion::V2_2), + ..Default::default() + }; + match Dataset::write(reader, &uri as &str, Some(params)).await { + Ok(dataset) => Ok(dataset), + Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri) + .await + .map_err(|open_err| OmniError::Lance(open_err.to_string())), + Err(err) => Err(OmniError::Lance(err.to_string())), + } +} + +fn recovery_record_to_batch(record: &RecoveryAuditRecord) -> Result { + let outcomes_json = serde_json::to_string(&record.per_table_outcomes).map_err(|e| { + OmniError::manifest_internal(format!( + "failed to serialize per_table_outcomes for recovery audit: {}", + e + )) + })?; + RecordBatch::try_new( + recoveries_schema(), + vec![ + Arc::new(StringArray::from(vec![record.graph_commit_id.clone()])), + Arc::new(StringArray::from(vec![record.recovery_kind.as_str()])), + Arc::new(StringArray::from(vec![record + .recovery_for_actor + .clone()])), + Arc::new(StringArray::from(vec![record.operation_id.clone()])), + Arc::new(StringArray::from(vec![record.sidecar_writer_kind.clone()])), + Arc::new(StringArray::from(vec![outcomes_json])), + Arc::new(TimestampMicrosecondArray::from(vec![record.created_at])), + ], + ) + .map_err(|e| OmniError::Lance(e.to_string())) +} + +fn decode_row(batch: &RecordBatch, row: usize) -> Result { + let str_col = |name: &str| -> Result<&StringArray> { + batch + .column_by_name(name) + .ok_or_else(|| OmniError::manifest_internal(format!("missing column '{}' in recovery audit", name)))? + .as_any() + .downcast_ref::() + .ok_or_else(|| OmniError::manifest_internal(format!("column '{}' has wrong type", name))) + }; + let ts_col = batch + .column_by_name("created_at") + .ok_or_else(|| OmniError::manifest_internal("missing 'created_at' column".to_string()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OmniError::manifest_internal("'created_at' column has wrong type".to_string()) + })?; + + let graph_commit_ids = str_col("graph_commit_id")?; + let kinds = str_col("recovery_kind")?; + let for_actors = str_col("recovery_for_actor")?; + let op_ids = str_col("operation_id")?; + let writers = str_col("sidecar_writer_kind")?; + let outcomes_json = str_col("per_table_outcomes_json")?; + + let outcomes: Vec = + serde_json::from_str(outcomes_json.value(row)).map_err(|e| { + OmniError::manifest_internal(format!( + "failed to deserialize per_table_outcomes_json from recovery audit: {}", + e + )) + })?; + + Ok(RecoveryAuditRecord { + graph_commit_id: graph_commit_ids.value(row).to_string(), + recovery_kind: RecoveryKind::parse(kinds.value(row))?, + recovery_for_actor: if for_actors.is_null(row) { + None + } else { + Some(for_actors.value(row).to_string()) + }, + operation_id: op_ids.value(row).to_string(), + sidecar_writer_kind: writers.value(row).to_string(), + per_table_outcomes: outcomes, + created_at: ts_col.value(row), + }) +} + +pub(crate) fn now_micros() -> Result { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_micros() as i64) + .map_err(|e| { + OmniError::manifest_internal(format!("system clock before unix epoch: {}", e)) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_record() -> RecoveryAuditRecord { + RecoveryAuditRecord { + graph_commit_id: "01H000000000000000000000XX".to_string(), + recovery_kind: RecoveryKind::RolledForward, + recovery_for_actor: Some("act-alice".to_string()), + operation_id: "01H000000000000000000000OP".to_string(), + sidecar_writer_kind: "Mutation".to_string(), + per_table_outcomes: vec![ + TableOutcome { + table_key: "node:Person".to_string(), + from_version: 5, + to_version: 6, + }, + TableOutcome { + table_key: "edge:Knows".to_string(), + from_version: 12, + to_version: 13, + }, + ], + created_at: 1_700_000_000_000_000, + } + } + + #[tokio::test] + async fn recovery_audit_round_trips_through_lance() { + let dir = tempfile::tempdir().unwrap(); + let root = dir.path().to_str().unwrap(); + + let mut audit = RecoveryAudit::open(root).await.unwrap(); + // Empty repo: list returns empty. + assert!(audit.list().await.unwrap().is_empty()); + + // Append + list. + let record = sample_record(); + audit.append(record.clone()).await.unwrap(); + let listed = audit.list().await.unwrap(); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0], record); + + // Append a second record; both visible, sorted by created_at. + let mut second = sample_record(); + second.graph_commit_id = "01H000000000000000000000YY".to_string(); + second.recovery_kind = RecoveryKind::RolledBack; + second.recovery_for_actor = None; + second.created_at = record.created_at + 1; + audit.append(second.clone()).await.unwrap(); + + let listed = audit.list().await.unwrap(); + assert_eq!(listed.len(), 2); + assert_eq!(listed[0], record); + assert_eq!(listed[1], second); + } + + #[tokio::test] + async fn recovery_audit_persists_across_open_cycles() { + let dir = tempfile::tempdir().unwrap(); + let root = dir.path().to_str().unwrap(); + + { + let mut audit = RecoveryAudit::open(root).await.unwrap(); + audit.append(sample_record()).await.unwrap(); + } + + let audit = RecoveryAudit::open(root).await.unwrap(); + let listed = audit.list().await.unwrap(); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0], sample_record()); + } + + #[test] + fn recovery_kind_round_trips_through_string() { + assert_eq!( + RecoveryKind::parse("RolledForward").unwrap(), + RecoveryKind::RolledForward, + ); + assert_eq!( + RecoveryKind::parse("RolledBack").unwrap(), + RecoveryKind::RolledBack, + ); + assert!(RecoveryKind::parse("Garbage").is_err()); + } +} diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index b9bf6f6..bc68fc1 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -5,10 +5,9 @@ //! Phase B → Phase C residual, reopen the engine, and assert the sweep's //! decision-tree dispatch did the right thing. //! -//! The four tests here pin Phase 3 scope (open-time invocation, -//! `OpenMode::{ReadWrite, ReadOnly}`, roll-back path, schema-version -//! refusal). The roll-forward path is Phase 4 — exercised by tests in -//! the Phase 4 commit. +//! The Phase 3 tests pin open-time invocation, `OpenMode::{ReadWrite, +//! ReadOnly}`, the roll-back path, and schema-version refusal. The Phase +//! 4 tests pin the roll-forward path + audit row recording. use std::path::Path; @@ -246,3 +245,322 @@ async fn recovery_rolls_back_synthetic_drift_on_open() { "second open must be a clean no-op" ); } + +// ===================================================================== +// Phase 4 — roll-forward path + audit row recording +// ===================================================================== + +/// Helper: count rows in `_graph_commit_recoveries.lance` at the given root. +async fn count_recovery_audit_rows(repo_root: &Path) -> usize { + let recoveries_dir = repo_root.join("_graph_commit_recoveries.lance"); + if !recoveries_dir.exists() { + return 0; + } + let ds = Dataset::open(recoveries_dir.to_str().unwrap()) + .await + .expect("recoveries dataset opens"); + use futures::TryStreamExt; + let batches: Vec = + ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap(); + batches.iter().map(|b| b.num_rows()).sum() +} + +/// Helper: read the most recent recovery audit row's `recovery_kind`, +/// `recovery_for_actor`, and `operation_id`. Returns `None` if no rows. +async fn read_latest_recovery_audit( + repo_root: &Path, +) -> Option<(String, Option, String, String)> { + let recoveries_dir = repo_root.join("_graph_commit_recoveries.lance"); + if !recoveries_dir.exists() { + return None; + } + let ds = Dataset::open(recoveries_dir.to_str().unwrap()) + .await + .ok()?; + use arrow_array::{Array, StringArray}; + use futures::TryStreamExt; + let batches: Vec = + ds.scan().try_into_stream().await.ok()?.try_collect().await.ok()?; + let last_batch = batches.iter().filter(|b| b.num_rows() > 0).last()?; + let row = last_batch.num_rows() - 1; + let kinds = last_batch + .column_by_name("recovery_kind")? + .as_any() + .downcast_ref::()?; + let for_actors = last_batch + .column_by_name("recovery_for_actor")? + .as_any() + .downcast_ref::()?; + let ops = last_batch + .column_by_name("operation_id")? + .as_any() + .downcast_ref::()?; + let writers = last_batch + .column_by_name("sidecar_writer_kind")? + .as_any() + .downcast_ref::()?; + Some(( + kinds.value(row).to_string(), + if for_actors.is_null(row) { + None + } else { + Some(for_actors.value(row).to_string()) + }, + ops.value(row).to_string(), + writers.value(row).to_string(), + )) +} + +/// Helper: count `_graph_commits.lance` rows tagged with the recovery actor. +async fn count_recovery_actor_commits(repo_root: &Path) -> usize { + let actors_dir = repo_root.join("_graph_commit_actors.lance"); + if !actors_dir.exists() { + return 0; + } + let ds = Dataset::open(actors_dir.to_str().unwrap()).await.unwrap(); + use arrow_array::{Array, StringArray}; + use futures::TryStreamExt; + let batches: Vec = + ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap(); + let mut count = 0; + for batch in &batches { + let actors = batch + .column_by_name("actor_id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..actors.len() { + if actors.value(i) == "omnigraph:recovery" { + count += 1; + } + } + } + count +} + +#[tokio::test] +async fn recovery_rolls_forward_after_phase_b_completes() { + use omnigraph::loader::{LoadMode, load_jsonl}; + use omnigraph::table_store::TableStore; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + // Bootstrap: init + load 2 rows. Manifest pin and Lance HEAD both + // advance via the legitimate publisher path. + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} +{"type":"Person","data":{"name":"bob","age":25}} +"#; + load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + drop(db); + + let person_uri = node_table_uri(uri, "Person"); + let store = TableStore::new(uri); + let mut ds = Dataset::open(&person_uri).await.unwrap(); + let head_before = ds.version().version; + + // Synthesize a successful Phase B: advance Lance HEAD by one + // (delete_where with no-match — no fragment changes, but version bumps). + let _ = store + .delete_where(&person_uri, &mut ds, "1 = 2") + .await + .unwrap(); + let head_after = ds.version().version; + assert_eq!(head_after, head_before + 1); + + // Drop a sidecar that MATCHES the synthesized state + // (expected=head_before, post_commit_pin=head_after) — classifier + // returns RolledPastExpected, decision is RollForward. + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H00000000000000000000RF", + "started_at": "0", + "branch": null, + "actor_id": "act-alice", + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{}", + "expected_version": {}, + "post_commit_pin": {} + }} + ] + }}"#, + person_uri, head_before, head_after + ); + write_sidecar_file(dir.path(), "01H00000000000000000000RF", &sidecar_json); + + // Reopen — sweep must roll forward, advancing the manifest pin to + // head_after via a single ManifestBatchPublisher::publish call. + let _db = Omnigraph::open(uri).await.unwrap(); + + // Sidecar deleted (sweep completed end-to-end). + assert!( + !list_recovery_dir(dir.path()).contains(&"01H00000000000000000000RF.json".to_string()), + "sidecar must be deleted after successful roll-forward" + ); + + // Audit row recorded. + assert_eq!( + count_recovery_audit_rows(dir.path()).await, + 1, + "roll-forward must record exactly one audit row" + ); + assert_eq!( + count_recovery_actor_commits(dir.path()).await, + 1, + "roll-forward must record exactly one commit-graph row tagged with omnigraph:recovery" + ); + let audit = read_latest_recovery_audit(dir.path()).await; + assert_eq!( + audit, + Some(( + "RolledForward".to_string(), + Some("act-alice".to_string()), + "01H00000000000000000000RF".to_string(), + "Mutation".to_string(), + )), + "audit row content mismatch" + ); + + // Idempotency: reopen is a no-op. + let _db2 = Omnigraph::open(uri).await.unwrap(); + assert!( + list_recovery_dir(dir.path()).is_empty(), + "second open must be a clean no-op" + ); + assert_eq!( + count_recovery_audit_rows(dir.path()).await, + 1, + "second open must NOT record a new audit row" + ); +} + +#[tokio::test] +async fn recovery_rolls_back_records_audit_row_with_recovery_actor() { + use omnigraph::loader::{LoadMode, load_jsonl}; + use omnigraph::table_store::TableStore; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} +"#; + load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + drop(db); + + let person_uri = node_table_uri(uri, "Person"); + let store = TableStore::new(uri); + let mut ds = Dataset::open(&person_uri).await.unwrap(); + let head_before = ds.version().version; + let _ = store + .delete_where(&person_uri, &mut ds, "1 = 2") + .await + .unwrap(); + let head_after = ds.version().version; + let _ = head_after; + + // Sidecar with MISMATCHED post_commit_pin → classifier returns + // UnexpectedAtP1 → decision is RollBack. + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H00000000000000000000AB", + "started_at": "0", + "branch": null, + "actor_id": "act-bob", + "writer_kind": "Load", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{}", + "expected_version": {}, + "post_commit_pin": {} + }} + ] + }}"#, + person_uri, head_before, head_before + ); + write_sidecar_file(dir.path(), "01H00000000000000000000AB", &sidecar_json); + + let _db = Omnigraph::open(uri).await.unwrap(); + + // Audit row recorded for RolledBack. + assert_eq!(count_recovery_audit_rows(dir.path()).await, 1); + assert_eq!(count_recovery_actor_commits(dir.path()).await, 1); + let audit = read_latest_recovery_audit(dir.path()).await; + assert_eq!( + audit, + Some(( + "RolledBack".to_string(), + Some("act-bob".to_string()), + "01H00000000000000000000AB".to_string(), + "Load".to_string(), + )), + ); +} + +#[tokio::test] +async fn recovery_rolls_forward_with_null_actor() { + use omnigraph::loader::{LoadMode, load_jsonl}; + use omnigraph::table_store::TableStore; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} +"#; + load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + drop(db); + + let person_uri = node_table_uri(uri, "Person"); + let store = TableStore::new(uri); + let mut ds = Dataset::open(&person_uri).await.unwrap(); + let head_before = ds.version().version; + let _ = store + .delete_where(&person_uri, &mut ds, "1 = 2") + .await + .unwrap(); + let head_after = ds.version().version; + + // Sidecar with no actor_id (CLI-driven mutation; common case). + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H00000000000000000000NA", + "started_at": "0", + "branch": null, + "actor_id": null, + "writer_kind": "EnsureIndices", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{}", + "expected_version": {}, + "post_commit_pin": {} + }} + ] + }}"#, + person_uri, head_before, head_after + ); + write_sidecar_file(dir.path(), "01H00000000000000000000NA", &sidecar_json); + + let _db = Omnigraph::open(uri).await.unwrap(); + + let audit = read_latest_recovery_audit(dir.path()).await; + assert_eq!( + audit, + Some(( + "RolledForward".to_string(), + None, // recovery_for_actor is None when sidecar.actor_id is None + "01H00000000000000000000NA".to_string(), + "EnsureIndices".to_string(), + )), + ); +}