recovery: align merge sidecar branch with active_branch + record rollback drift

Two small PR #72 review findings addressed:

- merge.rs sidecar pin recorded `entry.table_branch` (where the table
  currently lives in the target manifest) instead of the merge target
  branch where commits actually land via `publish_rewritten_merge_table`
  → `open_for_mutation` → `fork_dataset_from_entry_state`. Recovery's
  `open_lance_head` would then check the wrong ref. Aligned with the
  pattern already used in `ensure_indices_for_branch` (table_ops.rs:115).
  Added `branch_merge_sidecar_pins_table_branch_to_active_branch`
  contract test that reads the sidecar JSON and asserts every per-pin
  `table_branch` equals the active (target) branch — catches the
  regression even when the values happen to coincide in the test setup.

- Rollback audit `from_version` previously equalled `to_version`
  (both `manifest_pinned`), telling operators nothing about the actual
  Lance HEAD drift before restore. Captured `lance_head` in
  `ClassifiedTable` and used it as `from_version` so audit rows now
  show "rolled back from v7 to v5" instead of "v5 → v5". Added
  `assert_rollback_outcomes_record_drift` invariant in the test helper,
  invoked automatically by every `RecoveryExpectation::RolledBack` test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-05 19:33:32 +02:00
parent 815ff743f5
commit 58a3ff0e48
No known key found for this signature in database
4 changed files with 148 additions and 5 deletions

View file

@ -562,6 +562,7 @@ async fn process_sidecar(
states.push(ClassifiedTable {
classification: classify_table(pin, lance_head, manifest_pinned, sidecar.writer_kind),
manifest_pinned,
lance_head,
});
}
let classifications = states
@ -682,6 +683,12 @@ async fn process_sidecar(
struct ClassifiedTable {
classification: TableClassification,
manifest_pinned: u64,
/// Lance HEAD observed at classification time. Captured so the
/// rollback audit's `from_version` can record where Lance HEAD was
/// before `Dataset::restore` ran (operators reading
/// `_graph_commit_recoveries.lance` see actual drift, not
/// `from_version == to_version == manifest_pinned`).
lance_head: u64,
}
async fn roll_back_sidecar(
@ -711,12 +718,13 @@ async fn roll_back_sidecar(
state.manifest_pinned,
)
.await?;
// `from_version` records the Lance HEAD observed BEFORE the
// restore (the actual drift), not the manifest pin. Operators
// reading `_graph_commit_recoveries.lance` see "rolled back
// from v7 to v5" rather than "v5 → v5".
outcomes.push(TableOutcome {
table_key: pin.table_key.clone(),
from_version: snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0),
from_version: state.lance_head,
to_version: state.manifest_pinned,
});
}

View file

@ -1206,7 +1206,19 @@ impl Omnigraph {
table_path: self.table_store().dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
// Use the merge target branch (where commits actually
// land), NOT entry.table_branch (where the table
// currently lives). publish_rewritten_merge_table calls
// open_for_mutation, which forks an inherited-from-main
// table to active_branch on first write — the resulting
// Lance commit lands on active_branch. Recovery's
// open_lance_head must check the same branch, otherwise
// an inherited-table feature-to-feature merge classifies
// as NoMovement and the all-or-nothing rollback skips
// the orphaned post-Phase-B HEAD on the target ref.
// Same rationale as table_ops.rs:115-120 in
// ensure_indices_for_branch.
table_branch: self.active_branch().map(str::to_string),
})
})
.collect();

View file

