fix(engine): retry publish on RetryableCommitConflict (compaction vs publish)

Step 2 compacts `__manifest` with no app-level lock (Lance OCC arbitrates,
validated against LanceDB + the lance-7.0.0 conflict resolver). compact_files'
`Operation::Rewrite` auto-retries 20x (CommitConfig default num_retries=20), so a
live publish usually wins the race and the compaction rebases. But the publish
runs its merge-insert with conflict_retries(0) = one rebase attempt; if the
compaction commits first AND the merge touched a fragment the Rewrite rewrote,
Lance preempts the publish with `Error::RetryableCommitConflict` — a DIFFERENT
variant from the row-level `TooMuchWriteContention` the publisher already retries.
Left unhandled, that surfaces a transient error to the caller, i.e. a maintenance
compaction (physical op) failing a live write (logical op) — invariant 7.

Map `LanceError::RetryableCommitConflict` to a new
`ManifestConflictDetails::RetryableCommitConflict` and treat it as retryable in the
publisher's outer loop (reload fresh state + re-merge), alongside
RowLevelCasContention. `ExpectedVersionMismatch` still propagates (a genuine
expectation break must not be blindly retried). This also hardens multi-process
concurrent writers generally, not just compaction.

Normal publishes are insert-only (new object_ids -> new fragments, disjoint from
rewritten old ones), so the conflict is rare; the guard covers the
same-fragment-update edge and multi-process writers. Unit tests in publisher.rs
pin the mapping + the retry-predicate contract.
This commit is contained in:
Ragnor Comerford 2026-06-20 20:49:38 +02:00
parent b2f65062c8
commit d138902e7e
2 changed files with 102 additions and 8 deletions

View file

