recovery: refresh-time roll-forward closes the in-process residual

Adds RecoveryMode { Full, RollForwardOnly } and wires Omnigraph::refresh
to invoke roll-forward-only recovery. This closes the documented
"long-running server between Phase B failure and process restart"
residual without requiring a restart, for the common case (mutation /
load finalize → publisher failure).

Why roll-forward only and not full sweep:
  * Roll-forward is safe under concurrency (publisher uses row-level
    CAS).
  * Roll-back uses Dataset::restore, which "wins" against concurrent
    Append/Update/Delete/CreateIndex/Merge per check_restore_txn —
    silently orphaning the concurrent writer's commit (pinned by
    tests/staged_writes.rs::lance_restore_loses_to_concurrent_append_via_orphaning).
    Sidecars that classify as RollBack-eligible are LEFT ON DISK for the
    next ReadWrite open, where no concurrent writers exist and full
    restore is safe.

Implementation:
  * recovery.rs: RecoveryMode enum; recover_manifest_drift takes mode;
    process_sidecar branches on mode for Abort and RollBack — both
    defer to next ReadWrite open under RollForwardOnly. RollForward
    behavior unchanged.
  * omnigraph.rs: Omnigraph::refresh promoted to pub; calls
    recover_manifest_drift in RollForwardOnly mode after coordinator
    refresh. Steady-state cost: one list_dir of __recovery (early
    return on empty). Adds refresh_coordinator_only — pub(crate) —
    for engine-internal callers that hold an in-flight sidecar (the
    schema_apply lease-check + lock-release paths). Without this split,
    refresh would race the in-flight sidecar.
  * schema_apply.rs: switch all 6 internal db.refresh() call sites to
    refresh_coordinator_only().

Tests:
  * refresh_runs_roll_forward_recovery_in_process — trigger
    mutation.post_finalize_pre_publisher; without restart, call
    db.refresh(); assert sidecar deleted, drifted row visible,
    subsequent mutation succeeds.
  * refresh_defers_rollback_eligible_sidecar_to_next_open — synthesize
    a Mutation sidecar with bogus expected (UnexpectedAtP1 → RollBack);
    refresh leaves it on disk and Lance HEAD unchanged; drop and reopen
    runs the full sweep which advances HEAD via restore.

Docs:
  * docs/runs.md "Long-running servers" caveat updated to describe the
    refresh-time roll-forward path and the rollback-defer behavior.
  * docs/invariants.md §VI.23 status line updated to reflect in-process
    closure of the common case.

Workspace tests pass with --features failpoints; no regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-04 00:15:42 +02:00
parent 8c6506f5cd
commit aaa031e834
No known key found for this signature in database
7 changed files with 361 additions and 29 deletions

View file

@ -34,7 +34,7 @@ use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::{
delete_sidecar, has_schema_apply_sidecar, new_sidecar, recover_manifest_drift, write_sidecar,
RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
};
use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
pub use state::SubTableEntry;

View file

