diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 7d00219..5fb3224 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -562,6 +562,7 @@ async fn process_sidecar( states.push(ClassifiedTable { classification: classify_table(pin, lance_head, manifest_pinned, sidecar.writer_kind), manifest_pinned, + lance_head, }); } let classifications = states @@ -682,6 +683,12 @@ async fn process_sidecar( struct ClassifiedTable { classification: TableClassification, manifest_pinned: u64, + /// Lance HEAD observed at classification time. Captured so the + /// rollback audit's `from_version` can record where Lance HEAD was + /// before `Dataset::restore` ran (operators reading + /// `_graph_commit_recoveries.lance` see actual drift, not + /// `from_version == to_version == manifest_pinned`). + lance_head: u64, } async fn roll_back_sidecar( @@ -711,12 +718,13 @@ async fn roll_back_sidecar( state.manifest_pinned, ) .await?; + // `from_version` records the Lance HEAD observed BEFORE the + // restore (the actual drift), not the manifest pin. Operators + // reading `_graph_commit_recoveries.lance` see "rolled back + // from v7 to v5" rather than "v5 → v5". outcomes.push(TableOutcome { table_key: pin.table_key.clone(), - from_version: snapshot - .entry(&pin.table_key) - .map(|e| e.table_version) - .unwrap_or(0), + from_version: state.lance_head, to_version: state.manifest_pinned, }); } diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index fbcefc8..b466663 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1206,7 +1206,19 @@ impl Omnigraph { table_path: self.table_store().dataset_uri(&entry.table_path), expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, - table_branch: entry.table_branch.clone(), + // Use the merge target branch (where commits actually + // land), NOT entry.table_branch (where the table + // currently lives). publish_rewritten_merge_table calls + // open_for_mutation, which forks an inherited-from-main + // table to active_branch on first write — the resulting + // Lance commit lands on active_branch. Recovery's + // open_lance_head must check the same branch, otherwise + // an inherited-table feature-to-feature merge classifies + // as NoMovement and the all-or-nothing rollback skips + // the orphaned post-Phase-B HEAD on the target ref. + // Same rationale as table_ops.rs:115-120 in + // ensure_indices_for_branch. + table_branch: self.active_branch().map(str::to_string), }) }) .collect(); diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index c15bf80..d063169 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -1377,6 +1377,108 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { .unwrap(); } +/// Contract: the BranchMerge sidecar's per-table `table_branch` MUST be +/// the merge target branch (where commits land via +/// `publish_rewritten_merge_table` → `open_for_mutation` → potentially +/// `fork_dataset_from_entry_state`), NOT `entry.table_branch` (where +/// the table currently lives in the target's manifest snapshot). +/// +/// `ensure_indices_for_branch` already has this invariant pinned by an +/// explicit comment at `table_ops.rs:115-120`. Without the same fix in +/// `merge.rs`, a future change to candidate selection or the publish +/// path that produces a `RewriteMerged` whose entry.table_branch +/// diverges from active_branch would silently drift Lance HEAD on the +/// target ref while recovery checks the wrong ref and no-ops the +/// rollback. +/// +/// This test reads the sidecar JSON directly and asserts every per-pin +/// `table_branch` equals the active (target) branch. Even when the +/// values happen to coincide in practice (the strict candidate logic +/// keeps RewriteMerged tables on active_branch), the contract assertion +/// catches a regression that reverts to `entry.table_branch.clone()`. +#[tokio::test] +async fn branch_merge_sidecar_pins_table_branch_to_active_branch() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("target_branch").await.unwrap(); + db.mutate( + "target_branch", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Bob")], &[("$age", 40)]), + ) + .await + .unwrap(); + db.branch_create("source_branch").await.unwrap(); + db.mutate( + "source_branch", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Carol")], &[("$age", 50)]), + ) + .await + .unwrap(); + } + + { + let mut db = Omnigraph::open(&uri).await.unwrap(); + let _failpoint = + ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + let _ = db + .branch_merge("source_branch", "target_branch") + .await + .expect_err("failpoint must fire"); + } + + let operation_id = single_sidecar_operation_id(dir.path()); + let sidecar_path = dir + .path() + .join("__recovery") + .join(format!("{operation_id}.json")); + let sidecar_json = std::fs::read_to_string(&sidecar_path).unwrap(); + let sidecar: serde_json::Value = serde_json::from_str(&sidecar_json).unwrap(); + + let tables = sidecar["tables"] + .as_array() + .expect("sidecar tables must be an array"); + assert!( + !tables.is_empty(), + "sidecar must pin at least one RewriteMerged table — both branches mutated Person" + ); + for pin in tables { + let table_branch = pin + .get("table_branch") + .and_then(|v| v.as_str()) + .unwrap_or_else(|| { + panic!( + "sidecar pin must record table_branch as the merge target (active_branch); \ + got pin {pin:?}" + ) + }); + assert_eq!( + table_branch, "target_branch", + "sidecar pin must record `table_branch` as the merge target branch (where \ + commits actually land via publish_rewritten_merge_table → open_for_mutation), \ + NOT entry.table_branch from the target snapshot. See merge.rs filter_map and \ + the rationale comment at table_ops.rs:115-120. Got pin: {pin:?}" + ); + } +} + /// `ensure_indices` only writes a sidecar when at least one table /// genuinely needs index work (per `needs_index_work_*` helpers in /// `db/omnigraph/table_ops.rs`). When all tables are steady-state diff --git a/crates/omnigraph/tests/helpers/recovery.rs b/crates/omnigraph/tests/helpers/recovery.rs index 7197c93..3a8505f 100644 --- a/crates/omnigraph/tests/helpers/recovery.rs +++ b/crates/omnigraph/tests/helpers/recovery.rs @@ -49,6 +49,7 @@ struct RecoveryAuditRow { #[derive(Debug, Clone, Deserialize)] struct TableOutcome { table_key: String, + from_version: u64, to_version: u64, } @@ -179,6 +180,7 @@ pub async fn assert_post_recovery_invariants( audit.recovery_kind, "RolledBack", "audit row for {operation_id} recorded the wrong recovery_kind", ); + assert_rollback_outcomes_record_drift(&audit); assert_recovery_commit_shape(repo_root, &audit, &tables).await?; assert_non_main_did_not_move_main(repo_root, &tables).await?; assert_idempotent_reopen(repo_root, operation_id).await?; @@ -279,6 +281,25 @@ async fn assert_audit_to_versions_match_lance_heads( Ok(()) } +/// For RolledBack outcomes, `from_version` records the Lance HEAD +/// observed BEFORE the restore (the actual drift) and `to_version` +/// records the manifest pin we restored to. If both equal, the audit +/// row is uninformative — operators cannot tell how far Lance HEAD +/// drifted from the manifest. This assertion catches any regression +/// that reverts `from_version` to `manifest_pinned`. +fn assert_rollback_outcomes_record_drift(audit: &RecoveryAuditRow) { + for outcome in &audit.per_table_outcomes { + assert!( + outcome.from_version > outcome.to_version, + "rollback outcome for {} must record drift via `from_version > to_version` \ + (Lance HEAD before restore > manifest pin restored to); got from={}, to={}", + outcome.table_key, + outcome.from_version, + outcome.to_version, + ); + } +} + async fn assert_non_main_did_not_move_main( repo_root: &Path, tables: &[TableExpectation],