mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-24 02:38:06 +02:00
fix(engine): retry publish on RetryableCommitConflict (compaction vs publish)
Step 2 compacts `__manifest` with no app-level lock (Lance OCC arbitrates, validated against LanceDB + the lance-7.0.0 conflict resolver). compact_files' `Operation::Rewrite` auto-retries 20x (CommitConfig default num_retries=20), so a live publish usually wins the race and the compaction rebases. But the publish runs its merge-insert with conflict_retries(0) = one rebase attempt; if the compaction commits first AND the merge touched a fragment the Rewrite rewrote, Lance preempts the publish with `Error::RetryableCommitConflict` — a DIFFERENT variant from the row-level `TooMuchWriteContention` the publisher already retries. Left unhandled, that surfaces a transient error to the caller, i.e. a maintenance compaction (physical op) failing a live write (logical op) — invariant 7. Map `LanceError::RetryableCommitConflict` to a new `ManifestConflictDetails::RetryableCommitConflict` and treat it as retryable in the publisher's outer loop (reload fresh state + re-merge), alongside RowLevelCasContention. `ExpectedVersionMismatch` still propagates (a genuine expectation break must not be blindly retried). This also hardens multi-process concurrent writers generally, not just compaction. Normal publishes are insert-only (new object_ids -> new fragments, disjoint from rewritten old ones), so the conflict is rare; the guard covers the same-fragment-update edge and multi-process writers. Unit tests in publisher.rs pin the mapping + the retry-predicate contract.
This commit is contained in:
parent
b2f65062c8
commit
d138902e7e
2 changed files with 102 additions and 8 deletions
|
|
@ -436,6 +436,29 @@ fn map_lance_publish_error(err: LanceError) -> OmniError {
|
||||||
err
|
err
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
// A *retryable* fragment-level commit conflict. The merge-insert publish runs
|
||||||
|
// with `conflict_retries(0)` (one rebase attempt), so when a concurrent
|
||||||
|
// transaction preempts it at the manifest-version level Lance returns
|
||||||
|
// `RetryableCommitConflict` rather than the row-level `TooMuchWriteContention`.
|
||||||
|
// The concrete source that motivated handling this: `optimize` now compacts
|
||||||
|
// `__manifest` itself (RFC-013 step 2), and that compaction is an
|
||||||
|
// `Operation::Rewrite` committed concurrently with no app-level lock (Lance OCC
|
||||||
|
// arbitrates — validated against LanceDB). If the Rewrite lands between this
|
||||||
|
// publish's state load and its commit AND our merge touched a fragment the
|
||||||
|
// Rewrite rewrote, Lance preempts us with `RetryableCommitConflict`. Lance
|
||||||
|
// itself says "please retry"; treat it like row-level contention so the
|
||||||
|
// publisher's outer loop reloads fresh state and re-merges — a maintenance
|
||||||
|
// compaction (a physical op) must never fail a live write (a logical op),
|
||||||
|
// invariant 7. (Normal publishes are insert-only — new `object_id`s land in new
|
||||||
|
// fragments, disjoint from rewritten old ones — so this is rare; the guard is
|
||||||
|
// for the same-fragment-update edge and for multi-process writers generally.)
|
||||||
|
if matches!(err, LanceError::RetryableCommitConflict { .. }) {
|
||||||
|
return OmniError::manifest_retryable_commit_conflict(format!(
|
||||||
|
"manifest publish was preempted by a concurrent transaction (e.g. an \
|
||||||
|
optimize compaction Rewrite of __manifest); reload and retry: {}",
|
||||||
|
err
|
||||||
|
));
|
||||||
|
}
|
||||||
OmniError::Lance(err.to_string())
|
OmniError::Lance(err.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -490,20 +513,74 @@ impl ManifestBatchPublisher for GraphNamespacePublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A retryable conflict here means: Lance's row-level CAS rejected our commit
|
/// A retryable conflict here means a concurrent writer preempted our commit but
|
||||||
/// because someone else landed an `object_id` we were also inserting (mapped
|
/// our `expected_table_versions` still holds against fresh state, so re-loading
|
||||||
/// from `Error::TooMuchWriteContention` to
|
/// and re-merging is safe and correct. Two shapes qualify:
|
||||||
/// `ManifestConflictDetails::RowLevelCasContention`). This is transparent
|
///
|
||||||
/// contention; if the caller's `expected_table_versions` still holds against
|
/// - `RowLevelCasContention` — Lance's row-level CAS rejected us because someone
|
||||||
/// the new manifest state, we re-attempt. Other conflict variants (notably
|
/// landed an `object_id` we were also inserting (from `TooMuchWriteContention`).
|
||||||
/// `ExpectedVersionMismatch`) propagate so the caller learns immediately.
|
/// - `RetryableCommitConflict` — a fragment-level preemption (from Lance's
|
||||||
|
/// `Error::RetryableCommitConflict`), notably an `optimize` compaction
|
||||||
|
/// `Operation::Rewrite` of `__manifest` racing our merge (RFC-013 step 2). Lance
|
||||||
|
/// marks it retryable; retrying upholds invariant 7 (a maintenance compaction
|
||||||
|
/// must not fail a live write) rather than surfacing a transient error to the
|
||||||
|
/// caller. See `map_lance_publish_error` for the full rationale.
|
||||||
|
///
|
||||||
|
/// Other conflict variants (notably `ExpectedVersionMismatch`) propagate so the
|
||||||
|
/// caller learns immediately — the expectation genuinely broke and a blind retry
|
||||||
|
/// would be wrong.
|
||||||
fn is_retryable_publish_conflict(err: &OmniError) -> bool {
|
fn is_retryable_publish_conflict(err: &OmniError) -> bool {
|
||||||
matches!(
|
matches!(
|
||||||
err,
|
err,
|
||||||
OmniError::Manifest(m)
|
OmniError::Manifest(m)
|
||||||
if matches!(
|
if matches!(
|
||||||
m.details,
|
m.details,
|
||||||
Some(crate::error::ManifestConflictDetails::RowLevelCasContention)
|
Some(
|
||||||
|
crate::error::ManifestConflictDetails::RowLevelCasContention
|
||||||
|
| crate::error::ManifestConflictDetails::RetryableCommitConflict
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// The motivating case: an `optimize` compaction `Rewrite` of `__manifest`
|
||||||
|
/// preempts a concurrent publish, which Lance reports as
|
||||||
|
/// `Error::RetryableCommitConflict`. The publisher must classify it retryable
|
||||||
|
/// (invariant 7: a maintenance compaction must not fail a live write).
|
||||||
|
#[test]
|
||||||
|
fn retryable_commit_conflict_is_classified_retryable() {
|
||||||
|
let err = map_lance_publish_error(LanceError::retryable_commit_conflict_source(
|
||||||
|
7,
|
||||||
|
Box::new(std::io::Error::other("preempted by concurrent Rewrite")),
|
||||||
|
));
|
||||||
|
assert!(
|
||||||
|
is_retryable_publish_conflict(&err),
|
||||||
|
"RetryableCommitConflict must be retried, got {err:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The retry predicate's full contract: row-level CAS contention and a
|
||||||
|
/// retryable commit conflict are both retried; a genuine expectation break and
|
||||||
|
/// an opaque storage error are not.
|
||||||
|
#[test]
|
||||||
|
fn retry_predicate_contract() {
|
||||||
|
assert!(is_retryable_publish_conflict(
|
||||||
|
&OmniError::manifest_retryable_commit_conflict("rewrite preemption")
|
||||||
|
));
|
||||||
|
assert!(is_retryable_publish_conflict(
|
||||||
|
&OmniError::manifest_row_level_cas_contention("row-level race")
|
||||||
|
));
|
||||||
|
// A genuine expectation break must propagate so the caller learns.
|
||||||
|
assert!(!is_retryable_publish_conflict(
|
||||||
|
&OmniError::manifest_expected_version_mismatch("node:Person", 5, 7)
|
||||||
|
));
|
||||||
|
// Opaque storage errors are not retryable conflicts.
|
||||||
|
assert!(!is_retryable_publish_conflict(&OmniError::Lance(
|
||||||
|
"some io error".into()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,16 @@ pub enum ManifestConflictDetails {
|
||||||
/// `ExpectedVersionMismatch`: the caller's expectations (if any) still
|
/// `ExpectedVersionMismatch`: the caller's expectations (if any) still
|
||||||
/// hold against the new manifest state, so the publisher will retry.
|
/// hold against the new manifest state, so the publisher will retry.
|
||||||
RowLevelCasContention,
|
RowLevelCasContention,
|
||||||
|
/// Lance rejected the publish with a *retryable* fragment-level commit
|
||||||
|
/// conflict (`Error::RetryableCommitConflict`): a concurrent transaction
|
||||||
|
/// preempted ours at the manifest-version level — notably an `optimize`
|
||||||
|
/// compaction `Operation::Rewrite` of `__manifest` landing in the window
|
||||||
|
/// between our state load and our commit, when our merge touched a fragment
|
||||||
|
/// it rewrote. Like `RowLevelCasContention` and unlike
|
||||||
|
/// `ExpectedVersionMismatch`, the caller's expectations still hold against
|
||||||
|
/// fresh state, so the publisher reloads and retries — a maintenance
|
||||||
|
/// compaction must never fail a live write (invariant 7).
|
||||||
|
RetryableCommitConflict,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Error)]
|
#[derive(Debug, Clone, Error)]
|
||||||
|
|
@ -146,4 +156,11 @@ impl OmniError {
|
||||||
.with_details(ManifestConflictDetails::RowLevelCasContention),
|
.with_details(ManifestConflictDetails::RowLevelCasContention),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn manifest_retryable_commit_conflict(message: impl Into<String>) -> Self {
|
||||||
|
Self::Manifest(
|
||||||
|
ManifestError::new(ManifestErrorKind::Conflict, message)
|
||||||
|
.with_details(ManifestConflictDetails::RetryableCommitConflict),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue