From c6827919caad17e1ed215106fef3f84e436d71a2 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 3 May 2026 00:29:08 +0200 Subject: [PATCH] recovery: wire sidecar into schema_apply, branch_merge, ensure_indices (Phases 6-8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three writers each follow the same shape established in Phase 5: build SidecarTablePin list before the per-table commit_staged loop, write the sidecar via recovery::write_sidecar, do the existing work, delete the sidecar after the manifest publish succeeds. Loose-match classifier (recovery.rs): The classifier now distinguishes strict vs. loose match per SidecarKind. Strict (Mutation, Load, BranchMerge): exactly one commit_staged per table; lance_head == manifest_pinned + 1 AND post_commit_pin == lance_head required. Loose (SchemaApply, EnsureIndices): the writer may run N >= 1 commit_staged calls per table — index builds + rewrites compound, and the exact N is hard to compute at sidecar-write time. Loose accepts any lance_head > manifest_pinned (with expected_version still matching the manifest pin) as RolledPastExpected. The risk it admits — an external agent advancing HEAD between sidecar write and recovery — is out of scope for the single-coordinator model (MR-668 territory). roll_forward_all now reads the CURRENT Lance HEAD per table (not the sidecar's post_commit_pin) so the manifest publish reflects whatever HEAD landed, even if the loose-match writer committed multiple times per table. Per-writer wiring: - schema_apply::apply_schema_with_lock: sidecar covers rewritten_tables ∪ indexed_tables (the tables that go through stage_overwrite/stage_create_index commit_staged). Skips added_tables (fresh datasets, no Phase B residual class) and renamed_tables (handled by the existing schema-state staging recovery in recover_schema_state_files). - branch_merge::branch_merge_on_current_target: sidecar covers every table in candidates (publish_adopted_source_state + publish_rewritten_merge_table do the per-table commit_staged work). Sidecar writes after validate_merge_candidates and deletes after commit_manifest_updates. - ensure_indices_for_branch: sidecar covers every node + edge type in the catalog with a manifest entry (build_indices_on_dataset is per-table-per-index commit_staged). Skips when the catalog has nothing — steady-state calls incur no sidecar I/O when the manifest already pins all expected types. Allow recovery_audit.rs in forbidden_apis.rs: The new db/recovery_audit.rs uses Dataset::write to bootstrap the _graph_commit_recoveries.lance dataset (same pattern as commit_graph.rs which is already allow-listed). Add it to the ALLOW_LIST_FILES list in tests/forbidden_apis.rs. 8 new unit tests in db::manifest::recovery cover the loose-match classifier branches (SchemaApply + EnsureIndices accept multi-commit drift, NoMovement and InvariantViolation behave the same as strict). All 20 test binaries pass (350+ tests across the workspace). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/db/manifest/recovery.rs | 133 ++++++++++++++---- .../src/db/omnigraph/schema_apply.rs | 46 ++++++ .../omnigraph/src/db/omnigraph/table_ops.rs | 52 +++++++ crates/omnigraph/src/exec/merge.rs | 46 ++++++ crates/omnigraph/tests/forbidden_apis.rs | 1 + 5 files changed, 253 insertions(+), 25 deletions(-) diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 2a5ed72..c02c136 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -313,10 +313,26 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result manifest_pinned` as `RolledPastExpected` when +/// `pin.expected_version == manifest_pinned` (the writer's CAS +/// target matches what the manifest currently shows). The risk this +/// admits — an external agent advancing HEAD between sidecar write +/// and recovery — is out of scope for the single-coordinator model +/// (MR-668 territory). pub(crate) fn classify_table( pin: &SidecarTablePin, lance_head: u64, manifest_pinned: u64, + kind: SidecarKind, ) -> TableClassification { use TableClassification::*; if lance_head < manifest_pinned { @@ -328,15 +344,30 @@ pub(crate) fn classify_table( return NoMovement; } // lance_head > manifest_pinned - if lance_head == manifest_pinned + 1 { - if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head { - RolledPastExpected + let strict = matches!( + kind, + SidecarKind::Mutation | SidecarKind::Load | SidecarKind::BranchMerge, + ); + if strict { + if lance_head == manifest_pinned + 1 { + if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head { + RolledPastExpected + } else { + UnexpectedAtP1 + } } else { - UnexpectedAtP1 + // lance_head > manifest_pinned + 1 + UnexpectedMultistep } } else { - // lance_head > manifest_pinned + 1 - UnexpectedMultistep + // Loose match for multi-commit writers (SchemaApply, EnsureIndices). + if pin.expected_version == manifest_pinned { + RolledPastExpected + } else if lance_head == manifest_pinned + 1 { + UnexpectedAtP1 + } else { + UnexpectedMultistep + } } } @@ -455,7 +486,12 @@ async fn process_sidecar( .entry(&pin.table_key) .map(|e| e.table_version) .unwrap_or(0); - classifications.push(classify_table(pin, lance_head, manifest_pinned)); + classifications.push(classify_table( + pin, + lance_head, + manifest_pinned, + sidecar.writer_kind, + )); } match decide(&classifications) { @@ -564,17 +600,18 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result = HashMap::with_capacity(sidecar.tables.len()); for pin in &sidecar.tables { - // Read the post-commit dataset at `post_commit_pin` to capture the - // row count + version metadata that the manifest row needs. Cheap: - // these are manifest-level values, not a row scan. - let post_ds = Dataset::open(&pin.table_path) - .await - .map_err(|e| OmniError::Lance(e.to_string()))? - .checkout_version(pin.post_commit_pin) + // Open the dataset at its CURRENT Lance HEAD (not at the sidecar's + // post_commit_pin). For strict-match writers (Mutation/Load/ + // BranchMerge) HEAD == post_commit_pin by construction. For + // loose-match writers (SchemaApply/EnsureIndices) HEAD may be + // higher than post_commit_pin (multiple commit_staged calls per + // table); we want to publish to the actual current HEAD. + let head_ds = Dataset::open(&pin.table_path) .await .map_err(|e| OmniError::Lance(e.to_string()))?; + let head_version = head_ds.version().version; - let row_count = post_ds + let row_count = head_ds .count_rows(None) .await .map_err(|e| OmniError::Lance(e.to_string()))? as u64; @@ -584,12 +621,12 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result expected_version as RolledPastExpected when the + // expected version still matches the manifest pin. The exact + // post_commit_pin is allowed to be a lower bound. + #[test] + fn classify_loose_match_accepts_multi_commit_drift_for_schema_apply() { + let pin = make_pin("node:Person", "irrelevant", 5, 6); + // Sidecar's post_commit_pin says 6, but Lance HEAD is 8 (SchemaApply + // built two indices). Strict would say UnexpectedMultistep; loose + // accepts it as RolledPastExpected. + assert_eq!( + classify_table(&pin, 8, 5, SidecarKind::SchemaApply), + TableClassification::RolledPastExpected, + ); + } + + #[test] + fn classify_loose_match_accepts_multi_commit_drift_for_ensure_indices() { + let pin = make_pin("node:Person", "irrelevant", 5, 6); + assert_eq!( + classify_table(&pin, 9, 5, SidecarKind::EnsureIndices), + TableClassification::RolledPastExpected, + ); + } + + #[test] + fn classify_loose_match_no_movement_unchanged() { + let pin = make_pin("node:Person", "irrelevant", 5, 6); + assert_eq!( + classify_table(&pin, 5, 5, SidecarKind::SchemaApply), + TableClassification::NoMovement, + ); + } + + #[test] + fn classify_loose_match_invariant_violation_unchanged() { + let pin = make_pin("node:Person", "irrelevant", 5, 6); + assert_eq!( + classify_table(&pin, 3, 5, SidecarKind::SchemaApply), TableClassification::InvariantViolation { observed: 3 }, ); } diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 36f4b01..00d1da9 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -151,6 +151,43 @@ pub(super) async fn apply_schema_with_lock( let mut table_updates = HashMap::::new(); let mut table_tombstones = HashMap::::new(); + // MR-847 sidecar: protect the per-table commit_staged loop in + // rewritten_tables + indexed_tables. The post_commit_pin we record + // here is a lower bound (expected + 1); the classifier loose-matches + // for SidecarKind::SchemaApply because the actual N depends on how + // many indices need building. See classify_table's loose-match arm. + let recovery_pins: Vec = rewritten_tables + .iter() + .chain(indexed_tables.iter().filter(|t| { + !rewritten_tables.contains(*t) + && !added_tables.contains(*t) + && !renamed_tables.contains_key(*t) + })) + .filter_map(|table_key| { + let entry = snapshot.entry(table_key)?; + Some(crate::db::manifest::SidecarTablePin { + table_key: table_key.clone(), + table_path: db.table_store.dataset_uri(&entry.table_path), + expected_version: entry.table_version, + post_commit_pin: entry.table_version + 1, + }) + }) + .collect(); + let recovery_handle = if recovery_pins.is_empty() { + None + } else { + let sidecar = crate::db::manifest::new_sidecar( + crate::db::manifest::SidecarKind::SchemaApply, + Some("__schema_apply_lock__".to_string()), + db.audit_actor_id.clone(), + recovery_pins, + ); + Some( + crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar) + .await?, + ) + }; + for table_key in &added_tables { let table_path = table_path_for_table_key(table_key)?; let dataset_uri = db.table_store.dataset_uri(&table_path); @@ -396,6 +433,15 @@ pub(super) async fn apply_schema_with_lock( db.invalidate_graph_index().await; } + // MR-847 sidecar lifecycle: delete after the manifest commit succeeded. + // If this delete fails, the sidecar persists; on next open the sweep + // sees every table at the post-publish manifest pin (NoMovement) and + // the sidecar is treated as a stale artifact (recovery is a no-op + // and the sidecar is cleaned up). + if let Some(handle) = recovery_handle { + crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?; + } + Ok(SchemaApplyResult { supported: true, applied: true, diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index b343b4e..a02b4b1 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -42,6 +42,51 @@ pub(super) async fn ensure_indices_for_branch( let mut updates = Vec::new(); let active_branch = resolved.branch; + // MR-847 sidecar: protect the per-table commit_staged loop in + // build_indices_on_dataset (one commit per index built). Pins every + // node + edge table that's eligible for index work; the classifier + // loose-matches for SidecarKind::EnsureIndices (the actual N depends + // on which indices are missing). Skip sidecar entirely when the + // catalog has no tables that could need indexing — steady-state + // calls then incur no sidecar I/O. + let mut recovery_pins: Vec = Vec::new(); + for type_name in db.catalog.node_types.keys() { + let table_key = format!("node:{}", type_name); + if let Some(entry) = snapshot.entry(&table_key) { + recovery_pins.push(crate::db::manifest::SidecarTablePin { + table_key, + table_path: format!("{}/{}", db.root_uri, entry.table_path), + expected_version: entry.table_version, + post_commit_pin: entry.table_version + 1, + }); + } + } + for edge_name in db.catalog.edge_types.keys() { + let table_key = format!("edge:{}", edge_name); + if let Some(entry) = snapshot.entry(&table_key) { + recovery_pins.push(crate::db::manifest::SidecarTablePin { + table_key, + table_path: format!("{}/{}", db.root_uri, entry.table_path), + expected_version: entry.table_version, + post_commit_pin: entry.table_version + 1, + }); + } + } + let recovery_handle = if recovery_pins.is_empty() { + None + } else { + let sidecar = crate::db::manifest::new_sidecar( + crate::db::manifest::SidecarKind::EnsureIndices, + active_branch.clone(), + db.audit_actor_id.clone(), + recovery_pins, + ); + Some( + crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar) + .await?, + ) + }; + for type_name in db.catalog.node_types.keys() { let table_key = format!("node:{}", type_name); let Some(entry) = snapshot.entry(&table_key) else { @@ -140,6 +185,13 @@ pub(super) async fn ensure_indices_for_branch( commit_prepared_updates_on_branch(db, branch, &updates).await?; } + // MR-847 sidecar lifecycle: delete after the manifest publish (or no-op + // when there were no updates — sidecar covered the per-table commit + // window regardless). + if let Some(handle) = recovery_handle { + crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?; + } + Ok(()) } diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index df62a63..818be93 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1167,6 +1167,47 @@ impl Omnigraph { validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?; + // MR-847 sidecar: protect the per-table commit_staged loop. Pins + // every table that will be touched by `publish_adopted_source_state` + // or `publish_rewritten_merge_table`. BranchMerge uses loose + // classification — the publish path may run multiple commit_staged + // calls per table (publish_rewritten_merge_table does + // stage_merge_insert + delete_where + index rebuilds per the + // existing branch-merge code path). + let recovery_pins: Vec = ordered_table_keys + .iter() + .filter(|tk| candidates.contains_key(*tk)) + .filter_map(|table_key| { + let entry = target_snapshot.entry(table_key)?; + Some(crate::db::manifest::SidecarTablePin { + table_key: table_key.clone(), + table_path: self.table_store().dataset_uri(&entry.table_path), + expected_version: entry.table_version, + post_commit_pin: entry.table_version + 1, + }) + }) + .collect(); + let recovery_handle = if recovery_pins.is_empty() { + None + } else { + let sidecar = crate::db::manifest::new_sidecar( + crate::db::manifest::SidecarKind::BranchMerge, + target_snapshot + .entry(ordered_table_keys.first().map(String::as_str).unwrap_or("")) + .and_then(|e| e.table_branch.clone()), + self.audit_actor_id.clone(), + recovery_pins, + ); + Some( + crate::db::manifest::write_sidecar( + self.root_uri(), + self.storage_adapter(), + &sidecar, + ) + .await?, + ) + }; + let mut updates = Vec::new(); let mut changed_edge_tables = false; for table_key in &ordered_table_keys { @@ -1200,6 +1241,11 @@ impl Omnigraph { } else { self.commit_manifest_updates(&updates).await? }; + + // MR-847 sidecar lifecycle: delete after manifest publish. + if let Some(handle) = recovery_handle { + crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await?; + } self.record_merge_commit( manifest_version, target_head_commit_id, diff --git a/crates/omnigraph/tests/forbidden_apis.rs b/crates/omnigraph/tests/forbidden_apis.rs index 055d96a..30185a3 100644 --- a/crates/omnigraph/tests/forbidden_apis.rs +++ b/crates/omnigraph/tests/forbidden_apis.rs @@ -99,6 +99,7 @@ const ALLOW_LIST_FILES: &[&str] = &[ "storage_layer.rs", // The trait module. "commit_graph.rs", // Maintains `_graph_commits.lance` system table. "graph_coordinator.rs", // Drives the manifest publisher / branch coordinator. + "recovery_audit.rs", // Maintains `_graph_commit_recoveries.lance` (MR-847 audit trail). ]; /// Directories exempt from the guard. Files under these paths may use