@ -65,6 +65,31 @@ pub(crate) const RECOVERY_DIR_NAME: &str = "__recovery";
/// see [`SidecarSchemaError`]).
pub(crate) const SIDECAR_SCHEMA_VERSION: u32 = 1;
/// Selects which recovery actions are allowed in a sweep.
///
/// Open-time recovery (`Omnigraph::open` with `OpenMode::ReadWrite`)
/// runs the full sweep — `Dataset::restore` is safe because no other
/// writers are active yet. In-process recovery (called from
/// `Omnigraph::refresh` during a long-running server) must NOT call
/// `Dataset::restore`: it "wins" against concurrent Append/Update/
/// Delete/CreateIndex/Merge per `check_restore_txn`, silently orphaning
/// the concurrent writer's commit (pinned by
/// `tests/staged_writes.rs::lance_restore_loses_to_concurrent_append_via_orphaning`).
/// Roll-forward is safe under concurrency because
/// `ManifestBatchPublisher::publish` uses row-level CAS.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RecoveryMode {
/// Open-time: the full sweep. RolledPastExpected → roll forward;
/// mixed/unexpected → roll back via `Dataset::restore`; invariant
/// violation → abort with a loud error.
Full,
/// In-process (refresh): roll-forward only. Sidecars that would
/// require restore or abort are LEFT ON DISK for the next ReadWrite
/// open. Closes the common case (mutation/load finalize → publisher
/// failure) without restart.
RollForwardOnly,
}
/// Categorizes the writer that produced a sidecar so audit trail and
/// observability can attribute recoveries to the right code path.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@ -468,6 +493,7 @@ pub(crate) async fn recover_manifest_drift(
root_uri: &str,
storage: std::sync::Arc<dyn StorageAdapter>,
coordinator: &mut GraphCoordinator,
mode: RecoveryMode,
) -> Result<()> {
let sidecars = list_sidecars(root_uri, storage.as_ref()).await?;
if sidecars.is_empty() {
@ -502,7 +528,8 @@ pub(crate) async fn recover_manifest_drift(
coordinator.snapshot()
}
};
process_sidecar(root_uri, storage.as_ref(), &branch_snapshot, &sidecar).await?;
process_sidecar(root_uri, storage.as_ref(), &branch_snapshot, &sidecar, mode)
.await?;
}
// Final refresh so the caller sees the post-sweep state.
coordinator.refresh().await?;
@ -514,6 +541,7 @@ async fn process_sidecar(
storage: &dyn StorageAdapter,
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
mode: RecoveryMode,
) -> Result<()> {
let mut classifications = Vec::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
@ -532,20 +560,46 @@ async fn process_sidecar(
}
match decide(&classifications) {
SidecarDecision::Abort => {
// Surface loudly without deleting the sidecar — operator must
// investigate. This includes any classification of
// InvariantViolation (Lance HEAD < manifest pinned: should be
// impossible).
Err(OmniError::manifest_internal(format!(
"recovery sidecar '{}' has invariant violation; refusing to act \
operator review required (sidecar at '{}', classifications: {:?})",
sidecar.operation_id,
sidecar_uri(root_uri, &sidecar.operation_id),
classifications,
)))
}
SidecarDecision::Abort => match mode {
RecoveryMode::Full => {
// Surface loudly without deleting the sidecar — operator
// must investigate. This includes any InvariantViolation
// classification (Lance HEAD < manifest pinned: should
// be impossible).
Err(OmniError::manifest_internal(format!(
"recovery sidecar '{}' has invariant violation; refusing to act \
operator review required (sidecar at '{}', classifications: {:?})",
sidecar.operation_id,
sidecar_uri(root_uri, &sidecar.operation_id),
classifications,
)))
}
RecoveryMode::RollForwardOnly => {
// In-process refresh-time recovery: leave the sidecar
// and defer the loud abort to the next ReadWrite open.
// Operator-actionable error surfacing belongs at open,
// not silently inside refresh.
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: deferring sidecar with invariant violation to next ReadWrite open"
);
Ok(())
}
},
SidecarDecision::RollBack => {
if matches!(mode, RecoveryMode::RollForwardOnly) {
// In-process recovery cannot run Dataset::restore safely
// (would orphan a concurrent writer's commit). Leave the
// sidecar in place; the next ReadWrite open will handle
// it via the full sweep.
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: deferring rollback-eligible sidecar to next ReadWrite open"
);
return Ok(());
}
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,

View file

@ -190,6 +190,7 @@ impl Omnigraph {
&root,
Arc::clone(&storage),
&mut coordinator,
crate::db::manifest::RecoveryMode::Full,
)
.await?;
}
@ -374,8 +375,43 @@ impl Omnigraph {
Ok(())
}
/// Re-read the handle-local coordinator state from storage.
pub(crate) async fn refresh(&mut self) -> Result<()> {
/// Re-read the handle-local coordinator state from storage AND run
/// roll-forward-only recovery: closes the in-process Phase B → Phase C
/// residual (e.g. `MutationStaging::finalize` crash mid-publish in a
/// long-running server) without restart. Roll-forward uses
/// `ManifestBatchPublisher::publish`'s row-level CAS — safe under
/// concurrency. Sidecars that would require `Dataset::restore` are
/// deferred to the next ReadWrite open (restore can silently orphan
/// a concurrent writer's commit if invoked under concurrency).
///
/// Steady state cost: one `list_dir` of `__recovery/` (typically
/// returns empty → early return). No additional Lance reads.
///
/// Engine-internal callers that already hold an in-flight sidecar
/// (e.g. `schema_apply` mid-write) MUST use
/// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
/// avoid the recovery sweep racing their own sidecar.
pub async fn refresh(&mut self) -> Result<()> {
self.coordinator.refresh().await?;
crate::db::manifest::recover_manifest_drift(
&self.root_uri,
Arc::clone(&self.storage),
&mut self.coordinator,
crate::db::manifest::RecoveryMode::RollForwardOnly,
)
.await?;
self.runtime_cache.invalidate_all().await;
Ok(())
}
/// Refresh coordinator state and invalidate the runtime cache WITHOUT
/// running the recovery sweep. Engine-internal callers that hold an
/// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
/// internal lease-check refresh) need this variant: running recovery
/// here would observe the caller's own sidecar, classify it as
/// RolledPastExpected, and roll it forward — racing the caller's
/// own publish path.
pub(crate) async fn refresh_coordinator_only(&mut self) -> Result<()> {
self.coordinator.refresh().await?;
self.runtime_cache.invalidate_all().await;
Ok(())

View file

@ -381,7 +381,7 @@ pub(super) async fn apply_schema_with_lock(
}));
}
db.refresh().await?;
db.refresh_coordinator_only().await?;
if db.version() != base_manifest_version {
return Err(OmniError::manifest_conflict(format!(
"schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
@ -469,13 +469,13 @@ pub(super) async fn apply_schema_with_lock(
}
pub(super) async fn ensure_schema_apply_idle(db: &mut Omnigraph, operation: &str) -> Result<()> {
db.refresh().await?;
db.refresh_coordinator_only().await?;
ensure_schema_apply_not_locked(db, operation).await
}
pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> {
db.ensure_schema_state_valid().await?;
db.refresh().await?;
db.refresh_coordinator_only().await?;
let branches = db.coordinator.all_branches().await?;
if branches
.iter()
@ -489,7 +489,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()>
db.coordinator
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
db.refresh().await?;
db.refresh_coordinator_only().await?;
let blocking_branches = db
.coordinator
@ -513,7 +513,12 @@ pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()>
db.coordinator
.branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
db.refresh().await
// Use refresh_coordinator_only — the full Omnigraph::refresh would
// run roll-forward-only recovery, and on the failure path the
// in-flight schema_apply sidecar is still on disk; recovery would
// race the caller's own publish (or roll forward an aborted apply
// we want to leave for next-open).
db.refresh_coordinator_only().await
}
pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {

View file

@ -267,6 +267,236 @@ async fn recovery_rolls_forward_after_finalize_publisher_failure() {
);
}
/// Refresh-time recovery (Option B): the in-process `Omnigraph::refresh`
/// runs roll-forward-only recovery, closing the long-running-server
/// residual without restart.
///
/// Setup: trigger `mutation.post_finalize_pre_publisher` once. The
/// sidecar persists. Without dropping the engine, call `db.refresh()`.
/// The post-condition: sidecar gone; Eve visible; subsequent mutation
/// on the same handle succeeds without restart and without
/// ExpectedVersionMismatch.
#[tokio::test]
async fn refresh_runs_roll_forward_recovery_in_process() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
// Phase A: trigger the residual (sidecar persists; manifest unchanged).
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: mutation.post_finalize_pre_publisher"
),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"exactly one sidecar must persist after the finalize failure"
);
}
// Phase B: explicit refresh runs roll-forward-only recovery
// in-process — no restart needed. Sidecar finds the Person drift,
// classifies RolledPastExpected, rolls forward via publisher CAS,
// and deletes the sidecar.
db.refresh().await.expect("refresh must succeed");
// Sidecar must be gone — refresh-time recovery rolled it forward.
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted by refresh-time roll-forward; remaining: {:?}",
remaining,
);
}
// Eve (the originally-attempted insert) is visible without restart.
let person_count = helpers::count_rows(&db, "node:Person").await;
assert_eq!(
person_count, 1,
"Eve must be visible after refresh-time roll-forward"
);
// A direct Person mutation also succeeds without ExpectedVersionMismatch.
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.expect("Person insert must succeed after refresh-time recovery");
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
}
/// Refresh-time recovery must NOT call `Dataset::restore` — it can
/// silently orphan a concurrent writer's commit. Sidecars that would
/// require rollback must be left on disk for the next ReadWrite open.
///
/// Setup: synthesize a sidecar that would classify as `UnexpectedAtP1`
/// (rollback territory) — strict-match Mutation kind with
/// expected_version != manifest_pinned. Trigger refresh and assert:
/// sidecar still on disk, Lance HEAD unchanged (no restore commit).
/// Then drop + open: full sweep handles it.
#[tokio::test]
async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Bootstrap.
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
// Capture Person's full URI and manifest pin.
let snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
let entry = snapshot.entry("node:Person").unwrap();
let person_uri = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
let manifest_pin = entry.table_version;
// Drift Person's Lance HEAD ahead of the manifest pin (without
// touching the manifest) so the classifier can reach UnexpectedAtP1
// / UnexpectedMultistep / RolledPastExpected paths that require
// a real restore on rollback.
let store = TableStore::new(&uri);
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
store
.delete_where(&person_uri, &mut ds, "1 = 2")
.await
.unwrap();
let head_after_drift = ds.version().version;
assert_eq!(head_after_drift, manifest_pin + 1);
// Synthesize a sidecar with expected_version that DOES NOT match
// the current manifest pin AND post_commit_pin == lance_head →
// strict Mutation classifier sees lance_head == manifest_pinned + 1
// but expected != manifest_pinned → UnexpectedAtP1. decide → RollBack.
//
// expected_version must be a REAL Lance version (`restore_table_to_version`
// calls `checkout_version` on it, and an unknown version errors). Use
// manifest_pin - 1 which exists from the bootstrap commit chain.
let bogus_expected = manifest_pin - 1;
let bogus_post = head_after_drift;
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H0000000000000000000RBCK",
"started_at": "0",
"branch": null,
"actor_id": "act-rollback",
"writer_kind": "Mutation",
"tables": [
{{
"table_key":"node:Person",
"table_path":"{}",
"expected_version":{},
"post_commit_pin":{}
}}
]
}}"#,
person_uri, bogus_expected, bogus_post,
);
let recovery_dir = dir.path().join("__recovery");
std::fs::create_dir_all(&recovery_dir).unwrap();
std::fs::write(
recovery_dir.join("01H0000000000000000000RBCK.json"),
&sidecar_json,
)
.unwrap();
// Capture pre-refresh Lance HEAD on Person.
let pre_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
// Trigger refresh-time recovery directly. Sidecar is rollback-
// eligible (UnexpectedAtP1); RollForwardOnly mode defers it,
// leaving the sidecar on disk and Lance HEAD unchanged on Person.
db.refresh().await.expect("refresh must succeed (deferring rollback)");
// Sidecar still on disk.
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"rollback-eligible sidecar must be deferred to next ReadWrite open",
);
// Lance HEAD on Person unchanged — no restore ran.
let post_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
assert_eq!(
pre_head, post_head,
"refresh-time recovery must NOT call Dataset::restore on Person; \
pre_head={pre_head}, post_head={post_head}",
);
// Cross-check: drop the engine and reopen — full sweep handles
// the rollback (will use Dataset::restore safely; no concurrent
// writers at open time).
drop(db);
let _db = Omnigraph::open(&uri).await.unwrap();
// After full-sweep recovery, the sidecar should be processed
// (deleted). Sidecar's tables are eligible for rollback (UnexpectedAtP1):
// restore happens on Person (HEAD advances by 1).
let remaining = if recovery_dir.exists() {
std::fs::read_dir(&recovery_dir).unwrap().count()
} else {
0
};
assert_eq!(
remaining, 0,
"full sweep at next open must process the deferred sidecar",
);
let final_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
assert!(
final_head > post_head,
"full sweep must run Dataset::restore (head advances); \
post_head={post_head}, final_head={final_head}",
);
}
/// Companion to the above — confirms that a finalize→publisher failure
/// on one table leaves OTHER tables untouched. Subsequent writes to
/// non-drifted tables proceed normally; the drift is contained.

