fix(recovery): converge roll-forward on concurrent manifest advance (#296)

* refactor(storage): gate test-only TableStore::append_batch behind cfg(test)

The inherent append_batch is used only by in-source recovery test setup, but
the non-test lib build (cfg(test) off) cannot see those callers and emitted a
dead_code warning. Gating the method #[cfg(test)] silences the false positive
and enforces its own doc contract ("no new engine call sites") by construction
— engine code physically cannot call a cfg(test) method.

* test(failpoints): harden fault-injection harness + reproduce roll-forward CAS race

Hardens the test infrastructure around the process-global `fail` registry, and
adds a deterministic red repro for the open-time recovery sweep's roll-forward
CAS race (iss-schema-apply-reopen-recovery-race). The fix lands in the next
commit — this commit is intentionally red (rule 12: red→green visible in log).

Harness:
- One `ScopedFailPoint` (engine) gaining `with_callback`; the cluster duplicate
  is removed and cluster tests reuse the engine type via `omnigraph/failpoints`.
- `#[serial]` on every failpoint test (the registry is process-global, so shared
  names interfere under parallelism); `serial_test` added to cluster dev-deps.
- `helpers::failpoint::Rendezvous` (park-first / wait-until-reached / release)
  replaces fixed-`sleep` cross-thread coordination; the three concurrent tests
  now rendezvous deterministically. The reached flag doubles as a fired-assert.
- Compile-checked `failpoints::names` catalog (engine + cluster); every call
  site references a const, and `failpoint_names_guard.rs` enforces "no string
  literal names" by source-walk, so a typo is a build error not a silent no-fire.

Red repro:
- New `recovery.before_roll_forward_publish` failpoint at the sweep's
  classify -> publish-CAS window (the only injection point there).
- `open_sweep_roll_forward_converges_when_manifest_advances_concurrently`: two
  concurrent open-sweeps race one pending sidecar; the sweep parked at the
  failpoint loses its publish CAS to the other and fails the open with
  `ExpectedVersionMismatch`. FAILS at this commit by design.

* fix(recovery): converge roll-forward when the manifest advances concurrently

The open-time recovery sweep classified a pending sidecar as RolledPastExpected,
then published a manifest CAS at the sidecar's pinned expected_version. Under a
concurrent writer that advanced the manifest past expected during the
classify -> publish window, the CAS failed with ExpectedVersionMismatch and
`?`-propagated, failing the whole Omnigraph::open.
iss-schema-apply-reopen-recovery-race.

A roll-forward's postcondition is "the manifest reflects the sidecar's committed
Lance state", not "this sweep won the CAS" (invariants 7 & 15). On an
ExpectedVersionMismatch, re-read the live manifest and check whether the
sidecar's intent is already satisfied (every pinned table at a version >= the
one we observed and tried to publish; added tables registered; tombstones gone
— sound under the heal-first invariant, documented at the check). If satisfied,
this is convergence: record the RolledForward audit + delete the sidecar
idempotently. If only partway, defer to the next pass. Either way the open no
longer fails. Other errors still propagate; a genuine logical conflict
resurfaces via the classifier's InvariantViolation.

Turns the red repro from the previous commit green. The roll-BACK twin
(iss-recovery-sweep-live-writer-rollback) is destructive (Lance Restore) and
still needs a cross-process lease — the known-gap is updated accordingly.

* Address PR review: harden failpoint name guard + dedupe converge audit

Two issues surfaced in PR review of the failpoint hardening + recovery fix:

1. Name guard had a line-split blind spot. It scanned per line, so a call
   wrapped across lines (`park_first(\n    "name",\n)`) put the literal on a
   different line than the call prefix and bypassed the "no string-literal
   failpoint names" check — and one such literal
   (`mutation.delete_node_pre_primary_delete`) had slipped through. Make the
   guard whitespace/newline-tolerant (skip past the open paren to the first
   argument token) so wrapping can't hide a literal, and convert the bypassed
   site to the `names::` const.

2. Convergence path could append a duplicate recovery audit. When a
   roll-forward publish loses its CAS but the manifest already reached the
   sidecar's goal, `converge_or_defer_roll_forward` recorded a RolledForward
   audit unconditionally. Under the heal-first invariant, whoever advanced the
   manifest already healed this sidecar (audit + delete), so a second row
   landed in `_graph_commit_recoveries` for one recovery event. Gate the
   audit+delete on the sidecar still being present: absent => the winner
   completed it, return success with no duplicate row. The convergence
   regression test now asserts exactly one audit row.

* docs(dev): remove the schema-apply recovery-flake handoff (fixed by this PR)

The handoff was a transient investigation note for
`iss-schema-apply-reopen-recovery-race`, which this PR fixes (the converge
helper + the red→green regression). Its rationale now lives durably in the
dev-graph issue, the PR/commit history, and invariants.md, so the handoff is
obsolete. Drop the doc, its dev-index row, and the dangling reference from the
RFC-013 handoff; the doc cross-link check stays green.

* fix(recovery): include added-table registrations in the converge audit

The CAS-loss convergence audit built outcomes only from `sidecar.tables`,
omitting the `additional_registrations` that the normal `roll_forward_all`
audit includes. For a SchemaApply sidecar with added types, a converge-path
audit row would be incomplete versus the normal roll-forward path for the same
recovery kind. Mirror the roll-forward outcome construction (append a
registration outcome per added table) so both paths emit the same audit shape.
This commit is contained in:
Ragnor Comerford 2026-06-24 19:03:51 +02:00 committed by GitHub
parent 7d3a52d674
commit 4a5277b9c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 826 additions and 476 deletions

1
Cargo.lock generated
View file

@ -4894,6 +4894,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"serial_test",
"sha2 0.10.9",
"tempfile",
"thiserror",

View file

@ -30,5 +30,6 @@ tokio = { workspace = true }
ulid = { workspace = true }
[dev-dependencies]
serial_test = "3"
tempfile = { workspace = true }
tokio = { workspace = true }

View file