@ -1377,6 +1377,108 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
.unwrap();
}
/// Contract: the BranchMerge sidecar's per-table `table_branch` MUST be
/// the merge target branch (where commits land via
/// `publish_rewritten_merge_table` → `open_for_mutation` → potentially
/// `fork_dataset_from_entry_state`), NOT `entry.table_branch` (where
/// the table currently lives in the target's manifest snapshot).
///
/// `ensure_indices_for_branch` already has this invariant pinned by an
/// explicit comment at `table_ops.rs:115-120`. Without the same fix in
/// `merge.rs`, a future change to candidate selection or the publish
/// path that produces a `RewriteMerged` whose entry.table_branch
/// diverges from active_branch would silently drift Lance HEAD on the
/// target ref while recovery checks the wrong ref and no-ops the
/// rollback.
///
/// This test reads the sidecar JSON directly and asserts every per-pin
/// `table_branch` equals the active (target) branch. Even when the
/// values happen to coincide in practice (the strict candidate logic
/// keeps RewriteMerged tables on active_branch), the contract assertion
/// catches a regression that reverts to `entry.table_branch.clone()`.
#[tokio::test]
async fn branch_merge_sidecar_pins_table_branch_to_active_branch() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("target_branch").await.unwrap();
db.mutate(
"target_branch",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
)
.await
.unwrap();
db.branch_create("source_branch").await.unwrap();
db.mutate(
"source_branch",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
)
.await
.unwrap();
}
{
let mut db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
let _ = db
.branch_merge("source_branch", "target_branch")
.await
.expect_err("failpoint must fire");
}
let operation_id = single_sidecar_operation_id(dir.path());
let sidecar_path = dir
.path()
.join("__recovery")
.join(format!("{operation_id}.json"));
let sidecar_json = std::fs::read_to_string(&sidecar_path).unwrap();
let sidecar: serde_json::Value = serde_json::from_str(&sidecar_json).unwrap();
let tables = sidecar["tables"]
.as_array()
.expect("sidecar tables must be an array");
assert!(
!tables.is_empty(),
"sidecar must pin at least one RewriteMerged table — both branches mutated Person"
);
for pin in tables {
let table_branch = pin
.get("table_branch")
.and_then(|v| v.as_str())
.unwrap_or_else(|| {
panic!(
"sidecar pin must record table_branch as the merge target (active_branch); \
got pin {pin:?}"
)
});
assert_eq!(
table_branch, "target_branch",
"sidecar pin must record `table_branch` as the merge target branch (where \
commits actually land via publish_rewritten_merge_table open_for_mutation), \
NOT entry.table_branch from the target snapshot. See merge.rs filter_map and \
the rationale comment at table_ops.rs:115-120. Got pin: {pin:?}"
);
}
}
/// `ensure_indices` only writes a sidecar when at least one table
/// genuinely needs index work (per `needs_index_work_*` helpers in
/// `db/omnigraph/table_ops.rs`). When all tables are steady-state

View file

@ -49,6 +49,7 @@ struct RecoveryAuditRow {
#[derive(Debug, Clone, Deserialize)]
struct TableOutcome {
table_key: String,
from_version: u64,
to_version: u64,
}
@ -179,6 +180,7 @@ pub async fn assert_post_recovery_invariants(
audit.recovery_kind, "RolledBack",
"audit row for {operation_id} recorded the wrong recovery_kind",
);
assert_rollback_outcomes_record_drift(&audit);
assert_recovery_commit_shape(repo_root, &audit, &tables).await?;
assert_non_main_did_not_move_main(repo_root, &tables).await?;
assert_idempotent_reopen(repo_root, operation_id).await?;
@ -279,6 +281,25 @@ async fn assert_audit_to_versions_match_lance_heads(
Ok(())
}
/// For RolledBack outcomes, `from_version` records the Lance HEAD
/// observed BEFORE the restore (the actual drift) and `to_version`
/// records the manifest pin we restored to. If both equal, the audit
/// row is uninformative — operators cannot tell how far Lance HEAD
/// drifted from the manifest. This assertion catches any regression
/// that reverts `from_version` to `manifest_pinned`.
fn assert_rollback_outcomes_record_drift(audit: &RecoveryAuditRow) {
for outcome in &audit.per_table_outcomes {
assert!(
outcome.from_version > outcome.to_version,
"rollback outcome for {} must record drift via `from_version > to_version` \
(Lance HEAD before restore > manifest pin restored to); got from={}, to={}",
outcome.table_key,
outcome.from_version,
outcome.to_version,
);
}
}
async fn assert_non_main_did_not_move_main(
repo_root: &Path,
tables: &[TableExpectation],