diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 699d98f..c33864a 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -422,6 +422,29 @@ impl Omnigraph { crate::db::manifest::RecoveryMode::RollForwardOnly, ) .await?; + // Re-read the schema source / catalog from disk: schema-state + // recovery above may have renamed staging files into place + // (completing an in-flight schema_apply), so the on-disk + // `_schema.pg` and IR contract may now reflect a NEWER schema + // than the in-memory `self.catalog` / `self.schema_source`. + // Without this reload subsequent ops on the handle would use + // stale catalog metadata against post-migration data on disk. + // Mirrors `open_with_storage_and_mode`'s schema-load sequence. + let schema_path = schema_source_uri(&self.root_uri); + let schema_source = self.storage.read_text(&schema_path).await?; + let current_source_ir = read_schema_ir_from_source(&schema_source)?; + let branches = self.coordinator.branch_list().await?; + let (accepted_ir, _) = load_or_bootstrap_schema_contract( + &self.root_uri, + Arc::clone(&self.storage), + &branches, + ¤t_source_ir, + ) + .await?; + let mut catalog = build_catalog_from_ir(&accepted_ir)?; + fixup_blob_schemas(&mut catalog); + self.schema_source = schema_source; + self.catalog = catalog; self.runtime_cache.invalidate_all().await; Ok(()) } diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index b5acedb..df224cf 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -975,6 +975,42 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { .unwrap(); } + // Capture target_branch's commit-graph head BEFORE the failed merge. + // This is the commit the recovery's merge commit must claim as its + // `parent_commit_id` (D2 — without the per-branch CommitGraph fix, + // recovery would record the GLOBAL head as parent instead). + let target_branch_head_before_failure = { + let commits_dir = dir.path().join("_graph_commits.lance"); + let ds = lance::Dataset::open(commits_dir.to_str().unwrap()) + .await + .unwrap() + .checkout_branch("target_branch") + .await + .unwrap(); + use arrow_array::{Array, StringArray}; + use futures::TryStreamExt; + let batches: Vec = + ds.scan().try_into_stream().await.unwrap().try_collect().await.unwrap(); + // Grab the latest commit_id by created_at order (the per-branch + // checkout ensures we only see target_branch's commits). + let mut latest: Option<(i64, String)> = None; + for batch in batches { + let ids = batch + .column_by_name("graph_commit_id").unwrap() + .as_any().downcast_ref::().unwrap(); + let created = batch + .column_by_name("created_at").unwrap() + .as_any().downcast_ref::().unwrap(); + for i in 0..ids.len() { + let ts = created.value(i); + if latest.as_ref().is_none_or(|(t, _)| ts > *t) { + latest = Some((ts, ids.value(i).to_string())); + } + } + } + latest.expect("target_branch must have at least one commit (the insert-Bob mutate)").1 + }; + // Phase A: failpoint fires after the per-table publish loop completes // but before commit_manifest_updates. Sidecar persists with // branch=Some("target_branch"). @@ -1019,17 +1055,19 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { ); } - // Audit row for a merge commit was recorded with a non-null - // merged_parent_commit_id — proves the recovery sweep used the - // branch-specific commit-graph head as parent (not the global - // head). Also assert the recovery audit's recovery_kind == - // RolledForward. + // Find the recovery commit on target_branch's commit graph and + // assert its `parent_commit_id` matches the head we captured BEFORE + // the failed merge. This is what catches D2: without the + // per-branch CommitGraph fix, recovery records the GLOBAL head as + // parent, which on this test setup is the source_branch's + // insert-Carol commit (a different ULID), and the assertion fails. + // + // `merged_parent_commit_id` alone is insufficient — it's + // independently populated from sidecar.merge_source_commit_id, so + // it would be set correctly even with D2's bug. use arrow_array::{Array, StringArray}; use futures::TryStreamExt; let commits_dir = dir.path().join("_graph_commits.lance"); - // Recovery wrote the merge commit to the target_branch's Lance ref - // on the commit_graph dataset (per CommitGraph::open_at_branch). - // Open at that ref to find the merge commit. let ds = lance::Dataset::open(commits_dir.to_str().unwrap()) .await .unwrap() @@ -1044,7 +1082,8 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { .try_collect() .await .unwrap(); - let mut found_recovery_merge = false; + let mut recovery_merge_parent: Option = None; + let mut recovery_merge_merged_parent: Option = None; for batch in batches { let merged = batch .column_by_name("merged_parent_commit_id") @@ -1052,17 +1091,44 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { .as_any() .downcast_ref::() .expect("merged_parent_commit_id is Utf8"); + let parents = batch + .column_by_name("parent_commit_id") + .expect("parent_commit_id column present") + .as_any() + .downcast_ref::() + .expect("parent_commit_id is Utf8"); for i in 0..merged.len() { if !merged.is_null(i) { - found_recovery_merge = true; + // First (and only — single recovery, single merge commit) + // commit with a merged parent IS the recovery commit. + recovery_merge_parent = if parents.is_null(i) { + None + } else { + Some(parents.value(i).to_string()) + }; + recovery_merge_merged_parent = Some(merged.value(i).to_string()); break; } } + if recovery_merge_parent.is_some() { + break; + } } + let recovery_parent = recovery_merge_parent + .expect("non-main branch_merge recovery must record a merge commit with parent_commit_id"); + assert_eq!( + recovery_parent, target_branch_head_before_failure, + "recovery merge commit's parent_commit_id must == target_branch's pre-failure head; \ + expected {}, got {} — this would regress to the GLOBAL head if D2's per-branch \ + CommitGraph::open_at_branch fix were removed", + target_branch_head_before_failure, recovery_parent, + ); + // Sanity: merged_parent is set from the source branch (independent + // of D2; would be correct even with the bug, but we still verify + // it's non-null so the row is a true merge commit). assert!( - found_recovery_merge, - "non-main branch_merge recovery must record `merged_parent_commit_id` on the \ - target branch's commit graph", + recovery_merge_merged_parent.is_some(), + "recovery merge commit must have non-null merged_parent_commit_id" ); }