diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index bcbe769..7d00219 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -15,7 +15,7 @@ //! `OpenMode::ReadWrite`) classifies each table in each sidecar and //! either rolls forward all tables (if every table is at //! `post_commit_pin` AND matches the sidecar) or rolls back all -//! `RolledPastExpected` tables to `expected_version`. +//! drifted tables to the manifest-pinned version. //! //! ## Verified Lance behavior the rollback path depends on //! @@ -42,8 +42,9 @@ use tracing::warn; use crate::db::commit_graph::CommitGraph; use crate::db::graph_coordinator::GraphCoordinator; use crate::db::recovery_audit::{ - now_micros, RecoveryAudit, RecoveryAuditRecord, RecoveryKind, TableOutcome, + RecoveryAudit, RecoveryAuditRecord, RecoveryKind, TableOutcome, now_micros, }; +use crate::db::schema_state::SchemaStateRecovery; use crate::error::{OmniError, Result}; use crate::storage::StorageAdapter; @@ -209,12 +210,11 @@ pub(crate) enum TableClassification { RolledPastExpected, /// `lance_head == manifest_pinned + 1` but the sidecar's /// `expected_version`/`post_commit_pin` don't match. Some other writer - /// or recovery action moved this table. Roll back to - /// `sidecar.expected_version`. + /// or recovery action moved this table. Roll back to the manifest pin. UnexpectedAtP1, /// `lance_head > manifest_pinned + 1`. Multi-step orphan from a /// previous restore attempt or an external mutation. Roll back to - /// `sidecar.expected_version`. + /// the manifest pin. UnexpectedMultistep, /// `lance_head < manifest_pinned`. Should be impossible: the manifest /// pin can only advance after a successful Lance commit. Surface @@ -231,7 +231,7 @@ pub(crate) enum TableClassification { /// /// - Any `InvariantViolation` → `Abort` (operator action required). /// - Any `UnexpectedAtP1` / `UnexpectedMultistep` / `NoMovement` → -/// `RollBack` all `RolledPastExpected` tables to `expected_version`. +/// `RollBack` all drifted tables to the manifest pin. /// - All `RolledPastExpected` → `RollForward` every table in one /// manifest publish. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -419,7 +419,10 @@ pub(crate) fn classify_table( pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision { use SidecarDecision::*; use TableClassification::*; - if classifications.iter().any(|c| matches!(c, InvariantViolation { .. })) { + if classifications + .iter() + .any(|c| matches!(c, InvariantViolation { .. })) + { return Abort; } if classifications @@ -432,8 +435,8 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision RollForward } -/// Restore a single table's Lance HEAD to `expected_version`, producing a -/// new commit at HEAD+1 with content == content-at-`expected_version`. +/// Restore a single table's Lance HEAD to `target_version`, producing a +/// new commit at HEAD+1 with content == content-at-`target_version`. /// /// Always runs the actual `Dataset::restore` — there is NO fragment-set /// short-circuit because equal fragment IDs do NOT imply equal content: @@ -448,7 +451,7 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision pub(crate) async fn restore_table_to_version( table_path: &str, branch: Option<&str>, - expected_version: u64, + target_version: u64, ) -> Result<()> { let head = Dataset::open(table_path) .await @@ -461,7 +464,7 @@ pub(crate) async fn restore_table_to_version( _ => head, }; let mut to_restore = head - .checkout_version(expected_version) + .checkout_version(target_version) .await .map_err(|e| OmniError::Lance(e.to_string()))?; to_restore @@ -494,6 +497,7 @@ pub(crate) async fn recover_manifest_drift( storage: std::sync::Arc, coordinator: &mut GraphCoordinator, mode: RecoveryMode, + schema_state_recovery: SchemaStateRecovery, ) -> Result<()> { let sidecars = list_sidecars(root_uri, storage.as_ref()).await?; if sidecars.is_empty() { @@ -514,12 +518,9 @@ pub(crate) async fn recover_manifest_drift( for sidecar in sidecars { let branch_snapshot = match sidecar.branch.as_deref() { Some(b) => { - let mut branch_coord = GraphCoordinator::open_branch( - root_uri, - b, - std::sync::Arc::clone(&storage), - ) - .await?; + let mut branch_coord = + GraphCoordinator::open_branch(root_uri, b, std::sync::Arc::clone(&storage)) + .await?; branch_coord.refresh().await?; branch_coord.snapshot() } @@ -528,8 +529,15 @@ pub(crate) async fn recover_manifest_drift( coordinator.snapshot() } }; - process_sidecar(root_uri, storage.as_ref(), &branch_snapshot, &sidecar, mode) - .await?; + process_sidecar( + root_uri, + storage.as_ref(), + &branch_snapshot, + &sidecar, + mode, + schema_state_recovery, + ) + .await?; } // Final refresh so the caller sees the post-sweep state. coordinator.refresh().await?; @@ -542,22 +550,24 @@ async fn process_sidecar( snapshot: &Snapshot, sidecar: &RecoverySidecar, mode: RecoveryMode, + schema_state_recovery: SchemaStateRecovery, ) -> Result<()> { - let mut classifications = Vec::with_capacity(sidecar.tables.len()); + let mut states = Vec::with_capacity(sidecar.tables.len()); for pin in &sidecar.tables { - let lance_head = - open_lance_head(&pin.table_path, pin.table_branch.as_deref()).await?; + let lance_head = open_lance_head(&pin.table_path, pin.table_branch.as_deref()).await?; let manifest_pinned = snapshot .entry(&pin.table_key) .map(|e| e.table_version) .unwrap_or(0); - classifications.push(classify_table( - pin, - lance_head, + states.push(ClassifiedTable { + classification: classify_table(pin, lance_head, manifest_pinned, sidecar.writer_kind), manifest_pinned, - sidecar.writer_kind, - )); + }); } + let classifications = states + .iter() + .map(|state| state.classification) + .collect::>(); match decide(&classifications) { SidecarDecision::Abort => match mode { @@ -605,51 +615,31 @@ async fn process_sidecar( writer_kind = ?sidecar.writer_kind, "recovery: rolling back sidecar (mixed or unexpected state)" ); - // Restore every table whose Lance HEAD has drifted from the - // manifest pin (RolledPastExpected, UnexpectedAtP1, - // UnexpectedMultistep). NoMovement tables are already at - // expected_version — no action. Restore is unconditional; - // repeated mid-rollback crashes accumulate a few extra - // Lance commits that `omnigraph cleanup` reclaims. - let mut outcomes = Vec::with_capacity(sidecar.tables.len()); - for (pin, cls) in sidecar.tables.iter().zip(classifications.iter()) { - if matches!( - cls, - TableClassification::RolledPastExpected - | TableClassification::UnexpectedAtP1 - | TableClassification::UnexpectedMultistep - ) { - restore_table_to_version( - &pin.table_path, - pin.table_branch.as_deref(), - pin.expected_version, - ) - .await?; - outcomes.push(TableOutcome { - table_key: pin.table_key.clone(), - from_version: snapshot - .entry(&pin.table_key) - .map(|e| e.table_version) - .unwrap_or(0), - to_version: pin.expected_version, - }); - } - } - // Manifest pin doesn't move on rollback; record an audit-only - // commit at the existing version so operators can correlate via - // `omnigraph commit list --filter actor=omnigraph:recovery`. - record_audit( - root_uri, - sidecar, - snapshot.version(), - RecoveryKind::RolledBack, - outcomes, - ) - .await?; - delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?; - Ok(()) + roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states).await } SidecarDecision::RollForward => { + if matches!(sidecar.writer_kind, SidecarKind::SchemaApply) + && !schema_state_recovery.completed_schema_apply_sidecar_rename() + { + return match mode { + RecoveryMode::Full => { + warn!( + operation_id = sidecar.operation_id.as_str(), + "recovery: rolling back SchemaApply sidecar because schema staging \ + files were not promoted in this recovery pass" + ); + roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states).await + } + RecoveryMode::RollForwardOnly => { + warn!( + operation_id = sidecar.operation_id.as_str(), + "recovery: deferring SchemaApply sidecar because schema staging files \ + were not promoted in this recovery pass" + ); + Ok(()) + } + }; + } warn!( operation_id = sidecar.operation_id.as_str(), writer_kind = ?sidecar.writer_kind, @@ -688,6 +678,64 @@ async fn process_sidecar( } } +#[derive(Debug, Clone, Copy)] +struct ClassifiedTable { + classification: TableClassification, + manifest_pinned: u64, +} + +async fn roll_back_sidecar( + root_uri: &str, + storage: &dyn StorageAdapter, + snapshot: &Snapshot, + sidecar: &RecoverySidecar, + states: &[ClassifiedTable], +) -> Result<()> { + // Restore every table whose Lance HEAD has drifted from the + // manifest pin (RolledPastExpected, UnexpectedAtP1, + // UnexpectedMultistep). NoMovement tables are already at the + // manifest pin — no action. Restore is unconditional; repeated + // mid-rollback crashes accumulate a few extra Lance commits that + // `omnigraph cleanup` reclaims. + let mut outcomes = Vec::with_capacity(sidecar.tables.len()); + for (pin, state) in sidecar.tables.iter().zip(states.iter()) { + if matches!( + state.classification, + TableClassification::RolledPastExpected + | TableClassification::UnexpectedAtP1 + | TableClassification::UnexpectedMultistep + ) { + restore_table_to_version( + &pin.table_path, + pin.table_branch.as_deref(), + state.manifest_pinned, + ) + .await?; + outcomes.push(TableOutcome { + table_key: pin.table_key.clone(), + from_version: snapshot + .entry(&pin.table_key) + .map(|e| e.table_version) + .unwrap_or(0), + to_version: state.manifest_pinned, + }); + } + } + // Manifest pin doesn't move on rollback; record an audit-only + // commit at the existing version so operators can correlate via + // `omnigraph commit list --filter actor=omnigraph:recovery`. + record_audit( + root_uri, + sidecar, + snapshot.version(), + RecoveryKind::RolledBack, + outcomes, + ) + .await?; + delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?; + Ok(()) +} + /// Atomically extend every table's manifest pin from `expected_version` to /// `post_commit_pin` via a single `ManifestBatchPublisher::publish` call. /// Returns the new manifest version produced by the publish. @@ -710,8 +758,7 @@ async fn roll_forward_all( ) -> Result<(u64, HashMap)> { let mut updates: Vec = Vec::with_capacity(sidecar.tables.len()); let mut expected: HashMap = HashMap::with_capacity(sidecar.tables.len()); - let mut published_versions: HashMap = - HashMap::with_capacity(sidecar.tables.len()); + let mut published_versions: HashMap = HashMap::with_capacity(sidecar.tables.len()); for pin in &sidecar.tables { // Open the dataset at its CURRENT Lance HEAD on the pin's branch @@ -738,12 +785,11 @@ async fn roll_forward_all( .map_err(|e| OmniError::Lance(e.to_string()))? as u64; let table_relative_path = super::table_path_for_table_key(&pin.table_key)?; - let version_metadata = - super::metadata::TableVersionMetadata::from_dataset( - root_uri, - &table_relative_path, - &head_ds, - )?; + let version_metadata = super::metadata::TableVersionMetadata::from_dataset( + root_uri, + &table_relative_path, + &head_ds, + )?; updates.push(ManifestChange::Update(SubTableUpdate { table_key: pin.table_key.clone(), @@ -779,33 +825,26 @@ async fn record_audit( kind: RecoveryKind, outcomes: Vec, ) -> Result<()> { - // BranchMerge sidecars carry the source branch's HEAD commit id so - // recovery can record this as a MERGE commit (with parent linkage) - // instead of a plain commit. Without the merge parent, future - // `branch_merge feature → main` between the same pair would not - // recognize "already up-to-date" and merge-base computations break. - // - // For BranchMerge on a non-main target, the parent commit id is the - // TARGET BRANCH's tip — `CommitGraph::open()` returns the global - // commit graph whose `head_commit_id()` is the global head and would - // record the wrong parent. Open the per-branch instance instead. + // Non-main recovery commits must be appended on the sidecar branch's + // commit graph, otherwise parent_commit_id comes from the global + // main head. BranchMerge additionally records the source branch's + // HEAD as merged_parent_commit_id so future merges between the same + // pair recognize "already up-to-date". + let target_branch = sidecar.branch.as_deref(); + let mut graph = match target_branch { + Some(branch) => CommitGraph::open_at_branch(root_uri, branch).await?, + None => CommitGraph::open(root_uri).await?, + }; let graph_commit_id = match ( sidecar.writer_kind, sidecar.merge_source_commit_id.as_deref(), kind, ) { (SidecarKind::BranchMerge, Some(source_id), RecoveryKind::RolledForward) => { - let mut branch_graph = match sidecar.branch.as_deref() { - Some(target_branch) => { - CommitGraph::open_at_branch(root_uri, target_branch).await? - } - None => CommitGraph::open(root_uri).await?, - }; - let parent_commit_id = - branch_graph.head_commit_id().await?.unwrap_or_default(); - branch_graph + let parent_commit_id = graph.head_commit_id().await?.unwrap_or_default(); + graph .append_merge_commit( - sidecar.branch.as_deref(), + target_branch, manifest_version, &parent_commit_id, source_id, @@ -814,13 +853,8 @@ async fn record_audit( .await? } _ => { - let mut graph = CommitGraph::open(root_uri).await?; graph - .append_commit( - sidecar.branch.as_deref(), - manifest_version, - Some(RECOVERY_ACTOR), - ) + .append_commit(target_branch, manifest_version, Some(RECOVERY_ACTOR)) .await? } }; @@ -915,11 +949,11 @@ pub(crate) fn new_sidecar( #[cfg(test)] mod tests { use super::*; - use std::sync::Arc; - use arrow_array::{Int32Array, RecordBatch, StringArray}; - use arrow_schema::{DataType, Field, Schema}; use crate::storage::LocalStorageAdapter; use crate::table_store::TableStore; + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use std::sync::Arc; fn person_schema() -> Arc { Arc::new(Schema::new(vec![ @@ -1183,11 +1217,10 @@ mod tests { assert_eq!(post.version().version, head_before + 1); // Content matches v1 (just alice). let scanner = post.scan(); - let batches: Vec = futures::TryStreamExt::try_collect( - scanner.try_into_stream().await.unwrap(), - ) - .await - .unwrap(); + let batches: Vec = + futures::TryStreamExt::try_collect(scanner.try_into_stream().await.unwrap()) + .await + .unwrap(); let total: usize = batches.iter().map(|b| b.num_rows()).sum(); assert_eq!(total, 1); } @@ -1295,7 +1328,11 @@ mod tests { // Write sidecars in REVERSE chronological order (newest first). // The classifier shouldn't care, but the sweep needs deterministic // processing for reproducibility. - let ids = ["01H000000000000000000000ZZ", "01H000000000000000000000MM", "01H000000000000000000000AA"]; + let ids = [ + "01H000000000000000000000ZZ", + "01H000000000000000000000MM", + "01H000000000000000000000AA", + ]; for id in &ids { let sc = new_sidecar( SidecarKind::Mutation, diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index c33864a..e54b6eb 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -179,8 +179,9 @@ impl Omnigraph { // the manifest pin is the consistent snapshot regardless of // drift on the per-table side or leftover schema-staging files. if matches!(mode, OpenMode::ReadWrite) { - recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()) - .await?; + let schema_state_recovery = + recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()) + .await?; // Recovery sweep: close the Phase B → Phase C residual on // any sidecar left over from a crashed writer. Continuous // in-process recovery for long-running servers (no restart @@ -191,6 +192,7 @@ impl Omnigraph { Arc::clone(&storage), &mut coordinator, crate::db::manifest::RecoveryMode::Full, + schema_state_recovery, ) .await?; } @@ -409,7 +411,7 @@ impl Omnigraph { /// avoid the recovery sweep racing their own sidecar. pub async fn refresh(&mut self) -> Result<()> { self.coordinator.refresh().await?; - recover_schema_state_files( + let schema_state_recovery = recover_schema_state_files( &self.root_uri, Arc::clone(&self.storage), &self.coordinator.snapshot(), @@ -420,18 +422,20 @@ impl Omnigraph { Arc::clone(&self.storage), &mut self.coordinator, crate::db::manifest::RecoveryMode::RollForwardOnly, + schema_state_recovery, ) .await?; - // Re-read the schema source / catalog from disk: schema-state - // recovery above may have renamed staging files into place - // (completing an in-flight schema_apply), so the on-disk - // `_schema.pg` and IR contract may now reflect a NEWER schema - // than the in-memory `self.catalog` / `self.schema_source`. - // Without this reload subsequent ops on the handle would use - // stale catalog metadata against post-migration data on disk. - // Mirrors `open_with_storage_and_mode`'s schema-load sequence. + self.reload_schema_if_source_changed().await?; + self.runtime_cache.invalidate_all().await; + Ok(()) + } + + async fn reload_schema_if_source_changed(&mut self) -> Result<()> { let schema_path = schema_source_uri(&self.root_uri); let schema_source = self.storage.read_text(&schema_path).await?; + if schema_source == self.schema_source { + return Ok(()); + } let current_source_ir = read_schema_ir_from_source(&schema_source)?; let branches = self.coordinator.branch_list().await?; let (accepted_ir, _) = load_or_bootstrap_schema_contract( @@ -445,7 +449,6 @@ impl Omnigraph { fixup_blob_schemas(&mut catalog); self.schema_source = schema_source; self.catalog = catalog; - self.runtime_cache.invalidate_all().await; Ok(()) } @@ -611,6 +614,23 @@ impl Omnigraph { table_ops::ensure_indices_on(self, branch).await } + #[cfg(feature = "failpoints")] + #[doc(hidden)] + pub async fn failpoint_publish_table_head_without_index_rebuild_for_test( + &mut self, + branch: &str, + table_key: &str, + table_branch: Option<&str>, + ) -> Result { + table_ops::failpoint_publish_table_head_without_index_rebuild_for_test( + self, + branch, + table_key, + table_branch, + ) + .await + } + /// Compact small Lance fragments into fewer larger ones across every /// node + edge table on `main`. See [`optimize`] for details. pub async fn optimize(&mut self) -> Result> { @@ -989,7 +1009,6 @@ impl Omnigraph { pub(crate) async fn invalidate_graph_index(&self) { table_ops::invalidate_graph_index(self).await } - } pub(crate) fn normalize_branch_name(branch: &str) -> Result> { diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 1f964e5..18b9219 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -302,11 +302,7 @@ pub(super) async fn apply_schema_with_lock( // open the wrong HEAD here. let existing = db .table_store - .open_dataset_head_for_write( - table_key, - &dataset_uri, - entry.table_branch.as_deref(), - ) + .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref()) .await?; let staged = db.table_store.stage_overwrite(&existing, batch).await?; db.table_store @@ -398,6 +394,8 @@ pub(super) async fn apply_schema_with_lock( // `recover_schema_state_files`: // - crash before commit → manifest unchanged; staging deleted on open // - crash after commit → manifest advanced; staging renamed on open + crate::failpoints::maybe_fail("schema_apply.before_staging_write")?; + let staging_pg_uri = schema_source_staging_uri(&db.root_uri); db.storage .write_text(&staging_pg_uri, desired_schema_source) @@ -449,9 +447,7 @@ pub(super) async fn apply_schema_with_lock( // Failing the schema_apply call would report failure for a migration // that already succeeded. if let Some(handle) = recovery_handle { - if let Err(err) = - crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await - { + if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await { tracing::warn!( error = %err, operation_id = handle.operation_id.as_str(), diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 97ff84d..ab3757c 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -31,6 +31,37 @@ pub(super) async fn ensure_indices_on(db: &mut Omnigraph, branch: &str) -> Resul ensure_indices_for_branch(db, branch.as_deref()).await } +#[cfg(feature = "failpoints")] +pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test( + db: &mut Omnigraph, + branch: &str, + table_key: &str, + table_branch: Option<&str>, +) -> Result { + let branch = normalize_branch_name(branch)?; + let snapshot = db.snapshot_for_branch(branch.as_deref()).await?; + let entry = snapshot + .entry(table_key) + .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + let ds = db + .table_store + .open_dataset_head_for_write(table_key, &full_path, table_branch) + .await?; + let state = db.table_store.table_state(&full_path, &ds).await?; + let update = crate::db::SubTableUpdate { + table_key: table_key.to_string(), + table_version: state.version, + table_branch: table_branch.map(str::to_string), + row_count: state.row_count, + version_metadata: state.version_metadata, + }; + let mut expected = std::collections::HashMap::new(); + expected.insert(table_key.to_string(), entry.table_version); + commit_prepared_updates_on_branch_with_expected(db, branch.as_deref(), &[update], &expected) + .await +} + pub(super) async fn ensure_indices_for_branch( db: &mut Omnigraph, branch: Option<&str>, @@ -100,9 +131,7 @@ pub(super) async fn ensure_indices_for_branch( continue; } let full_path = format!("{}/{}", db.root_uri, entry.table_path); - if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref()) - .await? - { + if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref()).await? { recovery_pins.push(crate::db::manifest::SidecarTablePin { table_key, table_path: full_path, @@ -243,9 +272,7 @@ pub(super) async fn ensure_indices_for_branch( // per-table commit window regardless). Best-effort cleanup; failing // the user here would error a call that already succeeded. if let Some(handle) = recovery_handle { - if let Err(err) = - crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await - { + if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await { tracing::warn!( error = %err, operation_id = handle.operation_id.as_str(), diff --git a/crates/omnigraph/src/db/schema_state.rs b/crates/omnigraph/src/db/schema_state.rs index d307159..13dfccc 100644 --- a/crates/omnigraph/src/db/schema_state.rs +++ b/crates/omnigraph/src/db/schema_state.rs @@ -285,6 +285,24 @@ fn schema_lock_conflict(detail: impl Into) -> OmniError { )) } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum SchemaStateRecovery { + Noop, + CleanedStaging, + CompletedStagingRename { schema_apply_sidecar: bool }, +} + +impl SchemaStateRecovery { + pub(crate) fn completed_schema_apply_sidecar_rename(self) -> bool { + matches!( + self, + Self::CompletedStagingRename { + schema_apply_sidecar: true, + } + ) + } +} + /// Reconcile leftover schema staging files (`_schema.pg.staging`, /// `_schema.ir.json.staging`, `__schema_state.json.staging`) against the /// manifest snapshot. @@ -306,7 +324,7 @@ pub(crate) async fn recover_schema_state_files( root_uri: &str, storage: Arc, snapshot: &Snapshot, -) -> Result<()> { +) -> Result { let pg_staging = schema_source_staging_uri(root_uri); let ir_staging = schema_ir_staging_uri(root_uri); let state_staging = schema_state_staging_uri(root_uri); @@ -316,7 +334,7 @@ pub(crate) async fn recover_schema_state_files( let state_exists = storage.exists(&state_staging).await?; if !pg_exists && !ir_exists && !state_exists { - return Ok(()); + return Ok(SchemaStateRecovery::Noop); } // Schema-apply atomicity: when a SchemaApply sidecar is present, @@ -335,7 +353,9 @@ pub(crate) async fn recover_schema_state_files( snapshot.version() ); complete_staging_rename(root_uri, storage.as_ref()).await?; - return Ok(()); + return Ok(SchemaStateRecovery::CompletedStagingRename { + schema_apply_sidecar: true, + }); } if !pg_exists { @@ -365,7 +385,9 @@ pub(crate) async fn recover_schema_state_files( snapshot.version() ); complete_staging_rename(root_uri, storage.as_ref()).await?; - return Ok(()); + return Ok(SchemaStateRecovery::CompletedStagingRename { + schema_apply_sidecar: false, + }); } let staging_source = storage.read_text(&pg_staging).await?; @@ -384,7 +406,7 @@ pub(crate) async fn recover_schema_state_files( "removing leftover schema staging files matching the live schema (no-op apply that crashed)" ); cleanup_staging_files(root_uri, storage.as_ref()).await?; - return Ok(()); + return Ok(SchemaStateRecovery::CleanedStaging); } let live_keys = expected_table_keys(&live_ir); @@ -407,14 +429,16 @@ pub(crate) async fn recover_schema_state_files( snapshot.version() ); cleanup_staging_files(root_uri, storage.as_ref()).await?; - Ok(()) + Ok(SchemaStateRecovery::CleanedStaging) } else if actual_keys == staging_keys { warn!( "schema apply crashed after manifest commit; completing schema-file rename (manifest v{})", snapshot.version() ); complete_staging_rename(root_uri, storage.as_ref()).await?; - Ok(()) + Ok(SchemaStateRecovery::CompletedStagingRename { + schema_apply_sidecar: false, + }) } else { Err(schema_lock_conflict(format!( "found schema staging files but the manifest's table set ({:?}) matches neither the live schema ({:?}) nor the staging schema ({:?}); manual operator action required", @@ -426,9 +450,7 @@ pub(crate) async fn recover_schema_state_files( async fn cleanup_staging_files(root_uri: &str, storage: &dyn StorageAdapter) -> Result<()> { storage.delete(&schema_source_staging_uri(root_uri)).await?; storage.delete(&schema_ir_staging_uri(root_uri)).await?; - storage - .delete(&schema_state_staging_uri(root_uri)) - .await?; + storage.delete(&schema_state_staging_uri(root_uri)).await?; Ok(()) } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index cbb3051..f4cf7d1 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -540,6 +540,11 @@ async fn load_jsonl_reader( let (updates, expected_versions, sidecar_handle) = staging .finalize(db, branch, crate::db::manifest::SidecarKind::Load) .await?; + // Same finalize → publisher residual as mutations: per-table + // staged commits have advanced Lance HEAD, but the manifest + // publish has not run yet. Reuse the mutation failpoint name so + // one failpoint pins the shared `MutationStaging` boundary. + crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?; db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions) .await?; // The recovery sidecar protects the per-table commit_staged → diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index df224cf..c15bf80 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -6,12 +6,50 @@ use fail::FailScenario; use omnigraph::db::Omnigraph; use omnigraph::failpoints::ScopedFailPoint; +use helpers::recovery::{ + FollowUpMutation, RecoveryExpectation, TableExpectation, assert_post_recovery_invariants, + branch_head_commit_id, single_sidecar_operation_id, +}; use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main}; const SCHEMA_V1: &str = "node Person { name: String @key }\n"; const SCHEMA_V2_ADDED_TYPE: &str = "node Person { name: String @key }\nnode Company { name: String @key }\n"; +fn node_table_uri(root: &str, type_name: &str) -> String { + let mut hash: u64 = 0xcbf2_9ce4_8422_2325; + for &b in type_name.as_bytes() { + hash ^= b as u64; + hash = hash.wrapping_mul(0x100_0000_01b3); + } + format!("{}/nodes/{hash:016x}", root.trim_end_matches('/')) +} + +fn person_batch(rows: &[(&str, &str, Option)]) -> arrow_array::RecordBatch { + use std::sync::Arc; + + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("age", DataType::Int32, true), + Field::new("name", DataType::Utf8, false), + ])); + let ids: Vec<&str> = rows.iter().map(|(id, _, _)| *id).collect(); + let names: Vec<&str> = rows.iter().map(|(_, name, _)| *name).collect(); + let ages: Vec> = rows.iter().map(|(_, _, age)| *age).collect(); + arrow_array::RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(Int32Array::from(ages)), + Arc::new(StringArray::from(names)), + ], + ) + .unwrap() +} + #[tokio::test] async fn branch_create_failpoint_triggers() { let _scenario = FailScenario::setup(); @@ -174,12 +212,12 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); + let operation_id; // Phase A: trigger the residual. { let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); - let _failpoint = - ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); // The mutation's finalize completes (commit_staged advances Lance // HEAD on node:Person AND writes a `__recovery/{ulid}.json` @@ -195,9 +233,8 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() { .await .unwrap_err(); assert!( - err.to_string().contains( - "injected failpoint triggered: mutation.post_finalize_pre_publisher" - ), + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), "unexpected error: {err}" ); @@ -212,6 +249,7 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() { 1, "exactly one sidecar should persist after the finalize failure" ); + operation_id = single_sidecar_operation_id(dir.path()); // Drop the failpoint scope and the engine handle. } @@ -220,21 +258,7 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() { // sidecar, classifies node:Person as RolledPastExpected, decides // RollForward, publishes the manifest update, records the audit // row, deletes the sidecar. - let mut db = Omnigraph::open(&uri).await.unwrap(); - - // Sidecar gone — sweep completed end to end. - let recovery_dir = dir.path().join("__recovery"); - if recovery_dir.exists() { - let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) - .unwrap() - .filter_map(|e| e.ok()) - .collect(); - assert!( - remaining.is_empty(), - "sidecar must be deleted after successful roll-forward; remaining: {:?}", - remaining, - ); - } + let db = Omnigraph::open(&uri).await.unwrap(); // The originally-attempted "Eve" insert is now visible — the recovery // sweep extended the manifest pin to include the staged commit. @@ -243,27 +267,258 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() { person_count, 1, "exactly one person (Eve) must be visible after roll-forward" ); + drop(db); - // The next mutation on the same table succeeds — no ExpectedVersionMismatch. - mutate_main( - &mut db, - MUTATION_QUERIES, - "insert_person", - &mixed_params(&[("$name", "Frank")], &[("$age", 33)]), + assert_post_recovery_invariants( + dir.path(), + &operation_id, + RecoveryExpectation::RolledForward { + tables: vec![TableExpectation::main("node:Person").follow_up_mutation( + FollowUpMutation::new( + "main", + MUTATION_QUERIES, + "insert_person", + mixed_params(&[("$name", "Frank")], &[("$age", 33)]), + ), + )], + }, ) .await - .expect("next mutation must succeed after recovery rolled forward"); + .unwrap(); + + let db = Omnigraph::open(&uri).await.unwrap(); let person_count = helpers::count_rows(&db, "node:Person").await; assert_eq!( person_count, 2, "Frank's insert must land normally after recovery" ); +} - // Audit row recorded. - let audit_dir = dir.path().join("_graph_commit_recoveries.lance"); - assert!( - audit_dir.exists(), - "_graph_commit_recoveries.lance must exist after a successful recovery" +#[tokio::test] +async fn recovery_rolls_forward_load_on_feature_branch() { + use omnigraph::loader::LoadMode; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let operation_id; + let main_person_pin; + let feature_parent_commit_id; + + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "BeforeLoad")], &[("$age", 40)]), + ) + .await + .unwrap(); + main_person_pin = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap() + .entry("node:Person") + .expect("main must have Person") + .table_version; + feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap(); + + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let err = db + .load( + "feature", + r#"{"type":"Person","data":{"name":"FeatureLoad","age":41}} +"#, + LoadMode::Append, + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), + "unexpected error: {err}" + ); + operation_id = single_sidecar_operation_id(dir.path()); + } + + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + helpers::count_rows_branch(&db, "feature", "node:Person").await, + 2, + "feature branch load row must be visible after recovery" + ); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 0, + "feature branch load recovery must not publish the row to main" + ); + drop(db); + + assert_post_recovery_invariants( + dir.path(), + &operation_id, + RecoveryExpectation::RolledForward { + tables: vec![ + TableExpectation::branch("node:Person", "feature") + .expected_main_manifest_pin(main_person_pin) + .expected_recovery_parent_commit_id(feature_parent_commit_id) + .follow_up_mutation(FollowUpMutation::new( + "feature", + MUTATION_QUERIES, + "insert_person", + mixed_params(&[("$name", "AfterLoad")], &[("$age", 42)]), + )), + ], + }, + ) + .await + .unwrap(); + + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + helpers::count_rows_branch(&db, "feature", "node:Person").await, + 3, + "follow-up feature mutation must succeed after load recovery" + ); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 0, + "follow-up feature mutation must not move main" + ); +} + +#[tokio::test] +async fn recovery_rolls_forward_ensure_indices_on_feature_branch() { + use lance_index::DatasetIndexExt; + use omnigraph::loader::{LoadMode, load_jsonl}; + use omnigraph::table_store::TableStore; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let operation_id; + let feature_parent_commit_id; + let main_person_pin; + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "BeforeEnsure")], &[("$age", 42)]), + ) + .await + .unwrap(); + + main_person_pin = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap() + .entry("node:Person") + .expect("main must have Person") + .table_version; + + // Make the feature branch's Person table genuinely need index work + // while keeping the manifest internally consistent. The test-only + // publisher deliberately skips the normal index-rebuild preparation; + // the failed writer below is still the real `ensure_indices_on`. + let person_uri = node_table_uri(&uri, "Person"); + let store = TableStore::new(&uri); + let mut ds = store + .open_dataset_head(&person_uri, Some("feature")) + .await + .unwrap(); + ds.drop_index("id_idx").await.unwrap(); + let dropped_index_head = ds.version().version; + db.failpoint_publish_table_head_without_index_rebuild_for_test( + "feature", + "node:Person", + Some("feature"), + ) + .await + .unwrap(); + let feature_snapshot = db + .snapshot_of(omnigraph::db::ReadTarget::branch("feature")) + .await + .unwrap(); + assert_eq!( + feature_snapshot + .entry("node:Person") + .expect("feature must have Person") + .table_version, + dropped_index_head, + "test setup must publish the dropped-index table head before ensure_indices runs", + ); + feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap(); + + { + let _failpoint = + ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return"); + let err = db.ensure_indices_on("feature").await.unwrap_err(); + assert!( + err.to_string().contains( + "injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit" + ), + "unexpected error: {err}" + ); + operation_id = single_sidecar_operation_id(dir.path()); + } + drop(db); + + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + helpers::count_rows_branch(&db, "feature", "node:Person").await, + 2, + "feature should see inherited alice plus recovered branch-local row" + ); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 1, + "ensure_indices branch recovery must not move main" + ); + drop(db); + + assert_post_recovery_invariants( + dir.path(), + &operation_id, + RecoveryExpectation::RolledForward { + tables: vec![ + TableExpectation::branch("node:Person", "feature") + .expected_main_manifest_pin(main_person_pin) + .expected_recovery_parent_commit_id(feature_parent_commit_id) + .follow_up_mutation(FollowUpMutation::new( + "feature", + MUTATION_QUERIES, + "insert_person", + mixed_params(&[("$name", "AfterEnsure")], &[("$age", 44)]), + )), + ], + }, + ) + .await + .unwrap(); + + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + helpers::count_rows_branch(&db, "feature", "node:Person").await, + 3, + "follow-up feature mutation must succeed after ensure_indices recovery" + ); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 1, + "follow-up feature mutation must not move main" ); } @@ -286,8 +541,7 @@ async fn refresh_runs_roll_forward_recovery_in_process() { // Phase A: trigger the residual (sidecar persists; manifest unchanged). { - let _failpoint = - ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); let err = mutate_main( &mut db, MUTATION_QUERIES, @@ -297,9 +551,8 @@ async fn refresh_runs_roll_forward_recovery_in_process() { .await .unwrap_err(); assert!( - err.to_string().contains( - "injected failpoint triggered: mutation.post_finalize_pre_publisher" - ), + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), "unexpected error: {err}" ); let recovery_dir = dir.path().join("__recovery"); @@ -447,7 +700,9 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() { // Trigger refresh-time recovery directly. Sidecar is rollback- // eligible (UnexpectedAtP1); RollForwardOnly mode defers it, // leaving the sidecar on disk and Lance HEAD unchanged on Person. - db.refresh().await.expect("refresh must succeed (deferring rollback)"); + db.refresh() + .await + .expect("refresh must succeed (deferring rollback)"); // Sidecar still on disk. assert_eq!( @@ -509,8 +764,7 @@ async fn finalize_publisher_residual_does_not_drift_untouched_tables() { .unwrap(); { - let _failpoint = - ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); let _ = mutate_main( &mut db, MUTATION_QUERIES, @@ -570,13 +824,14 @@ async fn ensure_indices_phase_a_btree_failure_leaves_existing_tables_writable() // node:Project table's btree-on-id build. (TEST_SCHEMA already // has Person + Company + Knows + WorksAt — pick a name that isn't // already declared.) - let extended_schema = format!("{}\nnode Project {{ name: String @key }}\n", helpers::TEST_SCHEMA); + let extended_schema = format!( + "{}\nnode Project {{ name: String @key }}\n", + helpers::TEST_SCHEMA + ); { - let _failpoint = ScopedFailPoint::new( - "ensure_indices.post_stage_pre_commit_btree", - "return", - ); + let _failpoint = + ScopedFailPoint::new("ensure_indices.post_stage_pre_commit_btree", "return"); let err = db.apply_schema(&extended_schema).await.unwrap_err(); assert!( err.to_string() @@ -629,6 +884,98 @@ fn assert_no_staging_files(repo: &std::path::Path) { // recorded, sidecar deleted) and a follow-up operation succeeds without // ExpectedVersionMismatch. +#[tokio::test] +async fn schema_apply_without_schema_staging_rolls_back_on_next_open() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let operation_id; + + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + } + + let pre_failure_version = { + let db = Omnigraph::open(&uri).await.unwrap(); + version_main(&db).await.unwrap() + }; + + { + let mut db = Omnigraph::open(&uri).await.unwrap(); + let _failpoint = ScopedFailPoint::new("schema_apply.before_staging_write", "return"); + let v2_schema = r#"node Person { + name: String @key + age: I32? + city: String? +} + +node Company { + name: String @key +} + +node Tag { + label: String @key +} + +edge Knows: Person -> Person { + since: Date? +} + +edge WorksAt: Person -> Company +"#; + let err = db.apply_schema(v2_schema).await.unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: schema_apply.before_staging_write"), + "unexpected error: {err}" + ); + operation_id = single_sidecar_operation_id(dir.path()); + } + + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + version_main(&db).await.unwrap(), + pre_failure_version, + "manifest must remain on the old schema when no schema staging files existed" + ); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 1, + "old-schema data must remain readable after rollback" + ); + drop(db); + + assert_post_recovery_invariants( + dir.path(), + &operation_id, + RecoveryExpectation::RolledBack { + tables: vec![TableExpectation::main("node:Person")], + }, + ) + .await + .unwrap(); + + let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap(); + assert!( + !live_schema.contains("city: String?"), + "_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}", + ); + assert!( + !live_schema.contains("node Tag"), + "_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}", + ); +} + #[tokio::test] async fn schema_apply_phase_b_failure_recovered_on_next_open() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -636,6 +983,7 @@ async fn schema_apply_phase_b_failure_recovered_on_next_open() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); + let operation_id; // Seed: a Person table with one row so the schema-apply rewritten_tables // loop has actual work to do. @@ -710,6 +1058,7 @@ edge WorksAt: Person -> Company 1, "exactly one sidecar must persist after schema_apply failure" ); + operation_id = single_sidecar_operation_id(dir.path()); } // Phase B: reopen runs the recovery sweep. Sidecar's writer_kind is @@ -718,25 +1067,6 @@ edge WorksAt: Person -> Company // current Lance HEAD. let db = Omnigraph::open(&uri).await.unwrap(); - // Sidecar gone, audit row recorded. - let recovery_dir = dir.path().join("__recovery"); - if recovery_dir.exists() { - let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) - .unwrap() - .filter_map(|e| e.ok()) - .collect(); - assert!( - remaining.is_empty(), - "sidecar must be deleted; remaining: {:?}", - remaining, - ); - } - let audit_dir = dir.path().join("_graph_commit_recoveries.lance"); - assert!( - audit_dir.exists(), - "_graph_commit_recoveries.lance must exist after schema_apply recovery" - ); - // Recovery sweep must have advanced the manifest pin on the rewritten // table: roll-forward published the post-failure Lance HEAD. let post_recovery_version = version_main(&db).await.unwrap(); @@ -745,6 +1075,17 @@ edge WorksAt: Person -> Company "manifest version must advance post-recovery; pre={pre_failure_version}, \ post={post_recovery_version}", ); + drop(db); + + assert_post_recovery_invariants( + dir.path(), + &operation_id, + RecoveryExpectation::RolledForward { + tables: vec![TableExpectation::main("node:Person")], + }, + ) + .await + .unwrap(); // Schema-apply atomicity: the live `_schema.pg` must reflect the // NEW schema (city column on Person, Tag node type) — not the old. @@ -761,7 +1102,6 @@ edge WorksAt: Person -> Company live_schema.contains("node Tag"), "_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}", ); - drop(db); } #[tokio::test] @@ -939,6 +1279,8 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); + let operation_id; + let target_parent_commit_id; // Setup: // main: alice @@ -975,41 +1317,18 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { .unwrap(); } - // Capture target_branch's commit-graph head BEFORE the failed merge. - // This is the commit the recovery's merge commit must claim as its - // `parent_commit_id` (D2 — without the per-branch CommitGraph fix, - // recovery would record the GLOBAL head as parent instead). - let target_branch_head_before_failure = { - let commits_dir = dir.path().join("_graph_commits.lance"); - let ds = lance::Dataset::open(commits_dir.to_str().unwrap()) + let main_person_pin = { + let db = Omnigraph::open(&uri).await.unwrap(); + db.snapshot_of(omnigraph::db::ReadTarget::branch("main")) .await .unwrap() - .checkout_branch("target_branch") - .await - .unwrap(); - use arrow_array::{Array, StringArray}; - use futures::TryStreamExt; - let batches: Vec = - ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap(); - // Grab the latest commit_id by created_at order (the per-branch - // checkout ensures we only see target_branch's commits). - let mut latest: Option<(i64, String)> = None; - for batch in batches { - let ids = batch - .column_by_name("graph_commit_id").unwrap() - .as_any().downcast_ref::().unwrap(); - let created = batch - .column_by_name("created_at").unwrap() - .as_any().downcast_ref::().unwrap(); - for i in 0..ids.len() { - let ts = created.value(i); - if latest.as_ref().is_none_or(|(t, _)| ts > *t) { - latest = Some((ts, ids.value(i).to_string())); - } - } - } - latest.expect("target_branch must have at least one commit (the insert-Bob mutate)").1 + .entry("node:Person") + .expect("main must have Person") + .table_version }; + target_parent_commit_id = branch_head_commit_id(dir.path(), "target_branch") + .await + .unwrap(); // Phase A: failpoint fires after the per-table publish loop completes // but before commit_manifest_updates. Sidecar persists with @@ -1031,105 +1350,31 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { let recovery_dir = dir.path().join("__recovery"); let sidecar_count = std::fs::read_dir(&recovery_dir).unwrap().count(); assert_eq!( - sidecar_count, - 1, + sidecar_count, 1, "exactly one sidecar must persist after non-main branch_merge failure" ); + operation_id = single_sidecar_operation_id(dir.path()); } // Phase B: reopen runs full sweep. The BranchMerge sidecar's branch // = Some("target_branch"); D2 fix opens a per-branch CommitGraph // for the audit append so the merge-parent linkage is correct. - let _db = Omnigraph::open(&uri).await.unwrap(); + let db = Omnigraph::open(&uri).await.unwrap(); + drop(db); - let recovery_dir = dir.path().join("__recovery"); - if recovery_dir.exists() { - let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) - .unwrap() - .filter_map(|e| e.ok()) - .collect(); - assert!( - remaining.is_empty(), - "sidecar must be deleted; remaining: {:?}", - remaining, - ); - } - - // Find the recovery commit on target_branch's commit graph and - // assert its `parent_commit_id` matches the head we captured BEFORE - // the failed merge. This is what catches D2: without the - // per-branch CommitGraph fix, recovery records the GLOBAL head as - // parent, which on this test setup is the source_branch's - // insert-Carol commit (a different ULID), and the assertion fails. - // - // `merged_parent_commit_id` alone is insufficient — it's - // independently populated from sidecar.merge_source_commit_id, so - // it would be set correctly even with D2's bug. - use arrow_array::{Array, StringArray}; - use futures::TryStreamExt; - let commits_dir = dir.path().join("_graph_commits.lance"); - let ds = lance::Dataset::open(commits_dir.to_str().unwrap()) - .await - .unwrap() - .checkout_branch("target_branch") - .await - .unwrap(); - let batches: Vec = ds - .scan() - .try_into_stream() - .await - .unwrap() - .try_collect() - .await - .unwrap(); - let mut recovery_merge_parent: Option = None; - let mut recovery_merge_merged_parent: Option = None; - for batch in batches { - let merged = batch - .column_by_name("merged_parent_commit_id") - .expect("merged_parent_commit_id column present") - .as_any() - .downcast_ref::() - .expect("merged_parent_commit_id is Utf8"); - let parents = batch - .column_by_name("parent_commit_id") - .expect("parent_commit_id column present") - .as_any() - .downcast_ref::() - .expect("parent_commit_id is Utf8"); - for i in 0..merged.len() { - if !merged.is_null(i) { - // First (and only — single recovery, single merge commit) - // commit with a merged parent IS the recovery commit. - recovery_merge_parent = if parents.is_null(i) { - None - } else { - Some(parents.value(i).to_string()) - }; - recovery_merge_merged_parent = Some(merged.value(i).to_string()); - break; - } - } - if recovery_merge_parent.is_some() { - break; - } - } - let recovery_parent = recovery_merge_parent - .expect("non-main branch_merge recovery must record a merge commit with parent_commit_id"); - assert_eq!( - recovery_parent, target_branch_head_before_failure, - "recovery merge commit's parent_commit_id must == target_branch's pre-failure head; \ - expected {}, got {} — this would regress to the GLOBAL head if D2's per-branch \ - CommitGraph::open_at_branch fix were removed", - target_branch_head_before_failure, recovery_parent, - ); - // Sanity: merged_parent is set from the source branch (independent - // of D2; would be correct even with the bug, but we still verify - // it's non-null so the row is a true merge commit). - assert!( - recovery_merge_merged_parent.is_some(), - "recovery merge commit must have non-null merged_parent_commit_id" - ); + assert_post_recovery_invariants( + dir.path(), + &operation_id, + RecoveryExpectation::RolledForward { + tables: vec![ + TableExpectation::branch("node:Person", "target_branch") + .expected_main_manifest_pin(main_person_pin) + .expected_recovery_parent_commit_id(target_parent_commit_id), + ], + }, + ) + .await + .unwrap(); } /// `ensure_indices` only writes a sidecar when at least one table @@ -1180,10 +1425,8 @@ async fn ensure_indices_phase_b_failure_does_not_leak_sidecar_when_no_work_neede // still fires, surfacing the Err. { let mut db = Omnigraph::open(&uri).await.unwrap(); - let _failpoint = ScopedFailPoint::new( - "ensure_indices.post_phase_b_pre_manifest_commit", - "return", - ); + let _failpoint = + ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return"); let err = db.ensure_indices().await.unwrap_err(); assert!( err.to_string().contains( diff --git a/crates/omnigraph/tests/helpers/mod.rs b/crates/omnigraph/tests/helpers/mod.rs index 66db1a7..e7e1efb 100644 --- a/crates/omnigraph/tests/helpers/mod.rs +++ b/crates/omnigraph/tests/helpers/mod.rs @@ -1,5 +1,7 @@ #![allow(dead_code)] +pub mod recovery; + use arrow_array::{Array, RecordBatch, StringArray}; use futures::TryStreamExt; diff --git a/crates/omnigraph/tests/helpers/recovery.rs b/crates/omnigraph/tests/helpers/recovery.rs new file mode 100644 index 0000000..7197c93 --- /dev/null +++ b/crates/omnigraph/tests/helpers/recovery.rs @@ -0,0 +1,559 @@ +use std::path::{Path, PathBuf}; + +use arrow_array::{Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use lance::Dataset; +use omnigraph::db::commit_graph::CommitGraph; +use omnigraph::db::{GraphCommit, Omnigraph, ReadTarget, SubTableEntry}; +use omnigraph::error::{OmniError, Result}; +use omnigraph_compiler::ir::ParamMap; +use serde::Deserialize; + +const RECOVERY_ACTOR: &str = "omnigraph:recovery"; + +#[derive(Debug)] +pub enum RecoveryExpectation { + RolledForward { tables: Vec }, + RolledBack { tables: Vec }, + Deferred, + NoOp, +} + +#[derive(Debug)] +pub struct TableExpectation { + pub table_key: String, + pub branch: Option, + pub expected_lance_head: Option, + pub expected_main_manifest_pin: Option, + pub expected_recovery_parent_commit_id: Option, + pub follow_up_mutation: Option, +} + +#[derive(Debug)] +pub struct FollowUpMutation { + pub branch: String, + pub query_source: String, + pub query_name: String, + pub params: ParamMap, +} + +#[derive(Debug, Clone)] +struct RecoveryAuditRow { + graph_commit_id: String, + recovery_kind: String, + operation_id: String, + sidecar_writer_kind: String, + per_table_outcomes: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct TableOutcome { + table_key: String, + to_version: u64, +} + +impl TableExpectation { + pub fn main(table_key: impl Into) -> Self { + Self::new(table_key, None::) + } + + pub fn branch(table_key: impl Into, branch: impl Into) -> Self { + Self::new(table_key, Some(branch)) + } + + pub fn new(table_key: impl Into, branch: Option>) -> Self { + Self { + table_key: table_key.into(), + branch: branch.map(Into::into), + expected_lance_head: None, + expected_main_manifest_pin: None, + expected_recovery_parent_commit_id: None, + follow_up_mutation: None, + } + } + + pub fn expected_lance_head(mut self, version: u64) -> Self { + self.expected_lance_head = Some(version); + self + } + + pub fn expected_main_manifest_pin(mut self, version: u64) -> Self { + self.expected_main_manifest_pin = Some(version); + self + } + + pub fn expected_recovery_parent_commit_id(mut self, commit_id: impl Into) -> Self { + self.expected_recovery_parent_commit_id = Some(commit_id.into()); + self + } + + pub fn follow_up_mutation(mut self, mutation: FollowUpMutation) -> Self { + self.follow_up_mutation = Some(mutation); + self + } +} + +impl FollowUpMutation { + pub fn new( + branch: impl Into, + query_source: impl Into, + query_name: impl Into, + params: ParamMap, + ) -> Self { + Self { + branch: branch.into(), + query_source: query_source.into(), + query_name: query_name.into(), + params, + } + } +} + +pub fn single_sidecar_operation_id(repo_root: &Path) -> String { + let ids = sidecar_operation_ids(repo_root); + assert_eq!( + ids.len(), + 1, + "expected exactly one recovery sidecar under __recovery/, got {:?}", + ids, + ); + ids.into_iter().next().unwrap() +} + +pub fn sidecar_operation_ids(repo_root: &Path) -> Vec { + let dir = repo_root.join("__recovery"); + if !dir.exists() { + return Vec::new(); + } + let mut ids = std::fs::read_dir(&dir) + .unwrap() + .filter_map(|entry| { + let entry = entry.ok()?; + let path = entry.path(); + if path.extension().and_then(|ext| ext.to_str()) != Some("json") { + return None; + } + path.file_stem() + .and_then(|stem| stem.to_str()) + .map(str::to_string) + }) + .collect::>(); + ids.sort(); + ids +} + +pub async fn branch_head_commit_id(repo_root: &Path, branch: &str) -> Result { + let graph = match branch { + "main" => CommitGraph::open(&repo_uri(repo_root)).await?, + branch => CommitGraph::open_at_branch(&repo_uri(repo_root), branch).await?, + }; + graph.head_commit_id().await?.ok_or_else(|| { + OmniError::manifest_internal(format!("commit graph for branch {branch} has no head")) + }) +} + +pub async fn assert_post_recovery_invariants( + repo_root: &Path, + operation_id: &str, + expectation: RecoveryExpectation, +) -> Result<()> { + match expectation { + RecoveryExpectation::RolledForward { tables } => { + assert_sidecar_absent(repo_root, operation_id); + let audit = read_audit_row(repo_root, operation_id).await?; + assert_eq!( + audit.recovery_kind, "RolledForward", + "audit row for {operation_id} recorded the wrong recovery_kind", + ); + assert_manifest_pins_match_lance_heads(repo_root, &tables).await?; + assert_audit_to_versions_match_lance_heads(repo_root, &audit, &tables).await?; + assert_recovery_commit_shape(repo_root, &audit, &tables).await?; + assert_non_main_did_not_move_main(repo_root, &tables).await?; + assert_idempotent_reopen(repo_root, operation_id).await?; + run_follow_up_mutations(repo_root, tables).await?; + } + RecoveryExpectation::RolledBack { tables } => { + assert_sidecar_absent(repo_root, operation_id); + let audit = read_audit_row(repo_root, operation_id).await?; + assert_eq!( + audit.recovery_kind, "RolledBack", + "audit row for {operation_id} recorded the wrong recovery_kind", + ); + assert_recovery_commit_shape(repo_root, &audit, &tables).await?; + assert_non_main_did_not_move_main(repo_root, &tables).await?; + assert_idempotent_reopen(repo_root, operation_id).await?; + run_follow_up_mutations(repo_root, tables).await?; + } + RecoveryExpectation::Deferred => { + assert!( + sidecar_path(repo_root, operation_id).exists(), + "deferred recovery must leave sidecar {operation_id} on disk", + ); + assert!( + read_audit_row(repo_root, operation_id).await.is_err(), + "deferred recovery must not record an audit row for {operation_id}", + ); + } + RecoveryExpectation::NoOp => { + assert_sidecar_absent(repo_root, operation_id); + assert!( + read_audit_row(repo_root, operation_id).await.is_err(), + "no-op recovery must not record an audit row for {operation_id}", + ); + } + } + + Ok(()) +} + +fn branch_context(tables: &[TableExpectation]) -> Option { + tables + .iter() + .filter_map(|table| table.branch.as_deref()) + .find(|branch| *branch != "main") + .map(str::to_string) +} + +fn sidecar_path(repo_root: &Path, operation_id: &str) -> PathBuf { + repo_root + .join("__recovery") + .join(format!("{operation_id}.json")) +} + +fn assert_sidecar_absent(repo_root: &Path, operation_id: &str) { + assert!( + !sidecar_path(repo_root, operation_id).exists(), + "recovery sidecar {operation_id} must be deleted after successful recovery", + ); +} + +async fn assert_manifest_pins_match_lance_heads( + repo_root: &Path, + tables: &[TableExpectation], +) -> Result<()> { + let uri = repo_uri(repo_root); + let db = Omnigraph::open(&uri).await?; + for table in tables { + let (entry, lance_head) = entry_and_lance_head(&db, &uri, table).await?; + assert_eq!( + entry.table_version, lance_head, + "manifest pin for {} on {:?} must match Lance HEAD after roll-forward", + table.table_key, table.branch, + ); + if let Some(expected) = table.expected_lance_head { + assert_eq!( + lance_head, expected, + "Lance HEAD for {} on {:?} did not match the test's expected value", + table.table_key, table.branch, + ); + } + } + Ok(()) +} + +async fn assert_audit_to_versions_match_lance_heads( + repo_root: &Path, + audit: &RecoveryAuditRow, + tables: &[TableExpectation], +) -> Result<()> { + let uri = repo_uri(repo_root); + let db = Omnigraph::open(&uri).await?; + for table in tables { + let (_, lance_head) = entry_and_lance_head(&db, &uri, table).await?; + let outcome = audit + .per_table_outcomes + .iter() + .find(|outcome| outcome.table_key == table.table_key) + .ok_or_else(|| { + OmniError::manifest_internal(format!( + "audit row for {} has no outcome for {}", + audit.operation_id, table.table_key, + )) + })?; + assert_eq!( + outcome.to_version, lance_head, + "audit to_version for {} must match the published Lance HEAD", + table.table_key, + ); + } + Ok(()) +} + +async fn assert_non_main_did_not_move_main( + repo_root: &Path, + tables: &[TableExpectation], +) -> Result<()> { + let uri = repo_uri(repo_root); + let db = Omnigraph::open(&uri).await?; + let main = db.snapshot_of(ReadTarget::branch("main")).await?; + for table in tables { + let Some(expected) = table.expected_main_manifest_pin else { + continue; + }; + let entry = main.entry(&table.table_key).ok_or_else(|| { + OmniError::manifest_internal(format!( + "main snapshot has no entry for {}", + table.table_key, + )) + })?; + assert_eq!( + entry.table_version, expected, + "non-main recovery for {} on {:?} must not move main's manifest pin", + table.table_key, table.branch, + ); + } + Ok(()) +} + +async fn assert_recovery_commit_shape( + repo_root: &Path, + audit: &RecoveryAuditRow, + tables: &[TableExpectation], +) -> Result<()> { + let branch = branch_context(tables); + let expected_parent = expected_recovery_parent(tables)?; + let branch = branch.as_deref(); + let commit = read_recovery_commit(repo_root, audit, branch).await?; + + assert_eq!( + commit.actor_id.as_deref(), + Some(RECOVERY_ACTOR), + "recovery commit {} for operation {} must use actor {}", + commit.graph_commit_id, + audit.operation_id, + RECOVERY_ACTOR, + ); + + if let Some(expected_parent) = expected_parent { + assert_eq!( + commit.parent_commit_id.as_deref(), + Some(expected_parent.as_str()), + "recovery commit {} for operation {} recorded the wrong parent", + commit.graph_commit_id, + audit.operation_id, + ); + } + + if audit.sidecar_writer_kind == "BranchMerge" { + assert!( + commit.merged_parent_commit_id.is_some(), + "recovered BranchMerge must record merged_parent_commit_id", + ); + + if let Some(branch) = branch { + let graph = CommitGraph::open_at_branch(&repo_uri(repo_root), branch).await?; + let commits = graph.load_commits().await?; + let parent = commit.parent_commit_id.as_deref().ok_or_else(|| { + OmniError::manifest_internal(format!( + "recovered BranchMerge commit {} has no parent_commit_id", + commit.graph_commit_id, + )) + })?; + assert!( + commits + .iter() + .any(|candidate| candidate.graph_commit_id == parent), + "recovered BranchMerge parent_commit_id {} is not on target branch {}", + parent, + branch, + ); + } + } + + Ok(()) +} + +fn expected_recovery_parent(tables: &[TableExpectation]) -> Result> { + let mut expected = None; + for table in tables { + let Some(candidate) = &table.expected_recovery_parent_commit_id else { + continue; + }; + match &expected { + None => expected = Some(candidate.clone()), + Some(existing) if existing == candidate => {} + Some(existing) => { + return Err(OmniError::manifest_internal(format!( + "conflicting expected recovery parents in table expectations: {existing} vs {candidate}", + ))); + } + } + } + Ok(expected) +} + +async fn assert_idempotent_reopen(repo_root: &Path, operation_id: &str) -> Result<()> { + let before = matching_audit_rows(repo_root, operation_id).await?; + let uri = repo_uri(repo_root); + let _db = Omnigraph::open(&uri).await?; + assert_sidecar_absent(repo_root, operation_id); + let after = matching_audit_rows(repo_root, operation_id).await?; + assert_eq!( + after.len(), + before.len(), + "immediate reopen after recovery must be a clean no-op for {operation_id}", + ); + Ok(()) +} + +async fn run_follow_up_mutations(repo_root: &Path, tables: Vec) -> Result<()> { + let mut db: Option = None; + for table in tables { + let Some(mutation) = table.follow_up_mutation else { + continue; + }; + if db.is_none() { + db = Some(Omnigraph::open(&repo_uri(repo_root)).await?); + } + let db = db.as_mut().unwrap(); + db.mutate( + &mutation.branch, + &mutation.query_source, + &mutation.query_name, + &mutation.params, + ) + .await + .map_err(|err| { + OmniError::manifest_internal(format!( + "follow-up mutation {} on {} after recovery failed: {}", + mutation.query_name, table.table_key, err, + )) + })?; + } + Ok(()) +} + +async fn entry_and_lance_head( + db: &Omnigraph, + root_uri: &str, + table: &TableExpectation, +) -> Result<(SubTableEntry, u64)> { + let branch = table.branch.as_deref().unwrap_or("main"); + let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?; + let entry = snapshot + .entry(&table.table_key) + .ok_or_else(|| { + OmniError::manifest_internal(format!( + "snapshot for branch {branch} has no entry for {}", + table.table_key, + )) + })? + .clone(); + let lance_head = lance_head_for_entry(root_uri, &entry).await?; + Ok((entry, lance_head)) +} + +async fn lance_head_for_entry(root_uri: &str, entry: &SubTableEntry) -> Result { + let table_uri = format!("{}/{}", root_uri.trim_end_matches('/'), entry.table_path); + let ds = Dataset::open(&table_uri) + .await + .map_err(|err| OmniError::Lance(err.to_string()))?; + let ds = match entry.table_branch.as_deref() { + Some(branch) if branch != "main" => ds + .checkout_branch(branch) + .await + .map_err(|err| OmniError::Lance(err.to_string()))?, + _ => ds, + }; + Ok(ds.version().version) +} + +async fn read_recovery_commit( + repo_root: &Path, + audit: &RecoveryAuditRow, + branch: Option<&str>, +) -> Result { + let uri = repo_uri(repo_root); + let graph = match branch { + Some(branch) => CommitGraph::open_at_branch(&uri, branch).await?, + None => CommitGraph::open(&uri).await?, + }; + graph + .load_commits() + .await? + .into_iter() + .find(|commit| commit.graph_commit_id == audit.graph_commit_id) + .ok_or_else(|| { + OmniError::manifest_internal(format!( + "recovery commit {} for operation {} was not found", + audit.graph_commit_id, audit.operation_id, + )) + }) +} + +async fn read_audit_row(repo_root: &Path, operation_id: &str) -> Result { + let mut rows = matching_audit_rows(repo_root, operation_id).await?; + if rows.len() != 1 { + return Err(OmniError::manifest_internal(format!( + "expected exactly one recovery audit row for {operation_id}, got {}", + rows.len(), + ))); + } + Ok(rows.remove(0)) +} + +async fn matching_audit_rows( + repo_root: &Path, + operation_id: &str, +) -> Result> { + let recoveries_dir = repo_root.join("_graph_commit_recoveries.lance"); + if !recoveries_dir.exists() { + return Ok(Vec::new()); + } + let ds = Dataset::open(recoveries_dir.to_str().unwrap()) + .await + .map_err(|err| OmniError::Lance(err.to_string()))?; + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .map_err(|err| OmniError::Lance(err.to_string()))? + .try_collect() + .await + .map_err(|err| OmniError::Lance(err.to_string()))?; + + let mut rows = Vec::new(); + for batch in batches { + let graph_commit_ids = string_column(&batch, "graph_commit_id")?; + let kinds = string_column(&batch, "recovery_kind")?; + let ops = string_column(&batch, "operation_id")?; + let writers = string_column(&batch, "sidecar_writer_kind")?; + let outcomes_json = string_column(&batch, "per_table_outcomes_json")?; + for row in 0..batch.num_rows() { + if ops.value(row) != operation_id { + continue; + } + let per_table_outcomes = + serde_json::from_str(outcomes_json.value(row)).map_err(|err| { + OmniError::manifest_internal(format!( + "failed to parse recovery audit outcomes for {operation_id}: {err}", + )) + })?; + rows.push(RecoveryAuditRow { + graph_commit_id: graph_commit_ids.value(row).to_string(), + recovery_kind: kinds.value(row).to_string(), + operation_id: ops.value(row).to_string(), + sidecar_writer_kind: writers.value(row).to_string(), + per_table_outcomes, + }); + } + } + Ok(rows) +} + +fn string_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a StringArray> { + batch + .column_by_name(name) + .ok_or_else(|| { + OmniError::manifest_internal(format!("recovery audit batch missing '{name}'")) + })? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OmniError::manifest_internal(format!("recovery audit column '{name}' is not Utf8")) + }) +} + +fn repo_uri(repo_root: &Path) -> String { + repo_root.to_str().unwrap().to_string() +} diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index 2880692..6ffe4fd 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -18,6 +18,7 @@ use lance::Dataset; use omnigraph::db::Omnigraph; mod helpers; +use helpers::recovery::{RecoveryExpectation, TableExpectation, assert_post_recovery_invariants}; const TEST_SCHEMA: &str = include_str!("fixtures/test.pg"); @@ -185,7 +186,9 @@ async fn recovery_rolls_back_synthetic_drift_on_open() { let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} {"type":"Person","data":{"name":"bob","age":25}} "#; - load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + load_jsonl(&mut db, test_data, LoadMode::Append) + .await + .unwrap(); drop(db); // Synthetic drift: advance Person's Lance HEAD WITHOUT updating the @@ -289,8 +292,14 @@ async fn count_recovery_audit_rows(repo_root: &Path) -> usize { .await .expect("recoveries dataset opens"); use futures::TryStreamExt; - let batches: Vec = - ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap(); + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); batches.iter().map(|b| b.num_rows()).sum() } @@ -303,13 +312,17 @@ async fn read_latest_recovery_audit( if !recoveries_dir.exists() { return None; } - let ds = Dataset::open(recoveries_dir.to_str().unwrap()) - .await - .ok()?; + let ds = Dataset::open(recoveries_dir.to_str().unwrap()).await.ok()?; use arrow_array::{Array, StringArray}; use futures::TryStreamExt; - let batches: Vec = - ds.scan().try_into_stream().await.ok()?.try_collect().await.ok()?; + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .ok()? + .try_collect() + .await + .ok()?; let last_batch = batches.iter().filter(|b| b.num_rows() > 0).last()?; let row = last_batch.num_rows() - 1; let kinds = last_batch @@ -349,11 +362,19 @@ async fn list_recovery_audit_kinds(repo_root: &Path) -> Vec { if !recoveries_dir.exists() { return Vec::new(); } - let ds = Dataset::open(recoveries_dir.to_str().unwrap()).await.unwrap(); + let ds = Dataset::open(recoveries_dir.to_str().unwrap()) + .await + .unwrap(); use arrow_array::{Array, StringArray}; use futures::TryStreamExt; - let batches: Vec = - ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap(); + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); let mut out = Vec::new(); for batch in batches { let kinds = batch @@ -378,8 +399,14 @@ async fn count_recovery_actor_commits(repo_root: &Path) -> usize { let ds = Dataset::open(actors_dir.to_str().unwrap()).await.unwrap(); use arrow_array::{Array, StringArray}; use futures::TryStreamExt; - let batches: Vec = - ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap(); + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); let mut count = 0; for batch in &batches { let actors = batch @@ -411,7 +438,9 @@ async fn recovery_rolls_forward_after_phase_b_completes() { let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} {"type":"Person","data":{"name":"bob","age":25}} "#; - load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + load_jsonl(&mut db, test_data, LoadMode::Append) + .await + .unwrap(); drop(db); let person_uri = node_table_uri(uri, "Person"); @@ -454,48 +483,18 @@ async fn recovery_rolls_forward_after_phase_b_completes() { // Reopen — sweep must roll forward, advancing the manifest pin to // head_after via a single ManifestBatchPublisher::publish call. - let _db = Omnigraph::open(uri).await.unwrap(); + let db = Omnigraph::open(uri).await.unwrap(); + drop(db); - // Sidecar deleted (sweep completed end-to-end). - assert!( - !list_recovery_dir(dir.path()).contains(&"01H00000000000000000000RF.json".to_string()), - "sidecar must be deleted after successful roll-forward" - ); - - // Audit row recorded. - assert_eq!( - count_recovery_audit_rows(dir.path()).await, - 1, - "roll-forward must record exactly one audit row" - ); - assert_eq!( - count_recovery_actor_commits(dir.path()).await, - 1, - "roll-forward must record exactly one commit-graph row tagged with omnigraph:recovery" - ); - let audit = read_latest_recovery_audit(dir.path()).await; - assert_eq!( - audit, - Some(( - "RolledForward".to_string(), - Some("act-alice".to_string()), - "01H00000000000000000000RF".to_string(), - "Mutation".to_string(), - )), - "audit row content mismatch" - ); - - // Idempotency: reopen is a no-op. - let _db2 = Omnigraph::open(uri).await.unwrap(); - assert!( - list_recovery_dir(dir.path()).is_empty(), - "second open must be a clean no-op" - ); - assert_eq!( - count_recovery_audit_rows(dir.path()).await, - 1, - "second open must NOT record a new audit row" - ); + assert_post_recovery_invariants( + dir.path(), + "01H00000000000000000000RF", + RecoveryExpectation::RolledForward { + tables: vec![TableExpectation::main("node:Person").expected_lance_head(head_after)], + }, + ) + .await + .unwrap(); } #[tokio::test] @@ -509,7 +508,9 @@ async fn recovery_rolls_back_records_audit_row_with_recovery_actor() { let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} "#; - load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + load_jsonl(&mut db, test_data, LoadMode::Append) + .await + .unwrap(); drop(db); let person_uri = node_table_uri(uri, "Person"); @@ -574,7 +575,9 @@ async fn recovery_rolls_forward_with_null_actor() { let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} "#; - load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + load_jsonl(&mut db, test_data, LoadMode::Append) + .await + .unwrap(); drop(db); let person_uri = node_table_uri(uri, "Person"); @@ -650,7 +653,9 @@ async fn recovery_processes_multiple_sidecars_with_fresh_snapshot_per_iter() { let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} {"type":"Company","data":{"name":"acme"}} "#; - load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + load_jsonl(&mut db, test_data, LoadMode::Append) + .await + .unwrap(); drop(db); // Synthesize drift on both tables independently. @@ -745,7 +750,9 @@ async fn recovery_ensure_indices_steady_state_no_sidecar() { let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}} {"type":"Company","data":{"name":"acme"}} "#; - load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap(); + load_jsonl(&mut db, test_data, LoadMode::Append) + .await + .unwrap(); db.ensure_indices().await.unwrap(); drop(db); @@ -1045,6 +1052,13 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() { .expect("feature snapshot must have Person entry"); let v_pin = feature_entry.table_version; let feature_branch_name = feature_entry.table_branch.clone(); + let main_pin = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap() + .entry("node:Person") + .expect("main snapshot must have Person entry") + .table_version; drop(db); // Bypass the manifest: append directly to Person's Lance HEAD on the @@ -1100,37 +1114,21 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() { // against feature's snapshot, not main's. With the fix, feature's // manifest pin advances v_pin → v_head. let db = Omnigraph::open(uri).await.unwrap(); - assert!( - list_recovery_dir(dir.path()).is_empty(), - "feature-branch sidecar must be processed (deleted) after recovery" - ); + drop(db); - // The post-recovery feature snapshot must show Person pinned at v_head. - let post_feature_snapshot = db - .snapshot_of(omnigraph::db::ReadTarget::branch("feature")) - .await - .unwrap(); - let post_entry = post_feature_snapshot - .entry("node:Person") - .expect("Person must still be pinned on feature"); - assert_eq!( - post_entry.table_version, v_head, - "feature manifest pin must advance v_pin={} → v_head={}; got {} \ - — without branch-aware recovery, classification would have \ - compared against main and rolled back / no-op'd", - v_pin, v_head, post_entry.table_version, - ); - - // Audit row recorded for the recovery action — and the row's - // recovery_kind == RolledForward (proves the branch-aware classifier - // got us through the eligible path; without it, the snapshot lookup - // against main's pin would have produced NoMovement → RollBack). - let kinds = list_recovery_audit_kinds(dir.path()).await; - assert_eq!( - kinds, vec!["RolledForward".to_string()], - "feature-branch sidecar recovery must record exactly one RolledForward audit row; got {:?}", - kinds, - ); + assert_post_recovery_invariants( + dir.path(), + "01H0000000000000000000FEAT", + RecoveryExpectation::RolledForward { + tables: vec![ + TableExpectation::branch("node:Person", "feature") + .expected_lance_head(v_head) + .expected_main_manifest_pin(main_pin), + ], + }, + ) + .await + .unwrap(); } /// Companion to the roll-forward feature-branch test: branch-axis @@ -1176,6 +1174,13 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() { .expect("feature snapshot must have Person entry"); let v_pin = feature_entry.table_version; let feature_branch_name = feature_entry.table_branch.clone(); + let main_pin = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap() + .entry("node:Person") + .expect("main snapshot must have Person entry") + .table_version; drop(db); // Bypass the manifest: append on the feature ref to advance HEAD past @@ -1230,21 +1235,21 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() { write_sidecar_file(dir.path(), "01H0000000000000000000FRB1", &sidecar_json); // Reopen with full sweep — RollBack is allowed at open time. - let _db = Omnigraph::open(uri).await.unwrap(); - assert!( - list_recovery_dir(dir.path()).is_empty(), - "feature-branch rollback sidecar must be deleted after recovery" - ); + let db = Omnigraph::open(uri).await.unwrap(); + drop(db); - // Audit kind == RolledBack (proves classifier saw feature's HEAD, - // not main's; main's view of Person would be NoMovement → no audit - // row attribution). - let kinds = list_recovery_audit_kinds(dir.path()).await; - assert_eq!( - kinds, vec!["RolledBack".to_string()], - "feature-branch rollback must record one RolledBack audit row; got {:?}", - kinds, - ); + assert_post_recovery_invariants( + dir.path(), + "01H0000000000000000000FRB1", + RecoveryExpectation::RolledBack { + tables: vec![ + TableExpectation::branch("node:Person", "feature") + .expected_main_manifest_pin(main_pin), + ], + }, + ) + .await + .unwrap(); // Lance HEAD on the feature ref must have advanced (real restore ran). let post = store @@ -1257,6 +1262,18 @@ async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() { v_head, post.version().version, ); + + let db = Omnigraph::open(uri).await.unwrap(); + assert_eq!( + helpers::count_rows_branch(&db, "feature", "node:Person").await, + 2, + "feature branch must still expose the manifest-pinned rows after rollback" + ); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 1, + "feature rollback must not move main" + ); } /// `OpenMode::ReadOnly` must NOT run `recover_schema_state_files`, diff --git a/docs/maintenance.md b/docs/maintenance.md index 6725269..08ae8da 100644 --- a/docs/maintenance.md +++ b/docs/maintenance.md @@ -16,7 +16,7 @@ - `CleanupPolicyOptions { keep_versions: Option, older_than: Option }` — at least one is required. - Returns `[TableCleanupStats { table_key, bytes_removed, old_versions_removed }]`. - CLI guards with `--confirm`; without it, prints a preview line. -- **Recovery floor:** `--keep < 3` may garbage-collect Lance versions that the open-time recovery sweep needs as a rollback target (the sweep restores to the manifest-pinned `expected_version`, which is HEAD-1 in the typical Phase B → Phase C drift case). Default `--keep 10` is safe. +- **Recovery floor:** `--keep < 3` may garbage-collect Lance versions that the open-time recovery sweep needs as a rollback target (the sweep restores to the branch's manifest-pinned table version, which is HEAD-1 in the typical Phase B → Phase C drift case). Default `--keep 10` is safe. ## Tombstones diff --git a/docs/runs.md b/docs/runs.md index 3f628b5..5e4d6b5 100644 --- a/docs/runs.md +++ b/docs/runs.md @@ -176,10 +176,13 @@ recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: sidecar on disk for operator review. - Otherwise, if every table is `RolledPastExpected`, **roll forward**: a single `ManifestBatchPublisher::publish` call extends every pin - atomically. + atomically. `SchemaApply` sidecars are eligible only when schema-state + recovery promoted the matching staging files in the same recovery pass; + otherwise full open-time recovery rolls them back and refresh-time + recovery leaves them for the next read-write open. - Otherwise **roll back**: per-table `Dataset::restore` to the - expected_version (with a fragment-set short-circuit so repeated - mid-sweep crashes don't pile up versions). + manifest-pinned table version for that branch. Rollback records the + actual restore target in the audit row's `to_version`. - After a successful roll-forward or roll-back, an audit row is recorded — `_graph_commits.lance` carries a commit tagged `actor_id = "omnigraph:recovery"`, and a sibling