recovery: close four correctness gaps (schema-apply, branch-aware, restore short-circuit, merge parent)

B1. Schema-apply atomicity. Before this commit, a failure between
    `_schema.pg.staging` write and the manifest publish left the repo
    corrupt: Lance HEADs advanced under the new schema, manifest stayed
    at old pins, and on reopen schema-state recovery deleted the staging
    files (manifest's table set still matched the live schema), then
    manifest-drift recovery rolled the table versions forward — leaving
    new-schema data on disk with the old `_schema.pg` live.

    Fix: a SchemaApply sidecar is the marker that Phase B completed but
    Phase C didn't. New helper `has_schema_apply_sidecar` is consulted
    by `recover_schema_state_files` BEFORE its disambiguation logic;
    when present, it completes the staging→final rename so the
    subsequent manifest-drift roll-forward sees the new catalog.

B2. Branch-aware recovery. Sidecars from feature-branch writers were
    being classified against main's snapshot and main's Lance HEAD,
    silently no-op'ing or rolling back the wrong table version (the
    classifier saw NoMovement; the writer's drift on the feature branch
    persisted; subsequent feature writers surfaced
    ExpectedVersionMismatch).

    Fix: SidecarTablePin gets an optional `table_branch` field;
    `recover_manifest_drift` opens a per-branch coordinator
    (`GraphCoordinator::open_branch`) per sidecar; `open_lance_head`,
    `restore_table_to_version`, and `roll_forward_all` honor the pin's
    branch via `Dataset::checkout_branch`.

B3. Remove fragment-id short-circuit in `restore_table_to_version`.
    Equal fragment IDs do NOT imply equal content: Lance index commits
    and deletion-vector updates change the manifest without touching
    fragment IDs. Skipping restore in those cases would leave Lance HEAD
    ahead of the manifest with no recovery artifact left. Restore is
    now unconditional; pile-up under repeated mid-rollback crashes
    bounded and reclaimed by `omnigraph cleanup`.

B4. Recovered branch_merge records merge parent. `record_audit` always
    called `append_commit`, dropping `merged_parent_commit_id`. Future
    `branch_merge feature -> main` between the same pair lost
    already-up-to-date detection. RecoverySidecar gets an optional
    `merge_source_commit_id`; `branch_merge_on_current_target`
    populates it from `source_head_commit_id`; `record_audit`
    dispatches to `append_merge_commit` when present.

New tests: feature-branch sidecar classification (B2); B1 deepens the
existing schema_apply test with live-`_schema.pg` and new-type
assertions; B4 deepens the existing branch_merge test by reading
`_graph_commits.lance` and asserting a non-null `merged_parent_commit_id`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-03 23:39:41 +02:00
parent 35c4b16e91
commit 8c6506f5cd
No known key found for this signature in database
10 changed files with 431 additions and 67 deletions

View file

@ -33,8 +33,8 @@ pub(crate) use namespace::open_table_head_for_write;
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::{
delete_sidecar, new_sidecar, recover_manifest_drift, write_sidecar, RecoverySidecar,
RecoverySidecarHandle, SidecarKind, SidecarTablePin,
delete_sidecar, has_schema_apply_sidecar, new_sidecar, recover_manifest_drift, write_sidecar,
RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
};
use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
pub use state::SubTableEntry;

View file

