recovery: wire sidecar into schema_apply, branch_merge, ensure_indices (Phases 6-8)

Three writers each follow the same shape established in Phase 5: build
SidecarTablePin list before the per-table commit_staged loop, write the
sidecar via recovery::write_sidecar, do the existing work, delete the
sidecar after the manifest publish succeeds.

Loose-match classifier (recovery.rs):

The classifier now distinguishes strict vs. loose match per
SidecarKind. Strict (Mutation, Load, BranchMerge): exactly one
commit_staged per table; lance_head == manifest_pinned + 1 AND
post_commit_pin == lance_head required. Loose (SchemaApply,
EnsureIndices): the writer may run N >= 1 commit_staged calls per
table — index builds + rewrites compound, and the exact N is hard to
compute at sidecar-write time. Loose accepts any
lance_head > manifest_pinned (with expected_version still matching the
manifest pin) as RolledPastExpected. The risk it admits — an external
agent advancing HEAD between sidecar write and recovery — is out of
scope for the single-coordinator model (MR-668 territory).

roll_forward_all now reads the CURRENT Lance HEAD per table (not the
sidecar's post_commit_pin) so the manifest publish reflects whatever
HEAD landed, even if the loose-match writer committed multiple times
per table.

Per-writer wiring:

- schema_apply::apply_schema_with_lock: sidecar covers
  rewritten_tables ∪ indexed_tables (the tables that go through
  stage_overwrite/stage_create_index commit_staged). Skips
  added_tables (fresh datasets, no Phase B residual class) and
  renamed_tables (handled by the existing schema-state staging
  recovery in recover_schema_state_files).
- branch_merge::branch_merge_on_current_target: sidecar covers every
  table in candidates (publish_adopted_source_state +
  publish_rewritten_merge_table do the per-table commit_staged work).
  Sidecar writes after validate_merge_candidates and deletes after
  commit_manifest_updates.
- ensure_indices_for_branch: sidecar covers every node + edge type in
  the catalog with a manifest entry (build_indices_on_dataset is
  per-table-per-index commit_staged). Skips when the catalog has
  nothing — steady-state calls incur no sidecar I/O when the manifest
  already pins all expected types.

Allow recovery_audit.rs in forbidden_apis.rs:

The new db/recovery_audit.rs uses Dataset::write to bootstrap the
_graph_commit_recoveries.lance dataset (same pattern as
commit_graph.rs which is already allow-listed). Add it to the
ALLOW_LIST_FILES list in tests/forbidden_apis.rs.

8 new unit tests in db::manifest::recovery cover the loose-match
classifier branches (SchemaApply + EnsureIndices accept multi-commit
drift, NoMovement and InvariantViolation behave the same as strict).

All 20 test binaries pass (350+ tests across the workspace).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-03 00:29:08 +02:00
parent 49ca7e5068
commit c6827919ca
No known key found for this signature in database
5 changed files with 253 additions and 25 deletions

View file

@ -313,10 +313,26 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result<RecoverySid
}
/// Classify one table's observed state vs. the sidecar's intent.
///
/// `kind` adjusts the precision of the `RolledPastExpected` predicate:
/// - **Strict** (`Mutation`, `Load`, `BranchMerge`): exactly one
/// `commit_staged` per table, so `lance_head == manifest_pinned + 1`
/// AND `post_commit_pin == lance_head` is required.
/// - **Loose** (`SchemaApply`, `EnsureIndices`): the writer may run
/// N ≥ 1 `commit_staged` calls per table (one per index built + one
/// for the overwrite, etc.) and the exact N is hard to compute at
/// sidecar-write time. The loose match accepts any
/// `lance_head > manifest_pinned` as `RolledPastExpected` when
/// `pin.expected_version == manifest_pinned` (the writer's CAS
/// target matches what the manifest currently shows). The risk this
/// admits — an external agent advancing HEAD between sidecar write
/// and recovery — is out of scope for the single-coordinator model
/// (MR-668 territory).
pub(crate) fn classify_table(
pin: &SidecarTablePin,
lance_head: u64,
manifest_pinned: u64,
kind: SidecarKind,
) -> TableClassification {
use TableClassification::*;
if lance_head < manifest_pinned {
@ -328,15 +344,30 @@ pub(crate) fn classify_table(
return NoMovement;
}
// lance_head > manifest_pinned
if lance_head == manifest_pinned + 1 {
if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head {
RolledPastExpected
let strict = matches!(
kind,
SidecarKind::Mutation | SidecarKind::Load | SidecarKind::BranchMerge,
);
if strict {
if lance_head == manifest_pinned + 1 {
if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head {
RolledPastExpected
} else {
UnexpectedAtP1
}
} else {
UnexpectedAtP1
// lance_head > manifest_pinned + 1
UnexpectedMultistep
}
} else {
// lance_head > manifest_pinned + 1
UnexpectedMultistep
// Loose match for multi-commit writers (SchemaApply, EnsureIndices).
if pin.expected_version == manifest_pinned {
RolledPastExpected
} else if lance_head == manifest_pinned + 1 {
UnexpectedAtP1
} else {
UnexpectedMultistep
}
}
}
@ -455,7 +486,12 @@ async fn process_sidecar(
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0);
classifications.push(classify_table(pin, lance_head, manifest_pinned));
classifications.push(classify_table(
pin,
lance_head,
manifest_pinned,
sidecar.writer_kind,
));
}
match decide(&classifications) {
@ -564,17 +600,18 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result<u
let mut expected: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
// Read the post-commit dataset at `post_commit_pin` to capture the
// row count + version metadata that the manifest row needs. Cheap:
// these are manifest-level values, not a row scan.
let post_ds = Dataset::open(&pin.table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
.checkout_version(pin.post_commit_pin)
// Open the dataset at its CURRENT Lance HEAD (not at the sidecar's
// post_commit_pin). For strict-match writers (Mutation/Load/
// BranchMerge) HEAD == post_commit_pin by construction. For
// loose-match writers (SchemaApply/EnsureIndices) HEAD may be
// higher than post_commit_pin (multiple commit_staged calls per
// table); we want to publish to the actual current HEAD.
let head_ds = Dataset::open(&pin.table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head_version = head_ds.version().version;
let row_count = post_ds
let row_count = head_ds
.count_rows(None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
@ -584,12 +621,12 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result<u
super::metadata::TableVersionMetadata::from_dataset(
root_uri,
&table_relative_path,
&post_ds,
&head_ds,
)?;
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: pin.table_key.clone(),
table_version: pin.post_commit_pin,
table_version: head_version,
table_branch: sidecar.branch.clone(),
row_count,
version_metadata,
@ -763,33 +800,36 @@ mod tests {
#[test]
fn classify_no_movement_when_head_equals_pinned() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(classify_table(&pin, 5, 5), TableClassification::NoMovement);
assert_eq!(
classify_table(&pin, 5, 5, SidecarKind::Mutation),
TableClassification::NoMovement,
);
}
#[test]
fn classify_rolled_past_expected_when_sidecar_matches() {
fn classify_rolled_past_expected_when_sidecar_matches_strict() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 6, 5),
classify_table(&pin, 6, 5, SidecarKind::Mutation),
TableClassification::RolledPastExpected,
);
}
#[test]
fn classify_unexpected_at_p1_when_sidecar_does_not_match() {
fn classify_unexpected_at_p1_when_sidecar_does_not_match_strict() {
// Same +1 drift but post_commit_pin says it should be 7, not 6.
let pin = make_pin("node:Person", "irrelevant", 5, 7);
assert_eq!(
classify_table(&pin, 6, 5),
classify_table(&pin, 6, 5, SidecarKind::Mutation),
TableClassification::UnexpectedAtP1,
);
}
#[test]
fn classify_unexpected_multistep_when_head_jumped_more_than_one() {
fn classify_unexpected_multistep_when_head_jumped_more_than_one_strict() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 8, 5),
classify_table(&pin, 8, 5, SidecarKind::Mutation),
TableClassification::UnexpectedMultistep,
);
}
@ -798,7 +838,50 @@ mod tests {
fn classify_invariant_violation_when_head_below_pinned() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5),
classify_table(&pin, 3, 5, SidecarKind::Mutation),
TableClassification::InvariantViolation { observed: 3 },
);
}
// Loose-match writers (SchemaApply, EnsureIndices) accept any
// lance_head > expected_version as RolledPastExpected when the
// expected version still matches the manifest pin. The exact
// post_commit_pin is allowed to be a lower bound.
#[test]
fn classify_loose_match_accepts_multi_commit_drift_for_schema_apply() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
// Sidecar's post_commit_pin says 6, but Lance HEAD is 8 (SchemaApply
// built two indices). Strict would say UnexpectedMultistep; loose
// accepts it as RolledPastExpected.
assert_eq!(
classify_table(&pin, 8, 5, SidecarKind::SchemaApply),
TableClassification::RolledPastExpected,
);
}
#[test]
fn classify_loose_match_accepts_multi_commit_drift_for_ensure_indices() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 9, 5, SidecarKind::EnsureIndices),
TableClassification::RolledPastExpected,
);
}
#[test]
fn classify_loose_match_no_movement_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 5, 5, SidecarKind::SchemaApply),
TableClassification::NoMovement,
);
}
#[test]
fn classify_loose_match_invariant_violation_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5, SidecarKind::SchemaApply),
TableClassification::InvariantViolation { observed: 3 },
);
}

