diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 4a67130..651c10f 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -286,12 +286,43 @@ impl ManifestCoordinator { self.commit_changes(&changes).await } + /// Same as [`commit`], but with caller-supplied per-table expected + /// versions used for optimistic concurrency control. Each entry asserts + /// the manifest's current latest non-tombstoned `table_version` for that + /// `table_key` is exactly what the caller observed; mismatches surface + /// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`. + pub async fn commit_with_expected( + &mut self, + updates: &[SubTableUpdate], + expected_table_versions: &HashMap, + ) -> Result { + let changes = updates + .iter() + .cloned() + .map(ManifestChange::Update) + .collect::>(); + self.commit_changes_with_expected(&changes, expected_table_versions) + .await + } + pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result { - if changes.is_empty() { + self.commit_changes_with_expected(changes, &HashMap::new()) + .await + } + + pub(crate) async fn commit_changes_with_expected( + &mut self, + changes: &[ManifestChange], + expected_table_versions: &HashMap, + ) -> Result { + if changes.is_empty() && expected_table_versions.is_empty() { return Ok(self.version()); } - self.dataset = self.publisher.publish(changes).await?; + self.dataset = self + .publisher + .publish(changes, expected_table_versions) + .await?; self.known_state = read_manifest_state(&self.dataset).await?; Ok(self.version()) diff --git a/crates/omnigraph/src/db/manifest/publisher.rs b/crates/omnigraph/src/db/manifest/publisher.rs index f200ed4..0de87bb 100644 --- a/crates/omnigraph/src/db/manifest/publisher.rs +++ b/crates/omnigraph/src/db/manifest/publisher.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow_array::RecordBatchIterator; use async_trait::async_trait; use lance::Dataset; +use lance::Error as LanceError; use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched}; use lance_namespace::NamespaceError; use lance_namespace::models::CreateTableVersionRequest; @@ -38,9 +39,20 @@ use super::{ SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone, }; +/// Bound on the publisher-level retry loop that wraps Lance's row-level CAS +/// (`TooMuchWriteContention`). Lance's own `conflict_retries` is set to 0 in +/// `merge_rows` because its auto-rebase is "transparent merge" semantics — +/// wrong for an OCC contract — so retry is owned here instead, where each +/// iteration re-runs `load_publish_state` and the expected-version pre-check. +const PUBLISHER_RETRY_BUDGET: u32 = 5; + #[async_trait] pub(super) trait ManifestBatchPublisher: Send + Sync { - async fn publish(&self, changes: &[ManifestChange]) -> Result; + async fn publish( + &self, + changes: &[ManifestChange], + expected_table_versions: &HashMap, + ) -> Result; } pub(super) struct GraphNamespacePublisher { @@ -279,6 +291,77 @@ impl GraphNamespacePublisher { ) } + /// Reduce the loaded `(table_key, table_version) → entry` map and the + /// tombstone set to "latest non-tombstoned version per table" — the same + /// reduction performed by `read_manifest_state` on the visible snapshot. + /// Tombstoned tables fall back to their highest tombstone version so that + /// the resulting `actual` reported in `ExpectedVersionMismatch` is + /// meaningful even when the caller's expected table no longer exists. + fn latest_visible_per_table( + existing_versions: &HashMap<(String, u64), SubTableEntry>, + existing_tombstones: &HashMap<(String, u64), ()>, + ) -> HashMap { + let mut max_tombstones = HashMap::::new(); + for (key, version) in existing_tombstones.keys() { + max_tombstones + .entry(key.clone()) + .and_modify(|v| { + if *version > *v { + *v = *version; + } + }) + .or_insert(*version); + } + + let mut latest = HashMap::::new(); + for (key, version) in existing_versions.keys() { + let tombstoned = max_tombstones + .get(key) + .map(|t| *t >= *version) + .unwrap_or(false); + if tombstoned { + continue; + } + latest + .entry(key.clone()) + .and_modify(|v| { + if *version > *v { + *v = *version; + } + }) + .or_insert(*version); + } + + // For tables that have only tombstones (no visible entry), surface the + // tombstone version so callers see a non-zero `actual`. + for (key, tombstone) in &max_tombstones { + latest.entry(key.clone()).or_insert(*tombstone); + } + + latest + } + + /// Compare each caller-supplied expectation against the manifest's current + /// latest visible version per table. The first mismatch is returned as a + /// typed `ExpectedVersionMismatch` (`actual = 0` if the table isn't in the + /// manifest at all). + fn check_expected_table_versions( + latest_per_table: &HashMap, + expected: &HashMap, + ) -> Result<()> { + for (table_key, expected_version) in expected { + let actual = latest_per_table.get(table_key).copied().unwrap_or(0); + if actual != *expected_version { + return Err(OmniError::manifest_expected_version_mismatch( + table_key.clone(), + *expected_version, + actual, + )); + } + } + Ok(()) + } + async fn merge_rows(&self, dataset: Dataset, rows: Vec) -> Result { let batch = Self::pending_rows_to_batch(rows)?; let reader = RecordBatchIterator::new(vec![Ok(batch)], manifest_schema()); @@ -287,14 +370,18 @@ impl GraphNamespacePublisher { .map_err(|e| OmniError::Lance(e.to_string()))?; merge_builder.when_matched(WhenMatched::UpdateAll); merge_builder.when_not_matched(WhenNotMatched::InsertAll); - merge_builder.conflict_retries(5); + // 0 here is intentional: Lance's built-in retry uses transparent rebase, + // which would let a concurrent writer's row land alongside ours and + // silently break the OCC contract on `__manifest`. Retries are owned by + // the publisher loop above, where each attempt re-runs the pre-check. + merge_builder.conflict_retries(0); merge_builder.use_index(false); let (new_dataset, _stats) = merge_builder .try_build() .map_err(|e| OmniError::Lance(e.to_string()))? .execute_reader(Box::new(reader)) .await - .map_err(|e| OmniError::Lance(e.to_string()))?; + .map_err(map_lance_publish_error)?; Ok(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone())) } @@ -318,25 +405,90 @@ impl GraphNamespacePublisher { })) }) .collect::>>()?; - self.publish(&changes).await + self.publish(&changes, &HashMap::new()).await } } +/// `Error::TooMuchWriteContention` from Lance's row-level CAS bubbles up here +/// when a concurrent writer landed a row with the same `object_id` (the +/// merge-insert join key, annotated as an unenforced primary key on +/// `__manifest`). Translate it to a typed manifest conflict so callers can +/// match without parsing strings; everything else is opaque storage. +fn map_lance_publish_error(err: LanceError) -> OmniError { + if matches!(err, LanceError::TooMuchWriteContention { .. }) { + return OmniError::manifest_row_level_cas_contention(format!( + "manifest publish lost a row-level CAS race: {}", + err + )); + } + OmniError::Lance(err.to_string()) +} + #[async_trait] impl ManifestBatchPublisher for GraphNamespacePublisher { - async fn publish(&self, changes: &[ManifestChange]) -> Result { - if changes.is_empty() { + async fn publish( + &self, + changes: &[ManifestChange], + expected_table_versions: &HashMap, + ) -> Result { + if changes.is_empty() && expected_table_versions.is_empty() { return self.dataset().await; } - let (dataset, known_tables, existing_versions, existing_tombstones) = - self.load_publish_state().await?; - let rows = Self::build_pending_rows( - changes, - &known_tables, - &existing_versions, - &existing_tombstones, - )?; - self.merge_rows(dataset, rows).await + for attempt in 0..=PUBLISHER_RETRY_BUDGET { + let (dataset, known_tables, existing_versions, existing_tombstones) = + self.load_publish_state().await?; + + let latest_per_table = + Self::latest_visible_per_table(&existing_versions, &existing_tombstones); + // Pre-check on every attempt against freshly loaded state so a + // concurrent commit that broke the caller's expectation is + // surfaced as `ExpectedVersionMismatch` rather than retried. + Self::check_expected_table_versions(&latest_per_table, expected_table_versions)?; + + if changes.is_empty() { + return Ok(dataset); + } + + let rows = Self::build_pending_rows( + changes, + &known_tables, + &existing_versions, + &existing_tombstones, + )?; + + match self.merge_rows(dataset, rows).await { + Ok(new_dataset) => return Ok(new_dataset), + Err(err) => { + if attempt < PUBLISHER_RETRY_BUDGET && is_retryable_publish_conflict(&err) { + continue; + } + return Err(err); + } + } + } + + Err(OmniError::manifest_conflict(format!( + "manifest publish exhausted {} retries against concurrent writers", + PUBLISHER_RETRY_BUDGET + ))) } } + +/// A retryable conflict here means: Lance's row-level CAS rejected our commit +/// because someone else landed an `object_id` we were also inserting (mapped +/// from `Error::TooMuchWriteContention` to +/// `ManifestConflictDetails::RowLevelCasContention`). This is transparent +/// contention; if the caller's `expected_table_versions` still holds against +/// the new manifest state, we re-attempt. Other conflict variants (notably +/// `ExpectedVersionMismatch`) propagate so the caller learns immediately. +fn is_retryable_publish_conflict(err: &OmniError) -> bool { + matches!( + err, + OmniError::Manifest(m) + if matches!( + m.details, + Some(crate::error::ManifestConflictDetails::RowLevelCasContention) + ) + ) +} diff --git a/crates/omnigraph/src/db/manifest/state.rs b/crates/omnigraph/src/db/manifest/state.rs index eb36518..e222ede 100644 --- a/crates/omnigraph/src/db/manifest/state.rs +++ b/crates/omnigraph/src/db/manifest/state.rs @@ -42,8 +42,20 @@ struct ManifestScan { } pub(super) fn manifest_schema() -> SchemaRef { + // `object_id` is the merge-insert join key in the publisher; marking it as + // Lance's unenforced primary key engages row-level CAS at commit time, so + // two concurrent writers that try to land the same `object_id` row are + // detected by Lance via bloom-filter intersection (see + // `.context/merge-insert-cas-granularity.md`). Without this metadata, + // Lance's conflict resolver would silently rebase both writers' new + // fragments and admit duplicate rows. + let object_id_metadata: HashMap = + [("lance-schema:unenforced-primary-key", "true")] + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); Arc::new(Schema::new(vec![ - Field::new("object_id", DataType::Utf8, false), + Field::new("object_id", DataType::Utf8, false).with_metadata(object_id_metadata), Field::new("object_type", DataType::Utf8, false), Field::new("location", DataType::Utf8, true), Field::new("metadata", DataType::Utf8, true), diff --git a/crates/omnigraph/src/db/manifest/tests.rs b/crates/omnigraph/src/db/manifest/tests.rs index 9e14dd7..72738ea 100644 --- a/crates/omnigraph/src/db/manifest/tests.rs +++ b/crates/omnigraph/src/db/manifest/tests.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray, UInt64Array}; @@ -946,7 +947,11 @@ impl RecordingPublisher { #[async_trait] impl ManifestBatchPublisher for RecordingPublisher { - async fn publish(&self, changes: &[ManifestChange]) -> Result { + async fn publish( + &self, + changes: &[ManifestChange], + expected_table_versions: &HashMap, + ) -> Result { let requests: Vec = changes .iter() .filter_map(|change| match change { @@ -955,7 +960,7 @@ impl ManifestBatchPublisher for RecordingPublisher { }) .collect(); self.requests.lock().await.extend_from_slice(&requests); - self.inner.publish(changes).await + self.inner.publish(changes, expected_table_versions).await } } @@ -963,7 +968,11 @@ struct FailingPublisher; #[async_trait] impl ManifestBatchPublisher for FailingPublisher { - async fn publish(&self, _changes: &[ManifestChange]) -> Result { + async fn publish( + &self, + _changes: &[ManifestChange], + _expected_table_versions: &HashMap, + ) -> Result { Err(OmniError::manifest( "injected batch publisher failure".to_string(), )) @@ -1107,6 +1116,286 @@ async fn test_commit_failure_from_injected_batch_publisher_preserves_visible_sta ); } +/// Drive Person to a fresh on-disk dataset version `v` (returns the new +/// version number) and produce a `SubTableUpdate` ready to publish. +async fn append_person_and_make_update( + uri: &str, + person_entry: &SubTableEntry, + name: &str, +) -> SubTableUpdate { + let mut person_ds = Dataset::open(&format!("{}/{}", uri, person_entry.table_path)) + .await + .unwrap(); + let person_schema = Arc::new(person_ds.schema().into()); + let row = RecordBatch::try_new( + Arc::clone(&person_schema), + vec![ + Arc::new(StringArray::from(vec![format!("person-{name}")])), + Arc::new(StringArray::from(vec![Some(name.to_string())])), + Arc::new(Int32Array::from(vec![Some(30)])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(row)], person_schema); + person_ds.append(reader, None).await.unwrap(); + let new_version = person_ds.version().version; + let version_metadata = + table_version_metadata_for_state(uri, &person_entry.table_path, None, new_version) + .await + .unwrap(); + SubTableUpdate { + table_key: "node:Person".to_string(), + table_version: new_version, + table_branch: None, + row_count: 1, + version_metadata, + } +} + +#[tokio::test] +async fn test_commit_with_expected_accepts_matching_versions() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap(); + let snap = mc.snapshot(); + let person_entry = snap.entry("node:Person").unwrap().clone(); + + let update = append_person_and_make_update(uri, &person_entry, "Alice").await; + let mut expected = HashMap::new(); + // After init, every table is at table_version=1 — assert that. + expected.insert("node:Person".to_string(), 1); + expected.insert("node:Company".to_string(), 1); + + mc.commit_with_expected(&[update.clone()], &expected) + .await + .expect("matching expected versions should publish cleanly"); + + let after = mc.snapshot(); + assert_eq!( + after.entry("node:Person").unwrap().table_version, + update.table_version + ); +} + +#[tokio::test] +async fn test_commit_with_expected_rejects_stale_with_typed_details() { + use crate::error::ManifestConflictDetails; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap(); + let snap = mc.snapshot(); + let person_entry = snap.entry("node:Person").unwrap().clone(); + + // Writer A advances Person. + let update_a = append_person_and_make_update(uri, &person_entry, "Alice").await; + let advanced_version = update_a.table_version; + mc.commit(&[update_a]).await.unwrap(); + + // Writer B then tries to commit, asserting Person is still at v=1. + let update_b = append_person_and_make_update(uri, &person_entry, "Bob").await; + let mut stale_expected = HashMap::new(); + stale_expected.insert("node:Person".to_string(), 1); + + let err = mc + .commit_with_expected(&[update_b], &stale_expected) + .await + .expect_err("stale expected_table_versions should reject"); + + match err { + OmniError::Manifest(m) => match m.details { + Some(ManifestConflictDetails::ExpectedVersionMismatch { + table_key, + expected, + actual, + }) => { + assert_eq!(table_key, "node:Person"); + assert_eq!(expected, 1); + assert_eq!(actual, advanced_version); + } + other => panic!("expected ExpectedVersionMismatch details, got {:?}", other), + }, + other => panic!("expected OmniError::Manifest, got {:?}", other), + } +} + +#[tokio::test] +async fn test_commit_with_expected_catches_drift_on_untouched_table() { + use crate::error::ManifestConflictDetails; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap(); + let snap = mc.snapshot(); + let person_entry = snap.entry("node:Person").unwrap().clone(); + let company_entry = snap.entry("node:Company").unwrap().clone(); + + // Writer A advances Company. + let mut company_ds = Dataset::open(&format!("{}/{}", uri, company_entry.table_path)) + .await + .unwrap(); + let company_schema = Arc::new(company_ds.schema().into()); + let row = RecordBatch::try_new( + Arc::clone(&company_schema), + vec![ + Arc::new(StringArray::from(vec!["company-1"])), + Arc::new(StringArray::from(vec!["Acme"])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(row)], company_schema); + company_ds.append(reader, None).await.unwrap(); + let company_version = company_ds.version().version; + let company_metadata = + table_version_metadata_for_state(uri, &company_entry.table_path, None, company_version) + .await + .unwrap(); + mc.commit(&[SubTableUpdate { + table_key: "node:Company".to_string(), + table_version: company_version, + table_branch: None, + row_count: 1, + version_metadata: company_metadata, + }]) + .await + .unwrap(); + + // Writer B writes Person but asserts Company is still at v=1. + let update_person = append_person_and_make_update(uri, &person_entry, "Bob").await; + let mut expected = HashMap::new(); + expected.insert("node:Company".to_string(), 1); + + let err = mc + .commit_with_expected(&[update_person], &expected) + .await + .expect_err("drift on an untouched expected table should reject"); + + let OmniError::Manifest(m) = err else { + panic!("expected OmniError::Manifest"); + }; + match m.details { + Some(ManifestConflictDetails::ExpectedVersionMismatch { + ref table_key, + expected, + actual, + }) => { + assert_eq!(table_key, "node:Company"); + assert_eq!(expected, 1); + assert_eq!(actual, company_version); + } + other => panic!("expected ExpectedVersionMismatch, got {:?}", other), + } +} + +#[tokio::test] +async fn test_commit_with_expected_unknown_table_reports_actual_zero() { + use crate::error::ManifestConflictDetails; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap(); + + let mut expected = HashMap::new(); + expected.insert("node:DoesNotExist".to_string(), 7); + let err = mc + .commit_with_expected(&[], &expected) + .await + .expect_err("unknown expected table should reject"); + + let OmniError::Manifest(m) = err else { + panic!("expected OmniError::Manifest"); + }; + match m.details { + Some(ManifestConflictDetails::ExpectedVersionMismatch { + table_key, + expected, + actual, + }) => { + assert_eq!(table_key, "node:DoesNotExist"); + assert_eq!(expected, 7); + assert_eq!(actual, 0); + } + other => panic!("expected ExpectedVersionMismatch, got {:?}", other), + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_concurrent_publish_with_overlapping_expected_versions_one_succeeds() { + use crate::error::ManifestConflictDetails; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + let mc = ManifestCoordinator::init(uri, &catalog).await.unwrap(); + let person_entry = mc.snapshot().entry("node:Person").unwrap().clone(); + + // Advance the Person dataset once so we have a real on-disk version 2 that + // both publishers can target. Both attempt to land the *same* + // `version:node:Person@v=2` row in `__manifest`, which is the row-level + // CAS conflict the publisher must detect: load_publish_state at the same + // baseline → pre-check passes for both → only one merge_insert can land + // the unique `object_id`. + let update = append_person_and_make_update(uri, &person_entry, "Alice").await; + + let mut expected = HashMap::new(); + expected.insert("node:Person".to_string(), 1); + + let publisher_a = GraphNamespacePublisher::new(uri, None); + let publisher_b = GraphNamespacePublisher::new(uri, None); + let changes_a = vec![ManifestChange::Update(update.clone())]; + let changes_b = vec![ManifestChange::Update(update)]; + let expected_a = expected.clone(); + let expected_b = expected; + + let (res_a, res_b) = tokio::join!( + async { publisher_a.publish(&changes_a, &expected_a).await }, + async { publisher_b.publish(&changes_b, &expected_b).await } + ); + + let (succeeded, err) = match (res_a, res_b) { + (Ok(_), Err(e)) => (1, e), + (Err(e), Ok(_)) => (1, e), + (Ok(_), Ok(_)) => panic!("both writers committed -- OCC failed"), + (Err(a), Err(b)) => panic!("both writers failed: {:?} / {:?}", a, b), + }; + assert_eq!(succeeded, 1, "exactly one writer must succeed"); + + let OmniError::Manifest(m) = err else { + panic!("expected OmniError::Manifest, got {:?}", err); + }; + // The losing writer surfaces either ExpectedVersionMismatch (its retry's + // pre-check observed the winner's advance) or a plain Conflict (Lance + // row-level CAS rejected, retry exhausted before the pre-check fired). + // Both are acceptable typed conflict signals; what matters is that the + // failure is not silent. + use crate::error::ManifestErrorKind; + assert!( + matches!(m.kind, ManifestErrorKind::Conflict), + "expected Conflict-kind manifest error, got {:?}: {}", + m.kind, + m.message, + ); + if let Some(ManifestConflictDetails::ExpectedVersionMismatch { + ref table_key, + expected, + .. + }) = m.details + { + assert_eq!(table_key, "node:Person"); + assert_eq!(expected, 1); + } + + // Manifest must reflect exactly one new commit on Person at the requested + // version (no duplicate version rows). + let mc = ManifestCoordinator::open(uri).await.unwrap(); + let entry = mc.snapshot().entry("node:Person").unwrap().clone(); + assert!(entry.table_version > 1, "Person should have advanced past v=1"); +} + #[test] fn manifest_column_helpers_return_error_for_bad_schema() { let batch = RecordBatch::try_new( diff --git a/crates/omnigraph/src/error.rs b/crates/omnigraph/src/error.rs index fe65ccb..fc91090 100644 --- a/crates/omnigraph/src/error.rs +++ b/crates/omnigraph/src/error.rs @@ -10,11 +10,31 @@ pub enum ManifestErrorKind { Internal, } +/// Structured details for a manifest-level conflict. Set on the `details` +/// field of `ManifestError` when callers need to match on the specific +/// concurrency-control failure rather than parse a string. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ManifestConflictDetails { + /// A caller-supplied per-table expected version did not match the + /// manifest's current latest non-tombstoned version for that table. + ExpectedVersionMismatch { + table_key: String, + expected: u64, + actual: u64, + }, + /// Lance's row-level CAS rejected the publish because a concurrent writer + /// landed a row with the same `object_id`. Distinct from + /// `ExpectedVersionMismatch`: the caller's expectations (if any) still + /// hold against the new manifest state, so the publisher will retry. + RowLevelCasContention, +} + #[derive(Debug, Clone, Error)] #[error("{message}")] pub struct ManifestError { pub kind: ManifestErrorKind, pub message: String, + pub details: Option, } impl ManifestError { @@ -22,8 +42,14 @@ impl ManifestError { Self { kind, message: message.into(), + details: None, } } + + pub fn with_details(mut self, details: ManifestConflictDetails) -> Self { + self.details = Some(details); + self + } } #[derive(Debug, Clone)] @@ -77,4 +103,32 @@ impl OmniError { pub fn manifest_internal(message: impl Into) -> Self { Self::Manifest(ManifestError::new(ManifestErrorKind::Internal, message)) } + + pub fn manifest_expected_version_mismatch( + table_key: impl Into, + expected: u64, + actual: u64, + ) -> Self { + let table_key = table_key.into(); + let message = format!( + "stale view of '{}': expected manifest table version {} but current is {} — refresh and retry", + table_key, expected, actual + ); + Self::Manifest( + ManifestError::new(ManifestErrorKind::Conflict, message).with_details( + ManifestConflictDetails::ExpectedVersionMismatch { + table_key, + expected, + actual, + }, + ), + ) + } + + pub fn manifest_row_level_cas_contention(message: impl Into) -> Self { + Self::Manifest( + ManifestError::new(ManifestErrorKind::Conflict, message) + .with_details(ManifestConflictDetails::RowLevelCasContention), + ) + } }