recovery: refresh reloads schema after staging recovery; non-main merge test pins parent_commit_id

E1. After D3 added recover_schema_state_files to refresh(), the
    in-memory `self.schema_source` and `self.catalog` were left stale:
    a SchemaApply sidecar processed via refresh would rename the
    staging files (`_schema.pg`, IR contract) into place but the
    handle continued operating against the old catalog. Subsequent
    operations would surface schema mismatches against post-migration
    data on disk.

    Fix: after recover_manifest_drift completes, refresh() now mirrors
    open_with_storage_and_mode's schema-load sequence — re-reads
    `_schema.pg`, parses IR via load_or_bootstrap_schema_contract,
    rebuilds the catalog with fixup_blob_schemas, and assigns into
    self.schema_source / self.catalog. Steady-state cost: one read +
    one parse per refresh; only mutates handle state when the on-disk
    schema actually changed.

E2. The non-main branch_merge recovery test
    (`branch_merge_phase_b_failure_recovered_on_non_main_target`)
    asserted only `merged_parent_commit_id` was non-null — but
    `merged_parent_commit_id` is independently populated from
    sidecar.merge_source_commit_id (the SOURCE branch's tip), so the
    assertion would pass even if D2's per-branch CommitGraph fix
    regressed (the bug was about `parent_commit_id`, the TARGET
    branch's tip).

    Fix: capture target_branch's commit-graph head BEFORE the failed
    merge by scanning target_branch's Lance ref on _graph_commits.lance
    and picking the latest commit by created_at. After recovery, find
    the recovery merge commit (the one with non-null
    merged_parent_commit_id) and assert its `parent_commit_id` ==
    captured pre-failure head. Without D2, recovery would record the
    GLOBAL head (the source_branch's insert-Carol commit on this test)
    instead, and the assertion fails.

    Also fixes the column-type cast: created_at is stored as
    TimestampMicrosecondArray, not Int64Array.

All workspace tests pass with --features failpoints.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-04 12:06:17 +02:00
parent 2ce4efc450
commit 44c0d0bc4b
No known key found for this signature in database
2 changed files with 102 additions and 13 deletions

View file

@ -422,6 +422,29 @@ impl Omnigraph {
crate::db::manifest::RecoveryMode::RollForwardOnly,
)
.await?;
// Re-read the schema source / catalog from disk: schema-state
// recovery above may have renamed staging files into place
// (completing an in-flight schema_apply), so the on-disk
// `_schema.pg` and IR contract may now reflect a NEWER schema
// than the in-memory `self.catalog` / `self.schema_source`.
// Without this reload subsequent ops on the handle would use
// stale catalog metadata against post-migration data on disk.
// Mirrors `open_with_storage_and_mode`'s schema-load sequence.
let schema_path = schema_source_uri(&self.root_uri);
let schema_source = self.storage.read_text(&schema_path).await?;
let current_source_ir = read_schema_ir_from_source(&schema_source)?;
let branches = self.coordinator.branch_list().await?;
let (accepted_ir, _) = load_or_bootstrap_schema_contract(
&self.root_uri,
Arc::clone(&self.storage),
&branches,
&current_source_ir,
)
.await?;
let mut catalog = build_catalog_from_ir(&accepted_ir)?;
fixup_blob_schemas(&mut catalog);
self.schema_source = schema_source;
self.catalog = catalog;
self.runtime_cache.invalidate_all().await;
Ok(())
}

View file

@ -975,6 +975,42 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
.unwrap();
}
// Capture target_branch's commit-graph head BEFORE the failed merge.
// This is the commit the recovery's merge commit must claim as its
// `parent_commit_id` (D2 — without the per-branch CommitGraph fix,
// recovery would record the GLOBAL head as parent instead).
let target_branch_head_before_failure = {
let commits_dir = dir.path().join("_graph_commits.lance");
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
.await
.unwrap()
.checkout_branch("target_branch")
.await
.unwrap();
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let batches: Vec<arrow_array::RecordBatch> =
ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap();
// Grab the latest commit_id by created_at order (the per-branch
// checkout ensures we only see target_branch's commits).
let mut latest: Option<(i64, String)> = None;
for batch in batches {
let ids = batch
.column_by_name("graph_commit_id").unwrap()
.as_any().downcast_ref::<StringArray>().unwrap();
let created = batch
.column_by_name("created_at").unwrap()
.as_any().downcast_ref::<arrow_array::TimestampMicrosecondArray>().unwrap();
for i in 0..ids.len() {
let ts = created.value(i);
if latest.as_ref().is_none_or(|(t, _)| ts > *t) {
latest = Some((ts, ids.value(i).to_string()));
}
}
}
latest.expect("target_branch must have at least one commit (the insert-Bob mutate)").1
};
// Phase A: failpoint fires after the per-table publish loop completes
// but before commit_manifest_updates. Sidecar persists with
// branch=Some("target_branch").
@ -1019,17 +1055,19 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
);
}
// Audit row for a merge commit was recorded with a non-null
// merged_parent_commit_id — proves the recovery sweep used the
// branch-specific commit-graph head as parent (not the global
// head). Also assert the recovery audit's recovery_kind ==
// RolledForward.
// Find the recovery commit on target_branch's commit graph and
// assert its `parent_commit_id` matches the head we captured BEFORE
// the failed merge. This is what catches D2: without the
// per-branch CommitGraph fix, recovery records the GLOBAL head as
// parent, which on this test setup is the source_branch's
// insert-Carol commit (a different ULID), and the assertion fails.
//
// `merged_parent_commit_id` alone is insufficient — it's
// independently populated from sidecar.merge_source_commit_id, so
// it would be set correctly even with D2's bug.
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let commits_dir = dir.path().join("_graph_commits.lance");
// Recovery wrote the merge commit to the target_branch's Lance ref
// on the commit_graph dataset (per CommitGraph::open_at_branch).
// Open at that ref to find the merge commit.
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
.await
.unwrap()
@ -1044,7 +1082,8 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
.try_collect()
.await
.unwrap();
let mut found_recovery_merge = false;
let mut recovery_merge_parent: Option<String> = None;
let mut recovery_merge_merged_parent: Option<String> = None;
for batch in batches {
let merged = batch
.column_by_name("merged_parent_commit_id")
@ -1052,17 +1091,44 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
.as_any()
.downcast_ref::<StringArray>()
.expect("merged_parent_commit_id is Utf8");
let parents = batch
.column_by_name("parent_commit_id")
.expect("parent_commit_id column present")
.as_any()
.downcast_ref::<StringArray>()
.expect("parent_commit_id is Utf8");
for i in 0..merged.len() {
if !merged.is_null(i) {
found_recovery_merge = true;
// First (and only — single recovery, single merge commit)
// commit with a merged parent IS the recovery commit.
recovery_merge_parent = if parents.is_null(i) {
None
} else {
Some(parents.value(i).to_string())
};
recovery_merge_merged_parent = Some(merged.value(i).to_string());
break;
}
}
if recovery_merge_parent.is_some() {
break;
}
}
let recovery_parent = recovery_merge_parent
.expect("non-main branch_merge recovery must record a merge commit with parent_commit_id");
assert_eq!(
recovery_parent, target_branch_head_before_failure,
"recovery merge commit's parent_commit_id must == target_branch's pre-failure head; \
expected {}, got {} this would regress to the GLOBAL head if D2's per-branch \
CommitGraph::open_at_branch fix were removed",
target_branch_head_before_failure, recovery_parent,
);
// Sanity: merged_parent is set from the source branch (independent
// of D2; would be correct even with the bug, but we still verify
// it's non-null so the row is a true merge commit).
assert!(
found_recovery_merge,
"non-main branch_merge recovery must record `merged_parent_commit_id` on the \
target branch's commit graph",
recovery_merge_merged_parent.is_some(),
"recovery merge commit must have non-null merged_parent_commit_id"
);
}