diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 776fb1a..7eedb2e 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -33,8 +33,8 @@ pub(crate) use namespace::open_table_head_for_write; use namespace::{branch_manifest_namespace, staged_table_namespace}; use publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; pub(crate) use recovery::{ - delete_sidecar, new_sidecar, recover_manifest_drift, write_sidecar, RecoverySidecar, - RecoverySidecarHandle, SidecarKind, SidecarTablePin, + delete_sidecar, has_schema_apply_sidecar, new_sidecar, recover_manifest_drift, write_sidecar, + RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, }; use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at}; pub use state::SubTableEntry; diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 8a8f23a..51bab6e 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -95,6 +95,15 @@ pub(crate) struct SidecarTablePin { /// Lance HEAD that the writer's `commit_staged` would produce /// (typically `expected_version + 1`). pub post_commit_pin: u64, + /// Lance branch ref this table lives on (mirrors + /// `SubTableEntry::table_branch`). Required for the recovery sweep + /// to open the dataset at the correct ref — `Dataset::open(path)` + /// alone returns the default ref (typically main), which would + /// classify a feature-branch sidecar against main's HEAD and silently + /// no-op or roll back the wrong table version. Optional for backward + /// compatibility with older sidecars; `None` means main / default. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub table_branch: Option, } /// In-memory representation of the on-disk JSON sidecar. @@ -107,6 +116,16 @@ pub(crate) struct RecoverySidecar { pub actor_id: Option, pub writer_kind: SidecarKind, pub tables: Vec, + /// For `SidecarKind::BranchMerge` only: the source branch's HEAD + /// commit id at the time the sidecar was written. Used by the + /// recovery sweep's audit step to call `append_merge_commit` + /// (recording `merged_parent_commit_id`) instead of `append_commit`, + /// so future merges between the same pair recognize "already up-to- + /// date" and merge-base computations stay correct. Optional for + /// backward compatibility — older sidecars (or non-BranchMerge + /// kinds) carry `None` and recovery falls back to `append_commit`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub merge_source_commit_id: Option, } /// Opaque handle returned by [`write_sidecar`] so the caller can delete @@ -391,32 +410,35 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision /// Restore a single table's Lance HEAD to `expected_version`, producing a /// new commit at HEAD+1 with content == content-at-`expected_version`. /// -/// Idempotency: if the latest Lance commit's fragment-id set already equals -/// the fragment-id set at `expected_version`, this is a no-op. Soundness — -/// Lance fragments are immutable; equal fragment-ids ⇒ equal content. -/// This guards against version pile-up under repeated mid-rollback crashes -/// (see `docs/runs.md` "Finalize → publisher residual" + `.context/mr-847-design.md` -/// §"Fragment-set equality short-circuit"). +/// Always runs the actual `Dataset::restore` — there is NO fragment-set +/// short-circuit because equal fragment IDs do NOT imply equal content: +/// Lance index commits and deletion-vector updates change the manifest +/// (and therefore the user-visible state) without changing fragment IDs. +/// Skipping the restore in those cases would leave Lance HEAD ahead of +/// the manifest with no recovery artifact left. +/// +/// Cost: under repeated mid-rollback crashes (rare), Lance HEAD +/// accumulates extra restore commits that `omnigraph cleanup` reclaims. +/// Bounded by the number of recovery iterations — typically 1. pub(crate) async fn restore_table_to_version( table_path: &str, + branch: Option<&str>, expected_version: u64, ) -> Result<()> { let head = Dataset::open(table_path) .await .map_err(|e| OmniError::Lance(e.to_string()))?; - let target = head + let head = match branch { + Some(b) if b != "main" => head + .checkout_branch(b) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?, + _ => head, + }; + let mut to_restore = head .checkout_version(expected_version) .await .map_err(|e| OmniError::Lance(e.to_string()))?; - - if fragment_ids(&head) == fragment_ids(&target) { - // Lance HEAD already reflects target content (a prior restore - // landed; we just didn't get to delete the sidecar). No-op. - return Ok(()); - } - - // checkout returns a NEW Dataset; restore() takes &mut self. - let mut to_restore = target; to_restore .restore() .await @@ -424,12 +446,6 @@ pub(crate) async fn restore_table_to_version( Ok(()) } -fn fragment_ids(ds: &Dataset) -> Vec { - let mut ids: Vec = ds.manifest.fragments.iter().map(|f| f.id).collect(); - ids.sort_unstable(); - ids -} - /// Open-time recovery sweep — the entry point invoked from /// `Omnigraph::open` (gated on `OpenMode::ReadWrite`). /// @@ -439,9 +455,9 @@ fn fragment_ids(ds: &Dataset) -> Vec { /// state), or abort (invariant violation). /// /// Idempotency: a crash mid-sweep leaves the sidecar (deletion is the -/// final step). Re-opening re-classifies; the fragment-set short-circuit -/// in [`restore_table_to_version`] prevents version pile-up under -/// repeated mid-rollback crashes. +/// final step). Re-opening re-classifies; repeated rollbacks of the +/// same table append extra Lance restore commits which `omnigraph +/// cleanup` reclaims. /// /// Concurrency: today recovery runs synchronously in `Omnigraph::open` /// *before* the engine is wrapped in the server's `Arc>`. @@ -450,24 +466,43 @@ fn fragment_ids(ds: &Dataset) -> Vec { /// queues before the sweep restores or publishes. pub(crate) async fn recover_manifest_drift( root_uri: &str, - storage: &dyn StorageAdapter, + storage: std::sync::Arc, coordinator: &mut GraphCoordinator, ) -> Result<()> { - let sidecars = list_sidecars(root_uri, storage).await?; + let sidecars = list_sidecars(root_uri, storage.as_ref()).await?; if sidecars.is_empty() { return Ok(()); } - // Refresh the coordinator snapshot BEFORE each sidecar's - // classification. Sidecar N's roll-forward writes manifest changes - // that sidecar N+1 must observe, otherwise sidecar N+1 classifies - // its tables against stale pins and may incorrectly roll back work - // that landed moments earlier. Refresh is cheap (one Lance manifest - // read). + // For each sidecar, classify against a FRESH snapshot AT THE + // SIDECAR'S BRANCH. Two reasons: + // 1. Per-sidecar refresh: sidecar N's roll-forward writes manifest + // changes that sidecar N+1 must observe, otherwise N+1 classifies + // its tables against stale pins. + // 2. Per-branch snapshot: a sidecar from a feature-branch writer + // pins entries on that feature branch. Classifying against the + // main coordinator's snapshot would compare to main's pins (and + // main's Lance HEAD if pin.table_branch isn't honored), silently + // no-op'ing or rolling back the wrong table version. Open a + // separate per-branch coordinator and use ITS snapshot. for sidecar in sidecars { - coordinator.refresh().await?; - let snapshot = coordinator.snapshot(); - process_sidecar(root_uri, storage, &snapshot, &sidecar).await?; + let branch_snapshot = match sidecar.branch.as_deref() { + Some(b) => { + let mut branch_coord = GraphCoordinator::open_branch( + root_uri, + b, + std::sync::Arc::clone(&storage), + ) + .await?; + branch_coord.refresh().await?; + branch_coord.snapshot() + } + None => { + coordinator.refresh().await?; + coordinator.snapshot() + } + }; + process_sidecar(root_uri, storage.as_ref(), &branch_snapshot, &sidecar).await?; } // Final refresh so the caller sees the post-sweep state. coordinator.refresh().await?; @@ -482,7 +517,8 @@ async fn process_sidecar( ) -> Result<()> { let mut classifications = Vec::with_capacity(sidecar.tables.len()); for pin in &sidecar.tables { - let lance_head = open_lance_head(&pin.table_path).await?; + let lance_head = + open_lance_head(&pin.table_path, pin.table_branch.as_deref()).await?; let manifest_pinned = snapshot .entry(&pin.table_key) .map(|e| e.table_version) @@ -518,9 +554,9 @@ async fn process_sidecar( // Restore every table whose Lance HEAD has drifted from the // manifest pin (RolledPastExpected, UnexpectedAtP1, // UnexpectedMultistep). NoMovement tables are already at - // expected_version — no action. The fragment-set short-circuit - // in restore_table_to_version makes drift-with-equivalent-content - // a no-op (sound: equal fragment-ids ⇒ equal content). + // expected_version — no action. Restore is unconditional; + // repeated mid-rollback crashes accumulate a few extra + // Lance commits that `omnigraph cleanup` reclaims. let mut outcomes = Vec::with_capacity(sidecar.tables.len()); for (pin, cls) in sidecar.tables.iter().zip(classifications.iter()) { if matches!( @@ -529,7 +565,12 @@ async fn process_sidecar( | TableClassification::UnexpectedAtP1 | TableClassification::UnexpectedMultistep ) { - restore_table_to_version(&pin.table_path, pin.expected_version).await?; + restore_table_to_version( + &pin.table_path, + pin.table_branch.as_deref(), + pin.expected_version, + ) + .await?; outcomes.push(TableOutcome { table_key: pin.table_key.clone(), from_version: snapshot @@ -601,15 +642,22 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result = HashMap::with_capacity(sidecar.tables.len()); for pin in &sidecar.tables { - // Open the dataset at its CURRENT Lance HEAD (not at the sidecar's - // post_commit_pin). For strict-match writers (Mutation/Load/ - // BranchMerge) HEAD == post_commit_pin by construction. For - // loose-match writers (SchemaApply/EnsureIndices) HEAD may be - // higher than post_commit_pin (multiple commit_staged calls per - // table); we want to publish to the actual current HEAD. + // Open the dataset at its CURRENT Lance HEAD on the pin's branch + // (not at the sidecar's post_commit_pin). For strict-match writers + // (Mutation/Load) HEAD == post_commit_pin by construction. For + // loose-match writers (SchemaApply/EnsureIndices/BranchMerge) HEAD + // may be higher than post_commit_pin (multiple commit_staged + // calls per table); we want to publish to the actual current HEAD. let head_ds = Dataset::open(&pin.table_path) .await .map_err(|e| OmniError::Lance(e.to_string()))?; + let head_ds = match pin.table_branch.as_deref() { + Some(b) if b != "main" => head_ds + .checkout_branch(b) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?, + _ => head_ds, + }; let head_version = head_ds.version().version; let row_count = head_ds @@ -628,7 +676,7 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result, ) -> Result<()> { let mut graph = CommitGraph::open(root_uri).await?; - let graph_commit_id = graph - .append_commit( - sidecar.branch.as_deref(), - manifest_version, - Some(RECOVERY_ACTOR), - ) - .await?; + // BranchMerge sidecars carry the source branch's HEAD commit id so + // recovery can record this as a MERGE commit (with parent linkage) + // instead of a plain commit. Without the merge parent, future + // `branch_merge feature → main` between the same pair would not + // recognize "already up-to-date" and merge-base computations break. + let graph_commit_id = match ( + sidecar.writer_kind, + sidecar.merge_source_commit_id.as_deref(), + kind, + ) { + (SidecarKind::BranchMerge, Some(source_id), RecoveryKind::RolledForward) => { + // For BranchMerge roll-forward, fetch the current branch + // tip as the parent — at open-time recovery this is the + // pre-merge tip (no other writers have run yet). + let parent_commit_id = + graph.head_commit_id().await?.unwrap_or_default(); + graph + .append_merge_commit( + sidecar.branch.as_deref(), + manifest_version, + &parent_commit_id, + source_id, + Some(RECOVERY_ACTOR), + ) + .await? + } + _ => { + graph + .append_commit( + sidecar.branch.as_deref(), + manifest_version, + Some(RECOVERY_ACTOR), + ) + .await? + } + }; let mut audit = RecoveryAudit::open(root_uri).await?; audit .append(RecoveryAuditRecord { @@ -681,10 +758,42 @@ async fn record_audit( Ok(()) } -async fn open_lance_head(table_path: &str) -> Result { +/// Returns `true` if any `SchemaApply` sidecar is present in +/// `__recovery/`. Schema-state recovery (`recover_schema_state_files`) +/// uses this to skip its normal pre-vs-post-commit disambiguation — +/// when a SchemaApply sidecar is present, we know the writer reached +/// Phase B (Lance HEADs advanced) but didn't complete Phase C (manifest +/// publish + staging→final renames). The right action is to complete +/// the rename so the recovery sweep's roll-forward step sees the new +/// catalog. Without this, the disambiguation logic deletes the staging +/// files (since manifest still pins the old table set) and leaves the +/// repo with new-schema data on disk but the old `_schema.pg` live — +/// real corruption. +pub(crate) async fn has_schema_apply_sidecar( + root_uri: &str, + storage: &dyn StorageAdapter, +) -> Result { + let sidecars = list_sidecars(root_uri, storage).await?; + Ok(sidecars + .iter() + .any(|s| matches!(s.writer_kind, SidecarKind::SchemaApply))) +} + +/// Open the Lance dataset at `table_path` checked out at the given +/// branch ref (or default if `branch` is None or "main") and return its +/// HEAD version. Recovery uses this so feature-branch sidecars classify +/// against the feature-branch's Lance HEAD, not main's. +async fn open_lance_head(table_path: &str, branch: Option<&str>) -> Result { let ds = Dataset::open(table_path) .await .map_err(|e| OmniError::Lance(e.to_string()))?; + let ds = match branch { + Some(b) if b != "main" => ds + .checkout_branch(b) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?, + _ => ds, + }; Ok(ds.version().version) } @@ -718,6 +827,7 @@ pub(crate) fn new_sidecar( actor_id, writer_kind, tables, + merge_source_commit_id: None, } } @@ -756,6 +866,7 @@ mod tests { table_path: table_path.to_string(), expected_version: expected, post_commit_pin: post, + table_branch: None, } } @@ -985,7 +1096,7 @@ mod tests { let head_before = ds.version().version; assert_eq!(head_before, 3); - restore_table_to_version(&uri, 1).await.unwrap(); + restore_table_to_version(&uri, None, 1).await.unwrap(); let post = Dataset::open(&uri).await.unwrap(); assert_eq!(post.version().version, head_before + 1); @@ -1001,7 +1112,11 @@ mod tests { } #[tokio::test] - async fn restore_table_to_version_no_ops_when_fragments_already_match() { + async fn restore_table_to_version_always_appends_a_commit() { + // Restore is unconditional — equal fragment IDs do NOT imply + // equal content (Lance index commits and deletion-vector + // updates change the manifest without touching fragment IDs). + // Repeated restore calls each produce a new HEAD+1 commit. let dir = tempfile::tempdir().unwrap(); let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); let store = TableStore::new(dir.path().to_str().unwrap()); @@ -1014,16 +1129,19 @@ mod tests { .await .unwrap(); // First restore: HEAD goes from 2 to 3 (with content == v1). - restore_table_to_version(&uri, 1).await.unwrap(); + restore_table_to_version(&uri, None, 1).await.unwrap(); let mid = Dataset::open(&uri).await.unwrap().version().version; assert_eq!(mid, 3); - // Second restore to v1: content already matches; no-op. - restore_table_to_version(&uri, 1).await.unwrap(); + // Second restore to v1: still appends a commit (HEAD = 4) because + // restore is unconditional. The pile-up is bounded and reclaimed + // by `omnigraph cleanup`. + restore_table_to_version(&uri, None, 1).await.unwrap(); let post = Dataset::open(&uri).await.unwrap().version().version; assert_eq!( - post, mid, - "second restore must short-circuit via fragment-set equality" + post, + mid + 1, + "restore must always append a commit (no fragment-set short-circuit)" ); } diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 700ae30..cb44e17 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -188,7 +188,7 @@ impl Omnigraph { // separate background-reconciler effort. crate::db::manifest::recover_manifest_drift( &root, - storage.as_ref(), + Arc::clone(&storage), &mut coordinator, ) .await?; diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 5d12e2a..62a96d0 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -170,6 +170,7 @@ pub(super) async fn apply_schema_with_lock( table_path: db.table_store.dataset_uri(&entry.table_path), expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, + table_branch: entry.table_branch.clone(), }) }) .collect(); diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 343b666..1e8843b 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -81,6 +81,7 @@ pub(super) async fn ensure_indices_for_branch( table_path: full_path, expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, + table_branch: entry.table_branch.clone(), }); } } @@ -101,6 +102,7 @@ pub(super) async fn ensure_indices_for_branch( table_path: full_path, expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, + table_branch: entry.table_branch.clone(), }); } } diff --git a/crates/omnigraph/src/db/schema_state.rs b/crates/omnigraph/src/db/schema_state.rs index 8ba24f4..d307159 100644 --- a/crates/omnigraph/src/db/schema_state.rs +++ b/crates/omnigraph/src/db/schema_state.rs @@ -319,6 +319,25 @@ pub(crate) async fn recover_schema_state_files( return Ok(()); } + // Schema-apply atomicity: when a SchemaApply sidecar is present, + // the writer reached Phase B (Lance HEADs advanced) but didn't + // complete Phase C (manifest publish + staging→final renames). The + // recovery sweep about to run will roll the table versions forward + // to the new Lance HEADs; we MUST also rename the staging files + // forward so the catalog matches. Without this, the disambiguation + // logic below sees actual_keys == live_keys (manifest didn't move) + // and deletes the staging files, leaving the repo with new-schema + // data on disk but the old `_schema.pg` live — corruption. + if crate::db::manifest::has_schema_apply_sidecar(root_uri, storage.as_ref()).await? { + warn!( + "recovery: SchemaApply sidecar present; completing schema-staging rename so the \ + manifest-drift sweep's roll-forward sees the new catalog (manifest v{})", + snapshot.version() + ); + complete_staging_rename(root_uri, storage.as_ref()).await?; + return Ok(()); + } + if !pg_exists { // _schema.pg.staging is gone but at least one of the other staging // files is still present. This is a partial-rename: the post-commit diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 388ff92..fbcefc8 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1206,6 +1206,7 @@ impl Omnigraph { table_path: self.table_store().dataset_uri(&entry.table_path), expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, + table_branch: entry.table_branch.clone(), }) }) .collect(); @@ -1222,12 +1223,18 @@ impl Omnigraph { // before invoking this function, so `self.active_branch()` // is the target. let target_branch = self.active_branch().map(str::to_string); - let sidecar = crate::db::manifest::new_sidecar( + let mut sidecar = crate::db::manifest::new_sidecar( crate::db::manifest::SidecarKind::BranchMerge, target_branch, self.audit_actor_id.clone(), recovery_pins, ); + // Carry the source branch's HEAD commit id so the recovery + // sweep's audit step can record this as a MERGE commit + // (linked to the source) instead of a plain commit. Without + // this, future merges between the same pair lose + // already-up-to-date detection and merge-base correctness. + sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string()); Some( crate::db::manifest::write_sidecar( self.root_uri(), diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index c32b688..47433be 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -265,6 +265,7 @@ impl MutationStaging { table_path: path.full_path.clone(), expected_version: expected, post_commit_pin: expected + 1, + table_branch: path.table_branch.clone(), }) }) .collect::>>()?; diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index f9cdfb5..0b70ae0 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -515,6 +515,22 @@ edge WorksAt: Person -> Company "manifest version must advance post-recovery; pre={pre_failure_version}, \ post={post_recovery_version}", ); + + // Schema-apply atomicity: the live `_schema.pg` must reflect the + // NEW schema (city column on Person, Tag node type) — not the old. + // Without the schema-staging coordination, the schema-state + // recovery would have deleted the staging files (because manifest + // hadn't advanced when it ran), leaving a corrupt repo with new- + // schema data on disk but old-schema catalog. + let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap(); + assert!( + live_schema.contains("city: String?"), + "_schema.pg must reflect the NEW schema (city column added); got:\n{live_schema}", + ); + assert!( + live_schema.contains("node Tag"), + "_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}", + ); drop(db); } @@ -628,6 +644,55 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { "manifest version must advance post-recovery; pre={pre_failure_version}, \ post={post_recovery_version}", ); + + // The recovered branch_merge must record a MERGE commit (with + // `merged_parent_commit_id` set), not a plain commit. Without + // this, future merges between the same pair lose + // already-up-to-date detection. We verify by reading + // `_graph_commits.lance` and asserting the most recent commit + // tagged with the recovery actor has a non-null + // `merged_parent_commit_id`. + { + use arrow_array::{Array, StringArray}; + use futures::TryStreamExt; + let commits_dir = dir.path().join("_graph_commits.lance"); + let ds = lance::Dataset::open(commits_dir.to_str().unwrap()) + .await + .unwrap(); + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let mut found_recovery_merge = false; + for batch in batches { + let merged = batch + .column_by_name("merged_parent_commit_id") + .expect("merged_parent_commit_id column present") + .as_any() + .downcast_ref::() + .expect("merged_parent_commit_id is Utf8"); + // The actor_id lives in _graph_commit_actors; cross-checking + // is heavier than necessary. Detecting any non-null + // merged_parent_commit_id in the post-recovery state is + // sufficient: only a recovered branch_merge can produce one + // here (we never completed a normal merge in this test). + for i in 0..merged.len() { + if !merged.is_null(i) { + found_recovery_merge = true; + break; + } + } + } + assert!( + found_recovery_merge, + "recovered branch_merge must record `merged_parent_commit_id` so future \ + merges detect already-up-to-date — no merge-parent-tagged commit found", + ); + } drop(db); } diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index cef1257..e5f2388 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -17,6 +17,8 @@ use arrow_schema::{DataType, Field, Schema}; use lance::Dataset; use omnigraph::db::Omnigraph; +mod helpers; + const TEST_SCHEMA: &str = include_str!("fixtures/test.pg"); fn write_sidecar_file(repo_root: &Path, operation_id: &str, json: &str) { @@ -978,6 +980,155 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() { ); } +/// A sidecar from a feature-branch writer must be classified against +/// THAT FEATURE BRANCH's manifest pin and Lance HEAD — not main's. +/// Otherwise: +/// - `snapshot.entry(table_key)` returns main's entry (or None) and +/// `manifest_pinned` is wrong. +/// - `Dataset::open(path)` returns the default ref's HEAD (main), +/// missing the feature branch's actual drift. +/// Either way, the classifier sees NoMovement → RollBack as no-op → +/// sidecar deleted while feature's drift remains. Subsequent feature +/// writers surface ExpectedVersionMismatch. +/// +/// Setup: +/// - Load alice on main. +/// - Create `feature` branch. +/// - Mutate feature (insert bob) → feature's manifest pin AND Lance +/// HEAD on the feature branch advance. +/// - Capture feature's post-mutate manifest pin (v_pin) and Lance HEAD +/// (v_head). +/// - Synthesize a sidecar with `branch=Some("feature")`, pin Person at +/// `expected=v_pin, post=v_pin+1`, `table_branch=Some("feature")`. +/// - Drop the engine and append_batch on Person's feature branch to +/// advance HEAD to v_pin+1 (bypass manifest). +/// +/// On reopen, recovery must: +/// - Open a per-branch coordinator at `feature` for snapshot +/// classification. +/// - Open Person's Lance dataset at the `feature` ref for HEAD read. +/// - Classify as RolledPastExpected and roll forward. +#[tokio::test] +async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() { + use omnigraph::loader::{LoadMode, load_jsonl}; + use omnigraph::table_store::TableStore; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + helpers::MUTATION_QUERIES, + "insert_person", + &helpers::mixed_params(&[("$name", "bob")], &[("$age", 40)]), + ) + .await + .unwrap(); + + // Capture feature-branch state. + let feature_snapshot = db + .snapshot_of(omnigraph::db::ReadTarget::branch("feature")) + .await + .unwrap(); + let feature_entry = feature_snapshot + .entry("node:Person") + .expect("feature snapshot must have Person entry"); + let v_pin = feature_entry.table_version; + let feature_branch_name = feature_entry.table_branch.clone(); + drop(db); + + // Bypass the manifest: append directly to Person's Lance HEAD on the + // feature branch ref to advance HEAD past v_pin. + let person_uri = node_table_uri(uri, "Person"); + let store = TableStore::new(uri); + let mut ds = store + .open_dataset_head(&person_uri, feature_branch_name.as_deref()) + .await + .unwrap(); + store + .append_batch( + &person_uri, + &mut ds, + person_batch(&[("carol-id", "carol", Some(40))]), + ) + .await + .unwrap(); + let v_head = ds.version().version; + assert_eq!(v_head, v_pin + 1, "append must advance HEAD by 1"); + + // Synthesize a sidecar saying the writer's intent was to publish + // feature's pin v_pin → v_pin+1. (Mutation kind = strict match.) + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H0000000000000000000FEAT", + "started_at": "0", + "branch": "feature", + "actor_id": "act-feature", + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key":"node:Person", + "table_path":"{}", + "expected_version":{}, + "post_commit_pin":{}, + "table_branch":{} + }} + ] + }}"#, + person_uri, + v_pin, + v_head, + match &feature_branch_name { + Some(b) => format!("\"{}\"", b), + None => "null".to_string(), + }, + ); + write_sidecar_file(dir.path(), "01H0000000000000000000FEAT", &sidecar_json); + + // Reopen — recovery sweep must process the feature-branch sidecar + // against feature's snapshot, not main's. With the fix, feature's + // manifest pin advances v_pin → v_head. + let db = Omnigraph::open(uri).await.unwrap(); + assert!( + list_recovery_dir(dir.path()).is_empty(), + "feature-branch sidecar must be processed (deleted) after recovery" + ); + + // The post-recovery feature snapshot must show Person pinned at v_head. + let post_feature_snapshot = db + .snapshot_of(omnigraph::db::ReadTarget::branch("feature")) + .await + .unwrap(); + let post_entry = post_feature_snapshot + .entry("node:Person") + .expect("Person must still be pinned on feature"); + assert_eq!( + post_entry.table_version, v_head, + "feature manifest pin must advance v_pin={} → v_head={}; got {} \ + — without branch-aware recovery, classification would have \ + compared against main and rolled back / no-op'd", + v_pin, v_head, post_entry.table_version, + ); + + // Audit row recorded for the recovery action. + assert_eq!( + count_recovery_audit_rows(dir.path()).await, + 1, + "feature-branch sidecar recovery must record one audit row", + ); +} + /// `OpenMode::ReadOnly` must NOT run `recover_schema_state_files`, /// which can delete or rename schema-staging files. Read-only consumers /// may run with read-only object-store credentials, and silent open-time