@ -1,6 +1,13 @@
//! Fault-injection hooks for the cluster apply protocol, mirroring the
//! engine's `omnigraph::failpoints` pattern. With the `failpoints` feature
//! off, every call site compiles to `Ok(())`.
//!
//! Only `maybe_fail` lives here — it returns the cluster's [`Diagnostic`]
//! error type. The test-side configuration guard is shared: use
//! [`omnigraph::failpoints::ScopedFailPoint`], which is registry-only
//! (error-type agnostic) and reachable because the cluster's `failpoints`
//! feature enables `omnigraph/failpoints`. One `ScopedFailPoint`, in the
//! lowest crate, avoids a drifting duplicate.
use crate::Diagnostic;
@ -19,38 +26,16 @@ pub(crate) fn maybe_fail(_name: &str) -> Result<(), Diagnostic> {
Ok(())
}
#[cfg(feature = "failpoints")]
pub struct ScopedFailPoint {
name: String,
}
#[cfg(feature = "failpoints")]
impl ScopedFailPoint {
pub fn new(name: &str, action: &str) -> Self {
fail::cfg(name, action).expect("configure failpoint");
Self {
name: name.to_string(),
}
}
/// Register a callback failpoint with the same Drop-based cleanup as
/// `new`. Without the guard, a panic while the point is active would
/// leak the callback into the process-global registry and fire it under
/// later tests in the same binary.
pub fn with_callback<F>(name: &str, callback: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
fail::cfg_callback(name, callback).expect("configure callback failpoint");
Self {
name: name.to_string(),
}
}
}
#[cfg(feature = "failpoints")]
impl Drop for ScopedFailPoint {
fn drop(&mut self) {
fail::remove(&self.name);
}
/// Compile-checked catalog of this crate's apply-protocol failpoint names.
/// Engine-scoped names referenced from cluster tests live in
/// [`omnigraph::failpoints::names`].
pub mod names {
pub const CLUSTER_APPLY_AFTER_GRAPH_CREATE: &str = "cluster_apply.after_graph_create";
pub const CLUSTER_APPLY_AFTER_GRAPH_DELETE: &str = "cluster_apply.after_graph_delete";
pub const CLUSTER_APPLY_AFTER_PAYLOAD_PHASE: &str = "cluster_apply.after_payload_phase";
pub const CLUSTER_APPLY_AFTER_SCHEMA_APPLY: &str = "cluster_apply.after_schema_apply";
pub const CLUSTER_APPLY_BEFORE_GRAPH_CREATE: &str = "cluster_apply.before_graph_create";
pub const CLUSTER_APPLY_BEFORE_GRAPH_DELETE: &str = "cluster_apply.before_graph_delete";
pub const CLUSTER_APPLY_BEFORE_SCHEMA_APPLY: &str = "cluster_apply.before_schema_apply";
pub const CLUSTER_APPLY_BEFORE_STATE_WRITE: &str = "cluster_apply.before_state_write";
}

View file

@ -510,7 +510,7 @@ pub async fn apply_config_dir_with_options(
continue;
}
};
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_create") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_CREATE) {
// Simulated crash before the init: the sidecar stays for the
// sweep (row 1: root absent -> intent removed next run).
diagnostics.push(diagnostic);
@ -587,7 +587,7 @@ pub async fn apply_config_dir_with_options(
// Crash point: the graph exists, the cluster state does not record it
// yet. A failure here must acknowledge nothing; the next run's sweep
// rolls the ledger forward (row 4).
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_create") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_CREATE) {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
@ -727,7 +727,7 @@ pub async fn apply_config_dir_with_options(
continue;
}
};
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_SCHEMA_APPLY) {
// Simulated crash before the engine call: the sidecar stays; the
// sweep retires it next run (ledger still consistent with live).
diagnostics.push(diagnostic);
@ -787,7 +787,7 @@ pub async fn apply_config_dir_with_options(
}
// Crash point: the manifest moved, the ledger does not record it yet.
// A failure here acknowledges nothing; the sweep rolls forward.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_schema_apply") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_SCHEMA_APPLY) {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
@ -872,7 +872,7 @@ pub async fn apply_config_dir_with_options(
// Crash point: payloads are on disk, state has not moved. A failure here
// must leave state.json byte-identical and acknowledge nothing; re-running
// apply repairs via the skip-if-exists blob reuse.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_payload_phase") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE) {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
@ -949,7 +949,7 @@ pub async fn apply_config_dir_with_options(
continue;
}
};
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_delete") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_DELETE) {
// Simulated crash before removal: row 8 retires the intent and
// the still-valid approval lets a later run retry.
diagnostics.push(diagnostic);
@ -974,7 +974,7 @@ pub async fn apply_config_dir_with_options(
}
// Crash point: the root is gone, the ledger does not record it yet.
// The sweep rolls forward (row 7b) and consumes the approval.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_delete") {
if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_DELETE) {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
@ -1080,7 +1080,7 @@ pub async fn apply_config_dir_with_options(
// persisted-statuses revert contract below is exercised; a cfg_callback
// on this point can mutate state.json to simulate a concurrent writer,
// making write_state's CAS check fail organically.
let write_result = match failpoints::maybe_fail("cluster_apply.before_state_write") {
let write_result = match failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_STATE_WRITE) {
Ok(()) => {
backend
.write_state(&new_state, expected_cas.as_deref(), &mut observations)

View file

@ -13,9 +13,11 @@ use std::fs;
use std::path::{Path, PathBuf};
use fail::FailScenario;
use serial_test::serial;
use omnigraph::db::Omnigraph;
use omnigraph::failpoints::ScopedFailPoint as EngineScopedFailPoint;
use omnigraph_cluster::failpoints::ScopedFailPoint;
// One ScopedFailPoint for both engine- and cluster-scoped failpoint names:
// it is registry-only (error-type agnostic) and lives in the lowest crate.
use omnigraph::failpoints::ScopedFailPoint;
use omnigraph_cluster::{
ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir,
validate_config_dir,
@ -105,12 +107,13 @@ fn query_blob(config_dir: &Path, digests: &BTreeMap<String, String>) -> PathBuf
}
#[tokio::test]
#[serial]
async fn failpoint_wiring_returns_injected_diagnostic() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_applyable_state(dir.path());
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
@ -127,6 +130,7 @@ async fn failpoint_wiring_returns_injected_diagnostic() {
/// state.json is byte-identical, nothing is acknowledged — and a plain re-run
/// repairs by trusting the existing content-addressed blobs.
#[tokio::test]
#[serial]
async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
let scenario = FailScenario::setup();
let dir = fixture();
@ -134,7 +138,7 @@ async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
let state_before = fs::read(state_path(dir.path())).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
@ -169,6 +173,7 @@ async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
/// (possible under `state.lock: false`) must surface `state_cas_mismatch`,
/// acknowledge nothing, and leave the concurrent writer's state on disk.
#[tokio::test]
#[serial]
async fn apply_cas_race_surfaces_state_cas_mismatch() {
let scenario = FailScenario::setup();
let dir = fixture();
@ -179,7 +184,7 @@ async fn apply_cas_race_surfaces_state_cas_mismatch() {
// after apply read it but before apply writes. RAII-guarded so a panic
// inside apply cannot leak the callback into the global registry.
let race_path = state_path(dir.path());
let failpoint = ScopedFailPoint::with_callback("cluster_apply.before_state_write", move || {
let failpoint = ScopedFailPoint::with_callback(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_STATE_WRITE, move || {
let mut state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap();
state["state_revision"] = serde_json::json!(99);
@ -256,13 +261,14 @@ fn recovery_sidecars(config_dir: &Path) -> Vec<PathBuf> {
/// The next run's sweep removes the intent (row 1) and the same run creates
/// the graph and converges.
#[tokio::test]
#[serial]
async fn create_crash_before_init_recovers_via_sweep() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_empty_state(dir.path());
{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_create", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_CREATE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
@ -298,6 +304,7 @@ async fn create_crash_before_init_recovers_via_sweep() {
/// ledger is stale, nothing was acknowledged. The next run's sweep rolls the
/// ledger forward (row 4) with an audit entry, and the run converges.
#[tokio::test]
#[serial]
async fn create_crash_after_init_rolls_state_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
@ -305,7 +312,7 @@ async fn create_crash_after_init_rolls_state_forward() {
let state_before = fs::read(dir.path().join("__cluster/state.json")).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_create", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_CREATE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
@ -385,6 +392,7 @@ async fn live_schema_digest(dir: &Path) -> String {
/// live schema and ledger are untouched; the next run's sweep retires the
/// stale intent and the same run applies and converges.
#[tokio::test]
#[serial]
async fn schema_crash_before_apply_recovers_via_sweep() {
let scenario = FailScenario::setup();
let dir = fixture();
@ -393,7 +401,7 @@ async fn schema_crash_before_apply_recovers_via_sweep() {
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_schema_apply", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_SCHEMA_APPLY, "return");
let out = apply_config_dir_with_options(
dir.path(),
ApplyOptions {
@ -425,6 +433,7 @@ async fn schema_crash_before_apply_recovers_via_sweep() {
/// the graph manifest moves. The defensive cleanup proof should remove the
/// cluster sidecar immediately so a pre-movement error cannot brick boot.
#[tokio::test]
#[serial]
async fn schema_apply_error_before_graph_movement_removes_sidecar() {
let scenario = FailScenario::setup();
let dir = fixture();
@ -433,7 +442,7 @@ async fn schema_apply_error_before_graph_movement_removes_sidecar() {
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
{
let _failpoint = EngineScopedFailPoint::new("schema_apply.before_staging_write", "return");
let _failpoint = ScopedFailPoint::new(omnigraph::failpoints::names::SCHEMA_APPLY_BEFORE_STAGING_WRITE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
@ -462,6 +471,7 @@ async fn schema_apply_error_before_graph_movement_removes_sidecar() {
/// prove this is a pre-movement failure, so the sidecar must survive for
/// explicit recovery/quarantine instead of being cleaned up defensively.
#[tokio::test]
#[serial]
async fn schema_apply_error_after_graph_movement_keeps_sidecar() {
let scenario = FailScenario::setup();
let dir = fixture();
@ -472,7 +482,7 @@ async fn schema_apply_error_after_graph_movement_keeps_sidecar() {
let v2_digest = desired.resource_digests["schema.knowledge"].clone();
{
let _failpoint = EngineScopedFailPoint::new("schema_apply.after_manifest_commit", "return");
let _failpoint = ScopedFailPoint::new(omnigraph::failpoints::names::SCHEMA_APPLY_AFTER_MANIFEST_COMMIT, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
@ -524,6 +534,7 @@ async fn schema_apply_error_after_graph_movement_keeps_sidecar() {
/// moved, the ledger is stale, nothing acknowledged; the next run's sweep
/// rolls the ledger forward with an audit entry and the run converges.
#[tokio::test]
#[serial]
async fn schema_crash_after_apply_rolls_state_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
@ -534,7 +545,7 @@ async fn schema_crash_after_apply_rolls_state_forward() {
let v2_digest = desired.resource_digests["schema.knowledge"].clone();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_schema_apply", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_SCHEMA_APPLY, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
@ -608,13 +619,14 @@ async fn seed_approved_delete(dir: &Path) -> String {
/// next run retires the stale intent (row 8) and the still-approved delete
/// completes in the same run.
#[tokio::test]
#[serial]
async fn delete_crash_before_removal_reproposes() {
let scenario = FailScenario::setup();
let dir = fixture();
let approval_id = seed_approved_delete(dir.path()).await;
{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_delete", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_DELETE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(dir.path().join("graphs/old.omni").exists());
@ -650,6 +662,7 @@ async fn delete_crash_before_removal_reproposes() {
/// nothing acknowledged; the next run's sweep rolls the tombstone forward,
/// consumes the approval the sidecar carries, and audits the recovery.
#[tokio::test]
#[serial]
async fn delete_crash_after_removal_rolls_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
@ -657,7 +670,7 @@ async fn delete_crash_after_removal_rolls_forward() {
let state_before = fs::read(state_path(dir.path())).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_delete", "return");
let _failpoint = ScopedFailPoint::new(omnigraph_cluster::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_DELETE, "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);

View file

@ -257,7 +257,7 @@ impl GraphCoordinator {
/// fresh, so any existing commit-graph branch with this name is provably
/// orphaned and is force-dropped before recreating.
async fn create_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
failpoints::maybe_fail("branch_create.after_manifest_branch_create")?;
failpoints::maybe_fail(crate::failpoints::names::BRANCH_CREATE_AFTER_MANIFEST_BRANCH_CREATE)?;
let Some(commit_graph) = &mut self.commit_graph else {
return Ok(());
};
@ -306,7 +306,7 @@ impl GraphCoordinator {
/// Best-effort, idempotent reclaim of the commit-graph branch `branch`.
/// Tolerates an absent commit-graph dataset (a graph that never committed).
async fn reclaim_commit_graph_branch(&mut self, branch: &str) -> Result<()> {
failpoints::maybe_fail("branch_delete.before_commit_graph_reclaim")?;
failpoints::maybe_fail(crate::failpoints::names::BRANCH_DELETE_BEFORE_COMMIT_GRAPH_RECLAIM)?;
if let Some(commit_graph) = &mut self.commit_graph {
commit_graph.force_delete_branch(branch).await
} else if self
@ -486,7 +486,7 @@ impl GraphCoordinator {
updates: &[SubTableUpdate],
) -> Result<u64> {
let manifest_version = self.manifest.commit(updates).await?;
failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_AFTER_MANIFEST_COMMIT)?;
Ok(manifest_version)
}
@ -499,7 +499,7 @@ impl GraphCoordinator {
.manifest
.commit_with_expected(updates, expected_table_versions)
.await?;
failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_AFTER_MANIFEST_COMMIT)?;
Ok(manifest_version)
}
@ -508,7 +508,7 @@ impl GraphCoordinator {
changes: &[ManifestChange],
) -> Result<u64> {
let manifest_version = self.manifest.commit_changes(changes).await?;
failpoints::maybe_fail("graph_publish.after_manifest_commit")?;
failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_AFTER_MANIFEST_COMMIT)?;
Ok(manifest_version)
}
@ -539,7 +539,7 @@ impl GraphCoordinator {
self.manifest_incarnation().e_tag.as_deref(),
));
};
failpoints::maybe_fail("graph_publish.before_commit_append")?;
failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_BEFORE_COMMIT_APPEND)?;
// Refresh the commit-graph head from storage before selecting the
// parent. `append_commit` parents the new commit on the IN-MEMORY head
// (`head_commit_id`, zero storage read), but the manifest was just
@ -571,7 +571,7 @@ impl GraphCoordinator {
let commit_graph = self.commit_graph.as_mut().ok_or_else(|| {
OmniError::manifest("branch merge requires _graph_commits.lance".to_string())
})?;
failpoints::maybe_fail("graph_publish.before_commit_append")?;
failpoints::maybe_fail(crate::failpoints::names::GRAPH_PUBLISH_BEFORE_COMMIT_APPEND)?;
let graph_commit_id = commit_graph
.append_merge_commit(
current_branch.as_deref(),

View file

@ -416,7 +416,7 @@ pub(crate) async fn write_sidecar(
) -> Result<RecoverySidecarHandle> {
// Failpoint: models a storage put failure (S3 PutObject / fs write)
// in Phase A — every writer must abort before any HEAD advance.
crate::failpoints::maybe_fail("recovery.sidecar_write")?;
crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_SIDECAR_WRITE)?;
debug_assert_eq!(sidecar.schema_version, SIDECAR_SCHEMA_VERSION);
let uri = sidecar_uri(root_uri, &sidecar.operation_id);
let json = serde_json::to_string_pretty(sidecar).map_err(|err| {
@ -457,7 +457,7 @@ pub(crate) async fn confirm_sidecar_phase_b(
) -> Result<()> {
// Failpoint: models a storage failure on the confirmation write — the
// pre-confirm sidecar stays on disk, so recovery rolls the operation back.
crate::failpoints::maybe_fail("recovery.sidecar_confirm")?;
crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_SIDECAR_CONFIRM)?;
for pin in &mut sidecar.tables {
// Every pinned table MUST have an achieved version. A miss means the
// pin set and the publish `updates` diverged — fail loudly at the
@ -489,7 +489,7 @@ pub(crate) async fn delete_sidecar(
// Failpoint: models a storage delete failure (S3 DeleteObject) in
// Phase D — callers swallow it (the write already published) and the
// stale sidecar is healed by the next write or open.
crate::failpoints::maybe_fail("recovery.sidecar_delete")?;
crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_SIDECAR_DELETE)?;
storage.delete(&handle.sidecar_uri).await
}
@ -507,7 +507,7 @@ pub(crate) async fn list_sidecars(
// Failpoint: models a storage list failure (S3 ListObjectsV2) — every
// consumer (open-time sweep, write-entry heal) must fail loudly
// rather than silently skipping recovery.
crate::failpoints::maybe_fail("recovery.sidecar_list")?;
crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_SIDECAR_LIST)?;
let dir = recovery_dir_uri(root_uri);
let mut uris = storage.list_dir(&dir).await?;
// Sort by URI so the sweep processes sidecars deterministically.
@ -862,7 +862,7 @@ pub(crate) async fn heal_pending_sidecars_roll_forward(
};
if process_sidecar(
root_uri,
storage.as_ref(),
&storage,
&branch_snapshot,
&sidecar,
RecoveryMode::RollForwardOnly,
@ -928,7 +928,7 @@ async fn discard_orphaned_branch_sidecar(
.await?;
// Failpoint: the residual window above — commit appended, audit
// not yet durable.
crate::failpoints::maybe_fail("recovery.orphan_discard_audit_append")?;
crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_ORPHAN_DISCARD_AUDIT_APPEND)?;
audit
.append(RecoveryAuditRecord {
graph_commit_id,
@ -1036,7 +1036,7 @@ pub(crate) async fn recover_manifest_drift(
};
process_sidecar(
root_uri,
storage.as_ref(),
&storage,
&branch_snapshot,
&sidecar,
mode,
@ -1051,7 +1051,7 @@ pub(crate) async fn recover_manifest_drift(
async fn process_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
storage: &std::sync::Arc<dyn StorageAdapter>,
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
mode: RecoveryMode,
@ -1154,7 +1154,7 @@ async fn process_sidecar(
);
}
return record_audit_recovery_rollforward(
root_uri, storage, snapshot, sidecar, &states,
root_uri, storage.as_ref(), snapshot, sidecar, &states,
)
.await
.map(|()| true);
@ -1176,7 +1176,7 @@ async fn process_sidecar(
writer_kind = ?sidecar.writer_kind,
"recovery: rolling back sidecar (mixed or unexpected state)"
);
roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states)
roll_back_sidecar(root_uri, storage.as_ref(), snapshot, sidecar, &states)
.await
.map(|()| true)
}
@ -1191,7 +1191,7 @@ async fn process_sidecar(
"recovery: rolling back SchemaApply sidecar because schema staging \
files were not promoted in this recovery pass"
);
roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states)
roll_back_sidecar(root_uri, storage.as_ref(), snapshot, sidecar, &states)
.await
.map(|()| true)
}
@ -1211,8 +1211,32 @@ async fn process_sidecar(
"recovery: rolling forward sidecar (Phase B completed; \
Phase C did not land)"
);
// TOCTOU window: between `classify_table` (which read the manifest
// pin) and the publish CAS below, a concurrent live writer can
// advance the manifest past our expected version. The failpoint
// lets a test force that interleave deterministically.
crate::failpoints::maybe_fail(
crate::failpoints::names::RECOVERY_BEFORE_ROLL_FORWARD_PUBLISH,
)?;
let (new_manifest_version, published_versions) =
roll_forward_all(root_uri, sidecar, &states, snapshot).await?;
match roll_forward_all(root_uri, sidecar, &states, snapshot).await {
Ok(published) => published,
// Convergence-idempotent (invariants 7 & 15): a roll-forward's
// postcondition is "the manifest reflects the sidecar's committed
// Lance state", NOT "this sweep personally won the CAS". A
// concurrent writer that advanced the manifest to/past that goal
// during the classify→publish window is convergence, not a logical
// conflict — so re-read and either record the already-achieved
// roll-forward or defer to the next pass; never fail the open.
// Any other error still propagates.
Err(err) if is_expected_version_mismatch(&err) => {
return converge_or_defer_roll_forward(
root_uri, storage, sidecar, &states, err,
)
.await;
}
Err(err) => return Err(err),
};
// `to_version` records the ACTUAL Lance HEAD published for
// each table (not pin.post_commit_pin, which is a lower bound
// for loose-match writers like SchemaApply / EnsureIndices /
@ -1247,12 +1271,182 @@ async fn process_sidecar(
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
delete_sidecar_by_operation_id(root_uri, storage.as_ref(), &sidecar.operation_id)
.await?;
Ok(true)
}
}
}
/// True if `err` is the publisher's per-table CAS precondition failure
/// (`ExpectedVersionMismatch`) — the signal that a concurrent writer advanced
/// the manifest past what this caller expected.
fn is_expected_version_mismatch(err: &OmniError) -> bool {
matches!(
err,
OmniError::Manifest(m)
if matches!(
m.details,
Some(crate::error::ManifestConflictDetails::ExpectedVersionMismatch { .. })
)
)
}
/// Whether the live manifest already reflects everything this sidecar intended
/// to publish.
///
/// SOUNDNESS: the per-table test is `current_version >= observed lance_head`, a
/// *proxy* for "the sidecar's committed Lance commit is an ancestor of the
/// published HEAD" (so a higher version is a descendant that contains it). The
/// proxy is sound only because of the heal-first invariant: every writer that
/// can advance a table's manifest version first heals pending sidecars
/// (`heal_pending_recovery_sidecars` runs at the head of `load`/`mutate`/
/// schema-apply/branch-merge) or refuses on an unrecovered graph (`optimize`).
/// So the only path past `expected_version` is one that first publishes THIS
/// sidecar's commit at `lance_head` — version ordering then implies lineage
/// containment. A future writer that advances a pinned table WITHOUT healing
/// first (e.g. a non-heal-first `Overwrite` that replaces rows) would void this
/// proxy and must be re-validated by row-id lineage, not version ordering.
/// Added tables must be registered; tombstoned tables must be gone.
fn sidecar_intent_satisfied(
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
states: &[ClassifiedTable],
) -> bool {
for (pin, state) in sidecar.tables.iter().zip(states.iter()) {
let current = snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0);
if current < state.lance_head {
return false;
}
}
for reg in &sidecar.additional_registrations {
if snapshot.entry(&reg.table_key).is_none() {
return false;
}
}
for tomb in &sidecar.tombstones {
if snapshot.entry(&tomb.table_key).is_some() {
return false;
}
}
true
}
/// Re-read the live manifest snapshot for the sidecar's branch.
async fn fresh_snapshot_for_sidecar(
root_uri: &str,
storage: &std::sync::Arc<dyn StorageAdapter>,
sidecar: &RecoverySidecar,
) -> Result<Snapshot> {
let mut coordinator = match sidecar.branch.as_deref() {
Some(branch) if branch != "main" => {
GraphCoordinator::open_branch(root_uri, branch, std::sync::Arc::clone(storage)).await?
}
_ => GraphCoordinator::open(root_uri, std::sync::Arc::clone(storage)).await?,
};
coordinator.refresh().await?;
Ok(coordinator.snapshot())
}
/// Convergence-idempotent handling of a roll-forward publish CAS that lost to a
/// concurrent writer (`ExpectedVersionMismatch`). A roll-forward's postcondition
/// is "the manifest reflects the sidecar's committed Lance state", not "this
/// sweep won the CAS" (invariants 7 & 15). Re-read the live manifest:
///
/// - if it already reached the sidecar's goal, the work is done (just not by us)
/// — record the `RolledForward` audit and delete the sidecar idempotently;
/// - otherwise the manifest is progressing but not yet at the goal — leave the
/// sidecar for the next open / the live writer's own Phase D.
///
/// Either way the open does NOT fail. A genuine logical conflict (a table below
/// `expected_version`, i.e. data lost) is not satisfiable here and re-surfaces
/// loudly via the classifier's `InvariantViolation` on the next pass.
/// See iss-schema-apply-reopen-recovery-race.
async fn converge_or_defer_roll_forward(
root_uri: &str,
storage: &std::sync::Arc<dyn StorageAdapter>,
sidecar: &RecoverySidecar,
states: &[ClassifiedTable],
conflict: OmniError,
) -> Result<bool> {
let fresh = fresh_snapshot_for_sidecar(root_uri, storage, sidecar).await?;
if !sidecar_intent_satisfied(&fresh, sidecar, states) {
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: roll-forward publish lost a CAS and the manifest has not \
yet reached the sidecar's goal; deferring to the next pass \
(conflict: {conflict})"
);
return Ok(false);
}
// The manifest already reached the sidecar's goal — some other actor
// advanced it. Under the heal-first invariant, whoever advanced past
// `expected_version` first healed THIS sidecar (recorded its RolledForward
// audit and deleted it). So the audit row already exists; recording another
// here would put two RolledForward rows in `_graph_commit_recoveries` for
// one recovery event (visible in `commit list --filter actor=…recovery`).
// Only finish the bookkeeping if the sidecar is still on disk (the winner
// crashed between audit and delete); if it is already gone, the winner
// completed it — return success WITHOUT a duplicate audit, keeping the
// audit append-idempotent per operation_id across concurrent sweeps.
let sidecar_path = sidecar_uri(root_uri, &sidecar.operation_id);
if !storage.exists(&sidecar_path).await? {
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: roll-forward publish lost a CAS; the winner already \
converged and cleaned up this sidecar nothing to do"
);
return Ok(true);
}
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: roll-forward publish lost a CAS to a concurrent writer that \
already reached the goal; converging (RolledForward audit + delete)"
);
let mut outcomes: Vec<TableOutcome> = sidecar
.tables
.iter()
.map(|pin| TableOutcome {
table_key: pin.table_key.clone(),
from_version: pin.expected_version,
to_version: fresh
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(pin.post_commit_pin),
})
.collect();
// Mirror the normal roll-forward audit shape: SchemaApply sidecars also
// register added tables, so the audit must list them too (else a converge
// audit row is incomplete vs the `roll_forward_all` path for the same
// recovery kind).
for reg in &sidecar.additional_registrations {
outcomes.push(TableOutcome {
table_key: reg.table_key.clone(),
from_version: 0,
to_version: fresh
.entry(&reg.table_key)
.map(|e| e.table_version)
.unwrap_or(0),
});
}
record_audit(
root_uri,
sidecar,
fresh.version(),
RecoveryKind::RolledForward,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage.as_ref(), &sidecar.operation_id).await?;
Ok(true)
}
#[derive(Debug, Clone, Copy)]
struct ClassifiedTable {
classification: TableClassification,
@ -1622,7 +1816,7 @@ async fn record_audit(
// roll-back publish already landed — the sweep aborts, the sidecar
// stays, and re-entry records the audit row (see the retry note in
// the doc comment above).
crate::failpoints::maybe_fail("recovery.record_audit")?;
crate::failpoints::maybe_fail(crate::failpoints::names::RECOVERY_RECORD_AUDIT)?;
// Non-main recovery commits must be appended on the sidecar branch's
// commit graph, otherwise parent_commit_id comes from the global
// main head. BranchMerge additionally records the source branch's

View file

@ -317,7 +317,7 @@ impl Omnigraph {
{
return Err(OmniError::AlreadyInitialized { uri: root.clone() });
}
if let Err(err) = crate::failpoints::maybe_fail("init.after_schema_pg_written") {
if let Err(err) = crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_PG_WRITTEN) {
best_effort_cleanup_init_artifacts(&root, storage.as_ref()).await;
return Err(err);
}
@ -1447,7 +1447,7 @@ impl Omnigraph {
for (table_key, table_path) in cleanup_targets {
let dataset_uri = self.storage().dataset_uri(&table_path);
let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
let outcome = match crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_DELETE_BEFORE_TABLE_CLEANUP)
{
Ok(()) => {
self.storage()
@ -2024,14 +2024,14 @@ async fn init_storage_phase(
if write_schema_pg {
let schema_path = join_uri(root, SCHEMA_SOURCE_FILENAME);
storage.write_text(&schema_path, schema_source).await?;
crate::failpoints::maybe_fail("init.after_schema_pg_written")?;
crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_PG_WRITTEN)?;
}
write_schema_contract(root, storage.as_ref(), schema_ir).await?;
crate::failpoints::maybe_fail("init.after_schema_contract_written")?;
crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_SCHEMA_CONTRACT_WRITTEN)?;
let coordinator = GraphCoordinator::init(root, catalog, Arc::clone(storage)).await?;
crate::failpoints::maybe_fail("init.after_coordinator_init")?;
crate::failpoints::maybe_fail(crate::failpoints::names::INIT_AFTER_COORDINATOR_INIT)?;
Ok(coordinator)
}

View file

@ -512,7 +512,7 @@ async fn optimize_one_table(
// Test seam: a concurrent (cross-process) writer can interleave here, before
// any Phase-B commit lands, to exercise the reopen+replan path.
crate::failpoints::maybe_fail("optimize.before_compact")?;
crate::failpoints::maybe_fail(crate::failpoints::names::OPTIMIZE_BEFORE_COMPACT)?;
// Phase B: scrub stale auto_cleanup (keeps optimize non-destructive on a
// graph upgraded from a pre-v7 binary whose `compact_files`/`optimize_indices`
@ -549,7 +549,7 @@ async fn optimize_one_table(
// committed (so HEAD is already ahead of the manifest from our own work),
// exercising the own-HEAD (not external) drift classification on the next
// reopened attempt.
if crate::failpoints::maybe_fail("optimize.inject_reindex_conflict").is_err()
if crate::failpoints::maybe_fail(crate::failpoints::names::OPTIMIZE_INJECT_REINDEX_CONFLICT).is_err()
&& attempt < COMPACTION_RETRY_BUDGET
{
continue;
@ -584,7 +584,7 @@ async fn optimize_one_table(
// Pin the per-writer Phase B → Phase C residual: Lance HEAD has advanced but the
// manifest publish below hasn't run.
crate::failpoints::maybe_fail("optimize.post_phase_b_pre_manifest_commit")?;
crate::failpoints::maybe_fail(crate::failpoints::names::OPTIMIZE_POST_PHASE_B_PRE_MANIFEST_COMMIT)?;
// Phase C: monotonic fast-forward publish. The compaction is committed at Lance
// HEAD `N`; publish a manifest pointer that includes it. If a concurrent writer
@ -921,7 +921,7 @@ pub async fn cleanup_all_tables(
let results: Vec<TableCleanupStats> = futures::stream::iter(table_tasks.into_iter())
.map(|(table_key, full_path)| async move {
let outcome: Result<RemovalStats> = async {
crate::failpoints::maybe_fail("cleanup.table_gc")?;
crate::failpoints::maybe_fail(crate::failpoints::names::CLEANUP_TABLE_GC)?;
// `cleanup_old_versions` is a Lance-only maintenance API not
// surfaced through `TableStorage` — see the optimize path
// above for the same rationale. Unwrap via `into_dataset()`.
@ -1079,7 +1079,7 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
}
if !branch_snapshots.contains_key(&branch) {
let branch_snapshot =
match crate::failpoints::maybe_fail("cleanup.resolve_branch_snapshot") {
match crate::failpoints::maybe_fail(crate::failpoints::names::CLEANUP_RESOLVE_BRANCH_SNAPSHOT) {
Ok(()) => db.snapshot_for_branch(Some(&branch)).await,
Err(injected) => Err(injected),
};
@ -1158,7 +1158,7 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
continue;
}
}
let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") {
let outcome = match crate::failpoints::maybe_fail(crate::failpoints::names::CLEANUP_RECONCILE_FORK) {
Ok(()) => storage.force_delete_branch(&full_path, &branch).await,
Err(injected) => Err(injected),
};
@ -1308,7 +1308,10 @@ mod tests {
ds.create_branch("feature", base, None).await.unwrap();
}
let _fp = ScopedFailPoint::new("cleanup.resolve_branch_snapshot", "return");
let _fp = ScopedFailPoint::new(
crate::failpoints::names::CLEANUP_RESOLVE_BRANCH_SNAPSHOT,
"return",
);
let stats = reconcile_orphaned_branches(&db).await.unwrap();
assert_eq!(

View file

@ -648,7 +648,7 @@ where
// `recover_schema_state_files`:
// - crash before commit → manifest unchanged; staging deleted on open
// - crash after commit → manifest advanced; staging renamed on open
crate::failpoints::maybe_fail("schema_apply.before_staging_write")?;
crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_BEFORE_STAGING_WRITE)?;
let staging_pg_uri = schema_source_staging_uri(&db.root_uri);
db.storage
@ -656,7 +656,7 @@ where
.await?;
write_schema_contract_staging(&db.root_uri, db.storage.as_ref(), &desired_ir).await?;
crate::failpoints::maybe_fail("schema_apply.after_staging_write")?;
crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_AFTER_STAGING_WRITE)?;
// `apply_schema` doesn't currently take an actor; system-attributed.
let PublishedSnapshot {
@ -669,7 +669,7 @@ where
.commit_changes_with_actor(&manifest_changes, None)
.await?;
crate::failpoints::maybe_fail("schema_apply.after_manifest_commit")?;
crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_AFTER_MANIFEST_COMMIT)?;
db.storage
.rename_text(&staging_pg_uri, &schema_source_uri(&db.root_uri))

View file

@ -296,7 +296,7 @@ pub(super) async fn ensure_indices_for_branch(
// (one commit_staged per index built) but the manifest publish below
// hasn't run. Used by
// `tests/failpoints.rs::ensure_indices_phase_b_failure_recovered_on_next_open`.
crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?;
crate::failpoints::maybe_fail(crate::failpoints::names::ENSURE_INDICES_POST_PHASE_B_PRE_MANIFEST_COMMIT)?;
if !updates.is_empty() {
commit_prepared_updates_on_branch(db, branch, &updates, None).await?;
@ -671,7 +671,7 @@ pub(super) async fn open_owned_dataset_for_branch_write(
Ok((ds, Some(active_branch.to_string())))
}
source_branch => {
crate::failpoints::maybe_fail("fork.before_classify")?;
crate::failpoints::maybe_fail(crate::failpoints::names::FORK_BEFORE_CLASSIFY)?;
// Authority check before forking: re-read the live manifest. If this
// table is already forked on active_branch, a concurrent first-write
// won the race and our snapshot is stale — that is a retryable
@ -767,7 +767,7 @@ pub(crate) async fn classify_fork_ref(
// fresh-authority read (no-op without the `failpoints` feature). Lets a
// test exercise the Indeterminate path — a read failure on a live branch
// must classify as Indeterminate (skip), never Orphan (destroy).
let fresh = match crate::failpoints::maybe_fail("classify.fresh_read") {
let fresh = match crate::failpoints::maybe_fail(crate::failpoints::names::CLASSIFY_FRESH_READ) {
Ok(()) => db.fresh_snapshot_for_branch(Some(branch)).await,
Err(injected) => Err(injected),
};
@ -851,7 +851,7 @@ pub(super) async fn reclaim_orphaned_fork_and_refork(
}
}
crate::failpoints::maybe_fail("fork.before_reclaim")?;
crate::failpoints::maybe_fail(crate::failpoints::names::FORK_BEFORE_RECLAIM)?;
db.storage()
.force_delete_branch(full_path, active_branch)
.await
@ -1114,7 +1114,7 @@ async fn stage_and_commit_btree(
// to demonstrate that a stage-step failure in the staged-index
// path (`stage_create_btree_index` succeeded; `commit_staged` not
// yet called) leaves no Lance-HEAD drift on the touched table.
crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?;
crate::failpoints::maybe_fail(crate::failpoints::names::ENSURE_INDICES_POST_STAGE_PRE_COMMIT_BTREE)?;
let new_ds = db
.storage()
.commit_staged(ds.clone(), staged)

View file

@ -1128,7 +1128,7 @@ async fn publish_rewritten_merge_table(
// rows are on Lance HEAD but the delete has not committed and the
// achieved-version intent has not been recorded, so recovery must roll BACK.
// See tests/failpoints.rs::branch_merge_rewrite_partial_after_merge_rolls_back.
crate::failpoints::maybe_fail("branch_merge.rewrite_after_merge_pre_delete")?;
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_MERGE_PRE_DELETE)?;
// Phase 2: delete removed rows via deletion vectors.
//
@ -1159,7 +1159,7 @@ async fn publish_rewritten_merge_table(
// recorded, so recovery must roll BACK (the index is reconciler-owned derived
// state, but the merge itself never reached its commit boundary). See
// tests/failpoints.rs::branch_merge_rewrite_partial_after_delete_rolls_back.
crate::failpoints::maybe_fail("branch_merge.rewrite_after_delete_pre_index")?;
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_DELETE_PRE_INDEX)?;
// Phase 3: rebuild indices.
//
@ -1276,7 +1276,7 @@ async fn publish_adopted_delta(
// have not committed and the achieved-version intent has not been recorded, so
// recovery must roll BACK (not publish the appends-only state). See
// tests/failpoints.rs::branch_merge_adopt_partial_after_append_rolls_back.
crate::failpoints::maybe_fail("branch_merge.adopt_after_append_pre_upsert")?;
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_APPEND_PRE_UPSERT)?;
// Phase 1b: upsert the CHANGED rows. The merge_insert hash join is now
// bounded to the genuinely-changed set, not the whole delta. It runs against
@ -1308,7 +1308,7 @@ async fn publish_adopted_delta(
// has not committed and the achieved-version intent has not been recorded, so
// recovery must roll BACK. See
// tests/failpoints.rs::branch_merge_adopt_partial_after_upsert_rolls_back.
crate::failpoints::maybe_fail("branch_merge.adopt_after_upsert_pre_delete")?;
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_UPSERT_PRE_DELETE)?;
// Phase 2: delete removed rows via deletion vectors (inline-commit residual,
// same as the three-way path until Lance ships a public two-phase delete).
@ -1793,7 +1793,7 @@ impl Omnigraph {
// (publish_*) AND the sidecar is confirmed, but the manifest publish
// below hasn't run — so recovery rolls FORWARD. Used by
// `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`.
crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT)?;
let manifest_version = if updates.is_empty() {
self.version().await

View file

@ -895,7 +895,7 @@ impl Omnigraph {
// across this failure so the next `Omnigraph::open`'s
// recovery sweep can roll forward — see
// `tests/failpoints.rs::recovery_rolls_forward_after_finalize_publisher_failure`.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_POST_FINALIZE_PRE_PUBLISHER)?;
self.commit_updates_on_branch_with_expected(
requested.as_deref(),
&updates,
@ -1423,7 +1423,7 @@ impl Omnigraph {
crate::db::MutationOpKind::Delete,
)
.await?;
crate::failpoints::maybe_fail("mutation.delete_node_pre_primary_delete")?;
crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_DELETE_NODE_PRE_PRIMARY_DELETE)?;
let (_new_ds, delete_state) = self
.storage_inline_residual()
.delete_where(&full_path, ds, &pred_sql)

View file

@ -14,6 +14,49 @@ pub(crate) fn maybe_fail(_name: &str) -> Result<()> {
Ok(())
}
/// Compile-checked catalog of every failpoint name in this crate. Call sites
/// (`maybe_fail`) and tests (`ScopedFailPoint` / the test rendezvous helper)
/// reference these constants instead of bare string literals, so a typo is a
/// compile error rather than a silently-never-firing failpoint.
pub mod names {
pub const BRANCH_CREATE_AFTER_MANIFEST_BRANCH_CREATE: &str = "branch_create.after_manifest_branch_create";
pub const BRANCH_DELETE_BEFORE_COMMIT_GRAPH_RECLAIM: &str = "branch_delete.before_commit_graph_reclaim";
pub const BRANCH_DELETE_BEFORE_TABLE_CLEANUP: &str = "branch_delete.before_table_cleanup";
pub const BRANCH_MERGE_ADOPT_AFTER_APPEND_PRE_UPSERT: &str = "branch_merge.adopt_after_append_pre_upsert";
pub const BRANCH_MERGE_ADOPT_AFTER_UPSERT_PRE_DELETE: &str = "branch_merge.adopt_after_upsert_pre_delete";
pub const BRANCH_MERGE_POST_PHASE_B_PRE_MANIFEST_COMMIT: &str = "branch_merge.post_phase_b_pre_manifest_commit";
pub const BRANCH_MERGE_REWRITE_AFTER_DELETE_PRE_INDEX: &str = "branch_merge.rewrite_after_delete_pre_index";
pub const BRANCH_MERGE_REWRITE_AFTER_MERGE_PRE_DELETE: &str = "branch_merge.rewrite_after_merge_pre_delete";
pub const CLASSIFY_FRESH_READ: &str = "classify.fresh_read";
pub const CLEANUP_RECONCILE_FORK: &str = "cleanup.reconcile_fork";
pub const CLEANUP_RESOLVE_BRANCH_SNAPSHOT: &str = "cleanup.resolve_branch_snapshot";
pub const CLEANUP_TABLE_GC: &str = "cleanup.table_gc";
pub const ENSURE_INDICES_POST_PHASE_B_PRE_MANIFEST_COMMIT: &str = "ensure_indices.post_phase_b_pre_manifest_commit";
pub const ENSURE_INDICES_POST_STAGE_PRE_COMMIT_BTREE: &str = "ensure_indices.post_stage_pre_commit_btree";
pub const FORK_BEFORE_CLASSIFY: &str = "fork.before_classify";
pub const FORK_BEFORE_RECLAIM: &str = "fork.before_reclaim";
pub const GRAPH_PUBLISH_AFTER_MANIFEST_COMMIT: &str = "graph_publish.after_manifest_commit";
pub const GRAPH_PUBLISH_BEFORE_COMMIT_APPEND: &str = "graph_publish.before_commit_append";
pub const INIT_AFTER_COORDINATOR_INIT: &str = "init.after_coordinator_init";
pub const INIT_AFTER_SCHEMA_CONTRACT_WRITTEN: &str = "init.after_schema_contract_written";
pub const INIT_AFTER_SCHEMA_PG_WRITTEN: &str = "init.after_schema_pg_written";
pub const MUTATION_DELETE_NODE_PRE_PRIMARY_DELETE: &str = "mutation.delete_node_pre_primary_delete";
pub const MUTATION_POST_FINALIZE_PRE_PUBLISHER: &str = "mutation.post_finalize_pre_publisher";
pub const OPTIMIZE_BEFORE_COMPACT: &str = "optimize.before_compact";
pub const OPTIMIZE_INJECT_REINDEX_CONFLICT: &str = "optimize.inject_reindex_conflict";
pub const OPTIMIZE_POST_PHASE_B_PRE_MANIFEST_COMMIT: &str = "optimize.post_phase_b_pre_manifest_commit";
pub const RECOVERY_BEFORE_ROLL_FORWARD_PUBLISH: &str = "recovery.before_roll_forward_publish";
pub const RECOVERY_ORPHAN_DISCARD_AUDIT_APPEND: &str = "recovery.orphan_discard_audit_append";
pub const RECOVERY_RECORD_AUDIT: &str = "recovery.record_audit";
pub const RECOVERY_SIDECAR_CONFIRM: &str = "recovery.sidecar_confirm";
pub const RECOVERY_SIDECAR_DELETE: &str = "recovery.sidecar_delete";
pub const RECOVERY_SIDECAR_LIST: &str = "recovery.sidecar_list";
pub const RECOVERY_SIDECAR_WRITE: &str = "recovery.sidecar_write";
pub const SCHEMA_APPLY_AFTER_MANIFEST_COMMIT: &str = "schema_apply.after_manifest_commit";
pub const SCHEMA_APPLY_AFTER_STAGING_WRITE: &str = "schema_apply.after_staging_write";
pub const SCHEMA_APPLY_BEFORE_STAGING_WRITE: &str = "schema_apply.before_staging_write";
}
#[cfg(feature = "failpoints")]
pub struct ScopedFailPoint {
name: String,
@ -27,6 +70,20 @@ impl ScopedFailPoint {
name: name.to_string(),
}
}
/// Register a callback failpoint with the same Drop-based cleanup as
/// `new`. Without the guard, a panic while the point is active would
/// leak the callback into the process-global registry and fire it under
/// later tests in the same binary.
pub fn with_callback<F>(name: &str, callback: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
fail::cfg_callback(name, callback).expect("configure callback failpoint");
Self {
name: name.to_string(),
}
}
}
#[cfg(feature = "failpoints")]

View file

@ -625,7 +625,7 @@ async fn load_jsonl_reader<R: BufRead>(
// staged commits have advanced Lance HEAD, but the manifest
// publish has not run yet. Reuse the mutation failpoint name so
// one failpoint pins the shared `MutationStaging` boundary.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_POST_FINALIZE_PRE_PUBLISHER)?;
db.commit_updates_on_branch_with_expected(
branch,
&updates,

View file

@ -812,10 +812,12 @@ impl TableStore {
/// Legacy inline-commit append: writes fragments AND commits in one
/// call, advancing Lance HEAD as a side effect. Not on the
/// `TableStorage` trait surface — the staged primitive `stage_append`
/// + `commit_staged` is the engine write path. This inherent
/// `pub(crate)` method survives only for recovery test setup. Do not
/// add new engine call sites — they re-introduce the multi-phase
/// commit drift the trait surface was designed to eliminate.
/// + `commit_staged` is the engine write path. This inherent method
/// survives only for in-source recovery test setup, so it is
/// `#[cfg(test)]`-gated: engine code physically cannot call it (which
/// enforces "no new call sites" by construction and silences the
/// dead-code warning the non-test lib build would otherwise emit).
#[cfg(test)]
pub(crate) async fn append_batch(
&self,
dataset_uri: &str,

View file

@ -0,0 +1,96 @@
//! Guard: failpoint names must come from the compile-checked `names` catalog
//! (`omnigraph::failpoints::names` / `omnigraph_cluster::failpoints::names`),
//! never bare string literals.
//!
//! The `names` consts give compile-time typo protection only if every call
//! site uses them. A bare `maybe_fail("typo.literal")` still compiles (the
//! arg is `&str`), so a typo there would silently never fire. This
//! source-walk closes that gap by construction — the same defense-in-depth
//! shape as `forbidden_apis.rs`. Add a new failpoint by adding its const to
//! the catalog first; this guard then forces every call site to reference it.
use std::path::{Path, PathBuf};
/// Call-site prefixes whose first argument must be a `names::` constant. The
/// check is whitespace/newline-tolerant (it skips past the open paren to the
/// first non-whitespace token), so wrapping the call across lines cannot hide
/// a literal — a per-line `contains` scan would miss
/// `park_first(\n "name",\n)`.
const CALL_PREFIXES: &[&str] = &[
"maybe_fail(",
"ScopedFailPoint::new(",
"ScopedFailPoint::with_callback(",
"park_first(",
];
/// 1-based line number of `byte_off` within `contents`.
fn line_of(contents: &str, byte_off: usize) -> usize {
contents[..byte_off].bytes().filter(|&b| b == b'\n').count() + 1
}
fn manifest_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
}
/// Production call sites live under each crate's `src`; test call sites live
/// in the two failpoint integration binaries. This guard file is deliberately
/// not in the set (it names the patterns as literals itself).
fn files_to_scan() -> Vec<PathBuf> {
let engine = manifest_dir();
let cluster = engine.join("../omnigraph-cluster");
let mut out = Vec::new();
collect_rs(&engine.join("src"), &mut out);
collect_rs(&cluster.join("src"), &mut out);
out.push(engine.join("tests/failpoints.rs"));
out.push(cluster.join("tests/failpoints.rs"));
out
}
fn collect_rs(dir: &Path, out: &mut Vec<PathBuf>) {
let Ok(entries) = std::fs::read_dir(dir) else {
return;
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
collect_rs(&path, out);
} else if path.extension().is_some_and(|e| e == "rs") {
out.push(path);
}
}
}
#[test]
fn failpoint_names_use_the_compile_checked_catalog() {
let mut violations = Vec::new();
for file in files_to_scan() {
let Ok(contents) = std::fs::read_to_string(&file) else {
continue;
};
for prefix in CALL_PREFIXES {
let mut from = 0;
while let Some(rel) = contents[from..].find(prefix) {
let after_open = from + rel + prefix.len();
// Skip whitespace (incl. newlines) after the open paren. If the
// first argument token is a `"`, it's a literal failpoint name
// — across a line break or not.
if contents[after_open..].trim_start().starts_with('"') {
violations.push(format!(
"{}:{}: literal failpoint name at `{}` — use a `names::` const",
file.display(),
line_of(&contents, from + rel),
prefix.trim_end_matches('('),
));
}
from = after_open;
}
}
}
assert!(
violations.is_empty(),
"failpoint names must reference the compile-checked \
`omnigraph::failpoints::names::*` (or `omnigraph_cluster::failpoints::names::*`) \
constants, not string literals a literal typo would silently never fire:\n{}",
violations.join("\n")
);
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,84 @@
//! Deterministic rendezvous for concurrent failpoint tests.
//!
//! The pattern: park the FIRST thread that hits a failpoint until the test
//! explicitly releases it, while later arrivals fall through. This replaces
//! fixed "guess" `sleep`s for cross-thread coordination — the test waits on
//! the *condition* (the point was reached) with a bounded timeout that fails
//! loudly, instead of betting a fixed duration is long enough.
//!
//! Extracted from the open-coded `AtomicBool` + callback pattern that
//! `fork_collision_with_live_concurrent_fork_is_retryable` proved out.
//!
//! The `reached` flag also doubles as a fired-assertion: a point that is
//! never hit makes [`Rendezvous::wait_until_reached`] panic, so a typo'd or
//! misplaced failpoint cannot pass silently.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::time::Duration;
use omnigraph::failpoints::ScopedFailPoint;
/// A parked-on-first-arrival rendezvous bound to a failpoint name. The
/// underlying callback is RAII-cleaned when this guard drops.
pub struct Rendezvous {
name: String,
reached: Arc<AtomicBool>,
release: Arc<AtomicBool>,
_failpoint: ScopedFailPoint,
}
impl Rendezvous {
/// Register `name` so the FIRST thread to hit it records readiness and
/// blocks until [`release`](Self::release); later arrivals fall through
/// immediately. The park is bounded (~30s) so a test bug cannot hang the
/// suite forever.
pub fn park_first(name: &str) -> Self {
let reached = Arc::new(AtomicBool::new(false));
let release = Arc::new(AtomicBool::new(false));
let (cb_reached, cb_release) = (Arc::clone(&reached), Arc::clone(&release));
let _failpoint = ScopedFailPoint::with_callback(name, move || {
if cb_reached
.compare_exchange(false, true, SeqCst, SeqCst)
.is_ok()
{
// ~30s bound (6000 * 5ms); released earlier on the common path.
for _ in 0..6000 {
if cb_release.load(SeqCst) {
return;
}
std::thread::sleep(Duration::from_millis(5));
}
}
});
Self {
name: name.to_string(),
reached,
release,
_failpoint,
}
}
/// Async-wait until the parked thread has reached the failpoint, polling
/// the readiness condition with a bounded (~12s) timeout. Panics if the
/// point is never hit — the fired-assertion.
pub async fn wait_until_reached(&self) {
for _ in 0..2400 {
if self.reached.load(SeqCst) {
return;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
panic!("rendezvous: failpoint '{}' was never reached", self.name);
}
/// Whether the parked thread has reached the failpoint yet.
pub fn reached(&self) -> bool {
self.reached.load(SeqCst)
}
/// Release the parked thread so it resumes past the failpoint.
pub fn release(&self) {
self.release.store(true, SeqCst);
}
}

View file

@ -1,6 +1,8 @@
#![allow(dead_code)]
pub mod cost;
#[cfg(feature = "failpoints")]
pub mod failpoint;
pub mod recovery;
use arrow_array::{Array, RecordBatch, StringArray};

View file

@ -354,10 +354,9 @@ open; delete #6658 shipped). Track, don't build yet.
- **#254** — logical-class fix (schema-apply vs optimize false-fail). Same op-class family;
both are de-risking inputs for Design A's per-class commit models.
- **#296** — recovery roll-forward converges on concurrent manifest advance. This is the fix
for the flaky `iss-schema-apply-reopen-recovery-race` (the handoff in
`handoff-schema-apply-recovery-flake.md`). It touches `recovery.rs` and is *aligned* with
#297's "postcondition is the state, not winning the CAS" principle — reconcile the monotonic
publish with #296's converge helper if #296 lands first.
for the flaky `iss-schema-apply-reopen-recovery-race`. It touches `recovery.rs` and is
*aligned* with #297's "postcondition is the state, not winning the CAS" principle — reconcile
the monotonic publish with #296's converge helper if #296 lands first.
- **#295** — the step-3b RFC doc (apply §4's three corrections to it).
---

View file

@ -1,216 +0,0 @@
# Handoff: flaky schema-apply → reopen recovery race
**Type:** bug investigation handoff (not yet fixed)
**Status:** root-caused to a layer + hypothesis; exact mechanism and fix NOT yet validated
**Severity:** medium — flaky CI; a real (rare) schema-apply-then-reopen failure under load
**Scope:** pre-existing on `main`; **independent of** RFC-013 step 2 (internal-table
compaction, PR #291) and step 3a (#288) — those paths never touch schema apply or
the recovery sweep, and the full `--workspace` gate passes clean on a re-run.
> Do **not** "fix" this by changing the test to use a single handle. That was
> empirically shown to *reduce but not eliminate* the flake (see Experiments), so it
> would mask a real product race. This is a correct-by-design fix in the engine, not
> a test edit.
---
## 1. Symptom
The test
`crates/omnigraph-server/tests/schema_routes.rs::schema_apply_route_hard_drops_property_with_allow_data_loss`
intermittently fails. The HTTP schema apply **succeeds** (`applied == true`); the
*subsequent* `Omnigraph::open(graph)` (which the test does to verify the catalog)
panics on `.unwrap()` with:
```
OmniError::Manifest(Conflict,
"stale view of node:Person: expected manifest version 5 but current is 7",
ExpectedVersionMismatch { expected: 5, actual: 7 })
```
The values (5, 7) vary; the shape is always "recovery roll-forward expected version
N, manifest is at M > N." It is raised from the **open-time recovery sweep**, i.e.
inside `Omnigraph::open`, not from the apply itself.
---
## 2. Reproduction
- **Needs sibling-test parallelism (CPU contention).** Running the target test
*alone* is rock-solid (0/20 failures). The flake only appears when other tests in
the same binary run concurrently and perturb the timing inside the apply→reopen
sequence.
- Fast repro loop (≈1340% per run):
```bash
cargo test -p omnigraph-server --test schema_routes --no-run
for i in $(seq 1 15); do
cargo test -p omnigraph-server --test schema_routes 2>&1 \
| grep -q "schema_apply_route_hard_drops_property_with_allow_data_loss ... FAILED" \
&& echo "iter $i FAIL"
done
```
- It originally surfaced in a full `cargo test --workspace` run (max parallelism).
- Each test uses its own `tempfile::tempdir()`, so this is **not** cross-test shared
state — it's a timing race inside one test's own graph.
---
## 3. Experiments run (the discriminating evidence)
Each variant was stress-run under the full `schema_routes` suite (parallel siblings):
| Variant | Flake rate |
|---|---|
| Target test in isolation (no sibling parallelism) | **0/20** |
| **Control** — as written (server handle + out-of-band `Omnigraph::open` load + reopen) | 6/15 ≈ 40% |
| Drop the live server handle (`drop(app)`) before the reopen | 4/15 ≈ 27% |
| Remove the out-of-band separate-handle load | 2/15 ≈ 13% |
| Remove the load **and** drop the server handle (≈ single-handle) | 8/20 ≈ 40% |
**Interpretation:**
- It is **concurrency-triggered**, not a topology bug: 0% isolated, flaky under
parallel load.
- **No single factor eliminates it.** Removing the out-of-band load roughly halves
the rate (it amplifies the race) but leaves a ~13% base. Dropping the live server
handle does not clearly help. So the "single-handle test" patch is a **band-aid**,
not the fix.
- The residual base rate with the out-of-band load removed means there is a real
race in the **schema-apply → reopen → recovery** path itself.
Caveat on the experiments: `drop(app)` may not synchronously tear down the server's
engine handle (it can be held by an `Arc`/spawned task), so the "single-handle"
rows are not airtight. This is one of the things to validate (§6).
---
## 4. Root-cause hypothesis (NOT yet proven)
The failing path is the **open-time recovery sweep's roll-forward** raising
`ExpectedVersionMismatch` from the publisher's `check_expected_table_versions`.
The hard-drop schema apply (`allow_data_loss=true``DropMode::Hard`) is a
**multi-step migration**: it performs several Lance commits + `__manifest` publishes,
advancing `node:Person`'s manifest version across multiple versions (e.g. 5 → … → 7).
To be crash-safe across the Lance-HEAD-before-manifest-publish gap, schema apply
writes a **recovery sidecar** (`__recovery/{ulid}.json`) pinning per-table
`expected_version` / `post_commit_pin` before its Phase B.
Hypothesis: under CPU contention, the timing of (a) the migration's multi-version
advancement, (b) the sidecar's Phase-D deletion, and (c) a later/over­lapping
`Omnigraph::open` recovery sweep interleaves such that the recovery roll-forward
reads a sidecar whose pinned `expected` is **stale relative to a manifest that
legitimately advanced several versions**, and **re-publishes at the stale `expected`
instead of recognizing the migration already completed** → `expected 5, actual 7`.
In other words: the recovery classifier / roll-forward likely does not correctly
handle a table whose manifest is **already past `post_commit_pin`** by more than one
step (multi-step migration), or a sidecar whose operation has already fully
committed. The single-step assumption baked into the Optimize-style pin
(`post_commit_pin = expected_version + 1`) may not generalize to multi-commit schema
migrations.
---
## 5. Likely solution (correct-by-design, surgical)
Make the **open-time recovery classifier idempotent against a manifest that advanced
past the sidecar's pin**:
- If the table's current manifest/Lance version is already `>= post_commit_pin`
(operation completed, possibly across multiple versions), classify it as
*already-rolled-forward / completed* (the `RolledPastExpected` family) and **delete
the sidecar without republishing** — never attempt a publish at the stale
`expected`.
- Ensure the schema-apply sidecar records a pin that the classifier can interpret for
a **multi-step** migration (a range / "completed at or beyond" semantics), not a
strict single-step `expected + 1`.
This also hardens *real* crash recovery for multi-step schema apply (not just the
test), and is small + local to `recovery.rs` (+ possibly the schema-apply sidecar
write in `schema_apply.rs`). It does **not** rearchitect recovery.
Per repo rule 12 (test-first for bug fixes): land a **deterministic** repro first —
ideally a failpoint that forces the interleaving (pause after the migration's commits
but before sidecar delete, then run an open) so the red→green is reliable, not a
stress-loop probability. See the `failpoints.rs` pattern + the schema-apply failpoints
already in the tree.
---
## 6. What MUST be validated before fixing
1. **Which sidecar is being rolled forward?** Confirm it is the *schema-apply*
sidecar (vs the out-of-band `load`'s sidecar, vs another writer). Instrument /
log the sidecar `operation_id`, `kind`, and `SidecarTablePin` at the point the
recovery sweep raises the error.
2. **The exact classifier path.** Trace which `TableClassification` arm the failing
table hits (`recovery.rs::classify_table`, ~L600) and which roll-forward call
raises `ExpectedVersionMismatch` (`heal_pending_sidecars_roll_forward` ~L761,
`roll_forward_all` ~L1215, `restore`+publish ~L1275). Confirm it is the
multi-step-advanced / already-completed case being mishandled.
3. **Is `post_commit_pin = expected + 1` the bug?** Verify the hard-drop migration
advances `node:Person` by **>1** version, and that the sidecar pins a single-step
`+1`, so the classifier can't recognize completion at +2.
4. **Engine-level reproduction (no server).** Build a deterministic engine-level
repro: persistent handle applies a multi-step hard-drop, then a fresh
`Omnigraph::open` — ideally with a failpoint forcing the interleave — to confirm
the bug is in the engine recovery path and not server-specific (runtime, handle
lifecycle). The current evidence is server-test-only.
5. **Is the out-of-band load *necessary or only amplifying*?** Confirm the ~13% base
rate (load removed) is the same root cause, not a second distinct race. If the
load is required, the bug is specifically about a second writer's version
advancement; if not, it's purely intra-apply.
6. **`drop(app)` cleanliness.** Verify whether the server's engine handle is truly
gone after `drop(app)` (it may be `Arc`-held). If not, the "single-handle"
experiments don't isolate the live-handle factor and should be redone with a
genuinely single-handle setup.
---
## 7. Relationship to Lance MTT
This bug lives in the **recovery-sidecar roll-forward**, which exists only to bridge
the Lance-HEAD-before-manifest-publish gap in omnigraph's faked multi-table
atomicity. `invariants.md` already calls recovery sidecars "scaffolding to remove
once the substrate closes the gap." Lance **MTT** (native atomic multi-table commits,
RFC §8 / lance#7264) closes that gap → retires the sidecar → **eliminates this bug
class.**
Implications:
- **Don't wait for MTT** — it is the "strategic exit, not a current dependency,"
uncertain and far off, and this bug is live now.
- **Don't over-invest** — keep the fix surgical (classifier idempotency), because the
whole sidecar layer is MTT-disposable. A surgical fix retires cleanly with the
layer; a recovery rearchitecture would be throwaway.
---
## 8. Key pointers
- Failing test: `crates/omnigraph-server/tests/schema_routes.rs`
`schema_apply_route_hard_drops_property_with_allow_data_loss` (~L777,
`#[tokio::test(flavor = "multi_thread")]`).
- Error type: `OmniError::Manifest` / `ManifestConflictDetails::ExpectedVersionMismatch`
(`crates/omnigraph/src/error.rs`); raised by `check_expected_table_versions`
(`crates/omnigraph/src/db/manifest/publisher.rs`, ~L356).
- Recovery sweep + classifier: `crates/omnigraph/src/db/manifest/recovery.rs`
`TableClassification` (~L335), `classify_table` (~L600), roll-forward
(`heal_pending_sidecars_roll_forward` ~L761, `roll_forward_all` ~L1215, restore +
publish ~L1275).
- Schema-apply sidecar write: `crates/omnigraph/src/db/omnigraph/schema_apply.rs`
(the `SidecarKind` schema-apply pins; `db.coordinator.write().refresh()` ~L692).
- Open entry point that runs the sweep: `Omnigraph::open` (read-write mode) →
`db/manifest/recovery.rs` sweep.
- Repro: §2 above. Stress under `schema_routes` suite parallelism; 0% isolated.
---
## 9. Suggested next steps
1. Add tracing at the recovery roll-forward error site (sidecar kind/id, pins,
observed vs expected) and capture a failing run (§6.1, §6.2).
2. Reproduce deterministically at the engine level with a failpoint (§6.4) — this is
the red test (rule 12).
3. Implement the classifier-idempotency fix (§5) in a separate commit; confirm
red→green and that the stress loop goes to 0 failures over ≥50 iterations.
4. Keep it a standalone PR (not bundled with RFC-013 follow-ons).

View file

@ -94,7 +94,6 @@ Working documents for in-flight feature work. Removed when the work lands.
| Provider-independent embedding configuration — one resolved `EmbeddingConfig` + sealed provider enum (Gemini/OpenAI/Mock), identity recorded in the schema IR, query-time same-space validation, NFR floor | [rfc-012-embedding-provider-config.md](rfc-012-embedding-provider-config.md) |
| Write-path latency — capture-once `WriteTxn`, version-pinned opens, one `GraphPublishAuthority` fed declarative `PublishPlan`s, manifest-authoritative lineage, epoch fence, bounded history (compaction + cleanup), and an IO-counted cost contract (`iss-write-s3-roundtrip-amplification`, `iss-991`) | [rfc-013-write-path-latency.md](rfc-013-write-path-latency.md) |
| RFC-013 handoff — current-state map, latest validation, and concrete next actions for finishing write-path latency and correctness work | [handoff-rfc-013-write-path.md](handoff-rfc-013-write-path.md) |
| Schema-apply recovery flake handoff — investigation notes and validation plan for the intermittent schema-apply reopen race | [handoff-schema-apply-recovery-flake.md](handoff-schema-apply-recovery-flake.md) |
## Boundary

View file

@ -211,10 +211,21 @@ them explicit.
sweep has the same exposure, and always has): it may roll a live foreign
writer's sidecar forward, which degrades to publisher-CAS contention for
data writes but can race the schema-staging promotion for a foreign live
schema apply. Multi-process writers on one graph are already documented
one-winner-CAS territory; closing this fully needs a cross-process
serialization primitive (e.g. lease-based use of the schema-apply lock
branch) — design it before promoting multi-process write topologies.
schema apply. The roll-**forward** CAS contention is now
convergence-idempotent: when the publish loses the CAS to a concurrent
writer that already reached the sidecar's goal, the sweep treats it as
convergence (record the `RolledForward` audit + delete) rather than a fatal
`ExpectedVersionMismatch`, and defers when the manifest is only partway
(`converge_or_defer_roll_forward` in `db/manifest/recovery.rs`;
iss-schema-apply-reopen-recovery-race). So a concurrent advance no longer
fails the open. The schema-staging promotion race and the destructive
roll-**back** path (Lance `Restore` "trumps" a concurrent commit, so it
cannot be made idempotent — iss-recovery-sweep-live-writer-rollback) still
need the cross-process primitive. Multi-process writers on one graph are
already documented one-winner-CAS territory; closing this fully needs a
cross-process serialization primitive (e.g. lease-based use of the
schema-apply lock branch) — design it before promoting multi-process write
topologies.
- **Fork reclaim is in-process-safe only:** the first write to a table on a
branch forks it (a Lance `create_branch` that advances state before the
manifest publish). An interrupted fork (crash, or a cancelled request

View file

@ -46,7 +46,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
| `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths |
| `policy_engine_chassis.rs` | Engine-layer Cedar enforcement (MR-722): allow + deny through every `_as` writer via the SDK directly — no HTTP — proving embedded and CLI callers hit the same gate as the server, with action × scope shapes matching `authorize_request` |
| `maintenance.rs` | `optimize` (compaction), `repair` (explicit uncovered-drift publish), and `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation; `optimize` publishes its own compaction (`optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds`), skips pre-existing uncovered drift (`optimize_skips_preexisting_manifest_head_drift`), and refuses to run while a `__recovery` sidecar is pending (`optimize_defers_when_recovery_sidecar_is_pending`); `repair` previews/heals verified maintenance drift, refuses raw semantic drift without `--force`, and forced repair publishes only by explicit operator choice; the index reconciler (iss-848): `index_build_tolerates_null_vector_rows` (an untrainable Vector column defers instead of aborting the build, sibling indexes still build) and `optimize_materializes_index_declared_but_unbuilt` (optimize creates a declared-but-deferred index) |
| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`) and the write-entry in-process heal contract (the four `*_after_finalize_publisher_failure_heals_without_reopen` tests — load, mutation, schema apply, branch merge: a follow-up write on the same handle rolls a sidecar-covered residual forward without reopen/refresh) and the storage-fault matrix for the sidecar lifecycle (`recovery.sidecar_{write,delete,list}` / `recovery.record_audit` failpoints: Phase A put failure aborts with zero drift, Phase D delete failure is swallowed and healed by the next write, list failures are loud at heal and open, audit-append failures are retried to exactly one audit row; plus the bucket-gated `s3_load_recovers_after_publisher_failure_without_reopen`). |
| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`) and the write-entry in-process heal contract (the four `*_after_finalize_publisher_failure_heals_without_reopen` tests — load, mutation, schema apply, branch merge: a follow-up write on the same handle rolls a sidecar-covered residual forward without reopen/refresh) and the storage-fault matrix for the sidecar lifecycle (`recovery.sidecar_{write,delete,list}` / `recovery.record_audit` failpoints: Phase A put failure aborts with zero drift, Phase D delete failure is swallowed and healed by the next write, list failures are loud at heal and open, audit-append failures are retried to exactly one audit row; plus the bucket-gated `s3_load_recovers_after_publisher_failure_without_reopen`) and the convergence-idempotent roll-forward regression (`open_sweep_roll_forward_converges_when_manifest_advances_concurrently`: two concurrent open-sweeps race one sidecar at the `recovery.before_roll_forward_publish` rendezvous; the CAS loser must converge, not fail the open — iss-schema-apply-reopen-recovery-race). |
| `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path |
| `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories, post-optimize and post-cleanup strict writes). |
@ -64,10 +64,12 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
## Failpoints (fault injection)
- Cargo feature: `failpoints = ["dep:fail", "fail/failpoints"]` (in `crates/omnigraph/Cargo.toml` **and** `crates/omnigraph-cluster/Cargo.toml`; the cluster feature does not enable the engine's).
- Wrappers: `crates/omnigraph/src/failpoints.rs` and `crates/omnigraph-cluster/src/failpoints.rs` expose `maybe_fail("name")` and `ScopedFailPoint` for tests.
- Call sites are inserted at sensitive transaction boundaries (branch create, graph publish commit, cluster apply's payload→state-write window, etc.).
- Activated tests: `crates/omnigraph/tests/failpoints.rs` and `crates/omnigraph-cluster/tests/failpoints.rs` (crash-mid-apply + state CAS race via `fail::cfg_callback`; integration binaries, never in-source — the fail registry is process-global). Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints` / `cargo test -p omnigraph-cluster --features failpoints --test failpoints`.
- Cargo feature: `failpoints = ["dep:fail", "fail/failpoints"]` in `crates/omnigraph/Cargo.toml`; the cluster's `failpoints` feature additionally enables `omnigraph/failpoints` (`crates/omnigraph-cluster/Cargo.toml`), so the shared test guard is available to cluster tests.
- Wrappers: `crates/omnigraph/src/failpoints.rs` and `crates/omnigraph-cluster/src/failpoints.rs` each expose `maybe_fail("name")` (per-crate error type). The test-side config guard `ScopedFailPoint` (`new` for action strings, `with_callback` for callbacks; RAII `Drop` removes the point) lives **once** in the engine and is reused by both test binaries.
- **Names are compile-checked.** Every failpoint name is a `pub const` in `omnigraph::failpoints::names` (engine) / `omnigraph_cluster::failpoints::names` (cluster). Call sites and tests reference the constant, never a bare literal — a typo is a compile error, not a silently-never-firing point. Add a new failpoint by adding its const first.
- Call sites are inserted at sensitive transaction boundaries (branch create, graph publish commit, the recovery sweep's classify→roll-forward-publish window, cluster apply's payload→state-write window, etc.).
- **Serialize and rendezvous, never sleep.** The `fail` registry is process-global, so every failpoint test carries `#[serial]` (`serial_test`). For concurrent tests, use `helpers::failpoint::Rendezvous` (`tests/helpers/failpoint.rs`): `park_first(name)` parks the first thread to hit the point until `release()`, and `wait_until_reached().await` blocks on that condition (it doubles as a fired-assertion). Do not coordinate threads with fixed `sleep`s.
- Activated tests: `crates/omnigraph/tests/failpoints.rs` and `crates/omnigraph-cluster/tests/failpoints.rs` (integration binaries, never in-source — the fail registry is process-global). Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints` / `cargo test -p omnigraph-cluster --features failpoints --test failpoints`.
## RustFS / S3 integration