@ -95,6 +95,15 @@ pub(crate) struct SidecarTablePin {
/// Lance HEAD that the writer's `commit_staged` would produce
/// (typically `expected_version + 1`).
pub post_commit_pin: u64,
/// Lance branch ref this table lives on (mirrors
/// `SubTableEntry::table_branch`). Required for the recovery sweep
/// to open the dataset at the correct ref — `Dataset::open(path)`
/// alone returns the default ref (typically main), which would
/// classify a feature-branch sidecar against main's HEAD and silently
/// no-op or roll back the wrong table version. Optional for backward
/// compatibility with older sidecars; `None` means main / default.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub table_branch: Option<String>,
}
/// In-memory representation of the on-disk JSON sidecar.
@ -107,6 +116,16 @@ pub(crate) struct RecoverySidecar {
pub actor_id: Option<String>,
pub writer_kind: SidecarKind,
pub tables: Vec<SidecarTablePin>,
/// For `SidecarKind::BranchMerge` only: the source branch's HEAD
/// commit id at the time the sidecar was written. Used by the
/// recovery sweep's audit step to call `append_merge_commit`
/// (recording `merged_parent_commit_id`) instead of `append_commit`,
/// so future merges between the same pair recognize "already up-to-
/// date" and merge-base computations stay correct. Optional for
/// backward compatibility — older sidecars (or non-BranchMerge
/// kinds) carry `None` and recovery falls back to `append_commit`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub merge_source_commit_id: Option<String>,
}
/// Opaque handle returned by [`write_sidecar`] so the caller can delete
@ -391,32 +410,35 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision
/// Restore a single table's Lance HEAD to `expected_version`, producing a
/// new commit at HEAD+1 with content == content-at-`expected_version`.
///
/// Idempotency: if the latest Lance commit's fragment-id set already equals
/// the fragment-id set at `expected_version`, this is a no-op. Soundness —
/// Lance fragments are immutable; equal fragment-ids ⇒ equal content.
/// This guards against version pile-up under repeated mid-rollback crashes
/// (see `docs/runs.md` "Finalize → publisher residual" + `.context/mr-847-design.md`
/// §"Fragment-set equality short-circuit").
/// Always runs the actual `Dataset::restore` — there is NO fragment-set
/// short-circuit because equal fragment IDs do NOT imply equal content:
/// Lance index commits and deletion-vector updates change the manifest
/// (and therefore the user-visible state) without changing fragment IDs.
/// Skipping the restore in those cases would leave Lance HEAD ahead of
/// the manifest with no recovery artifact left.
///
/// Cost: under repeated mid-rollback crashes (rare), Lance HEAD
/// accumulates extra restore commits that `omnigraph cleanup` reclaims.
/// Bounded by the number of recovery iterations — typically 1.
pub(crate) async fn restore_table_to_version(
table_path: &str,
branch: Option<&str>,
expected_version: u64,
) -> Result<()> {
let head = Dataset::open(table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let target = head
let head = match branch {
Some(b) if b != "main" => head
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head,
};
let mut to_restore = head
.checkout_version(expected_version)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
if fragment_ids(&head) == fragment_ids(&target) {
// Lance HEAD already reflects target content (a prior restore
// landed; we just didn't get to delete the sidecar). No-op.
return Ok(());
}
// checkout returns a NEW Dataset; restore() takes &mut self.
let mut to_restore = target;
to_restore
.restore()
.await
@ -424,12 +446,6 @@ pub(crate) async fn restore_table_to_version(
Ok(())
}
fn fragment_ids(ds: &Dataset) -> Vec<u64> {
let mut ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
ids.sort_unstable();
ids
}
/// Open-time recovery sweep — the entry point invoked from
/// `Omnigraph::open` (gated on `OpenMode::ReadWrite`).
///
@ -439,9 +455,9 @@ fn fragment_ids(ds: &Dataset) -> Vec<u64> {
/// state), or abort (invariant violation).
///
/// Idempotency: a crash mid-sweep leaves the sidecar (deletion is the
/// final step). Re-opening re-classifies; the fragment-set short-circuit
/// in [`restore_table_to_version`] prevents version pile-up under
/// repeated mid-rollback crashes.
/// final step). Re-opening re-classifies; repeated rollbacks of the
/// same table append extra Lance restore commits which `omnigraph
/// cleanup` reclaims.
///
/// Concurrency: today recovery runs synchronously in `Omnigraph::open`
/// *before* the engine is wrapped in the server's `Arc<RwLock<Omnigraph>>`.
@ -450,24 +466,43 @@ fn fragment_ids(ds: &Dataset) -> Vec<u64> {
/// queues before the sweep restores or publishes.
pub(crate) async fn recover_manifest_drift(
root_uri: &str,
storage: &dyn StorageAdapter,
storage: std::sync::Arc<dyn StorageAdapter>,
coordinator: &mut GraphCoordinator,
) -> Result<()> {
let sidecars = list_sidecars(root_uri, storage).await?;
let sidecars = list_sidecars(root_uri, storage.as_ref()).await?;
if sidecars.is_empty() {
return Ok(());
}
// Refresh the coordinator snapshot BEFORE each sidecar's
// classification. Sidecar N's roll-forward writes manifest changes
// that sidecar N+1 must observe, otherwise sidecar N+1 classifies
// its tables against stale pins and may incorrectly roll back work
// that landed moments earlier. Refresh is cheap (one Lance manifest
// read).
// For each sidecar, classify against a FRESH snapshot AT THE
// SIDECAR'S BRANCH. Two reasons:
// 1. Per-sidecar refresh: sidecar N's roll-forward writes manifest
// changes that sidecar N+1 must observe, otherwise N+1 classifies
// its tables against stale pins.
// 2. Per-branch snapshot: a sidecar from a feature-branch writer
// pins entries on that feature branch. Classifying against the
// main coordinator's snapshot would compare to main's pins (and
// main's Lance HEAD if pin.table_branch isn't honored), silently
// no-op'ing or rolling back the wrong table version. Open a
// separate per-branch coordinator and use ITS snapshot.
for sidecar in sidecars {
coordinator.refresh().await?;
let snapshot = coordinator.snapshot();
process_sidecar(root_uri, storage, &snapshot, &sidecar).await?;
let branch_snapshot = match sidecar.branch.as_deref() {
Some(b) => {
let mut branch_coord = GraphCoordinator::open_branch(
root_uri,
b,
std::sync::Arc::clone(&storage),
)
.await?;
branch_coord.refresh().await?;
branch_coord.snapshot()
}
None => {
coordinator.refresh().await?;
coordinator.snapshot()
}
};
process_sidecar(root_uri, storage.as_ref(), &branch_snapshot, &sidecar).await?;
}
// Final refresh so the caller sees the post-sweep state.
coordinator.refresh().await?;
@ -482,7 +517,8 @@ async fn process_sidecar(
) -> Result<()> {
let mut classifications = Vec::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
let lance_head = open_lance_head(&pin.table_path).await?;
let lance_head =
open_lance_head(&pin.table_path, pin.table_branch.as_deref()).await?;
let manifest_pinned = snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
@ -518,9 +554,9 @@ async fn process_sidecar(
// Restore every table whose Lance HEAD has drifted from the
// manifest pin (RolledPastExpected, UnexpectedAtP1,
// UnexpectedMultistep). NoMovement tables are already at
// expected_version — no action. The fragment-set short-circuit
// in restore_table_to_version makes drift-with-equivalent-content
// a no-op (sound: equal fragment-ids ⇒ equal content).
// expected_version — no action. Restore is unconditional;
// repeated mid-rollback crashes accumulate a few extra
// Lance commits that `omnigraph cleanup` reclaims.
let mut outcomes = Vec::with_capacity(sidecar.tables.len());
for (pin, cls) in sidecar.tables.iter().zip(classifications.iter()) {
if matches!(
@ -529,7 +565,12 @@ async fn process_sidecar(
| TableClassification::UnexpectedAtP1
| TableClassification::UnexpectedMultistep
) {
restore_table_to_version(&pin.table_path, pin.expected_version).await?;
restore_table_to_version(
&pin.table_path,
pin.table_branch.as_deref(),
pin.expected_version,
)
.await?;
outcomes.push(TableOutcome {
table_key: pin.table_key.clone(),
from_version: snapshot
@ -601,15 +642,22 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result<u
let mut expected: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
// Open the dataset at its CURRENT Lance HEAD (not at the sidecar's
// post_commit_pin). For strict-match writers (Mutation/Load/
// BranchMerge) HEAD == post_commit_pin by construction. For
// loose-match writers (SchemaApply/EnsureIndices) HEAD may be
// higher than post_commit_pin (multiple commit_staged calls per
// table); we want to publish to the actual current HEAD.
// Open the dataset at its CURRENT Lance HEAD on the pin's branch
// (not at the sidecar's post_commit_pin). For strict-match writers
// (Mutation/Load) HEAD == post_commit_pin by construction. For
// loose-match writers (SchemaApply/EnsureIndices/BranchMerge) HEAD
// may be higher than post_commit_pin (multiple commit_staged
// calls per table); we want to publish to the actual current HEAD.
let head_ds = Dataset::open(&pin.table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head_ds = match pin.table_branch.as_deref() {
Some(b) if b != "main" => head_ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head_ds,
};
let head_version = head_ds.version().version;
let row_count = head_ds
@ -628,7 +676,7 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result<u
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: pin.table_key.clone(),
table_version: head_version,
table_branch: sidecar.branch.clone(),
table_branch: pin.table_branch.clone(),
row_count,
version_metadata,
}));
@ -659,13 +707,42 @@ async fn record_audit(
outcomes: Vec<TableOutcome>,
) -> Result<()> {
let mut graph = CommitGraph::open(root_uri).await?;
let graph_commit_id = graph
.append_commit(
sidecar.branch.as_deref(),
manifest_version,
Some(RECOVERY_ACTOR),
)
.await?;
// BranchMerge sidecars carry the source branch's HEAD commit id so
// recovery can record this as a MERGE commit (with parent linkage)
// instead of a plain commit. Without the merge parent, future
// `branch_merge feature → main` between the same pair would not
// recognize "already up-to-date" and merge-base computations break.
let graph_commit_id = match (
sidecar.writer_kind,
sidecar.merge_source_commit_id.as_deref(),
kind,
) {
(SidecarKind::BranchMerge, Some(source_id), RecoveryKind::RolledForward) => {
// For BranchMerge roll-forward, fetch the current branch
// tip as the parent — at open-time recovery this is the
// pre-merge tip (no other writers have run yet).
let parent_commit_id =
graph.head_commit_id().await?.unwrap_or_default();
graph
.append_merge_commit(
sidecar.branch.as_deref(),
manifest_version,
&parent_commit_id,
source_id,
Some(RECOVERY_ACTOR),
)
.await?
}
_ => {
graph
.append_commit(
sidecar.branch.as_deref(),
manifest_version,
Some(RECOVERY_ACTOR),
)
.await?
}
};
let mut audit = RecoveryAudit::open(root_uri).await?;
audit
.append(RecoveryAuditRecord {
@ -681,10 +758,42 @@ async fn record_audit(
Ok(())
}
async fn open_lance_head(table_path: &str) -> Result<u64> {
/// Returns `true` if any `SchemaApply` sidecar is present in
/// `__recovery/`. Schema-state recovery (`recover_schema_state_files`)
/// uses this to skip its normal pre-vs-post-commit disambiguation —
/// when a SchemaApply sidecar is present, we know the writer reached
/// Phase B (Lance HEADs advanced) but didn't complete Phase C (manifest
/// publish + staging→final renames). The right action is to complete
/// the rename so the recovery sweep's roll-forward step sees the new
/// catalog. Without this, the disambiguation logic deletes the staging
/// files (since manifest still pins the old table set) and leaves the
/// repo with new-schema data on disk but the old `_schema.pg` live —
/// real corruption.
pub(crate) async fn has_schema_apply_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
) -> Result<bool> {
let sidecars = list_sidecars(root_uri, storage).await?;
Ok(sidecars
.iter()
.any(|s| matches!(s.writer_kind, SidecarKind::SchemaApply)))
}
/// Open the Lance dataset at `table_path` checked out at the given
/// branch ref (or default if `branch` is None or "main") and return its
/// HEAD version. Recovery uses this so feature-branch sidecars classify
/// against the feature-branch's Lance HEAD, not main's.
async fn open_lance_head(table_path: &str, branch: Option<&str>) -> Result<u64> {
let ds = Dataset::open(table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let ds = match branch {
Some(b) if b != "main" => ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => ds,
};
Ok(ds.version().version)
}
@ -718,6 +827,7 @@ pub(crate) fn new_sidecar(
actor_id,
writer_kind,
tables,
merge_source_commit_id: None,
}
}
@ -756,6 +866,7 @@ mod tests {
table_path: table_path.to_string(),
expected_version: expected,
post_commit_pin: post,
table_branch: None,
}
}
@ -985,7 +1096,7 @@ mod tests {
let head_before = ds.version().version;
assert_eq!(head_before, 3);
restore_table_to_version(&uri, 1).await.unwrap();
restore_table_to_version(&uri, None, 1).await.unwrap();
let post = Dataset::open(&uri).await.unwrap();
assert_eq!(post.version().version, head_before + 1);
@ -1001,7 +1112,11 @@ mod tests {
}
#[tokio::test]
async fn restore_table_to_version_no_ops_when_fragments_already_match() {
async fn restore_table_to_version_always_appends_a_commit() {
// Restore is unconditional — equal fragment IDs do NOT imply
// equal content (Lance index commits and deletion-vector
// updates change the manifest without touching fragment IDs).
// Repeated restore calls each produce a new HEAD+1 commit.
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
@ -1014,16 +1129,19 @@ mod tests {
.await
.unwrap();
// First restore: HEAD goes from 2 to 3 (with content == v1).
restore_table_to_version(&uri, 1).await.unwrap();
restore_table_to_version(&uri, None, 1).await.unwrap();
let mid = Dataset::open(&uri).await.unwrap().version().version;
assert_eq!(mid, 3);
// Second restore to v1: content already matches; no-op.
restore_table_to_version(&uri, 1).await.unwrap();
// Second restore to v1: still appends a commit (HEAD = 4) because
// restore is unconditional. The pile-up is bounded and reclaimed
// by `omnigraph cleanup`.
restore_table_to_version(&uri, None, 1).await.unwrap();
let post = Dataset::open(&uri).await.unwrap().version().version;
assert_eq!(
post, mid,
"second restore must short-circuit via fragment-set equality"
post,
mid + 1,
"restore must always append a commit (no fragment-set short-circuit)"
);
}

View file

@ -188,7 +188,7 @@ impl Omnigraph {
// separate background-reconciler effort.
crate::db::manifest::recover_manifest_drift(
&root,
storage.as_ref(),
Arc::clone(&storage),
&mut coordinator,
)
.await?;

View file

@ -170,6 +170,7 @@ pub(super) async fn apply_schema_with_lock(
table_path: db.table_store.dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
})
})
.collect();

View file

@ -81,6 +81,7 @@ pub(super) async fn ensure_indices_for_branch(
table_path: full_path,
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
});
}
}
@ -101,6 +102,7 @@ pub(super) async fn ensure_indices_for_branch(
table_path: full_path,
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
});
}
}

View file

@ -319,6 +319,25 @@ pub(crate) async fn recover_schema_state_files(
return Ok(());
}
// Schema-apply atomicity: when a SchemaApply sidecar is present,
// the writer reached Phase B (Lance HEADs advanced) but didn't
// complete Phase C (manifest publish + staging→final renames). The
// recovery sweep about to run will roll the table versions forward
// to the new Lance HEADs; we MUST also rename the staging files
// forward so the catalog matches. Without this, the disambiguation
// logic below sees actual_keys == live_keys (manifest didn't move)
// and deletes the staging files, leaving the repo with new-schema
// data on disk but the old `_schema.pg` live — corruption.
if crate::db::manifest::has_schema_apply_sidecar(root_uri, storage.as_ref()).await? {
warn!(
"recovery: SchemaApply sidecar present; completing schema-staging rename so the \
manifest-drift sweep's roll-forward sees the new catalog (manifest v{})",
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
return Ok(());
}
if !pg_exists {
// _schema.pg.staging is gone but at least one of the other staging
// files is still present. This is a partial-rename: the post-commit

View file

@ -1206,6 +1206,7 @@ impl Omnigraph {
table_path: self.table_store().dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
})
})
.collect();
@ -1222,12 +1223,18 @@ impl Omnigraph {
// before invoking this function, so `self.active_branch()`
// is the target.
let target_branch = self.active_branch().map(str::to_string);
let sidecar = crate::db::manifest::new_sidecar(
let mut sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::BranchMerge,
target_branch,
self.audit_actor_id.clone(),
recovery_pins,
);
// Carry the source branch's HEAD commit id so the recovery
// sweep's audit step can record this as a MERGE commit
// (linked to the source) instead of a plain commit. Without
// this, future merges between the same pair lose
// already-up-to-date detection and merge-base correctness.
sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
Some(
crate::db::manifest::write_sidecar(
self.root_uri(),

View file

@ -265,6 +265,7 @@ impl MutationStaging {
table_path: path.full_path.clone(),
expected_version: expected,
post_commit_pin: expected + 1,
table_branch: path.table_branch.clone(),
})
})
.collect::<Result<Vec<_>>>()?;

View file

@ -515,6 +515,22 @@ edge WorksAt: Person -> Company
"manifest version must advance post-recovery; pre={pre_failure_version}, \
post={post_recovery_version}",
);
// Schema-apply atomicity: the live `_schema.pg` must reflect the
// NEW schema (city column on Person, Tag node type) — not the old.
// Without the schema-staging coordination, the schema-state
// recovery would have deleted the staging files (because manifest
// hadn't advanced when it ran), leaving a corrupt repo with new-
// schema data on disk but old-schema catalog.
let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap();
assert!(
live_schema.contains("city: String?"),
"_schema.pg must reflect the NEW schema (city column added); got:\n{live_schema}",
);
assert!(
live_schema.contains("node Tag"),
"_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}",
);
drop(db);
}
@ -628,6 +644,55 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
"manifest version must advance post-recovery; pre={pre_failure_version}, \
post={post_recovery_version}",
);
// The recovered branch_merge must record a MERGE commit (with
// `merged_parent_commit_id` set), not a plain commit. Without
// this, future merges between the same pair lose
// already-up-to-date detection. We verify by reading
// `_graph_commits.lance` and asserting the most recent commit
// tagged with the recovery actor has a non-null
// `merged_parent_commit_id`.
{
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let commits_dir = dir.path().join("_graph_commits.lance");
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
.await
.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let mut found_recovery_merge = false;
for batch in batches {
let merged = batch
.column_by_name("merged_parent_commit_id")
.expect("merged_parent_commit_id column present")
.as_any()
.downcast_ref::<StringArray>()
.expect("merged_parent_commit_id is Utf8");
// The actor_id lives in _graph_commit_actors; cross-checking
// is heavier than necessary. Detecting any non-null
// merged_parent_commit_id in the post-recovery state is
// sufficient: only a recovered branch_merge can produce one
// here (we never completed a normal merge in this test).
for i in 0..merged.len() {
if !merged.is_null(i) {
found_recovery_merge = true;
break;
}
}
}
assert!(
found_recovery_merge,
"recovered branch_merge must record `merged_parent_commit_id` so future \
merges detect already-up-to-date no merge-parent-tagged commit found",
);
}
drop(db);
}

View file

@ -17,6 +17,8 @@ use arrow_schema::{DataType, Field, Schema};
use lance::Dataset;
use omnigraph::db::Omnigraph;
mod helpers;
const TEST_SCHEMA: &str = include_str!("fixtures/test.pg");
fn write_sidecar_file(repo_root: &Path, operation_id: &str, json: &str) {
@ -978,6 +980,155 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() {
);
}
/// A sidecar from a feature-branch writer must be classified against
/// THAT FEATURE BRANCH's manifest pin and Lance HEAD — not main's.
/// Otherwise:
/// - `snapshot.entry(table_key)` returns main's entry (or None) and
/// `manifest_pinned` is wrong.
/// - `Dataset::open(path)` returns the default ref's HEAD (main),
/// missing the feature branch's actual drift.
/// Either way, the classifier sees NoMovement → RollBack as no-op →
/// sidecar deleted while feature's drift remains. Subsequent feature
/// writers surface ExpectedVersionMismatch.
///
/// Setup:
/// - Load alice on main.
/// - Create `feature` branch.
/// - Mutate feature (insert bob) → feature's manifest pin AND Lance
/// HEAD on the feature branch advance.
/// - Capture feature's post-mutate manifest pin (v_pin) and Lance HEAD
/// (v_head).
/// - Synthesize a sidecar with `branch=Some("feature")`, pin Person at
/// `expected=v_pin, post=v_pin+1`, `table_branch=Some("feature")`.
/// - Drop the engine and append_batch on Person's feature branch to
/// advance HEAD to v_pin+1 (bypass manifest).
///
/// On reopen, recovery must:
/// - Open a per-branch coordinator at `feature` for snapshot
/// classification.
/// - Open Person's Lance dataset at the `feature` ref for HEAD read.
/// - Classify as RolledPastExpected and roll forward.
#[tokio::test]
async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
helpers::MUTATION_QUERIES,
"insert_person",
&helpers::mixed_params(&[("$name", "bob")], &[("$age", 40)]),
)
.await
.unwrap();
// Capture feature-branch state.
let feature_snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("feature"))
.await
.unwrap();
let feature_entry = feature_snapshot
.entry("node:Person")
.expect("feature snapshot must have Person entry");
let v_pin = feature_entry.table_version;
let feature_branch_name = feature_entry.table_branch.clone();
drop(db);
// Bypass the manifest: append directly to Person's Lance HEAD on the
// feature branch ref to advance HEAD past v_pin.
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = store
.open_dataset_head(&person_uri, feature_branch_name.as_deref())
.await
.unwrap();
store
.append_batch(
&person_uri,
&mut ds,
person_batch(&[("carol-id", "carol", Some(40))]),
)
.await
.unwrap();
let v_head = ds.version().version;
assert_eq!(v_head, v_pin + 1, "append must advance HEAD by 1");
// Synthesize a sidecar saying the writer's intent was to publish
// feature's pin v_pin → v_pin+1. (Mutation kind = strict match.)
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H0000000000000000000FEAT",
"started_at": "0",
"branch": "feature",
"actor_id": "act-feature",
"writer_kind": "Mutation",
"tables": [
{{
"table_key":"node:Person",
"table_path":"{}",
"expected_version":{},
"post_commit_pin":{},
"table_branch":{}
}}
]
}}"#,
person_uri,
v_pin,
v_head,
match &feature_branch_name {
Some(b) => format!("\"{}\"", b),
None => "null".to_string(),
},
);
write_sidecar_file(dir.path(), "01H0000000000000000000FEAT", &sidecar_json);
// Reopen — recovery sweep must process the feature-branch sidecar
// against feature's snapshot, not main's. With the fix, feature's
// manifest pin advances v_pin → v_head.
let db = Omnigraph::open(uri).await.unwrap();
assert!(
list_recovery_dir(dir.path()).is_empty(),
"feature-branch sidecar must be processed (deleted) after recovery"
);
// The post-recovery feature snapshot must show Person pinned at v_head.
let post_feature_snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("feature"))
.await
.unwrap();
let post_entry = post_feature_snapshot
.entry("node:Person")
.expect("Person must still be pinned on feature");
assert_eq!(
post_entry.table_version, v_head,
"feature manifest pin must advance v_pin={} → v_head={}; got {} \
without branch-aware recovery, classification would have \
compared against main and rolled back / no-op'd",
v_pin, v_head, post_entry.table_version,
);
// Audit row recorded for the recovery action.
assert_eq!(
count_recovery_audit_rows(dir.path()).await,
1,
"feature-branch sidecar recovery must record one audit row",
);
}
/// `OpenMode::ReadOnly` must NOT run `recover_schema_state_files`,
/// which can delete or rename schema-staging files. Read-only consumers
/// may run with read-only object-store credentials, and silent open-time