diff --git a/crates/omnigraph/src/db/manifest/publisher.rs b/crates/omnigraph/src/db/manifest/publisher.rs index ba1166d..1f4ad51 100644 --- a/crates/omnigraph/src/db/manifest/publisher.rs +++ b/crates/omnigraph/src/db/manifest/publisher.rs @@ -436,6 +436,29 @@ fn map_lance_publish_error(err: LanceError) -> OmniError { err )); } + // A *retryable* fragment-level commit conflict. The merge-insert publish runs + // with `conflict_retries(0)` (one rebase attempt), so when a concurrent + // transaction preempts it at the manifest-version level Lance returns + // `RetryableCommitConflict` rather than the row-level `TooMuchWriteContention`. + // The concrete source that motivated handling this: `optimize` now compacts + // `__manifest` itself (RFC-013 step 2), and that compaction is an + // `Operation::Rewrite` committed concurrently with no app-level lock (Lance OCC + // arbitrates — validated against LanceDB). If the Rewrite lands between this + // publish's state load and its commit AND our merge touched a fragment the + // Rewrite rewrote, Lance preempts us with `RetryableCommitConflict`. Lance + // itself says "please retry"; treat it like row-level contention so the + // publisher's outer loop reloads fresh state and re-merges — a maintenance + // compaction (a physical op) must never fail a live write (a logical op), + // invariant 7. (Normal publishes are insert-only — new `object_id`s land in new + // fragments, disjoint from rewritten old ones — so this is rare; the guard is + // for the same-fragment-update edge and for multi-process writers generally.) + if matches!(err, LanceError::RetryableCommitConflict { .. }) { + return OmniError::manifest_retryable_commit_conflict(format!( + "manifest publish was preempted by a concurrent transaction (e.g. an \ + optimize compaction Rewrite of __manifest); reload and retry: {}", + err + )); + } OmniError::Lance(err.to_string()) } @@ -490,20 +513,74 @@ impl ManifestBatchPublisher for GraphNamespacePublisher { } } -/// A retryable conflict here means: Lance's row-level CAS rejected our commit -/// because someone else landed an `object_id` we were also inserting (mapped -/// from `Error::TooMuchWriteContention` to -/// `ManifestConflictDetails::RowLevelCasContention`). This is transparent -/// contention; if the caller's `expected_table_versions` still holds against -/// the new manifest state, we re-attempt. Other conflict variants (notably -/// `ExpectedVersionMismatch`) propagate so the caller learns immediately. +/// A retryable conflict here means a concurrent writer preempted our commit but +/// our `expected_table_versions` still holds against fresh state, so re-loading +/// and re-merging is safe and correct. Two shapes qualify: +/// +/// - `RowLevelCasContention` — Lance's row-level CAS rejected us because someone +/// landed an `object_id` we were also inserting (from `TooMuchWriteContention`). +/// - `RetryableCommitConflict` — a fragment-level preemption (from Lance's +/// `Error::RetryableCommitConflict`), notably an `optimize` compaction +/// `Operation::Rewrite` of `__manifest` racing our merge (RFC-013 step 2). Lance +/// marks it retryable; retrying upholds invariant 7 (a maintenance compaction +/// must not fail a live write) rather than surfacing a transient error to the +/// caller. See `map_lance_publish_error` for the full rationale. +/// +/// Other conflict variants (notably `ExpectedVersionMismatch`) propagate so the +/// caller learns immediately — the expectation genuinely broke and a blind retry +/// would be wrong. fn is_retryable_publish_conflict(err: &OmniError) -> bool { matches!( err, OmniError::Manifest(m) if matches!( m.details, - Some(crate::error::ManifestConflictDetails::RowLevelCasContention) + Some( + crate::error::ManifestConflictDetails::RowLevelCasContention + | crate::error::ManifestConflictDetails::RetryableCommitConflict + ) ) ) } + +#[cfg(test)] +mod tests { + use super::*; + + /// The motivating case: an `optimize` compaction `Rewrite` of `__manifest` + /// preempts a concurrent publish, which Lance reports as + /// `Error::RetryableCommitConflict`. The publisher must classify it retryable + /// (invariant 7: a maintenance compaction must not fail a live write). + #[test] + fn retryable_commit_conflict_is_classified_retryable() { + let err = map_lance_publish_error(LanceError::retryable_commit_conflict_source( + 7, + Box::new(std::io::Error::other("preempted by concurrent Rewrite")), + )); + assert!( + is_retryable_publish_conflict(&err), + "RetryableCommitConflict must be retried, got {err:?}" + ); + } + + /// The retry predicate's full contract: row-level CAS contention and a + /// retryable commit conflict are both retried; a genuine expectation break and + /// an opaque storage error are not. + #[test] + fn retry_predicate_contract() { + assert!(is_retryable_publish_conflict( + &OmniError::manifest_retryable_commit_conflict("rewrite preemption") + )); + assert!(is_retryable_publish_conflict( + &OmniError::manifest_row_level_cas_contention("row-level race") + )); + // A genuine expectation break must propagate so the caller learns. + assert!(!is_retryable_publish_conflict( + &OmniError::manifest_expected_version_mismatch("node:Person", 5, 7) + )); + // Opaque storage errors are not retryable conflicts. + assert!(!is_retryable_publish_conflict(&OmniError::Lance( + "some io error".into() + ))); + } +} diff --git a/crates/omnigraph/src/error.rs b/crates/omnigraph/src/error.rs index a24f153..1bf321e 100644 --- a/crates/omnigraph/src/error.rs +++ b/crates/omnigraph/src/error.rs @@ -27,6 +27,16 @@ pub enum ManifestConflictDetails { /// `ExpectedVersionMismatch`: the caller's expectations (if any) still /// hold against the new manifest state, so the publisher will retry. RowLevelCasContention, + /// Lance rejected the publish with a *retryable* fragment-level commit + /// conflict (`Error::RetryableCommitConflict`): a concurrent transaction + /// preempted ours at the manifest-version level — notably an `optimize` + /// compaction `Operation::Rewrite` of `__manifest` landing in the window + /// between our state load and our commit, when our merge touched a fragment + /// it rewrote. Like `RowLevelCasContention` and unlike + /// `ExpectedVersionMismatch`, the caller's expectations still hold against + /// fresh state, so the publisher reloads and retries — a maintenance + /// compaction must never fail a live write (invariant 7). + RetryableCommitConflict, } #[derive(Debug, Clone, Error)] @@ -146,4 +156,11 @@ impl OmniError { .with_details(ManifestConflictDetails::RowLevelCasContention), ) } + + pub fn manifest_retryable_commit_conflict(message: impl Into) -> Self { + Self::Manifest( + ManifestError::new(ManifestErrorKind::Conflict, message) + .with_details(ManifestConflictDetails::RetryableCommitConflict), + ) + } }