recovery: wire sidecar into MutationStaging::finalize + flip headline test (Phase 5)

Production wiring (~120 LOC):

- `MutationStaging::finalize` now takes a `SidecarKind` parameter and
  returns an additional `Option<RecoverySidecarHandle>`. Builds a
  Vec<SidecarTablePin> from `pending` BEFORE the per-table commit_staged
  loop and writes the sidecar via `recovery::write_sidecar`. Skips the
  sidecar when `pending` is empty (delete-only mutation; D₂ keeps these
  out of the staged-write path so the option is just a clean signal,
  not a code path users hit).
- `exec/mutation.rs::execute_mutation_as` (around line 740): destructure
  the new third element, pass `SidecarKind::Mutation`, delete the
  sidecar after `commit_updates_on_branch_with_expected` succeeds.
- `loader/mod.rs::ingest_loaded` (around line 540): same shape, with
  `SidecarKind::Load`. The Overwrite path stays inline-commit (legacy
  residual; out of MR-847 scope per docs/runs.md).
- New engine accessors `Omnigraph::storage_adapter()` and
  `Omnigraph::root_uri()` for the sidecar I/O. The pre-existing
  `db.storage` field stays private; no other engine code reaches around
  the accessor.
- Re-exports from `db::manifest`: `new_sidecar`, `write_sidecar`,
  `delete_sidecar`, plus the `RecoverySidecar*` types and `SidecarKind`,
  so consumers in `exec/` can use them via `crate::db::manifest::...`.

Bugfix folded in (~5 LOC): make `coordinator` mutable in
`Omnigraph::open_with_storage_and_mode` and call `coordinator.refresh()`
after the recovery sweep returns. Roll-forward advances the manifest
pin on disk; without the refresh the returned engine carried a stale
in-memory snapshot. The Phase 4 tests passed only because they
opened Lance datasets directly rather than going through `db.snapshot()`.

Storage adapter (~15 LOC): `LocalStorageAdapter::write_text` now ensures
the parent directory exists via `tokio::fs::create_dir_all`. Required
because the sidecar protocol writes into `__recovery/` which doesn't
pre-exist after `Omnigraph::init`. S3 has no equivalent; PutObject is
path-agnostic.

Headline test flip (~150 LOC):

- `tests/failpoints.rs::finalize_publisher_residual_drifts_lance_head_until_next_writer_recovers`
  is replaced by `recovery_rolls_forward_after_finalize_publisher_failure`.
  Same setup (failpoint at `mutation.post_finalize_pre_publisher`) but
  after the synthetic failure the test:
  1. Asserts the sidecar persists in `__recovery/` for the recovery
     sweep to find.
  2. Drops the engine handle.
  3. Reopens via `Omnigraph::open` — recovery sweep classifies
     RolledPastExpected, decides RollForward, publishes the manifest
     update, records the audit row, deletes the sidecar.
  4. Asserts the sidecar is gone.
  5. Asserts the originally-attempted Eve insert is now visible
     (Person count = 1).
  6. Asserts a subsequent insert succeeds without
     ExpectedVersionMismatch (Person count = 2).
  7. Asserts the audit dataset `_graph_commit_recoveries.lance` exists.
  This is the headline contract the MR-847 acceptance criteria require.

All other failpoint and runs tests continue to pass (8 + 24 unchanged).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-03 00:15:37 +02:00
parent ca21e73d43
commit 49ca7e5068
No known key found for this signature in database
7 changed files with 209 additions and 61 deletions

View file

@ -32,7 +32,10 @@ pub(crate) use namespace::open_table_head_for_write;
#[cfg(test)]
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::recover_manifest_drift;
pub(crate) use recovery::{
delete_sidecar, new_sidecar, recover_manifest_drift, write_sidecar, RecoverySidecar,
RecoverySidecarHandle, SidecarKind, SidecarTablePin,
};
use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
pub use state::SubTableEntry;
#[cfg(test)]

View file

