Investigate Lance MergeInsertBuilder CAS granularity (MR-766 prereq)

Confirms Lance v4.0.0 has row-level CAS for merge_insert only when the
join-key column carries lance-schema:unenforced-primary-key=true. Our
__manifest schema does not, so the publisher silently allows duplicate
object_id rows under concurrent writers. Note + reproducible scratch
crate select the layered (pre-check + row-level CAS) approach for the
publisher API ticket.
This commit is contained in:
Claude 2026-04-28 23:30:17 +00:00
parent 58dba6210e
commit bb95fdceda
No known key found for this signature in database
4 changed files with 7268 additions and 0 deletions

View file

@ -0,0 +1,134 @@
# MergeInsertBuilder CAS granularity (Lance 4.0.0)
**Status:** investigated (2026-04-28)
**Consumed by:** publisher per-table version API ticket — see "Implication" below.
**Companion ticket:** zombie-run investigation, `Prerequisite` section under "The demotion".
**Repro:** `.context/scratch/merge-insert-cas-repro/``cargo test --release -- --nocapture`. Both tests pass against `lance = "=4.0.0"`, demonstrating the answer.
## TL;DR
**Lance v4.0.0 supports row-level CAS for `MergeInsertBuilder` — but only when the join-key columns are annotated `lance-schema:unenforced-primary-key=true`.** Without that annotation, two concurrent `execute_reader()` calls inserting the same key into disjoint fragments **both succeed silently**, producing duplicate rows under the same key.
Our `__manifest` schema (`crates/omnigraph/src/db/manifest/state.rs:44-60`) does **not** carry that annotation, so today's `GraphNamespacePublisher::publish` has **no row-level CAS protection**. The TOCTOU window between `load_publish_state` and the merge-insert commit is open — two concurrent publishers computing the same `(table_key, table_version)` row both land it.
## Implication for the publisher API ticket
Take both layers, not one or the other:
1. **Annotate `object_id` (or the version-row subset) `lance-schema:unenforced-primary-key=true`** in `manifest_schema()` so Lance enforces row-level CAS at commit time. This closes the silent-duplicate hole that exists today, independent of the new feature.
2. **Add a pre-check phase in `publish` that validates `expected_table_versions` against the manifest snapshot** loaded by `load_publish_state`. The pre-check covers "expected unchanged" assertions for tables the caller is *not* writing to (Lance's row-level CAS only covers rows we *are* writing).
3. **Set `merge_builder.conflict_retries(0)`** and let the publisher's own caller-level retry loop handle the rebase: refresh manifest, re-run pre-check, re-attempt merge-insert. This gives strict atomicity for the per-table expected-version contract; Lance's auto-rebase would otherwise let our commit through against unfamiliar manifest state.
The ticket's "Approach 1 (parameter-only)" by itself is **insufficient** — Lance row-level CAS only catches collisions on rows we are emitting; it does not enforce "expected unchanged" for untouched tables. The ticket's "Approach 2 (pre-check + CAS)" is correct **only if** the row-level CAS is also enabled, otherwise the pre-check is TOCTOU-vulnerable and the existing publisher already has the silent-duplicate bug.
## How Lance does it (source-quoted)
### 1. Filter is built only when ON columns match the unenforced primary key
`rust/lance/src/dataset/write/merge_insert/exec/write.rs:209-221`:
```rust
// Check if ON columns match the schema's unenforced primary key
let field_ids: Vec<i32> = params
.on
.iter()
.filter_map(|name| dataset.schema().field(name).map(|f| f.id))
.collect();
let pk_field_ids: Vec<i32> = dataset
.schema()
.unenforced_primary_key()
.iter()
.map(|f| f.id)
.collect();
let is_primary_key = !pk_field_ids.is_empty() && field_ids == pk_field_ids;
```
The filter is then attached only when `is_primary_key`:
`rust/lance/src/dataset/write/merge_insert/exec/write.rs:903-928`:
```rust
let inserted_rows_filter = if is_primary_key {
// ... build KeyExistenceFilter (bloom or exact set)
};
// ...
inserted_rows_filter: inserted_rows_filter.clone(),
```
The PK is sourced from field metadata; from `inserted_rows.rs:158`:
> `/// Tracks keys of inserted rows for conflict detection.`
> `/// Only created when ON columns match the schema's unenforced primary key.`
### 2. Conflict detection compares the bloom filters at commit time
`rust/lance/src/io/commit/conflict_resolver.rs::check_update_txn` (line 328+):
```rust
match (self_inserted_rows_filter, other_inserted_rows_filter) {
(Some(self_keys), Some(other_keys)) => {
// ...
let Ok((has_intersection, _maybe_false_positive)) =
self_keys.intersects(other_keys)
else { /* treat as conflict */ };
if has_intersection {
return Err(self.retryable_conflict_err(other_transaction, other_version));
}
}
(Some(_), None) => {
// Pessimistic: filter present on this side only -> conflict.
return Err(self.retryable_conflict_err(other_transaction, other_version));
}
_ => {}
}
```
`RetryableCommitConflict` is consumed by the merge-insert retry loop (`rust/lance/src/dataset/write/retry.rs:97`); when retries exhaust it surfaces as `Error::TooMuchWriteContention`.
### 3. Without the filter — fragment overlap only
If the filter is `None` on one side (no PK annotation), `check_update_txn` falls through to fragment-overlap checks (line 382+). For two `Operation::Update` transactions where each writer's merge-insert emits a **new** fragment with the new row, neither side's `updated_fragments` / `removed_fragment_ids` overlaps the other's `modified_fragment_ids`. The conflict resolver returns `Ok` and **both commits land**.
This is exactly what merge_insert does on a not-matched insert: `rust/lance/src/dataset/write/merge_insert.rs:1574` builds `Operation::Update` with `inserted_rows_filter: None` (the v1 path) — though the more common v2/`FullSchemaMergeInsertExec` path also leaves the filter `None` unless `is_primary_key` flipped to true at plan time.
### 4. Today's `__manifest` schema does NOT have the annotation
`crates/omnigraph/src/db/manifest/state.rs:44-60`:
```rust
pub(super) fn manifest_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("object_id", DataType::Utf8, false), // <-- no metadata
// ...
]))
}
```
`crates/omnigraph/src/db/manifest/publisher.rs:286` joins on `object_id`. With no PK metadata, `is_primary_key` is `false` for every commit; `inserted_rows_filter` is always `None`; the fallback fragment-overlap check accepts the duplicate.
## Empirical confirmation
`.context/scratch/merge-insert-cas-repro/src/lib.rs` builds two tests against a `__manifest`-shaped schema, deliberately using `MergeInsertBuilder` with the same `on=["object_id"]`, both writers based on the same dataset version:
- `without_pk_annotation_concurrent_inserts_both_succeed` — passes. Test output: `[without_pk] duplicate rows after both commits: 2`.
- `with_pk_annotation_concurrent_inserts_second_fails` — passes. Test output: `[with_pk] second writer correctly rejected with TooMuchWriteContention`.
Run:
```sh
cd .context/scratch/merge-insert-cas-repro
cargo test --release -- --nocapture
```
## Recommended publisher implementation outline
1. In `manifest_schema()` (`crates/omnigraph/src/db/manifest/state.rs:44`): attach `lance-schema:unenforced-primary-key=true` metadata to `object_id`.
2. Extend `ManifestBatchPublisher::publish` (`crates/omnigraph/src/db/manifest/publisher.rs:42-44`) to take `expected_table_versions: HashMap<String, u64>` (empty = back-compat).
3. In `GraphNamespacePublisher::publish`, after `load_publish_state` (`publisher.rs:77`), reduce `existing_versions` to "latest non-tombstoned version per table" (mirror `state.rs:65-94`) and reject any expectation that doesn't match with a typed `manifest_conflict { table_key, expected, actual }`. Add the structured variant on `ManifestError` (`crates/omnigraph/src/error.rs:5-46`).
4. Switch `merge_builder.conflict_retries(5)` to `(0)` (`publisher.rs:290`) so a concurrent commit fails fast instead of silently rebasing past our pre-check.
5. Wrap the publish in a bounded retry loop at the publisher level: on `TooMuchWriteContention` or `CommitConflict`, refresh manifest, re-run pre-check, re-attempt — bounded retries (5).
6. Existing tests at `crates/omnigraph/src/db/manifest/tests.rs:680-728` are the right place to extend with stale-expected-version cases.
## Out of scope here
- Whether to apply the same PK annotation to `_graph_commits.lance` / `_graph_runs.lance` — separate review when those code paths land in scope.
- Schema-migration story for existing manifests written before the annotation lands. Adding field metadata is a non-breaking schema change in Lance, but worth confirming with one round of `optimize` semantics before deploying.
- Whether the fast-fail (`conflict_retries(0)` + caller-level retry) is preferable to Lance's built-in rebase. The argument here is correctness: Lance's rebase is "transparent merge", which is the wrong semantic for an OCC contract.

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,20 @@
[package]
name = "merge-insert-cas-repro"
version = "0.1.0"
edition = "2024"
publish = false
[lib]
path = "src/lib.rs"
[dependencies]
lance = "=4.0.0"
arrow-array = "=57.3.0"
arrow-schema = "=57.3.0"
tokio = { version = "=1.49.0", features = ["macros", "rt-multi-thread"] }
futures = "0.3"
[dev-dependencies]
tempfile = "3"
[workspace]

View file

@ -0,0 +1,152 @@
// Investigation repro for MR-766 (CAS-granularity).
//
// Two tests on a __manifest-shaped Lance dataset:
// - without_pk_annotation: today's __manifest schema. Two writers
// concurrently insert rows with the same `object_id`. Expectation:
// both succeed (silent duplicate) — proving the publisher has no
// row-level CAS today.
// - with_pk_annotation: same setup but `object_id` carries
// `lance-schema:unenforced-primary-key=true`. Expectation: the
// second writer fails with TooMuchWriteContention.
//
// Run: cargo test -- --nocapture
use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use futures::stream::StreamExt;
use lance::Dataset;
use lance::dataset::{
InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched,
};
#[cfg(test)]
use lance::Error;
fn schema(with_pk: bool) -> Arc<Schema> {
let mut object_id = Field::new("object_id", DataType::Utf8, false);
if with_pk {
let mut md = HashMap::new();
md.insert(
"lance-schema:unenforced-primary-key".to_string(),
"true".to_string(),
);
object_id = object_id.with_metadata(md);
}
Arc::new(Schema::new(vec![
object_id,
Field::new("metadata", DataType::Utf8, true),
]))
}
fn batch(schema: Arc<Schema>, object_id: &str, metadata: &str) -> RecordBatch {
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec![object_id])),
Arc::new(StringArray::from(vec![Some(metadata)])),
],
)
.unwrap()
}
async fn count_rows_with_object_id(ds: &Dataset, object_id: &str) -> usize {
let mut scan = ds.scan();
scan.filter(&format!("object_id = '{}'", object_id)).unwrap();
let mut total = 0;
let mut stream = scan.try_into_stream().await.unwrap();
while let Some(b) = stream.next().await {
total += b.unwrap().num_rows();
}
total
}
async fn baseline_dataset(uri: &str, with_pk: bool) -> Dataset {
let s = schema(with_pk);
// Seed with a single distinct row so the dataset has at least one fragment;
// the conflict is on a different object_id.
let seed = batch(s.clone(), "table:Person", "{}");
InsertBuilder::new(uri).execute(vec![seed]).await.unwrap()
}
async fn try_merge_insert(
base: Arc<Dataset>,
schema: Arc<Schema>,
new_object_id: &str,
metadata: &str,
) -> lance::Result<()> {
let job = MergeInsertBuilder::try_new(base, vec!["object_id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.conflict_retries(0)
.try_build()
.unwrap();
let b = batch(schema.clone(), new_object_id, metadata);
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(b)], schema);
job.execute_reader(Box::new(reader)).await.map(|_| ())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn without_pk_annotation_concurrent_inserts_both_succeed() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = baseline_dataset(uri, false).await;
let base = Arc::new(ds);
// Both writers see the same baseline dataset (same read_version).
// Both compute the same "next-version row" object_id.
try_merge_insert(base.clone(), schema(false), "version:T@v=1", "{\"by\":\"A\"}")
.await
.expect("first writer should succeed");
try_merge_insert(base.clone(), schema(false), "version:T@v=1", "{\"by\":\"B\"}")
.await
.expect("second writer should also succeed (no CAS)");
// Open at head and count duplicates.
let head = Dataset::open(uri).await.unwrap();
let n = count_rows_with_object_id(&head, "version:T@v=1").await;
println!("[without_pk] duplicate rows after both commits: {}", n);
assert!(
n >= 2,
"without unenforced-primary-key, both writers landed rows -> n={}",
n
);
}
#[tokio::test]
async fn with_pk_annotation_concurrent_inserts_second_fails() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let ds = baseline_dataset(uri, true).await;
let base = Arc::new(ds);
try_merge_insert(base.clone(), schema(true), "version:T@v=1", "{\"by\":\"A\"}")
.await
.expect("first writer should succeed");
let result = try_merge_insert(
base.clone(),
schema(true),
"version:T@v=1",
"{\"by\":\"B\"}",
)
.await;
match result {
Err(Error::TooMuchWriteContention { .. }) => {
println!("[with_pk] second writer correctly rejected with TooMuchWriteContention");
}
other => panic!("expected TooMuchWriteContention, got: {:?}", other),
}
let head = Dataset::open(uri).await.unwrap();
let n = count_rows_with_object_id(&head, "version:T@v=1").await;
assert_eq!(n, 1, "exactly one row should be visible");
}
}