View file

@ -151,6 +151,43 @@ pub(super) async fn apply_schema_with_lock(
let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
let mut table_tombstones = HashMap::<String, u64>::new();
// MR-847 sidecar: protect the per-table commit_staged loop in
// rewritten_tables + indexed_tables. The post_commit_pin we record
// here is a lower bound (expected + 1); the classifier loose-matches
// for SidecarKind::SchemaApply because the actual N depends on how
// many indices need building. See classify_table's loose-match arm.
let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = rewritten_tables
.iter()
.chain(indexed_tables.iter().filter(|t| {
!rewritten_tables.contains(*t)
&& !added_tables.contains(*t)
&& !renamed_tables.contains_key(*t)
}))
.filter_map(|table_key| {
let entry = snapshot.entry(table_key)?;
Some(crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: db.table_store.dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
})
})
.collect();
let recovery_handle = if recovery_pins.is_empty() {
None
} else {
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::SchemaApply,
Some("__schema_apply_lock__".to_string()),
db.audit_actor_id.clone(),
recovery_pins,
);
Some(
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
.await?,
)
};
for table_key in &added_tables {
let table_path = table_path_for_table_key(table_key)?;
let dataset_uri = db.table_store.dataset_uri(&table_path);
@ -396,6 +433,15 @@ pub(super) async fn apply_schema_with_lock(
db.invalidate_graph_index().await;
}
// MR-847 sidecar lifecycle: delete after the manifest commit succeeded.
// If this delete fails, the sidecar persists; on next open the sweep
// sees every table at the post-publish manifest pin (NoMovement) and
// the sidecar is treated as a stale artifact (recovery is a no-op
// and the sidecar is cleaned up).
if let Some(handle) = recovery_handle {
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?;
}
Ok(SchemaApplyResult {
supported: true,
applied: true,

View file

@ -42,6 +42,51 @@ pub(super) async fn ensure_indices_for_branch(
let mut updates = Vec::new();
let active_branch = resolved.branch;
// MR-847 sidecar: protect the per-table commit_staged loop in
// build_indices_on_dataset (one commit per index built). Pins every
// node + edge table that's eligible for index work; the classifier
// loose-matches for SidecarKind::EnsureIndices (the actual N depends
// on which indices are missing). Skip sidecar entirely when the
// catalog has no tables that could need indexing — steady-state
// calls then incur no sidecar I/O.
let mut recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = Vec::new();
for type_name in db.catalog.node_types.keys() {
let table_key = format!("node:{}", type_name);
if let Some(entry) = snapshot.entry(&table_key) {
recovery_pins.push(crate::db::manifest::SidecarTablePin {
table_key,
table_path: format!("{}/{}", db.root_uri, entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
});
}
}
for edge_name in db.catalog.edge_types.keys() {
let table_key = format!("edge:{}", edge_name);
if let Some(entry) = snapshot.entry(&table_key) {
recovery_pins.push(crate::db::manifest::SidecarTablePin {
table_key,
table_path: format!("{}/{}", db.root_uri, entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
});
}
}
let recovery_handle = if recovery_pins.is_empty() {
None
} else {
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::EnsureIndices,
active_branch.clone(),
db.audit_actor_id.clone(),
recovery_pins,
);
Some(
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
.await?,
)
};
for type_name in db.catalog.node_types.keys() {
let table_key = format!("node:{}", type_name);
let Some(entry) = snapshot.entry(&table_key) else {
@ -140,6 +185,13 @@ pub(super) async fn ensure_indices_for_branch(
commit_prepared_updates_on_branch(db, branch, &updates).await?;
}
// MR-847 sidecar lifecycle: delete after the manifest publish (or no-op
// when there were no updates — sidecar covered the per-table commit
// window regardless).
if let Some(handle) = recovery_handle {
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?;
}
Ok(())
}

View file

@ -1167,6 +1167,47 @@ impl Omnigraph {
validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
// MR-847 sidecar: protect the per-table commit_staged loop. Pins
// every table that will be touched by `publish_adopted_source_state`
// or `publish_rewritten_merge_table`. BranchMerge uses loose
// classification — the publish path may run multiple commit_staged
// calls per table (publish_rewritten_merge_table does
// stage_merge_insert + delete_where + index rebuilds per the
// existing branch-merge code path).
let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
.iter()
.filter(|tk| candidates.contains_key(*tk))
.filter_map(|table_key| {
let entry = target_snapshot.entry(table_key)?;
Some(crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: self.table_store().dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
})
})
.collect();
let recovery_handle = if recovery_pins.is_empty() {
None
} else {
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::BranchMerge,
target_snapshot
.entry(ordered_table_keys.first().map(String::as_str).unwrap_or(""))
.and_then(|e| e.table_branch.clone()),
self.audit_actor_id.clone(),
recovery_pins,
);
Some(
crate::db::manifest::write_sidecar(
self.root_uri(),
self.storage_adapter(),
&sidecar,
)
.await?,
)
};
let mut updates = Vec::new();
let mut changed_edge_tables = false;
for table_key in &ordered_table_keys {
@ -1200,6 +1241,11 @@ impl Omnigraph {
} else {
self.commit_manifest_updates(&updates).await?
};
// MR-847 sidecar lifecycle: delete after manifest publish.
if let Some(handle) = recovery_handle {
crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await?;
}
self.record_merge_commit(
manifest_version,
target_head_commit_id,

View file

@ -99,6 +99,7 @@ const ALLOW_LIST_FILES: &[&str] = &[
"storage_layer.rs", // The trait module.
"commit_graph.rs", // Maintains `_graph_commits.lance` system table.
"graph_coordinator.rs", // Drives the manifest publisher / branch coordinator.
"recovery_audit.rs", // Maintains `_graph_commit_recoveries.lance` (MR-847 audit trail).
];
/// Directories exempt from the guard. Files under these paths may use