@ -172,7 +172,7 @@ impl Omnigraph {
// compare its snapshot against any leftover staging files. Recovery
// either deletes staging (pre-commit crash) or completes the rename
// (post-commit crash) before the live schema files are read.
let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?;
// MR-847 recovery sweep: close the Phase B → Phase C residual on
// any sidecar left over from a crashed writer. ReadOnly skips —
@ -189,6 +189,11 @@ impl Omnigraph {
&coordinator.snapshot(),
)
.await?;
// Roll-forward advances the manifest pin and the audit appends
// commits to _graph_commits.lance + _graph_commit_recoveries.lance.
// The coordinator's in-memory snapshot is now stale; refresh so
// the returned Omnigraph carries the post-recovery state.
coordinator.refresh().await?;
}
// Read _schema.pg (post-recovery — may have just been renamed in).
let schema_path = schema_source_uri(&root);
@ -266,6 +271,20 @@ impl Omnigraph {
&self.table_store
}
/// Engine-level access to the object-store adapter (S3 / local fs).
/// Used by the MR-847 recovery sidecar protocol — writers in the
/// engine call this to write/delete sidecars at `__recovery/{ulid}.json`.
pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
self.storage.as_ref()
}
/// Engine-level access to the repo's normalized root URI. Used by
/// the MR-847 recovery sidecar protocol to compute `__recovery/`
/// paths.
pub(crate) fn root_uri(&self) -> &str {
&self.root_uri
}
pub(crate) async fn open_coordinator_for_branch(
&self,
branch: Option<&str>,

View file

@ -737,18 +737,23 @@ impl Omnigraph {
Err(e) => Err(e),
Ok(total) if staging.is_empty() => Ok(total),
Ok(total) => {
let (updates, expected_versions) = staging
.finalize(self, requested.as_deref())
let (updates, expected_versions, sidecar_handle) = staging
.finalize(
self,
requested.as_deref(),
crate::db::manifest::SidecarKind::Mutation,
)
.await?;
// Failpoint that wedges the documented finalize→publisher
// residual: per-table `commit_staged` calls already
// advanced Lance HEAD on every touched table; a failure
// injected here mirrors the production-rare case where
// the publisher's CAS pre-check rejects (or the manifest
// write throws) after staged commits succeeded. Used by
// `tests/failpoints.rs::finalize_publisher_residual_*`
// to pin the documented residual behavior. See
// `docs/runs.md` "Finalize → publisher residual".
// write throws) after staged commits succeeded. The
// sidecar written inside `staging.finalize()` persists
// across this failure so the next `Omnigraph::open`
// (MR-847 recovery sweep) can roll forward — see
// `tests/failpoints.rs::recovery_rolls_forward_after_finalize_publisher_failure`.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
self.commit_updates_on_branch_with_expected(
requested.as_deref(),
@ -756,6 +761,14 @@ impl Omnigraph {
&expected_versions,
)
.await?;
// Phase C succeeded — sidecar can be deleted. If this
// delete fails, the next open's sweep classifies every
// table as NoMovement (manifest pin == Lance HEAD ==
// post_commit_pin) and the sidecar is treated as a
// stale artifact (cleaned up via the Phase 2 logic).
if let Some(handle) = sidecar_handle {
crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await?;
}
Ok(total)
}
}

View file

@ -27,6 +27,9 @@ use lance::Dataset;
use omnigraph_compiler::catalog::EdgeType;
use crate::db::SubTableUpdate;
use crate::db::manifest::{
new_sidecar, write_sidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
};
use crate::error::{OmniError, Result};
/// Whether the per-table accumulator should commit via `stage_append`
@ -218,8 +221,13 @@ impl MutationStaging {
pub(crate) async fn finalize(
self,
db: &crate::db::Omnigraph,
_branch: Option<&str>,
) -> Result<(Vec<SubTableUpdate>, HashMap<String, u64>)> {
branch: Option<&str>,
sidecar_kind: SidecarKind,
) -> Result<(
Vec<SubTableUpdate>,
HashMap<String, u64>,
Option<RecoverySidecarHandle>,
)> {
let MutationStaging {
expected_versions,
paths,
@ -230,6 +238,49 @@ impl MutationStaging {
let mut updates: Vec<SubTableUpdate> =
inline_committed.into_values().collect();
// MR-847 — sidecar protocol. Build the per-table pin list BEFORE
// any Lance commit_staged runs, then write the sidecar so a crash
// between Phase B (this loop's commit_staged calls) and Phase C
// (the manifest publish in the caller) is recoverable on next
// open. Skipped when `pending` is empty (delete-only mutation;
// D₂ parse-time rule keeps deletes out of this code path so this
// branch is reached only for the inline-committed-only case).
let pins: Vec<SidecarTablePin> = pending
.iter()
.map(|(table_key, _)| {
let path = paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing path for table '{}'",
table_key,
))
})?;
let expected = *expected_versions.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing expected version for table '{}'",
table_key,
))
})?;
Ok::<SidecarTablePin, OmniError>(SidecarTablePin {
table_key: table_key.clone(),
table_path: path.full_path.clone(),
expected_version: expected,
post_commit_pin: expected + 1,
})
})
.collect::<Result<Vec<_>>>()?;
let sidecar_handle = if pins.is_empty() {
None
} else {
let sidecar = new_sidecar(
sidecar_kind,
branch.map(|s| s.to_string()),
db.audit_actor_id.clone(),
pins,
);
Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?)
};
for (table_key, table) in pending {
let path = paths.get(&table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
@ -318,7 +369,7 @@ impl MutationStaging {
});
}
Ok((updates, expected_versions))
Ok((updates, expected_versions, sidecar_handle))
}
}

