mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-30 02:49:39 +02:00
Add per-table expected-version OCC to ManifestBatchPublisher (MR-766)
Layered approach selected by the CAS-granularity investigation
(.context/merge-insert-cas-granularity.md):
- Annotate __manifest.object_id with `lance-schema:unenforced-primary-key`,
enabling Lance row-level CAS via the bloom-filter conflict resolver.
Closes a latent silent-duplicate bug where two concurrent publishes of
the same `version:T@v=N+1` row could both land in disjoint fragments.
- Extend `ManifestBatchPublisher::publish` with `expected_table_versions:
&HashMap<String, u64>`. Empty map preserves today's behavior; populated
map asserts the manifest's latest non-tombstoned version per table
matches the caller's view. Mismatches surface as a typed
`ManifestConflictDetails::ExpectedVersionMismatch { table_key, expected,
actual }` so callers can match without parsing strings.
- Set `merge_builder.conflict_retries(0)` so Lance's transparent rebase
cannot silently break the OCC contract; retries are owned by the
publisher loop, where each attempt re-runs `load_publish_state` and
the expected-version pre-check.
- Surface `ManifestCoordinator::commit_with_expected` for the callers
that need strict OCC (the run-demotion ticket); existing `commit` and
`commit_changes` paths are unaffected.
New tests in `manifest/tests.rs` cover: matching expected versions,
stale expected with typed details, drift on an untouched expected
table, unknown expected table (actual=0), and the headline case of two
concurrent publishes with overlapping expected versions where exactly
one succeeds.
This commit is contained in:
parent
bb95fdceda
commit
df0e158190
5 changed files with 559 additions and 21 deletions
|
|
@ -286,12 +286,43 @@ impl ManifestCoordinator {
|
||||||
self.commit_changes(&changes).await
|
self.commit_changes(&changes).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Same as [`commit`], but with caller-supplied per-table expected
|
||||||
|
/// versions used for optimistic concurrency control. Each entry asserts
|
||||||
|
/// the manifest's current latest non-tombstoned `table_version` for that
|
||||||
|
/// `table_key` is exactly what the caller observed; mismatches surface
|
||||||
|
/// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`.
|
||||||
|
pub async fn commit_with_expected(
|
||||||
|
&mut self,
|
||||||
|
updates: &[SubTableUpdate],
|
||||||
|
expected_table_versions: &HashMap<String, u64>,
|
||||||
|
) -> Result<u64> {
|
||||||
|
let changes = updates
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(ManifestChange::Update)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
self.commit_changes_with_expected(&changes, expected_table_versions)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
|
pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
|
||||||
if changes.is_empty() {
|
self.commit_changes_with_expected(changes, &HashMap::new())
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn commit_changes_with_expected(
|
||||||
|
&mut self,
|
||||||
|
changes: &[ManifestChange],
|
||||||
|
expected_table_versions: &HashMap<String, u64>,
|
||||||
|
) -> Result<u64> {
|
||||||
|
if changes.is_empty() && expected_table_versions.is_empty() {
|
||||||
return Ok(self.version());
|
return Ok(self.version());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.dataset = self.publisher.publish(changes).await?;
|
self.dataset = self
|
||||||
|
.publisher
|
||||||
|
.publish(changes, expected_table_versions)
|
||||||
|
.await?;
|
||||||
|
|
||||||
self.known_state = read_manifest_state(&self.dataset).await?;
|
self.known_state = read_manifest_state(&self.dataset).await?;
|
||||||
Ok(self.version())
|
Ok(self.version())
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ use std::sync::Arc;
|
||||||
use arrow_array::RecordBatchIterator;
|
use arrow_array::RecordBatchIterator;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use lance::Dataset;
|
use lance::Dataset;
|
||||||
|
use lance::Error as LanceError;
|
||||||
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
|
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
|
||||||
use lance_namespace::NamespaceError;
|
use lance_namespace::NamespaceError;
|
||||||
use lance_namespace::models::CreateTableVersionRequest;
|
use lance_namespace::models::CreateTableVersionRequest;
|
||||||
|
|
@ -38,9 +39,20 @@ use super::{
|
||||||
SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone,
|
SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Bound on the publisher-level retry loop that wraps Lance's row-level CAS
|
||||||
|
/// (`TooMuchWriteContention`). Lance's own `conflict_retries` is set to 0 in
|
||||||
|
/// `merge_rows` because its auto-rebase is "transparent merge" semantics —
|
||||||
|
/// wrong for an OCC contract — so retry is owned here instead, where each
|
||||||
|
/// iteration re-runs `load_publish_state` and the expected-version pre-check.
|
||||||
|
const PUBLISHER_RETRY_BUDGET: u32 = 5;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub(super) trait ManifestBatchPublisher: Send + Sync {
|
pub(super) trait ManifestBatchPublisher: Send + Sync {
|
||||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset>;
|
async fn publish(
|
||||||
|
&self,
|
||||||
|
changes: &[ManifestChange],
|
||||||
|
expected_table_versions: &HashMap<String, u64>,
|
||||||
|
) -> Result<Dataset>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) struct GraphNamespacePublisher {
|
pub(super) struct GraphNamespacePublisher {
|
||||||
|
|
@ -279,6 +291,77 @@ impl GraphNamespacePublisher {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reduce the loaded `(table_key, table_version) → entry` map and the
|
||||||
|
/// tombstone set to "latest non-tombstoned version per table" — the same
|
||||||
|
/// reduction performed by `read_manifest_state` on the visible snapshot.
|
||||||
|
/// Tombstoned tables fall back to their highest tombstone version so that
|
||||||
|
/// the resulting `actual` reported in `ExpectedVersionMismatch` is
|
||||||
|
/// meaningful even when the caller's expected table no longer exists.
|
||||||
|
fn latest_visible_per_table(
|
||||||
|
existing_versions: &HashMap<(String, u64), SubTableEntry>,
|
||||||
|
existing_tombstones: &HashMap<(String, u64), ()>,
|
||||||
|
) -> HashMap<String, u64> {
|
||||||
|
let mut max_tombstones = HashMap::<String, u64>::new();
|
||||||
|
for (key, version) in existing_tombstones.keys() {
|
||||||
|
max_tombstones
|
||||||
|
.entry(key.clone())
|
||||||
|
.and_modify(|v| {
|
||||||
|
if *version > *v {
|
||||||
|
*v = *version;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.or_insert(*version);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut latest = HashMap::<String, u64>::new();
|
||||||
|
for (key, version) in existing_versions.keys() {
|
||||||
|
let tombstoned = max_tombstones
|
||||||
|
.get(key)
|
||||||
|
.map(|t| *t >= *version)
|
||||||
|
.unwrap_or(false);
|
||||||
|
if tombstoned {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
latest
|
||||||
|
.entry(key.clone())
|
||||||
|
.and_modify(|v| {
|
||||||
|
if *version > *v {
|
||||||
|
*v = *version;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.or_insert(*version);
|
||||||
|
}
|
||||||
|
|
||||||
|
// For tables that have only tombstones (no visible entry), surface the
|
||||||
|
// tombstone version so callers see a non-zero `actual`.
|
||||||
|
for (key, tombstone) in &max_tombstones {
|
||||||
|
latest.entry(key.clone()).or_insert(*tombstone);
|
||||||
|
}
|
||||||
|
|
||||||
|
latest
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compare each caller-supplied expectation against the manifest's current
|
||||||
|
/// latest visible version per table. The first mismatch is returned as a
|
||||||
|
/// typed `ExpectedVersionMismatch` (`actual = 0` if the table isn't in the
|
||||||
|
/// manifest at all).
|
||||||
|
fn check_expected_table_versions(
|
||||||
|
latest_per_table: &HashMap<String, u64>,
|
||||||
|
expected: &HashMap<String, u64>,
|
||||||
|
) -> Result<()> {
|
||||||
|
for (table_key, expected_version) in expected {
|
||||||
|
let actual = latest_per_table.get(table_key).copied().unwrap_or(0);
|
||||||
|
if actual != *expected_version {
|
||||||
|
return Err(OmniError::manifest_expected_version_mismatch(
|
||||||
|
table_key.clone(),
|
||||||
|
*expected_version,
|
||||||
|
actual,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn merge_rows(&self, dataset: Dataset, rows: Vec<PendingVersionRow>) -> Result<Dataset> {
|
async fn merge_rows(&self, dataset: Dataset, rows: Vec<PendingVersionRow>) -> Result<Dataset> {
|
||||||
let batch = Self::pending_rows_to_batch(rows)?;
|
let batch = Self::pending_rows_to_batch(rows)?;
|
||||||
let reader = RecordBatchIterator::new(vec![Ok(batch)], manifest_schema());
|
let reader = RecordBatchIterator::new(vec![Ok(batch)], manifest_schema());
|
||||||
|
|
@ -287,14 +370,18 @@ impl GraphNamespacePublisher {
|
||||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||||
merge_builder.when_matched(WhenMatched::UpdateAll);
|
merge_builder.when_matched(WhenMatched::UpdateAll);
|
||||||
merge_builder.when_not_matched(WhenNotMatched::InsertAll);
|
merge_builder.when_not_matched(WhenNotMatched::InsertAll);
|
||||||
merge_builder.conflict_retries(5);
|
// 0 here is intentional: Lance's built-in retry uses transparent rebase,
|
||||||
|
// which would let a concurrent writer's row land alongside ours and
|
||||||
|
// silently break the OCC contract on `__manifest`. Retries are owned by
|
||||||
|
// the publisher loop above, where each attempt re-runs the pre-check.
|
||||||
|
merge_builder.conflict_retries(0);
|
||||||
merge_builder.use_index(false);
|
merge_builder.use_index(false);
|
||||||
let (new_dataset, _stats) = merge_builder
|
let (new_dataset, _stats) = merge_builder
|
||||||
.try_build()
|
.try_build()
|
||||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||||
.execute_reader(Box::new(reader))
|
.execute_reader(Box::new(reader))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
.map_err(map_lance_publish_error)?;
|
||||||
Ok(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone()))
|
Ok(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -318,25 +405,90 @@ impl GraphNamespacePublisher {
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<_>>>()?;
|
.collect::<Result<Vec<_>>>()?;
|
||||||
self.publish(&changes).await
|
self.publish(&changes, &HashMap::new()).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `Error::TooMuchWriteContention` from Lance's row-level CAS bubbles up here
|
||||||
|
/// when a concurrent writer landed a row with the same `object_id` (the
|
||||||
|
/// merge-insert join key, annotated as an unenforced primary key on
|
||||||
|
/// `__manifest`). Translate it to a typed manifest conflict so callers can
|
||||||
|
/// match without parsing strings; everything else is opaque storage.
|
||||||
|
fn map_lance_publish_error(err: LanceError) -> OmniError {
|
||||||
|
if matches!(err, LanceError::TooMuchWriteContention { .. }) {
|
||||||
|
return OmniError::manifest_row_level_cas_contention(format!(
|
||||||
|
"manifest publish lost a row-level CAS race: {}",
|
||||||
|
err
|
||||||
|
));
|
||||||
|
}
|
||||||
|
OmniError::Lance(err.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ManifestBatchPublisher for GraphNamespacePublisher {
|
impl ManifestBatchPublisher for GraphNamespacePublisher {
|
||||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset> {
|
async fn publish(
|
||||||
if changes.is_empty() {
|
&self,
|
||||||
|
changes: &[ManifestChange],
|
||||||
|
expected_table_versions: &HashMap<String, u64>,
|
||||||
|
) -> Result<Dataset> {
|
||||||
|
if changes.is_empty() && expected_table_versions.is_empty() {
|
||||||
return self.dataset().await;
|
return self.dataset().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let (dataset, known_tables, existing_versions, existing_tombstones) =
|
for attempt in 0..=PUBLISHER_RETRY_BUDGET {
|
||||||
self.load_publish_state().await?;
|
let (dataset, known_tables, existing_versions, existing_tombstones) =
|
||||||
let rows = Self::build_pending_rows(
|
self.load_publish_state().await?;
|
||||||
changes,
|
|
||||||
&known_tables,
|
let latest_per_table =
|
||||||
&existing_versions,
|
Self::latest_visible_per_table(&existing_versions, &existing_tombstones);
|
||||||
&existing_tombstones,
|
// Pre-check on every attempt against freshly loaded state so a
|
||||||
)?;
|
// concurrent commit that broke the caller's expectation is
|
||||||
self.merge_rows(dataset, rows).await
|
// surfaced as `ExpectedVersionMismatch` rather than retried.
|
||||||
|
Self::check_expected_table_versions(&latest_per_table, expected_table_versions)?;
|
||||||
|
|
||||||
|
if changes.is_empty() {
|
||||||
|
return Ok(dataset);
|
||||||
|
}
|
||||||
|
|
||||||
|
let rows = Self::build_pending_rows(
|
||||||
|
changes,
|
||||||
|
&known_tables,
|
||||||
|
&existing_versions,
|
||||||
|
&existing_tombstones,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
match self.merge_rows(dataset, rows).await {
|
||||||
|
Ok(new_dataset) => return Ok(new_dataset),
|
||||||
|
Err(err) => {
|
||||||
|
if attempt < PUBLISHER_RETRY_BUDGET && is_retryable_publish_conflict(&err) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(OmniError::manifest_conflict(format!(
|
||||||
|
"manifest publish exhausted {} retries against concurrent writers",
|
||||||
|
PUBLISHER_RETRY_BUDGET
|
||||||
|
)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A retryable conflict here means: Lance's row-level CAS rejected our commit
|
||||||
|
/// because someone else landed an `object_id` we were also inserting (mapped
|
||||||
|
/// from `Error::TooMuchWriteContention` to
|
||||||
|
/// `ManifestConflictDetails::RowLevelCasContention`). This is transparent
|
||||||
|
/// contention; if the caller's `expected_table_versions` still holds against
|
||||||
|
/// the new manifest state, we re-attempt. Other conflict variants (notably
|
||||||
|
/// `ExpectedVersionMismatch`) propagate so the caller learns immediately.
|
||||||
|
fn is_retryable_publish_conflict(err: &OmniError) -> bool {
|
||||||
|
matches!(
|
||||||
|
err,
|
||||||
|
OmniError::Manifest(m)
|
||||||
|
if matches!(
|
||||||
|
m.details,
|
||||||
|
Some(crate::error::ManifestConflictDetails::RowLevelCasContention)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,8 +42,20 @@ struct ManifestScan {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn manifest_schema() -> SchemaRef {
|
pub(super) fn manifest_schema() -> SchemaRef {
|
||||||
|
// `object_id` is the merge-insert join key in the publisher; marking it as
|
||||||
|
// Lance's unenforced primary key engages row-level CAS at commit time, so
|
||||||
|
// two concurrent writers that try to land the same `object_id` row are
|
||||||
|
// detected by Lance via bloom-filter intersection (see
|
||||||
|
// `.context/merge-insert-cas-granularity.md`). Without this metadata,
|
||||||
|
// Lance's conflict resolver would silently rebase both writers' new
|
||||||
|
// fragments and admit duplicate rows.
|
||||||
|
let object_id_metadata: HashMap<String, String> =
|
||||||
|
[("lance-schema:unenforced-primary-key", "true")]
|
||||||
|
.into_iter()
|
||||||
|
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||||
|
.collect();
|
||||||
Arc::new(Schema::new(vec![
|
Arc::new(Schema::new(vec![
|
||||||
Field::new("object_id", DataType::Utf8, false),
|
Field::new("object_id", DataType::Utf8, false).with_metadata(object_id_metadata),
|
||||||
Field::new("object_type", DataType::Utf8, false),
|
Field::new("object_type", DataType::Utf8, false),
|
||||||
Field::new("location", DataType::Utf8, true),
|
Field::new("location", DataType::Utf8, true),
|
||||||
Field::new("metadata", DataType::Utf8, true),
|
Field::new("metadata", DataType::Utf8, true),
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray, UInt64Array};
|
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray, UInt64Array};
|
||||||
|
|
@ -946,7 +947,11 @@ impl RecordingPublisher {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ManifestBatchPublisher for RecordingPublisher {
|
impl ManifestBatchPublisher for RecordingPublisher {
|
||||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset> {
|
async fn publish(
|
||||||
|
&self,
|
||||||
|
changes: &[ManifestChange],
|
||||||
|
expected_table_versions: &HashMap<String, u64>,
|
||||||
|
) -> Result<Dataset> {
|
||||||
let requests: Vec<CreateTableVersionRequest> = changes
|
let requests: Vec<CreateTableVersionRequest> = changes
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|change| match change {
|
.filter_map(|change| match change {
|
||||||
|
|
@ -955,7 +960,7 @@ impl ManifestBatchPublisher for RecordingPublisher {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
self.requests.lock().await.extend_from_slice(&requests);
|
self.requests.lock().await.extend_from_slice(&requests);
|
||||||
self.inner.publish(changes).await
|
self.inner.publish(changes, expected_table_versions).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -963,7 +968,11 @@ struct FailingPublisher;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ManifestBatchPublisher for FailingPublisher {
|
impl ManifestBatchPublisher for FailingPublisher {
|
||||||
async fn publish(&self, _changes: &[ManifestChange]) -> Result<Dataset> {
|
async fn publish(
|
||||||
|
&self,
|
||||||
|
_changes: &[ManifestChange],
|
||||||
|
_expected_table_versions: &HashMap<String, u64>,
|
||||||
|
) -> Result<Dataset> {
|
||||||
Err(OmniError::manifest(
|
Err(OmniError::manifest(
|
||||||
"injected batch publisher failure".to_string(),
|
"injected batch publisher failure".to_string(),
|
||||||
))
|
))
|
||||||
|
|
@ -1107,6 +1116,286 @@ async fn test_commit_failure_from_injected_batch_publisher_preserves_visible_sta
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drive Person to a fresh on-disk dataset version `v` (returns the new
|
||||||
|
/// version number) and produce a `SubTableUpdate` ready to publish.
|
||||||
|
async fn append_person_and_make_update(
|
||||||
|
uri: &str,
|
||||||
|
person_entry: &SubTableEntry,
|
||||||
|
name: &str,
|
||||||
|
) -> SubTableUpdate {
|
||||||
|
let mut person_ds = Dataset::open(&format!("{}/{}", uri, person_entry.table_path))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let person_schema = Arc::new(person_ds.schema().into());
|
||||||
|
let row = RecordBatch::try_new(
|
||||||
|
Arc::clone(&person_schema),
|
||||||
|
vec![
|
||||||
|
Arc::new(StringArray::from(vec![format!("person-{name}")])),
|
||||||
|
Arc::new(StringArray::from(vec![Some(name.to_string())])),
|
||||||
|
Arc::new(Int32Array::from(vec![Some(30)])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let reader = RecordBatchIterator::new(vec![Ok(row)], person_schema);
|
||||||
|
person_ds.append(reader, None).await.unwrap();
|
||||||
|
let new_version = person_ds.version().version;
|
||||||
|
let version_metadata =
|
||||||
|
table_version_metadata_for_state(uri, &person_entry.table_path, None, new_version)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
SubTableUpdate {
|
||||||
|
table_key: "node:Person".to_string(),
|
||||||
|
table_version: new_version,
|
||||||
|
table_branch: None,
|
||||||
|
row_count: 1,
|
||||||
|
version_metadata,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_commit_with_expected_accepts_matching_versions() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let catalog = build_test_catalog();
|
||||||
|
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||||
|
let snap = mc.snapshot();
|
||||||
|
let person_entry = snap.entry("node:Person").unwrap().clone();
|
||||||
|
|
||||||
|
let update = append_person_and_make_update(uri, &person_entry, "Alice").await;
|
||||||
|
let mut expected = HashMap::new();
|
||||||
|
// After init, every table is at table_version=1 — assert that.
|
||||||
|
expected.insert("node:Person".to_string(), 1);
|
||||||
|
expected.insert("node:Company".to_string(), 1);
|
||||||
|
|
||||||
|
mc.commit_with_expected(&[update.clone()], &expected)
|
||||||
|
.await
|
||||||
|
.expect("matching expected versions should publish cleanly");
|
||||||
|
|
||||||
|
let after = mc.snapshot();
|
||||||
|
assert_eq!(
|
||||||
|
after.entry("node:Person").unwrap().table_version,
|
||||||
|
update.table_version
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_commit_with_expected_rejects_stale_with_typed_details() {
|
||||||
|
use crate::error::ManifestConflictDetails;
|
||||||
|
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let catalog = build_test_catalog();
|
||||||
|
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||||
|
let snap = mc.snapshot();
|
||||||
|
let person_entry = snap.entry("node:Person").unwrap().clone();
|
||||||
|
|
||||||
|
// Writer A advances Person.
|
||||||
|
let update_a = append_person_and_make_update(uri, &person_entry, "Alice").await;
|
||||||
|
let advanced_version = update_a.table_version;
|
||||||
|
mc.commit(&[update_a]).await.unwrap();
|
||||||
|
|
||||||
|
// Writer B then tries to commit, asserting Person is still at v=1.
|
||||||
|
let update_b = append_person_and_make_update(uri, &person_entry, "Bob").await;
|
||||||
|
let mut stale_expected = HashMap::new();
|
||||||
|
stale_expected.insert("node:Person".to_string(), 1);
|
||||||
|
|
||||||
|
let err = mc
|
||||||
|
.commit_with_expected(&[update_b], &stale_expected)
|
||||||
|
.await
|
||||||
|
.expect_err("stale expected_table_versions should reject");
|
||||||
|
|
||||||
|
match err {
|
||||||
|
OmniError::Manifest(m) => match m.details {
|
||||||
|
Some(ManifestConflictDetails::ExpectedVersionMismatch {
|
||||||
|
table_key,
|
||||||
|
expected,
|
||||||
|
actual,
|
||||||
|
}) => {
|
||||||
|
assert_eq!(table_key, "node:Person");
|
||||||
|
assert_eq!(expected, 1);
|
||||||
|
assert_eq!(actual, advanced_version);
|
||||||
|
}
|
||||||
|
other => panic!("expected ExpectedVersionMismatch details, got {:?}", other),
|
||||||
|
},
|
||||||
|
other => panic!("expected OmniError::Manifest, got {:?}", other),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_commit_with_expected_catches_drift_on_untouched_table() {
|
||||||
|
use crate::error::ManifestConflictDetails;
|
||||||
|
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let catalog = build_test_catalog();
|
||||||
|
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||||
|
let snap = mc.snapshot();
|
||||||
|
let person_entry = snap.entry("node:Person").unwrap().clone();
|
||||||
|
let company_entry = snap.entry("node:Company").unwrap().clone();
|
||||||
|
|
||||||
|
// Writer A advances Company.
|
||||||
|
let mut company_ds = Dataset::open(&format!("{}/{}", uri, company_entry.table_path))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let company_schema = Arc::new(company_ds.schema().into());
|
||||||
|
let row = RecordBatch::try_new(
|
||||||
|
Arc::clone(&company_schema),
|
||||||
|
vec![
|
||||||
|
Arc::new(StringArray::from(vec!["company-1"])),
|
||||||
|
Arc::new(StringArray::from(vec!["Acme"])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let reader = RecordBatchIterator::new(vec![Ok(row)], company_schema);
|
||||||
|
company_ds.append(reader, None).await.unwrap();
|
||||||
|
let company_version = company_ds.version().version;
|
||||||
|
let company_metadata =
|
||||||
|
table_version_metadata_for_state(uri, &company_entry.table_path, None, company_version)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
mc.commit(&[SubTableUpdate {
|
||||||
|
table_key: "node:Company".to_string(),
|
||||||
|
table_version: company_version,
|
||||||
|
table_branch: None,
|
||||||
|
row_count: 1,
|
||||||
|
version_metadata: company_metadata,
|
||||||
|
}])
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Writer B writes Person but asserts Company is still at v=1.
|
||||||
|
let update_person = append_person_and_make_update(uri, &person_entry, "Bob").await;
|
||||||
|
let mut expected = HashMap::new();
|
||||||
|
expected.insert("node:Company".to_string(), 1);
|
||||||
|
|
||||||
|
let err = mc
|
||||||
|
.commit_with_expected(&[update_person], &expected)
|
||||||
|
.await
|
||||||
|
.expect_err("drift on an untouched expected table should reject");
|
||||||
|
|
||||||
|
let OmniError::Manifest(m) = err else {
|
||||||
|
panic!("expected OmniError::Manifest");
|
||||||
|
};
|
||||||
|
match m.details {
|
||||||
|
Some(ManifestConflictDetails::ExpectedVersionMismatch {
|
||||||
|
ref table_key,
|
||||||
|
expected,
|
||||||
|
actual,
|
||||||
|
}) => {
|
||||||
|
assert_eq!(table_key, "node:Company");
|
||||||
|
assert_eq!(expected, 1);
|
||||||
|
assert_eq!(actual, company_version);
|
||||||
|
}
|
||||||
|
other => panic!("expected ExpectedVersionMismatch, got {:?}", other),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_commit_with_expected_unknown_table_reports_actual_zero() {
|
||||||
|
use crate::error::ManifestConflictDetails;
|
||||||
|
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let catalog = build_test_catalog();
|
||||||
|
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||||
|
|
||||||
|
let mut expected = HashMap::new();
|
||||||
|
expected.insert("node:DoesNotExist".to_string(), 7);
|
||||||
|
let err = mc
|
||||||
|
.commit_with_expected(&[], &expected)
|
||||||
|
.await
|
||||||
|
.expect_err("unknown expected table should reject");
|
||||||
|
|
||||||
|
let OmniError::Manifest(m) = err else {
|
||||||
|
panic!("expected OmniError::Manifest");
|
||||||
|
};
|
||||||
|
match m.details {
|
||||||
|
Some(ManifestConflictDetails::ExpectedVersionMismatch {
|
||||||
|
table_key,
|
||||||
|
expected,
|
||||||
|
actual,
|
||||||
|
}) => {
|
||||||
|
assert_eq!(table_key, "node:DoesNotExist");
|
||||||
|
assert_eq!(expected, 7);
|
||||||
|
assert_eq!(actual, 0);
|
||||||
|
}
|
||||||
|
other => panic!("expected ExpectedVersionMismatch, got {:?}", other),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_concurrent_publish_with_overlapping_expected_versions_one_succeeds() {
|
||||||
|
use crate::error::ManifestConflictDetails;
|
||||||
|
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let uri = dir.path().to_str().unwrap();
|
||||||
|
let catalog = build_test_catalog();
|
||||||
|
let mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||||
|
let person_entry = mc.snapshot().entry("node:Person").unwrap().clone();
|
||||||
|
|
||||||
|
// Advance the Person dataset once so we have a real on-disk version 2 that
|
||||||
|
// both publishers can target. Both attempt to land the *same*
|
||||||
|
// `version:node:Person@v=2` row in `__manifest`, which is the row-level
|
||||||
|
// CAS conflict the publisher must detect: load_publish_state at the same
|
||||||
|
// baseline → pre-check passes for both → only one merge_insert can land
|
||||||
|
// the unique `object_id`.
|
||||||
|
let update = append_person_and_make_update(uri, &person_entry, "Alice").await;
|
||||||
|
|
||||||
|
let mut expected = HashMap::new();
|
||||||
|
expected.insert("node:Person".to_string(), 1);
|
||||||
|
|
||||||
|
let publisher_a = GraphNamespacePublisher::new(uri, None);
|
||||||
|
let publisher_b = GraphNamespacePublisher::new(uri, None);
|
||||||
|
let changes_a = vec![ManifestChange::Update(update.clone())];
|
||||||
|
let changes_b = vec![ManifestChange::Update(update)];
|
||||||
|
let expected_a = expected.clone();
|
||||||
|
let expected_b = expected;
|
||||||
|
|
||||||
|
let (res_a, res_b) = tokio::join!(
|
||||||
|
async { publisher_a.publish(&changes_a, &expected_a).await },
|
||||||
|
async { publisher_b.publish(&changes_b, &expected_b).await }
|
||||||
|
);
|
||||||
|
|
||||||
|
let (succeeded, err) = match (res_a, res_b) {
|
||||||
|
(Ok(_), Err(e)) => (1, e),
|
||||||
|
(Err(e), Ok(_)) => (1, e),
|
||||||
|
(Ok(_), Ok(_)) => panic!("both writers committed -- OCC failed"),
|
||||||
|
(Err(a), Err(b)) => panic!("both writers failed: {:?} / {:?}", a, b),
|
||||||
|
};
|
||||||
|
assert_eq!(succeeded, 1, "exactly one writer must succeed");
|
||||||
|
|
||||||
|
let OmniError::Manifest(m) = err else {
|
||||||
|
panic!("expected OmniError::Manifest, got {:?}", err);
|
||||||
|
};
|
||||||
|
// The losing writer surfaces either ExpectedVersionMismatch (its retry's
|
||||||
|
// pre-check observed the winner's advance) or a plain Conflict (Lance
|
||||||
|
// row-level CAS rejected, retry exhausted before the pre-check fired).
|
||||||
|
// Both are acceptable typed conflict signals; what matters is that the
|
||||||
|
// failure is not silent.
|
||||||
|
use crate::error::ManifestErrorKind;
|
||||||
|
assert!(
|
||||||
|
matches!(m.kind, ManifestErrorKind::Conflict),
|
||||||
|
"expected Conflict-kind manifest error, got {:?}: {}",
|
||||||
|
m.kind,
|
||||||
|
m.message,
|
||||||
|
);
|
||||||
|
if let Some(ManifestConflictDetails::ExpectedVersionMismatch {
|
||||||
|
ref table_key,
|
||||||
|
expected,
|
||||||
|
..
|
||||||
|
}) = m.details
|
||||||
|
{
|
||||||
|
assert_eq!(table_key, "node:Person");
|
||||||
|
assert_eq!(expected, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manifest must reflect exactly one new commit on Person at the requested
|
||||||
|
// version (no duplicate version rows).
|
||||||
|
let mc = ManifestCoordinator::open(uri).await.unwrap();
|
||||||
|
let entry = mc.snapshot().entry("node:Person").unwrap().clone();
|
||||||
|
assert!(entry.table_version > 1, "Person should have advanced past v=1");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn manifest_column_helpers_return_error_for_bad_schema() {
|
fn manifest_column_helpers_return_error_for_bad_schema() {
|
||||||
let batch = RecordBatch::try_new(
|
let batch = RecordBatch::try_new(
|
||||||
|
|
|
||||||
|
|
@ -10,11 +10,31 @@ pub enum ManifestErrorKind {
|
||||||
Internal,
|
Internal,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Structured details for a manifest-level conflict. Set on the `details`
|
||||||
|
/// field of `ManifestError` when callers need to match on the specific
|
||||||
|
/// concurrency-control failure rather than parse a string.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum ManifestConflictDetails {
|
||||||
|
/// A caller-supplied per-table expected version did not match the
|
||||||
|
/// manifest's current latest non-tombstoned version for that table.
|
||||||
|
ExpectedVersionMismatch {
|
||||||
|
table_key: String,
|
||||||
|
expected: u64,
|
||||||
|
actual: u64,
|
||||||
|
},
|
||||||
|
/// Lance's row-level CAS rejected the publish because a concurrent writer
|
||||||
|
/// landed a row with the same `object_id`. Distinct from
|
||||||
|
/// `ExpectedVersionMismatch`: the caller's expectations (if any) still
|
||||||
|
/// hold against the new manifest state, so the publisher will retry.
|
||||||
|
RowLevelCasContention,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Error)]
|
#[derive(Debug, Clone, Error)]
|
||||||
#[error("{message}")]
|
#[error("{message}")]
|
||||||
pub struct ManifestError {
|
pub struct ManifestError {
|
||||||
pub kind: ManifestErrorKind,
|
pub kind: ManifestErrorKind,
|
||||||
pub message: String,
|
pub message: String,
|
||||||
|
pub details: Option<ManifestConflictDetails>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ManifestError {
|
impl ManifestError {
|
||||||
|
|
@ -22,8 +42,14 @@ impl ManifestError {
|
||||||
Self {
|
Self {
|
||||||
kind,
|
kind,
|
||||||
message: message.into(),
|
message: message.into(),
|
||||||
|
details: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn with_details(mut self, details: ManifestConflictDetails) -> Self {
|
||||||
|
self.details = Some(details);
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
|
@ -77,4 +103,32 @@ impl OmniError {
|
||||||
pub fn manifest_internal(message: impl Into<String>) -> Self {
|
pub fn manifest_internal(message: impl Into<String>) -> Self {
|
||||||
Self::Manifest(ManifestError::new(ManifestErrorKind::Internal, message))
|
Self::Manifest(ManifestError::new(ManifestErrorKind::Internal, message))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn manifest_expected_version_mismatch(
|
||||||
|
table_key: impl Into<String>,
|
||||||
|
expected: u64,
|
||||||
|
actual: u64,
|
||||||
|
) -> Self {
|
||||||
|
let table_key = table_key.into();
|
||||||
|
let message = format!(
|
||||||
|
"stale view of '{}': expected manifest table version {} but current is {} — refresh and retry",
|
||||||
|
table_key, expected, actual
|
||||||
|
);
|
||||||
|
Self::Manifest(
|
||||||
|
ManifestError::new(ManifestErrorKind::Conflict, message).with_details(
|
||||||
|
ManifestConflictDetails::ExpectedVersionMismatch {
|
||||||
|
table_key,
|
||||||
|
expected,
|
||||||
|
actual,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn manifest_row_level_cas_contention(message: impl Into<String>) -> Self {
|
||||||
|
Self::Manifest(
|
||||||
|
ManifestError::new(ManifestErrorKind::Conflict, message)
|
||||||
|
.with_details(ManifestConflictDetails::RowLevelCasContention),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue