diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index e7072f6..776fb1a 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -32,7 +32,10 @@ pub(crate) use namespace::open_table_head_for_write; #[cfg(test)] use namespace::{branch_manifest_namespace, staged_table_namespace}; use publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; -pub(crate) use recovery::recover_manifest_drift; +pub(crate) use recovery::{ + delete_sidecar, new_sidecar, recover_manifest_drift, write_sidecar, RecoverySidecar, + RecoverySidecarHandle, SidecarKind, SidecarTablePin, +}; use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at}; pub use state::SubTableEntry; #[cfg(test)] diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index c365021..9fa7c6e 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -172,7 +172,7 @@ impl Omnigraph { // compare its snapshot against any leftover staging files. Recovery // either deletes staging (pre-commit crash) or completes the rename // (post-commit crash) before the live schema files are read. - let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?; + let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?; recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?; // MR-847 recovery sweep: close the Phase B → Phase C residual on // any sidecar left over from a crashed writer. ReadOnly skips — @@ -189,6 +189,11 @@ impl Omnigraph { &coordinator.snapshot(), ) .await?; + // Roll-forward advances the manifest pin and the audit appends + // commits to _graph_commits.lance + _graph_commit_recoveries.lance. + // The coordinator's in-memory snapshot is now stale; refresh so + // the returned Omnigraph carries the post-recovery state. + coordinator.refresh().await?; } // Read _schema.pg (post-recovery — may have just been renamed in). let schema_path = schema_source_uri(&root); @@ -266,6 +271,20 @@ impl Omnigraph { &self.table_store } + /// Engine-level access to the object-store adapter (S3 / local fs). + /// Used by the MR-847 recovery sidecar protocol — writers in the + /// engine call this to write/delete sidecars at `__recovery/{ulid}.json`. + pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter { + self.storage.as_ref() + } + + /// Engine-level access to the repo's normalized root URI. Used by + /// the MR-847 recovery sidecar protocol to compute `__recovery/` + /// paths. + pub(crate) fn root_uri(&self) -> &str { + &self.root_uri + } + pub(crate) async fn open_coordinator_for_branch( &self, branch: Option<&str>, diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index e996241..ebd2ae0 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -737,18 +737,23 @@ impl Omnigraph { Err(e) => Err(e), Ok(total) if staging.is_empty() => Ok(total), Ok(total) => { - let (updates, expected_versions) = staging - .finalize(self, requested.as_deref()) + let (updates, expected_versions, sidecar_handle) = staging + .finalize( + self, + requested.as_deref(), + crate::db::manifest::SidecarKind::Mutation, + ) .await?; // Failpoint that wedges the documented finalize→publisher // residual: per-table `commit_staged` calls already // advanced Lance HEAD on every touched table; a failure // injected here mirrors the production-rare case where // the publisher's CAS pre-check rejects (or the manifest - // write throws) after staged commits succeeded. Used by - // `tests/failpoints.rs::finalize_publisher_residual_*` - // to pin the documented residual behavior. See - // `docs/runs.md` "Finalize → publisher residual". + // write throws) after staged commits succeeded. The + // sidecar written inside `staging.finalize()` persists + // across this failure so the next `Omnigraph::open` + // (MR-847 recovery sweep) can roll forward — see + // `tests/failpoints.rs::recovery_rolls_forward_after_finalize_publisher_failure`. crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?; self.commit_updates_on_branch_with_expected( requested.as_deref(), @@ -756,6 +761,14 @@ impl Omnigraph { &expected_versions, ) .await?; + // Phase C succeeded — sidecar can be deleted. If this + // delete fails, the next open's sweep classifies every + // table as NoMovement (manifest pin == Lance HEAD == + // post_commit_pin) and the sidecar is treated as a + // stale artifact (cleaned up via the Phase 2 logic). + if let Some(handle) = sidecar_handle { + crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await?; + } Ok(total) } } diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 2c7eae3..b1d392d 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -27,6 +27,9 @@ use lance::Dataset; use omnigraph_compiler::catalog::EdgeType; use crate::db::SubTableUpdate; +use crate::db::manifest::{ + new_sidecar, write_sidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, +}; use crate::error::{OmniError, Result}; /// Whether the per-table accumulator should commit via `stage_append` @@ -218,8 +221,13 @@ impl MutationStaging { pub(crate) async fn finalize( self, db: &crate::db::Omnigraph, - _branch: Option<&str>, - ) -> Result<(Vec, HashMap)> { + branch: Option<&str>, + sidecar_kind: SidecarKind, + ) -> Result<( + Vec, + HashMap, + Option, + )> { let MutationStaging { expected_versions, paths, @@ -230,6 +238,49 @@ impl MutationStaging { let mut updates: Vec = inline_committed.into_values().collect(); + // MR-847 — sidecar protocol. Build the per-table pin list BEFORE + // any Lance commit_staged runs, then write the sidecar so a crash + // between Phase B (this loop's commit_staged calls) and Phase C + // (the manifest publish in the caller) is recoverable on next + // open. Skipped when `pending` is empty (delete-only mutation; + // D₂ parse-time rule keeps deletes out of this code path so this + // branch is reached only for the inline-committed-only case). + let pins: Vec = pending + .iter() + .map(|(table_key, _)| { + let path = paths.get(table_key).ok_or_else(|| { + OmniError::manifest_internal(format!( + "MutationStaging::finalize: missing path for table '{}'", + table_key, + )) + })?; + let expected = *expected_versions.get(table_key).ok_or_else(|| { + OmniError::manifest_internal(format!( + "MutationStaging::finalize: missing expected version for table '{}'", + table_key, + )) + })?; + Ok::(SidecarTablePin { + table_key: table_key.clone(), + table_path: path.full_path.clone(), + expected_version: expected, + post_commit_pin: expected + 1, + }) + }) + .collect::>>()?; + + let sidecar_handle = if pins.is_empty() { + None + } else { + let sidecar = new_sidecar( + sidecar_kind, + branch.map(|s| s.to_string()), + db.audit_actor_id.clone(), + pins, + ); + Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?) + }; + for (table_key, table) in pending { let path = paths.get(&table_key).ok_or_else(|| { OmniError::manifest_internal(format!( @@ -318,7 +369,7 @@ impl MutationStaging { }); } - Ok((updates, expected_versions)) + Ok((updates, expected_versions, sidecar_handle)) } } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index f275938..5af87fa 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -537,10 +537,24 @@ async fn load_jsonl_reader( // Phase 4: Atomic manifest commit with publisher-level OCC. if use_staging { - let (updates, expected_versions) = staging.finalize(db, branch).await?; + let (updates, expected_versions, sidecar_handle) = staging + .finalize(db, branch, crate::db::manifest::SidecarKind::Load) + .await?; db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions) .await?; + // MR-847: sidecar protects the per-table commit_staged → + // manifest publish window. Phase C succeeded — clean up. + if let Some(handle) = sidecar_handle { + crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?; + } } else { + // LoadMode::Overwrite keeps the legacy inline-commit path — + // truncate-then-append doesn't fit the staged shape (see + // `docs/runs.md` "LoadMode::Overwrite residual"). MR-847 sidecar + // is not applicable here because the writer doesn't go through + // MutationStaging; per-table inline commits + a final manifest + // publish handle their own residual via documented operator + // workflow (re-run overwrite to recover). db.commit_updates_on_branch_with_expected( branch, &overwrite_updates, diff --git a/crates/omnigraph/src/storage.rs b/crates/omnigraph/src/storage.rs index fe80c45..a895c02 100644 --- a/crates/omnigraph/src/storage.rs +++ b/crates/omnigraph/src/storage.rs @@ -63,6 +63,16 @@ impl StorageAdapter for LocalStorageAdapter { async fn write_text(&self, uri: &str, contents: &str) -> Result<()> { let path = local_path_from_uri(uri)?; + // Ensure parent directory exists. S3 has no equivalent (PutObject + // is path-agnostic). For local fs, callers like the MR-847 + // recovery sidecar protocol expect transparent directory + // creation under the repo root (the `__recovery/` directory + // doesn't pre-exist; first sidecar write creates it). + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + tokio::fs::create_dir_all(parent).await?; + } + } tokio::fs::write(&path, contents).await?; Ok(()) } diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 8f10b17..9d9edb9 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -140,43 +140,53 @@ async fn schema_apply_recovers_partial_rename() { assert_no_staging_files(dir.path()); } -/// Pin the documented "finalize → publisher residual" of the -/// staged-write commit path. +/// Prove the MR-847 recovery sweep closes the "finalize → publisher +/// residual" across one open cycle — the post-MR-847 contract. /// /// `MutationStaging::finalize` runs `commit_staged` per touched table /// sequentially before the publisher commits the manifest. Lance has no /// multi-dataset atomic commit primitive, so a failure between the /// per-table staged commits and the manifest commit leaves Lance HEAD -/// advanced on the touched tables with no manifest update — and the -/// next mutation surfaces `ExpectedVersionMismatch` on those tables. +/// advanced on the touched tables with no manifest update. /// -/// This isn't a code bug we can fix without an upstream Lance change; -/// it's the documented residual (see `docs/runs.md` "Finalize → -/// publisher residual"). The test pins the behavior so future code -/// changes catch any silent regression: if someone widens the residual -/// (e.g. failing earlier in finalize without rolling back), this test -/// will surface a different error than `ExpectedVersionMismatch`. If -/// someone narrows the residual (e.g. lance ships multi-dataset commit -/// and we plumb it), this test will start passing the next mutation -/// — and someone has to update the assertion + the docs. +/// Pre-MR-847: the next mutation surfaced `ExpectedVersionMismatch` and +/// the residual persisted until process restart. Post-MR-847: the +/// finalize writes a sidecar at `__recovery/{ulid}.json` BEFORE Phase B, +/// the failpoint fires AFTER finalize but BEFORE the publisher, the +/// engine handle is dropped, and the next `Omnigraph::open` runs the +/// recovery sweep. The sweep classifies every table in the sidecar as +/// `RolledPastExpected` (Lance HEAD == expected + 1, post_commit_pin +/// matches), decides RollForward, atomically extends every manifest pin +/// via `ManifestBatchPublisher::publish`, records an audit row, and +/// deletes the sidecar. +/// +/// After this test passes: +/// - The originally-attempted insert ("Eve") is visible via a normal +/// query. +/// - The next mutation succeeds without `ExpectedVersionMismatch`. +/// - `_graph_commit_recoveries.lance` carries an audit row with +/// `recovery_kind=RolledForward` and the original sidecar's +/// `actor_id` in `recovery_for_actor`. +/// +/// Continuous in-process recovery (no restart needed between failure +/// and recovery) is MR-856 (background reconciler). #[tokio::test] -async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recovers() { - use omnigraph::error::{ManifestConflictDetails, OmniError}; - +async fn recovery_rolls_forward_after_finalize_publisher_failure() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); - let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA) - .await - .unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + // Phase A: trigger the residual. { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); - // First mutation: finalize succeeds (commit_staged advances Lance - // HEAD on node:Person), then the failpoint kicks before the - // publisher's manifest commit. The caller sees the synthetic - // error. + // The mutation's finalize completes (commit_staged advances Lance + // HEAD on node:Person AND writes a `__recovery/{ulid}.json` + // sidecar). Then the failpoint kicks in before the publisher's + // manifest commit, so the manifest pin stays at the pre-write + // version. The sidecar persists for the next-open recovery sweep. let err = mutate_main( &mut db, MUTATION_QUERIES, @@ -191,42 +201,70 @@ async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recover ), "unexpected error: {err}" ); - } - // Failpoint dropped — subsequent calls are not synthetic-failed. - // Next mutation against the same table surfaces the documented - // residual: Lance HEAD on node:Person advanced (commit_staged ran), - // manifest didn't, so the publisher CAS at next-mutation time - // surfaces ExpectedVersionMismatch. - let err = mutate_main( + // Sidecar must still exist on disk for the recovery sweep to find. + let recovery_dir = dir.path().join("__recovery"); + let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert_eq!( + sidecars.len(), + 1, + "exactly one sidecar should persist after the finalize failure" + ); + + // Drop the failpoint scope and the engine handle. + } + + // Phase B: reopen runs the recovery sweep. The sweep finds the + // sidecar, classifies node:Person as RolledPastExpected, decides + // RollForward, publishes the manifest update, records the audit + // row, deletes the sidecar. + let mut db = Omnigraph::open(&uri).await.unwrap(); + + // Sidecar gone — sweep completed end to end. + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert!( + remaining.is_empty(), + "sidecar must be deleted after successful roll-forward; remaining: {:?}", + remaining, + ); + } + + // The originally-attempted "Eve" insert is now visible — the recovery + // sweep extended the manifest pin to include the staged commit. + let person_count = helpers::count_rows(&db, "node:Person").await; + assert_eq!( + person_count, 1, + "exactly one person (Eve) must be visible after roll-forward" + ); + + // The next mutation on the same table succeeds — no ExpectedVersionMismatch. + mutate_main( &mut db, MUTATION_QUERIES, "insert_person", &mixed_params(&[("$name", "Frank")], &[("$age", 33)]), ) .await - .unwrap_err(); - let OmniError::Manifest(manifest_err) = err else { - panic!("expected Manifest error, got {err:?}"); - }; - let Some(ManifestConflictDetails::ExpectedVersionMismatch { - ref table_key, - expected, - actual, - }) = manifest_err.details - else { - panic!( - "expected ExpectedVersionMismatch (the documented residual), got {:?}", - manifest_err.details - ); - }; + .expect("next mutation must succeed after recovery rolled forward"); + let person_count = helpers::count_rows(&db, "node:Person").await; assert_eq!( - table_key, "node:Person", - "drift should be on the table the failed finalize touched" + person_count, 2, + "Frank's insert must land normally after recovery" ); + + // Audit row recorded. + let audit_dir = dir.path().join("_graph_commit_recoveries.lance"); assert!( - actual > expected, - "Lance HEAD on the drifted table should be ahead of manifest pinned: actual={actual} expected={expected}", + audit_dir.exists(), + "_graph_commit_recoveries.lance must exist after a successful recovery" ); }