View file

@ -537,10 +537,24 @@ async fn load_jsonl_reader<R: BufRead>(
// Phase 4: Atomic manifest commit with publisher-level OCC.
if use_staging {
let (updates, expected_versions) = staging.finalize(db, branch).await?;
let (updates, expected_versions, sidecar_handle) = staging
.finalize(db, branch, crate::db::manifest::SidecarKind::Load)
.await?;
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
.await?;
// MR-847: sidecar protects the per-table commit_staged →
// manifest publish window. Phase C succeeded — clean up.
if let Some(handle) = sidecar_handle {
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?;
}
} else {
// LoadMode::Overwrite keeps the legacy inline-commit path —
// truncate-then-append doesn't fit the staged shape (see
// `docs/runs.md` "LoadMode::Overwrite residual"). MR-847 sidecar
// is not applicable here because the writer doesn't go through
// MutationStaging; per-table inline commits + a final manifest
// publish handle their own residual via documented operator
// workflow (re-run overwrite to recover).
db.commit_updates_on_branch_with_expected(
branch,
&overwrite_updates,

View file

@ -63,6 +63,16 @@ impl StorageAdapter for LocalStorageAdapter {
async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
let path = local_path_from_uri(uri)?;
// Ensure parent directory exists. S3 has no equivalent (PutObject
// is path-agnostic). For local fs, callers like the MR-847
// recovery sidecar protocol expect transparent directory
// creation under the repo root (the `__recovery/` directory
// doesn't pre-exist; first sidecar write creates it).
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await?;
}
}
tokio::fs::write(&path, contents).await?;
Ok(())
}

View file

@ -140,43 +140,53 @@ async fn schema_apply_recovers_partial_rename() {
assert_no_staging_files(dir.path());
}
/// Pin the documented "finalize → publisher residual" of the
/// staged-write commit path.
/// Prove the MR-847 recovery sweep closes the "finalize → publisher
/// residual" across one open cycle — the post-MR-847 contract.
///
/// `MutationStaging::finalize` runs `commit_staged` per touched table
/// sequentially before the publisher commits the manifest. Lance has no
/// multi-dataset atomic commit primitive, so a failure between the
/// per-table staged commits and the manifest commit leaves Lance HEAD
/// advanced on the touched tables with no manifest update — and the
/// next mutation surfaces `ExpectedVersionMismatch` on those tables.
/// advanced on the touched tables with no manifest update.
///
/// This isn't a code bug we can fix without an upstream Lance change;
/// it's the documented residual (see `docs/runs.md` "Finalize →
/// publisher residual"). The test pins the behavior so future code
/// changes catch any silent regression: if someone widens the residual
/// (e.g. failing earlier in finalize without rolling back), this test
/// will surface a different error than `ExpectedVersionMismatch`. If
/// someone narrows the residual (e.g. lance ships multi-dataset commit
/// and we plumb it), this test will start passing the next mutation
/// — and someone has to update the assertion + the docs.
/// Pre-MR-847: the next mutation surfaced `ExpectedVersionMismatch` and
/// the residual persisted until process restart. Post-MR-847: the
/// finalize writes a sidecar at `__recovery/{ulid}.json` BEFORE Phase B,
/// the failpoint fires AFTER finalize but BEFORE the publisher, the
/// engine handle is dropped, and the next `Omnigraph::open` runs the
/// recovery sweep. The sweep classifies every table in the sidecar as
/// `RolledPastExpected` (Lance HEAD == expected + 1, post_commit_pin
/// matches), decides RollForward, atomically extends every manifest pin
/// via `ManifestBatchPublisher::publish`, records an audit row, and
/// deletes the sidecar.
///
/// After this test passes:
/// - The originally-attempted insert ("Eve") is visible via a normal
/// query.
/// - The next mutation succeeds without `ExpectedVersionMismatch`.
/// - `_graph_commit_recoveries.lance` carries an audit row with
/// `recovery_kind=RolledForward` and the original sidecar's
/// `actor_id` in `recovery_for_actor`.
///
/// Continuous in-process recovery (no restart needed between failure
/// and recovery) is MR-856 (background reconciler).
#[tokio::test]
async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recovers() {
use omnigraph::error::{ManifestConflictDetails, OmniError};
async fn recovery_rolls_forward_after_finalize_publisher_failure() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Phase A: trigger the residual.
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
// First mutation: finalize succeeds (commit_staged advances Lance
// HEAD on node:Person), then the failpoint kicks before the
// publisher's manifest commit. The caller sees the synthetic
// error.
// The mutation's finalize completes (commit_staged advances Lance
// HEAD on node:Person AND writes a `__recovery/{ulid}.json`
// sidecar). Then the failpoint kicks in before the publisher's
// manifest commit, so the manifest pin stays at the pre-write
// version. The sidecar persists for the next-open recovery sweep.
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
@ -191,42 +201,70 @@ async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recover
),
"unexpected error: {err}"
);
}
// Failpoint dropped — subsequent calls are not synthetic-failed.
// Next mutation against the same table surfaces the documented
// residual: Lance HEAD on node:Person advanced (commit_staged ran),
// manifest didn't, so the publisher CAS at next-mutation time
// surfaces ExpectedVersionMismatch.
let err = mutate_main(
// Sidecar must still exist on disk for the recovery sweep to find.
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one sidecar should persist after the finalize failure"
);
// Drop the failpoint scope and the engine handle.
}
// Phase B: reopen runs the recovery sweep. The sweep finds the
// sidecar, classifies node:Person as RolledPastExpected, decides
// RollForward, publishes the manifest update, records the audit
// row, deletes the sidecar.
let mut db = Omnigraph::open(&uri).await.unwrap();
// Sidecar gone — sweep completed end to end.
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted after successful roll-forward; remaining: {:?}",
remaining,
);
}
// The originally-attempted "Eve" insert is now visible — the recovery
// sweep extended the manifest pin to include the staged commit.
let person_count = helpers::count_rows(&db, "node:Person").await;
assert_eq!(
person_count, 1,
"exactly one person (Eve) must be visible after roll-forward"
);
// The next mutation on the same table succeeds — no ExpectedVersionMismatch.
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.unwrap_err();
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
let Some(ManifestConflictDetails::ExpectedVersionMismatch {
ref table_key,
expected,
actual,
}) = manifest_err.details
else {
panic!(
"expected ExpectedVersionMismatch (the documented residual), got {:?}",
manifest_err.details
);
};
.expect("next mutation must succeed after recovery rolled forward");
let person_count = helpers::count_rows(&db, "node:Person").await;
assert_eq!(
table_key, "node:Person",
"drift should be on the table the failed finalize touched"
person_count, 2,
"Frank's insert must land normally after recovery"
);
// Audit row recorded.
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
assert!(
actual > expected,
"Lance HEAD on the drifted table should be ahead of manifest pinned: actual={actual} expected={expected}",
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after a successful recovery"
);
}