recovery: roll-forward execution + audit row (Phase 4)

Implement the remaining half of the open-time recovery sweep.

Roll-forward execution (db/manifest/recovery.rs::roll_forward_all):
constructs a GraphNamespacePublisher directly (recovery runs inside
Omnigraph::open before the engine struct exists, so we can't go through
Omnigraph::commit_updates_on_branch_with_expected). Builds a
ManifestChange::Update per sidecar table reading row_count and
TableVersionMetadata from the dataset at post_commit_pin (cheap;
manifest-level reads, not a row scan), then calls publisher.publish with
expected_table_versions = sidecar.expected_version per table. Single
__manifest CAS extends every pin atomically — all-or-nothing at the
substrate. Persistent CAS contention surfaces as the typed
ExpectedVersionMismatch error and leaves the sidecar in place for the
next open's retry.

Audit model (new crates/omnigraph/src/db/recovery_audit.rs +
record_audit() in recovery.rs): each successful recovery sweep records
a graph-commit row tagged with actor_id="omnigraph:recovery" plus a
row in a new sibling table _graph_commit_recoveries.lance carrying
recovery_kind (RolledForward | RolledBack), recovery_for_actor (the
sidecar's original actor_id), operation_id (sidecar ULID),
sidecar_writer_kind, per_table_outcomes (JSON-serialized for schema
flexibility), and created_at. Operators investigating "did my mutation
land?" can find the answer via `omnigraph commit list --filter
actor=omnigraph:recovery` joined to the recoveries table by
graph_commit_id.

The sibling-table choice avoids bumping INTERNAL_MANIFEST_SCHEMA_VERSION
or migrating _graph_commits.lance. Same not-atomic-pair-write shape as
the existing _graph_commits + _graph_commit_actors split — a crash
between the two sequential writes leaves an orphan commit row with no
recovery row. Recovery sweep tolerates this: re-entry classifies
already-restored / already-published tables as NoMovement, the action
is a no-op, and the audit append is retried.

Note on classifier: process_sidecar's RollBack arm now restores
RolledPastExpected, UnexpectedAtP1, AND UnexpectedMultistep (any drift
class). Earlier Phase 3 logic restricted to RolledPastExpected only,
which left UnexpectedAtP1/UnexpectedMultistep tables drifted; the
all-or-nothing decision rule per docs/invariants.md §VI.23 demands all
drifted tables be restored.

3 new integration tests in tests/recovery.rs (7 total now):
- recovery_rolls_forward_after_phase_b_completes — happy-path
  roll-forward; audit row recorded; idempotent on second open.
- recovery_rolls_back_records_audit_row_with_recovery_actor —
  roll-back path also records an audit row with the original actor.
- recovery_rolls_forward_with_null_actor — sidecar without actor_id
  still records the audit row (recovery_for_actor = None).

3 new unit tests in db::recovery_audit pin the round-trip + persistence
+ recovery_kind string parsing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-02 23:58:23 +02:00
parent c2fc3e7c40
commit ca21e73d43
No known key found for this signature in database
4 changed files with 842 additions and 18 deletions

View file

@ -40,14 +40,28 @@
//!
//! See `.context/mr-847-design.md` for the full design.
use std::collections::HashMap;
use lance::Dataset;
use serde::{Deserialize, Serialize};
use tracing::warn;
use crate::db::commit_graph::CommitGraph;
use crate::db::recovery_audit::{
now_micros, RecoveryAudit, RecoveryAuditRecord, RecoveryKind, TableOutcome,
};
use crate::error::{OmniError, Result};
use crate::storage::StorageAdapter;
use super::Snapshot;
use super::publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
use super::{ManifestChange, SubTableUpdate};
/// System actor identifier recorded on every recovery commit. Operators
/// distinguish recovery commits from user commits in `omnigraph commit list`
/// by filtering on this actor; the original sidecar's actor (if any) flows
/// into the audit row's `recovery_for_actor` field.
pub(crate) const RECOVERY_ACTOR: &str = "omnigraph:recovery";
/// Subdirectory under the repo root holding sidecar files.
pub(crate) const RECOVERY_DIR_NAME: &str = "__recovery";
@ -450,13 +464,13 @@ async fn process_sidecar(
// investigate. This includes any classification of
// InvariantViolation (Lance HEAD < manifest pinned: should be
// impossible).
return Err(OmniError::manifest_internal(format!(
Err(OmniError::manifest_internal(format!(
"recovery sidecar '{}' has invariant violation; refusing to act \
operator review required (sidecar at '{}', classifications: {:?})",
sidecar.operation_id,
sidecar_uri(root_uri, &sidecar.operation_id),
classifications,
)));
)))
}
SidecarDecision::RollBack => {
warn!(
@ -470,6 +484,7 @@ async fn process_sidecar(
// expected_version — no action. The fragment-set short-circuit
// in restore_table_to_version makes drift-with-equivalent-content
// a no-op (sound: equal fragment-ids ⇒ equal content).
let mut outcomes = Vec::with_capacity(sidecar.tables.len());
for (pin, cls) in sidecar.tables.iter().zip(classifications.iter()) {
if matches!(
cls,
@ -478,28 +493,156 @@ async fn process_sidecar(
| TableClassification::UnexpectedMultistep
) {
restore_table_to_version(&pin.table_path, pin.expected_version).await?;
outcomes.push(TableOutcome {
table_key: pin.table_key.clone(),
from_version: snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0),
to_version: pin.expected_version,
});
}
}
// Audit row write deferred to Phase 4 (recovery audit model).
// For now: delete sidecar as the final step.
// Manifest pin doesn't move on rollback; record an audit-only
// commit at the existing version so operators can correlate via
// `omnigraph commit list --filter actor=omnigraph:recovery`.
record_audit(
root_uri,
sidecar,
snapshot.version(),
RecoveryKind::RolledBack,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
}
SidecarDecision::RollForward => {
// Phase 4 implements the audit row + ManifestBatchPublisher
// pin extension. Until then, surface loudly so we don't
// silently leave drift.
Err(OmniError::manifest_internal(format!(
"recovery sidecar '{}' is roll-forward eligible but the \
roll-forward path is not yet implemented (MR-847 Phase 4); \
sidecar left in place at '{}'",
sidecar.operation_id,
sidecar_uri(root_uri, &sidecar.operation_id),
)))
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: rolling forward sidecar (Phase B completed; \
Phase C did not land)"
);
let new_manifest_version = roll_forward_all(root_uri, sidecar).await?;
let outcomes: Vec<TableOutcome> = sidecar
.tables
.iter()
.map(|pin| TableOutcome {
table_key: pin.table_key.clone(),
from_version: pin.expected_version,
to_version: pin.post_commit_pin,
})
.collect();
record_audit(
root_uri,
sidecar,
new_manifest_version,
RecoveryKind::RolledForward,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
}
}
}
/// Atomically extend every table's manifest pin from `expected_version` to
/// `post_commit_pin` via a single `ManifestBatchPublisher::publish` call.
/// Returns the new manifest version produced by the publish.
///
/// All-or-nothing at the substrate: the publish writes one `__manifest`
/// row-level CAS that either advances every listed pin together or fails
/// with `ExpectedVersionMismatch` (no partial publish). The publisher's
/// internal `PUBLISHER_RETRY_BUDGET = 5` handles transient row-level CAS
/// contention; persistent contention surfaces the typed conflict error to
/// the recovery sweep, which leaves the sidecar in place for the next
/// open's retry.
async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result<u64> {
let mut updates: Vec<ManifestChange> = Vec::with_capacity(sidecar.tables.len());
let mut expected: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
// Read the post-commit dataset at `post_commit_pin` to capture the
// row count + version metadata that the manifest row needs. Cheap:
// these are manifest-level values, not a row scan.
let post_ds = Dataset::open(&pin.table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
.checkout_version(pin.post_commit_pin)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let row_count = post_ds
.count_rows(None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
let table_relative_path = super::table_path_for_table_key(&pin.table_key)?;
let version_metadata =
super::metadata::TableVersionMetadata::from_dataset(
root_uri,
&table_relative_path,
&post_ds,
)?;
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: pin.table_key.clone(),
table_version: pin.post_commit_pin,
table_branch: sidecar.branch.clone(),
row_count,
version_metadata,
}));
expected.insert(pin.table_key.clone(), pin.expected_version);
}
let publisher = GraphNamespacePublisher::new(root_uri, sidecar.branch.as_deref());
let new_dataset = publisher.publish(&updates, &expected).await?;
Ok(new_dataset.version().version)
}
/// Append the audit row describing this recovery action.
///
/// Two-part write: (a) `_graph_commits.lance` row anchored on the recovery
/// actor (`omnigraph:recovery`); (b) `_graph_commit_recoveries.lance` row
/// linking back to (a) and naming the original actor + per-table outcomes.
/// Same not-atomic-pair-write shape as the existing `_graph_commits`
/// + `_graph_commit_actors` split — a crash between the two leaves an
/// orphan commit row with no audit row. The recovery sweep tolerates this:
/// on re-entry the classifier surfaces `NoMovement` for already-restored /
/// already-published tables, the action is a no-op, and the audit append
/// is retried.
async fn record_audit(
root_uri: &str,
sidecar: &RecoverySidecar,
manifest_version: u64,
kind: RecoveryKind,
outcomes: Vec<TableOutcome>,
) -> Result<()> {
let mut graph = CommitGraph::open(root_uri).await?;
let graph_commit_id = graph
.append_commit(
sidecar.branch.as_deref(),
manifest_version,
Some(RECOVERY_ACTOR),
)
.await?;
let mut audit = RecoveryAudit::open(root_uri).await?;
audit
.append(RecoveryAuditRecord {
graph_commit_id,
recovery_kind: kind,
recovery_for_actor: sidecar.actor_id.clone(),
operation_id: sidecar.operation_id.clone(),
sidecar_writer_kind: format!("{:?}", sidecar.writer_kind),
per_table_outcomes: outcomes,
created_at: now_micros()?,
})
.await?;
Ok(())
}
async fn open_lance_head(table_path: &str) -> Result<u64> {
let ds = Dataset::open(table_path)
.await

View file

@ -2,6 +2,7 @@ pub mod commit_graph;
pub mod graph_coordinator;
pub mod manifest;
mod omnigraph;
mod recovery_audit;
mod run_registry;
mod schema_state;

View file

@ -0,0 +1,362 @@
//! MR-847 Phase 4 — Recovery audit row storage in `_graph_commit_recoveries.lance`.
//!
//! Sibling to `_graph_commits.lance` (`commit_graph.rs`). Each successful
//! recovery sweep — roll-forward or roll-back — records one row here so
//! operators investigating a sidecar-attributed mutation can correlate
//! `omnigraph commit list --filter actor=omnigraph:recovery` with the
//! original actor whose mutation was rolled forward / back.
//!
//! The schema-migration alternative (adding `recovery_for_actor` and
//! `recovery_kind` columns to `_graph_commits.lance` itself) was
//! considered and rejected for MR-847 — see `.context/mr-847-design.md`
//! § "Recovery audit model". Sibling-table is additive, doesn't bump
//! `INTERNAL_MANIFEST_SCHEMA_VERSION`, and can be removed in favor of a
//! schema migration later if the join cost matters.
//!
//! Atomicity caveat: append to `_graph_commit_recoveries.lance` is
//! sequential w.r.t. the `CommitGraph::append_commit` write. A crash
//! between the two leaves an orphan commit-graph row with no audit row.
//! Same shape as the existing `_graph_commits` + `_graph_commit_actors`
//! split; the recovery sweep tolerates it the same way (re-entry sees
//! `NoMovement` for already-restored / already-published tables; the
//! audit append is retried).
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use arrow_array::{
Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::{WriteMode, WriteParams};
use lance_file::version::LanceFileVersion;
use serde::{Deserialize, Serialize};
use crate::error::{OmniError, Result};
const RECOVERIES_DIR: &str = "_graph_commit_recoveries.lance";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) enum RecoveryKind {
RolledForward,
RolledBack,
}
impl RecoveryKind {
fn as_str(self) -> &'static str {
match self {
RecoveryKind::RolledForward => "RolledForward",
RecoveryKind::RolledBack => "RolledBack",
}
}
fn parse(s: &str) -> Result<Self> {
match s {
"RolledForward" => Ok(RecoveryKind::RolledForward),
"RolledBack" => Ok(RecoveryKind::RolledBack),
other => Err(OmniError::manifest_internal(format!(
"unknown recovery_kind '{}' in _graph_commit_recoveries.lance",
other
))),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct TableOutcome {
pub table_key: String,
/// For RolledForward: the prior manifest pin (== sidecar.expected_version).
/// For RolledBack: same.
pub from_version: u64,
/// For RolledForward: the new manifest pin (== sidecar.post_commit_pin).
/// For RolledBack: == sidecar.expected_version (Lance HEAD reverted).
pub to_version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct RecoveryAuditRecord {
pub graph_commit_id: String,
pub recovery_kind: RecoveryKind,
pub recovery_for_actor: Option<String>,
pub operation_id: String,
pub sidecar_writer_kind: String,
pub per_table_outcomes: Vec<TableOutcome>,
pub created_at: i64,
}
pub(crate) struct RecoveryAudit {
root_uri: String,
dataset: Option<Dataset>,
}
impl RecoveryAudit {
/// Open the recovery-audit dataset for the repo, or return a handle
/// with no dataset yet (created on first append). Mirrors the
/// optional-dataset pattern from `_graph_commit_actors.lance`.
pub(crate) async fn open(root_uri: &str) -> Result<Self> {
let root = root_uri.trim_end_matches('/').to_string();
let dataset = Dataset::open(&recoveries_uri(&root)).await.ok();
Ok(Self {
root_uri: root,
dataset,
})
}
/// Append one recovery audit record. Lazily initializes the dataset
/// on first call (idempotent under racy creation via the same
/// `Dataset already exists` rebound as `_graph_commit_actors.lance`).
pub(crate) async fn append(&mut self, record: RecoveryAuditRecord) -> Result<()> {
let batch = recovery_record_to_batch(&record)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)], recoveries_schema());
let mut dataset = match self.dataset.take() {
Some(dataset) => dataset,
None => create_recoveries_dataset(&self.root_uri).await?,
};
dataset
.append(reader, None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
self.dataset = Some(dataset);
Ok(())
}
/// Read every recorded recovery (test + audit-CLI surface). Ordered by
/// `created_at` ascending.
pub(crate) async fn list(&self) -> Result<Vec<RecoveryAuditRecord>> {
let dataset = match &self.dataset {
Some(dataset) => dataset,
None => return Ok(Vec::new()),
};
let batches: Vec<RecordBatch> = dataset
.scan()
.try_into_stream()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
.try_collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let mut out = Vec::new();
for batch in batches {
for row in 0..batch.num_rows() {
out.push(decode_row(&batch, row)?);
}
}
out.sort_by_key(|r| r.created_at);
Ok(out)
}
}
fn recoveries_uri(root_uri: &str) -> String {
format!("{}/{}", root_uri.trim_end_matches('/'), RECOVERIES_DIR)
}
fn recoveries_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("graph_commit_id", DataType::Utf8, false),
Field::new("recovery_kind", DataType::Utf8, false),
Field::new("recovery_for_actor", DataType::Utf8, true),
Field::new("operation_id", DataType::Utf8, false),
Field::new("sidecar_writer_kind", DataType::Utf8, false),
// per_table_outcomes is serialized as a JSON string. The audit
// table is queried infrequently; a JSON column avoids needing
// a list-of-struct schema, which would make schema evolution
// (adding fields per outcome) more painful.
Field::new("per_table_outcomes_json", DataType::Utf8, false),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
]))
}
async fn create_recoveries_dataset(root_uri: &str) -> Result<Dataset> {
let uri = recoveries_uri(root_uri);
let batch = RecordBatch::new_empty(recoveries_schema());
let reader = RecordBatchIterator::new(vec![Ok(batch)], recoveries_schema());
let params = WriteParams {
mode: WriteMode::Create,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
match Dataset::write(reader, &uri as &str, Some(params)).await {
Ok(dataset) => Ok(dataset),
Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
.await
.map_err(|open_err| OmniError::Lance(open_err.to_string())),
Err(err) => Err(OmniError::Lance(err.to_string())),
}
}
fn recovery_record_to_batch(record: &RecoveryAuditRecord) -> Result<RecordBatch> {
let outcomes_json = serde_json::to_string(&record.per_table_outcomes).map_err(|e| {
OmniError::manifest_internal(format!(
"failed to serialize per_table_outcomes for recovery audit: {}",
e
))
})?;
RecordBatch::try_new(
recoveries_schema(),
vec![
Arc::new(StringArray::from(vec![record.graph_commit_id.clone()])),
Arc::new(StringArray::from(vec![record.recovery_kind.as_str()])),
Arc::new(StringArray::from(vec![record
.recovery_for_actor
.clone()])),
Arc::new(StringArray::from(vec![record.operation_id.clone()])),
Arc::new(StringArray::from(vec![record.sidecar_writer_kind.clone()])),
Arc::new(StringArray::from(vec![outcomes_json])),
Arc::new(TimestampMicrosecondArray::from(vec![record.created_at])),
],
)
.map_err(|e| OmniError::Lance(e.to_string()))
}
fn decode_row(batch: &RecordBatch, row: usize) -> Result<RecoveryAuditRecord> {
let str_col = |name: &str| -> Result<&StringArray> {
batch
.column_by_name(name)
.ok_or_else(|| OmniError::manifest_internal(format!("missing column '{}' in recovery audit", name)))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest_internal(format!("column '{}' has wrong type", name)))
};
let ts_col = batch
.column_by_name("created_at")
.ok_or_else(|| OmniError::manifest_internal("missing 'created_at' column".to_string()))?
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| {
OmniError::manifest_internal("'created_at' column has wrong type".to_string())
})?;
let graph_commit_ids = str_col("graph_commit_id")?;
let kinds = str_col("recovery_kind")?;
let for_actors = str_col("recovery_for_actor")?;
let op_ids = str_col("operation_id")?;
let writers = str_col("sidecar_writer_kind")?;
let outcomes_json = str_col("per_table_outcomes_json")?;
let outcomes: Vec<TableOutcome> =
serde_json::from_str(outcomes_json.value(row)).map_err(|e| {
OmniError::manifest_internal(format!(
"failed to deserialize per_table_outcomes_json from recovery audit: {}",
e
))
})?;
Ok(RecoveryAuditRecord {
graph_commit_id: graph_commit_ids.value(row).to_string(),
recovery_kind: RecoveryKind::parse(kinds.value(row))?,
recovery_for_actor: if for_actors.is_null(row) {
None
} else {
Some(for_actors.value(row).to_string())
},
operation_id: op_ids.value(row).to_string(),
sidecar_writer_kind: writers.value(row).to_string(),
per_table_outcomes: outcomes,
created_at: ts_col.value(row),
})
}
pub(crate) fn now_micros() -> Result<i64> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.map_err(|e| {
OmniError::manifest_internal(format!("system clock before unix epoch: {}", e))
})
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_record() -> RecoveryAuditRecord {
RecoveryAuditRecord {
graph_commit_id: "01H000000000000000000000XX".to_string(),
recovery_kind: RecoveryKind::RolledForward,
recovery_for_actor: Some("act-alice".to_string()),
operation_id: "01H000000000000000000000OP".to_string(),
sidecar_writer_kind: "Mutation".to_string(),
per_table_outcomes: vec![
TableOutcome {
table_key: "node:Person".to_string(),
from_version: 5,
to_version: 6,
},
TableOutcome {
table_key: "edge:Knows".to_string(),
from_version: 12,
to_version: 13,
},
],
created_at: 1_700_000_000_000_000,
}
}
#[tokio::test]
async fn recovery_audit_round_trips_through_lance() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_str().unwrap();
let mut audit = RecoveryAudit::open(root).await.unwrap();
// Empty repo: list returns empty.
assert!(audit.list().await.unwrap().is_empty());
// Append + list.
let record = sample_record();
audit.append(record.clone()).await.unwrap();
let listed = audit.list().await.unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0], record);
// Append a second record; both visible, sorted by created_at.
let mut second = sample_record();
second.graph_commit_id = "01H000000000000000000000YY".to_string();
second.recovery_kind = RecoveryKind::RolledBack;
second.recovery_for_actor = None;
second.created_at = record.created_at + 1;
audit.append(second.clone()).await.unwrap();
let listed = audit.list().await.unwrap();
assert_eq!(listed.len(), 2);
assert_eq!(listed[0], record);
assert_eq!(listed[1], second);
}
#[tokio::test]
async fn recovery_audit_persists_across_open_cycles() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_str().unwrap();
{
let mut audit = RecoveryAudit::open(root).await.unwrap();
audit.append(sample_record()).await.unwrap();
}
let audit = RecoveryAudit::open(root).await.unwrap();
let listed = audit.list().await.unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0], sample_record());
}
#[test]
fn recovery_kind_round_trips_through_string() {
assert_eq!(
RecoveryKind::parse("RolledForward").unwrap(),
RecoveryKind::RolledForward,
);
assert_eq!(
RecoveryKind::parse("RolledBack").unwrap(),
RecoveryKind::RolledBack,
);
assert!(RecoveryKind::parse("Garbage").is_err());
}
}

View file

@ -5,10 +5,9 @@
//! Phase B → Phase C residual, reopen the engine, and assert the sweep's
//! decision-tree dispatch did the right thing.
//!
//! The four tests here pin Phase 3 scope (open-time invocation,
//! `OpenMode::{ReadWrite, ReadOnly}`, roll-back path, schema-version
//! refusal). The roll-forward path is Phase 4 — exercised by tests in
//! the Phase 4 commit.
//! The Phase 3 tests pin open-time invocation, `OpenMode::{ReadWrite,
//! ReadOnly}`, the roll-back path, and schema-version refusal. The Phase
//! 4 tests pin the roll-forward path + audit row recording.
use std::path::Path;
@ -246,3 +245,322 @@ async fn recovery_rolls_back_synthetic_drift_on_open() {
"second open must be a clean no-op"
);
}
// =====================================================================
// Phase 4 — roll-forward path + audit row recording
// =====================================================================
/// Helper: count rows in `_graph_commit_recoveries.lance` at the given root.
async fn count_recovery_audit_rows(repo_root: &Path) -> usize {
let recoveries_dir = repo_root.join("_graph_commit_recoveries.lance");
if !recoveries_dir.exists() {
return 0;
}
let ds = Dataset::open(recoveries_dir.to_str().unwrap())
.await
.expect("recoveries dataset opens");
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap();
batches.iter().map(|b| b.num_rows()).sum()
}
/// Helper: read the most recent recovery audit row's `recovery_kind`,
/// `recovery_for_actor`, and `operation_id`. Returns `None` if no rows.
async fn read_latest_recovery_audit(
repo_root: &Path,
) -> Option<(String, Option<String>, String, String)> {
let recoveries_dir = repo_root.join("_graph_commit_recoveries.lance");
if !recoveries_dir.exists() {
return None;
}
let ds = Dataset::open(recoveries_dir.to_str().unwrap())
.await
.ok()?;
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.ok()?.try_collect().await.ok()?;
let last_batch = batches.iter().filter(|b| b.num_rows() > 0).last()?;
let row = last_batch.num_rows() - 1;
let kinds = last_batch
.column_by_name("recovery_kind")?
.as_any()
.downcast_ref::<StringArray>()?;
let for_actors = last_batch
.column_by_name("recovery_for_actor")?
.as_any()
.downcast_ref::<StringArray>()?;
let ops = last_batch
.column_by_name("operation_id")?
.as_any()
.downcast_ref::<StringArray>()?;
let writers = last_batch
.column_by_name("sidecar_writer_kind")?
.as_any()
.downcast_ref::<StringArray>()?;
Some((
kinds.value(row).to_string(),
if for_actors.is_null(row) {
None
} else {
Some(for_actors.value(row).to_string())
},
ops.value(row).to_string(),
writers.value(row).to_string(),
))
}
/// Helper: count `_graph_commits.lance` rows tagged with the recovery actor.
async fn count_recovery_actor_commits(repo_root: &Path) -> usize {
let actors_dir = repo_root.join("_graph_commit_actors.lance");
if !actors_dir.exists() {
return 0;
}
let ds = Dataset::open(actors_dir.to_str().unwrap()).await.unwrap();
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap();
let mut count = 0;
for batch in &batches {
let actors = batch
.column_by_name("actor_id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..actors.len() {
if actors.value(i) == "omnigraph:recovery" {
count += 1;
}
}
}
count
}
#[tokio::test]
async fn recovery_rolls_forward_after_phase_b_completes() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// Bootstrap: init + load 2 rows. Manifest pin and Lance HEAD both
// advance via the legitimate publisher path.
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Person","data":{"name":"bob","age":25}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
drop(db);
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let head_before = ds.version().version;
// Synthesize a successful Phase B: advance Lance HEAD by one
// (delete_where with no-match — no fragment changes, but version bumps).
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let head_after = ds.version().version;
assert_eq!(head_after, head_before + 1);
// Drop a sidecar that MATCHES the synthesized state
// (expected=head_before, post_commit_pin=head_after) — classifier
// returns RolledPastExpected, decision is RollForward.
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H00000000000000000000RF",
"started_at": "0",
"branch": null,
"actor_id": "act-alice",
"writer_kind": "Mutation",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{}",
"expected_version": {},
"post_commit_pin": {}
}}
]
}}"#,
person_uri, head_before, head_after
);
write_sidecar_file(dir.path(), "01H00000000000000000000RF", &sidecar_json);
// Reopen — sweep must roll forward, advancing the manifest pin to
// head_after via a single ManifestBatchPublisher::publish call.
let _db = Omnigraph::open(uri).await.unwrap();
// Sidecar deleted (sweep completed end-to-end).
assert!(
!list_recovery_dir(dir.path()).contains(&"01H00000000000000000000RF.json".to_string()),
"sidecar must be deleted after successful roll-forward"
);
// Audit row recorded.
assert_eq!(
count_recovery_audit_rows(dir.path()).await,
1,
"roll-forward must record exactly one audit row"
);
assert_eq!(
count_recovery_actor_commits(dir.path()).await,
1,
"roll-forward must record exactly one commit-graph row tagged with omnigraph:recovery"
);
let audit = read_latest_recovery_audit(dir.path()).await;
assert_eq!(
audit,
Some((
"RolledForward".to_string(),
Some("act-alice".to_string()),
"01H00000000000000000000RF".to_string(),
"Mutation".to_string(),
)),
"audit row content mismatch"
);
// Idempotency: reopen is a no-op.
let _db2 = Omnigraph::open(uri).await.unwrap();
assert!(
list_recovery_dir(dir.path()).is_empty(),
"second open must be a clean no-op"
);
assert_eq!(
count_recovery_audit_rows(dir.path()).await,
1,
"second open must NOT record a new audit row"
);
}
#[tokio::test]
async fn recovery_rolls_back_records_audit_row_with_recovery_actor() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
drop(db);
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let head_before = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let head_after = ds.version().version;
let _ = head_after;
// Sidecar with MISMATCHED post_commit_pin → classifier returns
// UnexpectedAtP1 → decision is RollBack.
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H00000000000000000000AB",
"started_at": "0",
"branch": null,
"actor_id": "act-bob",
"writer_kind": "Load",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{}",
"expected_version": {},
"post_commit_pin": {}
}}
]
}}"#,
person_uri, head_before, head_before
);
write_sidecar_file(dir.path(), "01H00000000000000000000AB", &sidecar_json);
let _db = Omnigraph::open(uri).await.unwrap();
// Audit row recorded for RolledBack.
assert_eq!(count_recovery_audit_rows(dir.path()).await, 1);
assert_eq!(count_recovery_actor_commits(dir.path()).await, 1);
let audit = read_latest_recovery_audit(dir.path()).await;
assert_eq!(
audit,
Some((
"RolledBack".to_string(),
Some("act-bob".to_string()),
"01H00000000000000000000AB".to_string(),
"Load".to_string(),
)),
);
}
#[tokio::test]
async fn recovery_rolls_forward_with_null_actor() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
"#;
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
drop(db);
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = Dataset::open(&person_uri).await.unwrap();
let head_before = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let head_after = ds.version().version;
// Sidecar with no actor_id (CLI-driven mutation; common case).
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H00000000000000000000NA",
"started_at": "0",
"branch": null,
"actor_id": null,
"writer_kind": "EnsureIndices",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{}",
"expected_version": {},
"post_commit_pin": {}
}}
]
}}"#,
person_uri, head_before, head_after
);
write_sidecar_file(dir.path(), "01H00000000000000000000NA", &sidecar_json);
let _db = Omnigraph::open(uri).await.unwrap();
let audit = read_latest_recovery_audit(dir.path()).await;
assert_eq!(
audit,
Some((
"RolledForward".to_string(),
None, // recovery_for_actor is None when sidecar.actor_id is None
"01H00000000000000000000NA".to_string(),
"EnsureIndices".to_string(),
)),
);
}