View file

@ -105,7 +105,7 @@ These are user-visible commitments. They state what the engine guarantees and wh
Specific defaults (timeout values, memory caps, TTL windows) are *configuration*, not invariants — see [docs/constants.md](constants.md) and per-deployment configuration. The invariant is that bounds and contracts exist, not their numerical values.
23. **Atomicity is per-query.** Every `.gq` query is atomic — multi-statement mutations are all-or-nothing via the substrate's atomic-commit primitive. No cross-query `BEGIN`/`COMMIT`; branches and merges fill that role for agent workflows.
*Status: upheld at the writer-trait surface AND across process boundaries — the sealed `TableStorage` trait routes inserts / updates / scalar-index builds / merge_insert / overwrite through `stage_*` + `commit_staged` (Phase A is drift-free), and the open-time recovery sweep in `db/manifest/recovery.rs` (sidecars at `__recovery/{ulid}.json` written by `MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) closes the per-table commit_staged → manifest publish residual on the next `Omnigraph::open`. The "Lance HEAD ahead of `__manifest`" drift class is unreachable for op-execution failures and recoverable across process boundaries for finalize→publisher failures. Continuous in-process recovery (no restart required between Phase B failure and recovery) is the goal of a future background reconciler. Two writer paths still inline-commit pending upstream Lance work: `delete_where` (lance-format/lance#6658) and `create_vector_index` (lance-format/lance#6666).*
*Status: upheld at the writer-trait surface, across process boundaries, AND in-process for the common case — the sealed `TableStorage` trait routes inserts / updates / scalar-index builds / merge_insert / overwrite through `stage_*` + `commit_staged` (Phase A is drift-free); the open-time recovery sweep in `db/manifest/recovery.rs` (sidecars at `__recovery/{ulid}.json` written by `MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) closes the per-table commit_staged → manifest publish residual on the next `Omnigraph::open`; and `Omnigraph::refresh` runs roll-forward-only recovery in-process so long-running servers close the common case (mutation/load finalize → publisher failure) without restart. The "Lance HEAD ahead of `__manifest`" drift class is unreachable for op-execution failures, recoverable across process boundaries for all writer kinds, and recoverable in-process for roll-forward-eligible sidecars. Sidecars that would require `Dataset::restore` are deferred to the next ReadWrite open (restore unsafe under concurrency); continuous in-process recovery for that case requires per-(table, branch) writer-queue acquisition and is the goal of a future background reconciler. Two writer paths still inline-commit pending upstream Lance work: `delete_where` (lance-format/lance#6658) and `create_vector_index` (lance-format/lance#6666).*
24. **Schema integrity is strict at commit.** Type validation, required-field presence (auto-filled from `@default` if declared), uniqueness across batches and versions, and referential integrity — all enforced before commit succeeds. Per-write softening flags are opt-in, never default.
*Status: aspirational — referential integrity at scale requires SIP-backed cross-table validation; not yet implemented. Cross-batch / cross-version uniqueness tracked in MR-714.*

View file

@ -193,12 +193,19 @@ Triggers for the residual: transient Lance write errors during finalize
(object-store retry budget exhaustion, disk full); persistent publisher
contention exceeding `PUBLISHER_RETRY_BUDGET = 5` retries.
**Long-running servers**: between Phase B failure and the next
`Omnigraph::open` (typically a server restart), subsequent writers on
the affected tables surface
`ManifestConflictDetails::ExpectedVersionMismatch`. Continuous
in-process recovery (no restart required) is the goal of a future
background reconciler.
**Long-running servers**: `Omnigraph::refresh` runs roll-forward-only
recovery in-process — the common Phase B → Phase C residual closes
without a restart. The next mutation on the same handle (after refresh)
no longer surfaces `ExpectedVersionMismatch` for the failed table.
Sidecars that would require a `Dataset::restore` (mixed / unexpected
state) are deferred to the next `OpenMode::ReadWrite` open: restore is
unsafe under concurrency because Lance's `check_restore_txn` accepts
the restore against in-flight Append/Update/Delete commits and
silently orphans them (pinned by
`tests/staged_writes.rs::lance_restore_loses_to_concurrent_append_via_orphaning`).
Continuous in-process recovery for the rollback path is the goal of a
future background reconciler with per-(table, branch) writer-queue
acquisition.
The publisher-CAS contract is unchanged: a *concurrent writer* that
advances any of our touched tables between snapshot capture and