@ -436,6 +436,29 @@ fn map_lance_publish_error(err: LanceError) -> OmniError {
err err
)); ));
} }
// A *retryable* fragment-level commit conflict. The merge-insert publish runs
// with `conflict_retries(0)` (one rebase attempt), so when a concurrent
// transaction preempts it at the manifest-version level Lance returns
// `RetryableCommitConflict` rather than the row-level `TooMuchWriteContention`.
// The concrete source that motivated handling this: `optimize` now compacts
// `__manifest` itself (RFC-013 step 2), and that compaction is an
// `Operation::Rewrite` committed concurrently with no app-level lock (Lance OCC
// arbitrates — validated against LanceDB). If the Rewrite lands between this
// publish's state load and its commit AND our merge touched a fragment the
// Rewrite rewrote, Lance preempts us with `RetryableCommitConflict`. Lance
// itself says "please retry"; treat it like row-level contention so the
// publisher's outer loop reloads fresh state and re-merges — a maintenance
// compaction (a physical op) must never fail a live write (a logical op),
// invariant 7. (Normal publishes are insert-only — new `object_id`s land in new
// fragments, disjoint from rewritten old ones — so this is rare; the guard is
// for the same-fragment-update edge and for multi-process writers generally.)
if matches!(err, LanceError::RetryableCommitConflict { .. }) {
return OmniError::manifest_retryable_commit_conflict(format!(
"manifest publish was preempted by a concurrent transaction (e.g. an \
optimize compaction Rewrite of __manifest); reload and retry: {}",
err
));
}
OmniError::Lance(err.to_string()) OmniError::Lance(err.to_string())
} }
@ -490,20 +513,74 @@ impl ManifestBatchPublisher for GraphNamespacePublisher {
} }
} }
/// A retryable conflict here means: Lance's row-level CAS rejected our commit /// A retryable conflict here means a concurrent writer preempted our commit but
/// because someone else landed an `object_id` we were also inserting (mapped /// our `expected_table_versions` still holds against fresh state, so re-loading
/// from `Error::TooMuchWriteContention` to /// and re-merging is safe and correct. Two shapes qualify:
/// `ManifestConflictDetails::RowLevelCasContention`). This is transparent ///
/// contention; if the caller's `expected_table_versions` still holds against /// - `RowLevelCasContention` — Lance's row-level CAS rejected us because someone
/// the new manifest state, we re-attempt. Other conflict variants (notably /// landed an `object_id` we were also inserting (from `TooMuchWriteContention`).
/// `ExpectedVersionMismatch`) propagate so the caller learns immediately. /// - `RetryableCommitConflict` — a fragment-level preemption (from Lance's
/// `Error::RetryableCommitConflict`), notably an `optimize` compaction
/// `Operation::Rewrite` of `__manifest` racing our merge (RFC-013 step 2). Lance
/// marks it retryable; retrying upholds invariant 7 (a maintenance compaction
/// must not fail a live write) rather than surfacing a transient error to the
/// caller. See `map_lance_publish_error` for the full rationale.
///
/// Other conflict variants (notably `ExpectedVersionMismatch`) propagate so the
/// caller learns immediately — the expectation genuinely broke and a blind retry
/// would be wrong.
fn is_retryable_publish_conflict(err: &OmniError) -> bool { fn is_retryable_publish_conflict(err: &OmniError) -> bool {
matches!( matches!(
err, err,
OmniError::Manifest(m) OmniError::Manifest(m)
if matches!( if matches!(
m.details, m.details,
Some(crate::error::ManifestConflictDetails::RowLevelCasContention) Some(
crate::error::ManifestConflictDetails::RowLevelCasContention
| crate::error::ManifestConflictDetails::RetryableCommitConflict
)
) )
) )
} }
#[cfg(test)]
mod tests {
use super::*;
/// The motivating case: an `optimize` compaction `Rewrite` of `__manifest`
/// preempts a concurrent publish, which Lance reports as
/// `Error::RetryableCommitConflict`. The publisher must classify it retryable
/// (invariant 7: a maintenance compaction must not fail a live write).
#[test]
fn retryable_commit_conflict_is_classified_retryable() {
let err = map_lance_publish_error(LanceError::retryable_commit_conflict_source(
7,
Box::new(std::io::Error::other("preempted by concurrent Rewrite")),
));
assert!(
is_retryable_publish_conflict(&err),
"RetryableCommitConflict must be retried, got {err:?}"
);
}
/// The retry predicate's full contract: row-level CAS contention and a
/// retryable commit conflict are both retried; a genuine expectation break and
/// an opaque storage error are not.
#[test]
fn retry_predicate_contract() {
assert!(is_retryable_publish_conflict(
&OmniError::manifest_retryable_commit_conflict("rewrite preemption")
));
assert!(is_retryable_publish_conflict(
&OmniError::manifest_row_level_cas_contention("row-level race")
));
// A genuine expectation break must propagate so the caller learns.
assert!(!is_retryable_publish_conflict(
&OmniError::manifest_expected_version_mismatch("node:Person", 5, 7)
));
// Opaque storage errors are not retryable conflicts.
assert!(!is_retryable_publish_conflict(&OmniError::Lance(
"some io error".into()
)));
}
}

View file

@ -27,6 +27,16 @@ pub enum ManifestConflictDetails {
/// `ExpectedVersionMismatch`: the caller's expectations (if any) still /// `ExpectedVersionMismatch`: the caller's expectations (if any) still
/// hold against the new manifest state, so the publisher will retry. /// hold against the new manifest state, so the publisher will retry.
RowLevelCasContention, RowLevelCasContention,
/// Lance rejected the publish with a *retryable* fragment-level commit
/// conflict (`Error::RetryableCommitConflict`): a concurrent transaction
/// preempted ours at the manifest-version level — notably an `optimize`
/// compaction `Operation::Rewrite` of `__manifest` landing in the window
/// between our state load and our commit, when our merge touched a fragment
/// it rewrote. Like `RowLevelCasContention` and unlike
/// `ExpectedVersionMismatch`, the caller's expectations still hold against
/// fresh state, so the publisher reloads and retries — a maintenance
/// compaction must never fail a live write (invariant 7).
RetryableCommitConflict,
} }
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
@ -146,4 +156,11 @@ impl OmniError {
.with_details(ManifestConflictDetails::RowLevelCasContention), .with_details(ManifestConflictDetails::RowLevelCasContention),
) )
} }
pub fn manifest_retryable_commit_conflict(message: impl Into<String>) -> Self {
Self::Manifest(
ManifestError::new(ManifestErrorKind::Conflict, message)
.with_details(ManifestConflictDetails::RetryableCommitConflict),
)
}
} }