mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Merge pull request #63 from ModernRelay/claude/extend-manifest-batch-publisher-GQzB6
This commit is contained in:
commit
4e5374a85e
16 changed files with 8112 additions and 23 deletions
134
.context/merge-insert-cas-granularity.md
Normal file
134
.context/merge-insert-cas-granularity.md
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
# MergeInsertBuilder CAS granularity (Lance 4.0.0)
|
||||
|
||||
**Status:** investigated (2026-04-28)
|
||||
**Consumed by:** publisher per-table version API ticket — see "Implication" below.
|
||||
**Companion ticket:** zombie-run investigation, `Prerequisite` section under "The demotion".
|
||||
**Repro:** `.context/scratch/merge-insert-cas-repro/` — `cargo test --release -- --nocapture`. Both tests pass against `lance = "=4.0.0"`, demonstrating the answer.
|
||||
|
||||
## TL;DR
|
||||
|
||||
**Lance v4.0.0 supports row-level CAS for `MergeInsertBuilder` — but only when the join-key columns are annotated `lance-schema:unenforced-primary-key=true`.** Without that annotation, two concurrent `execute_reader()` calls inserting the same key into disjoint fragments **both succeed silently**, producing duplicate rows under the same key.
|
||||
|
||||
Our `__manifest` schema (`crates/omnigraph/src/db/manifest/state.rs:44-60`) does **not** carry that annotation, so today's `GraphNamespacePublisher::publish` has **no row-level CAS protection**. The TOCTOU window between `load_publish_state` and the merge-insert commit is open — two concurrent publishers computing the same `(table_key, table_version)` row both land it.
|
||||
|
||||
## Implication for the publisher API ticket
|
||||
|
||||
Take both layers, not one or the other:
|
||||
|
||||
1. **Annotate `object_id` (or the version-row subset) `lance-schema:unenforced-primary-key=true`** in `manifest_schema()` so Lance enforces row-level CAS at commit time. This closes the silent-duplicate hole that exists today, independent of the new feature.
|
||||
2. **Add a pre-check phase in `publish` that validates `expected_table_versions` against the manifest snapshot** loaded by `load_publish_state`. The pre-check covers "expected unchanged" assertions for tables the caller is *not* writing to (Lance's row-level CAS only covers rows we *are* writing).
|
||||
3. **Set `merge_builder.conflict_retries(0)`** and let the publisher's own caller-level retry loop handle the rebase: refresh manifest, re-run pre-check, re-attempt merge-insert. This gives strict atomicity for the per-table expected-version contract; Lance's auto-rebase would otherwise let our commit through against unfamiliar manifest state.
|
||||
|
||||
The ticket's "Approach 1 (parameter-only)" by itself is **insufficient** — Lance row-level CAS only catches collisions on rows we are emitting; it does not enforce "expected unchanged" for untouched tables. The ticket's "Approach 2 (pre-check + CAS)" is correct **only if** the row-level CAS is also enabled, otherwise the pre-check is TOCTOU-vulnerable and the existing publisher already has the silent-duplicate bug.
|
||||
|
||||
## How Lance does it (source-quoted)
|
||||
|
||||
### 1. Filter is built only when ON columns match the unenforced primary key
|
||||
|
||||
`rust/lance/src/dataset/write/merge_insert/exec/write.rs:209-221`:
|
||||
|
||||
```rust
|
||||
// Check if ON columns match the schema's unenforced primary key
|
||||
let field_ids: Vec<i32> = params
|
||||
.on
|
||||
.iter()
|
||||
.filter_map(|name| dataset.schema().field(name).map(|f| f.id))
|
||||
.collect();
|
||||
let pk_field_ids: Vec<i32> = dataset
|
||||
.schema()
|
||||
.unenforced_primary_key()
|
||||
.iter()
|
||||
.map(|f| f.id)
|
||||
.collect();
|
||||
let is_primary_key = !pk_field_ids.is_empty() && field_ids == pk_field_ids;
|
||||
```
|
||||
|
||||
The filter is then attached only when `is_primary_key`:
|
||||
`rust/lance/src/dataset/write/merge_insert/exec/write.rs:903-928`:
|
||||
|
||||
```rust
|
||||
let inserted_rows_filter = if is_primary_key {
|
||||
// ... build KeyExistenceFilter (bloom or exact set)
|
||||
};
|
||||
// ...
|
||||
inserted_rows_filter: inserted_rows_filter.clone(),
|
||||
```
|
||||
|
||||
The PK is sourced from field metadata; from `inserted_rows.rs:158`:
|
||||
|
||||
> `/// Tracks keys of inserted rows for conflict detection.`
|
||||
> `/// Only created when ON columns match the schema's unenforced primary key.`
|
||||
|
||||
### 2. Conflict detection compares the bloom filters at commit time
|
||||
|
||||
`rust/lance/src/io/commit/conflict_resolver.rs::check_update_txn` (line 328+):
|
||||
|
||||
```rust
|
||||
match (self_inserted_rows_filter, other_inserted_rows_filter) {
|
||||
(Some(self_keys), Some(other_keys)) => {
|
||||
// ...
|
||||
let Ok((has_intersection, _maybe_false_positive)) =
|
||||
self_keys.intersects(other_keys)
|
||||
else { /* treat as conflict */ };
|
||||
if has_intersection {
|
||||
return Err(self.retryable_conflict_err(other_transaction, other_version));
|
||||
}
|
||||
}
|
||||
(Some(_), None) => {
|
||||
// Pessimistic: filter present on this side only -> conflict.
|
||||
return Err(self.retryable_conflict_err(other_transaction, other_version));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
```
|
||||
|
||||
`RetryableCommitConflict` is consumed by the merge-insert retry loop (`rust/lance/src/dataset/write/retry.rs:97`); when retries exhaust it surfaces as `Error::TooMuchWriteContention`.
|
||||
|
||||
### 3. Without the filter — fragment overlap only
|
||||
|
||||
If the filter is `None` on one side (no PK annotation), `check_update_txn` falls through to fragment-overlap checks (line 382+). For two `Operation::Update` transactions where each writer's merge-insert emits a **new** fragment with the new row, neither side's `updated_fragments` / `removed_fragment_ids` overlaps the other's `modified_fragment_ids`. The conflict resolver returns `Ok` and **both commits land**.
|
||||
|
||||
This is exactly what merge_insert does on a not-matched insert: `rust/lance/src/dataset/write/merge_insert.rs:1574` builds `Operation::Update` with `inserted_rows_filter: None` (the v1 path) — though the more common v2/`FullSchemaMergeInsertExec` path also leaves the filter `None` unless `is_primary_key` flipped to true at plan time.
|
||||
|
||||
### 4. Today's `__manifest` schema does NOT have the annotation
|
||||
|
||||
`crates/omnigraph/src/db/manifest/state.rs:44-60`:
|
||||
|
||||
```rust
|
||||
pub(super) fn manifest_schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
Field::new("object_id", DataType::Utf8, false), // <-- no metadata
|
||||
// ...
|
||||
]))
|
||||
}
|
||||
```
|
||||
|
||||
`crates/omnigraph/src/db/manifest/publisher.rs:286` joins on `object_id`. With no PK metadata, `is_primary_key` is `false` for every commit; `inserted_rows_filter` is always `None`; the fallback fragment-overlap check accepts the duplicate.
|
||||
|
||||
## Empirical confirmation
|
||||
|
||||
`.context/scratch/merge-insert-cas-repro/src/lib.rs` builds two tests against a `__manifest`-shaped schema, deliberately using `MergeInsertBuilder` with the same `on=["object_id"]`, both writers based on the same dataset version:
|
||||
|
||||
- `without_pk_annotation_concurrent_inserts_both_succeed` — passes. Test output: `[without_pk] duplicate rows after both commits: 2`.
|
||||
- `with_pk_annotation_concurrent_inserts_second_fails` — passes. Test output: `[with_pk] second writer correctly rejected with TooMuchWriteContention`.
|
||||
|
||||
Run:
|
||||
```sh
|
||||
cd .context/scratch/merge-insert-cas-repro
|
||||
cargo test --release -- --nocapture
|
||||
```
|
||||
|
||||
## Recommended publisher implementation outline
|
||||
|
||||
1. In `manifest_schema()` (`crates/omnigraph/src/db/manifest/state.rs:44`): attach `lance-schema:unenforced-primary-key=true` metadata to `object_id`.
|
||||
2. Extend `ManifestBatchPublisher::publish` (`crates/omnigraph/src/db/manifest/publisher.rs:42-44`) to take `expected_table_versions: HashMap<String, u64>` (empty = back-compat).
|
||||
3. In `GraphNamespacePublisher::publish`, after `load_publish_state` (`publisher.rs:77`), reduce `existing_versions` to "latest non-tombstoned version per table" (mirror `state.rs:65-94`) and reject any expectation that doesn't match with a typed `manifest_conflict { table_key, expected, actual }`. Add the structured variant on `ManifestError` (`crates/omnigraph/src/error.rs:5-46`).
|
||||
4. Switch `merge_builder.conflict_retries(5)` to `(0)` (`publisher.rs:290`) so a concurrent commit fails fast instead of silently rebasing past our pre-check.
|
||||
5. Wrap the publish in a bounded retry loop at the publisher level: on `TooMuchWriteContention` or `CommitConflict`, refresh manifest, re-run pre-check, re-attempt — bounded retries (5).
|
||||
6. Existing tests at `crates/omnigraph/src/db/manifest/tests.rs:680-728` are the right place to extend with stale-expected-version cases.
|
||||
|
||||
## Out of scope here
|
||||
|
||||
- Whether to apply the same PK annotation to `_graph_commits.lance` / `_graph_runs.lance` — separate review when those code paths land in scope.
|
||||
- Schema-migration story for existing manifests written before the annotation lands. Adding field metadata is a non-breaking schema change in Lance, but worth confirming with one round of `optimize` semantics before deploying.
|
||||
- Whether the fast-fail (`conflict_retries(0)` + caller-level retry) is preferable to Lance's built-in rebase. The argument here is correctness: Lance's rebase is "transparent merge", which is the wrong semantic for an OCC contract.
|
||||
6962
.context/scratch/merge-insert-cas-repro/Cargo.lock
generated
Normal file
6962
.context/scratch/merge-insert-cas-repro/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
20
.context/scratch/merge-insert-cas-repro/Cargo.toml
Normal file
20
.context/scratch/merge-insert-cas-repro/Cargo.toml
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
[package]
|
||||
name = "merge-insert-cas-repro"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
publish = false
|
||||
|
||||
[lib]
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
lance = "=4.0.0"
|
||||
arrow-array = "=57.3.0"
|
||||
arrow-schema = "=57.3.0"
|
||||
tokio = { version = "=1.49.0", features = ["macros", "rt-multi-thread"] }
|
||||
futures = "0.3"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
|
||||
[workspace]
|
||||
152
.context/scratch/merge-insert-cas-repro/src/lib.rs
Normal file
152
.context/scratch/merge-insert-cas-repro/src/lib.rs
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
// Investigation repro for MR-766 (CAS-granularity).
|
||||
//
|
||||
// Two tests on a __manifest-shaped Lance dataset:
|
||||
// - without_pk_annotation: today's __manifest schema. Two writers
|
||||
// concurrently insert rows with the same `object_id`. Expectation:
|
||||
// both succeed (silent duplicate) — proving the publisher has no
|
||||
// row-level CAS today.
|
||||
// - with_pk_annotation: same setup but `object_id` carries
|
||||
// `lance-schema:unenforced-primary-key=true`. Expectation: the
|
||||
// second writer fails with TooMuchWriteContention.
|
||||
//
|
||||
// Run: cargo test -- --nocapture
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::{RecordBatch, StringArray};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use futures::stream::StreamExt;
|
||||
use lance::Dataset;
|
||||
use lance::dataset::{
|
||||
InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
use lance::Error;
|
||||
|
||||
fn schema(with_pk: bool) -> Arc<Schema> {
|
||||
let mut object_id = Field::new("object_id", DataType::Utf8, false);
|
||||
if with_pk {
|
||||
let mut md = HashMap::new();
|
||||
md.insert(
|
||||
"lance-schema:unenforced-primary-key".to_string(),
|
||||
"true".to_string(),
|
||||
);
|
||||
object_id = object_id.with_metadata(md);
|
||||
}
|
||||
Arc::new(Schema::new(vec![
|
||||
object_id,
|
||||
Field::new("metadata", DataType::Utf8, true),
|
||||
]))
|
||||
}
|
||||
|
||||
fn batch(schema: Arc<Schema>, object_id: &str, metadata: &str) -> RecordBatch {
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec![object_id])),
|
||||
Arc::new(StringArray::from(vec![Some(metadata)])),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn count_rows_with_object_id(ds: &Dataset, object_id: &str) -> usize {
|
||||
let mut scan = ds.scan();
|
||||
scan.filter(&format!("object_id = '{}'", object_id)).unwrap();
|
||||
let mut total = 0;
|
||||
let mut stream = scan.try_into_stream().await.unwrap();
|
||||
while let Some(b) = stream.next().await {
|
||||
total += b.unwrap().num_rows();
|
||||
}
|
||||
total
|
||||
}
|
||||
|
||||
async fn baseline_dataset(uri: &str, with_pk: bool) -> Dataset {
|
||||
let s = schema(with_pk);
|
||||
// Seed with a single distinct row so the dataset has at least one fragment;
|
||||
// the conflict is on a different object_id.
|
||||
let seed = batch(s.clone(), "table:Person", "{}");
|
||||
InsertBuilder::new(uri).execute(vec![seed]).await.unwrap()
|
||||
}
|
||||
|
||||
async fn try_merge_insert(
|
||||
base: Arc<Dataset>,
|
||||
schema: Arc<Schema>,
|
||||
new_object_id: &str,
|
||||
metadata: &str,
|
||||
) -> lance::Result<()> {
|
||||
let job = MergeInsertBuilder::try_new(base, vec!["object_id".to_string()])
|
||||
.unwrap()
|
||||
.when_matched(WhenMatched::UpdateAll)
|
||||
.when_not_matched(WhenNotMatched::InsertAll)
|
||||
.conflict_retries(0)
|
||||
.try_build()
|
||||
.unwrap();
|
||||
let b = batch(schema.clone(), new_object_id, metadata);
|
||||
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(b)], schema);
|
||||
job.execute_reader(Box::new(reader)).await.map(|_| ())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn without_pk_annotation_concurrent_inserts_both_succeed() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let ds = baseline_dataset(uri, false).await;
|
||||
let base = Arc::new(ds);
|
||||
|
||||
// Both writers see the same baseline dataset (same read_version).
|
||||
// Both compute the same "next-version row" object_id.
|
||||
try_merge_insert(base.clone(), schema(false), "version:T@v=1", "{\"by\":\"A\"}")
|
||||
.await
|
||||
.expect("first writer should succeed");
|
||||
try_merge_insert(base.clone(), schema(false), "version:T@v=1", "{\"by\":\"B\"}")
|
||||
.await
|
||||
.expect("second writer should also succeed (no CAS)");
|
||||
|
||||
// Open at head and count duplicates.
|
||||
let head = Dataset::open(uri).await.unwrap();
|
||||
let n = count_rows_with_object_id(&head, "version:T@v=1").await;
|
||||
println!("[without_pk] duplicate rows after both commits: {}", n);
|
||||
assert!(
|
||||
n >= 2,
|
||||
"without unenforced-primary-key, both writers landed rows -> n={}",
|
||||
n
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn with_pk_annotation_concurrent_inserts_second_fails() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let ds = baseline_dataset(uri, true).await;
|
||||
let base = Arc::new(ds);
|
||||
|
||||
try_merge_insert(base.clone(), schema(true), "version:T@v=1", "{\"by\":\"A\"}")
|
||||
.await
|
||||
.expect("first writer should succeed");
|
||||
let result = try_merge_insert(
|
||||
base.clone(),
|
||||
schema(true),
|
||||
"version:T@v=1",
|
||||
"{\"by\":\"B\"}",
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(Error::TooMuchWriteContention { .. }) => {
|
||||
println!("[with_pk] second writer correctly rejected with TooMuchWriteContention");
|
||||
}
|
||||
other => panic!("expected TooMuchWriteContention, got: {:?}", other),
|
||||
}
|
||||
|
||||
let head = Dataset::open(uri).await.unwrap();
|
||||
let n = count_rows_with_object_id(&head, "version:T@v=1").await;
|
||||
assert_eq!(n, 1, "exactly one row should be visible");
|
||||
}
|
||||
}
|
||||
20
AGENTS.md
20
AGENTS.md
|
|
@ -96,6 +96,26 @@ Full diagram and concurrency model: [docs/architecture.md](docs/architecture.md)
|
|||
|
||||
---
|
||||
|
||||
## First principle: minimize ongoing liability
|
||||
|
||||
Every decision — adding code, removing code, picking an abstraction, choosing a layer, writing a doc paragraph — carries an ongoing maintenance cost. Before any change, ask: **which option has the lower ongoing cost over time?** Not "shorter now," not "fastest to ship," but which leaves the codebase narrower in the long run.
|
||||
|
||||
This is a decision lens, not a code-size rule. It cuts both ways. Sometimes the lower-liability option is:
|
||||
|
||||
- **More code.** A centralized dispatcher costs more lines than an ad-hoc heal hook, but each future change adds a match arm instead of a new hook scattered through the engine.
|
||||
- **Less code.** Three similar lines that may diverge later cost less to maintain than a premature abstraction that has to be retrofitted every time a caller deviates.
|
||||
- **DRYing.** Two copies of business logic that must stay in sync are a perpetual drift risk.
|
||||
- **Duplication.** Two callers that look similar today but have independent evolution pressure shouldn't be wedged through a shared helper just because the lines match.
|
||||
- **Removal.** A "just in case" code path with no caller is pure surface area: tests for it, docs that mention it, future changes that have to consider it.
|
||||
- **Addition.** A migration framework, a typed error variant, a feature flag — each adds code now and lowers the cost of every future change in its surface.
|
||||
- **A new abstraction**, when the absence forces every consumer to re-derive the same logic. Or **flattening one**, when the abstraction has accumulated more special-cases than the code it replaced.
|
||||
|
||||
When evaluating a design, ask: *"what does this look like after 5 more changes like it?"* If the answer is "this converges to one shape", cost is bounded. If it's "this forks every time", the option is mortgaging the future for present convenience — pick differently.
|
||||
|
||||
The always-on rules below and the §IX deny-list in [docs/invariants.md](docs/invariants.md) are specific applications of this principle; when the rules are silent, fall back to it.
|
||||
|
||||
---
|
||||
|
||||
## Always-on rules (load these into your working memory)
|
||||
|
||||
These are architectural rules that need to be in scope on every change. They're framed at the level that survives renames and refactors — the deeper implementation specifics (function names, lock names, branch-prefix conventions, enforcement points) live in the per-area docs and may evolve. The full architectural invariants and deny-list are in [docs/invariants.md](docs/invariants.md); §IX (deny-list) is the fastest first-pass when reviewing any change.
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ use omnigraph_compiler::catalog::Catalog;
|
|||
mod layout;
|
||||
#[path = "manifest/metadata.rs"]
|
||||
mod metadata;
|
||||
#[path = "manifest/migrations.rs"]
|
||||
mod migrations;
|
||||
#[path = "manifest/namespace.rs"]
|
||||
mod namespace;
|
||||
#[path = "manifest/publisher.rs"]
|
||||
|
|
@ -286,12 +288,43 @@ impl ManifestCoordinator {
|
|||
self.commit_changes(&changes).await
|
||||
}
|
||||
|
||||
/// Same as [`commit`], but with caller-supplied per-table expected
|
||||
/// versions used for optimistic concurrency control. Each entry asserts
|
||||
/// the manifest's current latest non-tombstoned `table_version` for that
|
||||
/// `table_key` is exactly what the caller observed; mismatches surface
|
||||
/// as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch`.
|
||||
pub async fn commit_with_expected(
|
||||
&mut self,
|
||||
updates: &[SubTableUpdate],
|
||||
expected_table_versions: &HashMap<String, u64>,
|
||||
) -> Result<u64> {
|
||||
let changes = updates
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(ManifestChange::Update)
|
||||
.collect::<Vec<_>>();
|
||||
self.commit_changes_with_expected(&changes, expected_table_versions)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn commit_changes(&mut self, changes: &[ManifestChange]) -> Result<u64> {
|
||||
if changes.is_empty() {
|
||||
self.commit_changes_with_expected(changes, &HashMap::new())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn commit_changes_with_expected(
|
||||
&mut self,
|
||||
changes: &[ManifestChange],
|
||||
expected_table_versions: &HashMap<String, u64>,
|
||||
) -> Result<u64> {
|
||||
if changes.is_empty() && expected_table_versions.is_empty() {
|
||||
return Ok(self.version());
|
||||
}
|
||||
|
||||
self.dataset = self.publisher.publish(changes).await?;
|
||||
self.dataset = self
|
||||
.publisher
|
||||
.publish(changes, expected_table_versions)
|
||||
.await?;
|
||||
|
||||
self.known_state = read_manifest_state(&self.dataset).await?;
|
||||
Ok(self.version())
|
||||
|
|
|
|||
131
crates/omnigraph/src/db/manifest/migrations.rs
Normal file
131
crates/omnigraph/src/db/manifest/migrations.rs
Normal file
|
|
@ -0,0 +1,131 @@
|
|||
//! Internal schema migrations for the `__manifest` Lance dataset.
|
||||
//!
|
||||
//! ## Why this exists
|
||||
//!
|
||||
//! The on-disk shape of `__manifest` evolves alongside the engine. We do not
|
||||
//! want healing hooks scattered through every read/write path that ask
|
||||
//! "is this an old shape? am I supposed to upgrade it?" — that pattern
|
||||
//! accrues liability with every change. Instead this module is the *single*
|
||||
//! place where on-disk shape is reconciled with what the binary expects:
|
||||
//!
|
||||
//! - One constant `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape this
|
||||
//! binary writes.
|
||||
//! - One stamp `omnigraph:internal_schema_version` in the manifest dataset's
|
||||
//! schema-level metadata records the on-disk shape.
|
||||
//! - One dispatcher `migrate_internal_schema` walks the on-disk stamp forward
|
||||
//! to the expected version via `match`-arm steps. Each future change adds
|
||||
//! one arm + one test, never a new branch in unrelated code paths.
|
||||
//!
|
||||
//! After the dispatcher runs, the rest of the engine assumes current shape.
|
||||
//! No code outside this module should ever inspect the stamp.
|
||||
//!
|
||||
//! ## When it runs
|
||||
//!
|
||||
//! Only on open-for-write paths (the publisher's `load_publish_state`).
|
||||
//! Reads are side-effect-free by contract; an old-shape `__manifest` reads
|
||||
//! fine, it just lacks the protections introduced by later versions.
|
||||
//! `init_manifest_repo` stamps the current version at creation, so newly
|
||||
//! initialized repos never need migration.
|
||||
//!
|
||||
//! ## Forward-version protection
|
||||
//!
|
||||
//! A stamp *higher* than this binary's known version triggers a clear
|
||||
//! "upgrade omnigraph first" error. An old binary cannot clobber a newer
|
||||
//! schema by silently treating "unknown stamp" as "missing stamp".
|
||||
|
||||
use lance::Dataset;
|
||||
|
||||
use crate::error::{OmniError, Result};
|
||||
|
||||
/// Current internal schema version this binary expects to find on disk.
|
||||
///
|
||||
/// History:
|
||||
/// - v1 — implicit (pre-stamp). `__manifest.object_id` carried no
|
||||
/// `lance-schema:unenforced-primary-key` annotation; the publisher had
|
||||
/// no row-level CAS protection (see `.context/merge-insert-cas-granularity.md`).
|
||||
/// - v2 — `__manifest.object_id` carries the unenforced-PK annotation,
|
||||
/// engaging Lance's bloom-filter conflict resolver at commit time. Added
|
||||
/// alongside `expected_table_versions` OCC on `ManifestBatchPublisher::publish`.
|
||||
pub(super) const INTERNAL_MANIFEST_SCHEMA_VERSION: u32 = 2;
|
||||
|
||||
const INTERNAL_SCHEMA_VERSION_KEY: &str = "omnigraph:internal_schema_version";
|
||||
const OBJECT_ID_PK_KEY: &str = "lance-schema:unenforced-primary-key";
|
||||
|
||||
/// Read the on-disk stamp from `__manifest`'s schema-level metadata.
|
||||
/// Absent ⇒ v1 (pre-stamp world).
|
||||
pub(super) fn read_stamp(dataset: &Dataset) -> u32 {
|
||||
dataset
|
||||
.schema()
|
||||
.metadata
|
||||
.get(INTERNAL_SCHEMA_VERSION_KEY)
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(1)
|
||||
}
|
||||
|
||||
/// Stamp a freshly-initialized manifest with the current internal schema
|
||||
/// version. Idempotent — safe to call on an already-stamped dataset.
|
||||
pub(super) async fn stamp_current_version(dataset: &mut Dataset) -> Result<()> {
|
||||
set_stamp(dataset, INTERNAL_MANIFEST_SCHEMA_VERSION).await
|
||||
}
|
||||
|
||||
/// Apply any pending internal-schema migrations to the manifest dataset.
|
||||
///
|
||||
/// Idempotent: when the on-disk stamp matches the binary, this is a single
|
||||
/// metadata read with no writes.
|
||||
pub(super) async fn migrate_internal_schema(dataset: &mut Dataset) -> Result<()> {
|
||||
let mut current = read_stamp(dataset);
|
||||
|
||||
if current > INTERNAL_MANIFEST_SCHEMA_VERSION {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"__manifest is stamped at internal schema v{} but this binary expects v{} \
|
||||
— upgrade omnigraph before opening this repo for writes",
|
||||
current, INTERNAL_MANIFEST_SCHEMA_VERSION,
|
||||
)));
|
||||
}
|
||||
|
||||
while current < INTERNAL_MANIFEST_SCHEMA_VERSION {
|
||||
match current {
|
||||
1 => {
|
||||
migrate_v1_to_v2(dataset).await?;
|
||||
current = 2;
|
||||
}
|
||||
other => {
|
||||
return Err(OmniError::manifest_internal(format!(
|
||||
"no internal-schema migration registered for v{} → v{}",
|
||||
other,
|
||||
other + 1,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// v1 → v2: annotate `__manifest.object_id` as Lance's unenforced primary key
|
||||
/// so the merge-insert conflict resolver enforces row-level CAS at commit
|
||||
/// time, then bump the stamp.
|
||||
///
|
||||
/// Both steps are idempotent under retry: re-applying the field annotation
|
||||
/// at its current value is a no-op-ish bump in Lance, and the stamp is a
|
||||
/// simple key-value write. A crash between the two leaves the field set
|
||||
/// without a stamp; the next open re-runs this fn and only the stamp lands.
|
||||
async fn migrate_v1_to_v2(dataset: &mut Dataset) -> Result<()> {
|
||||
dataset
|
||||
.update_field_metadata()
|
||||
.update("object_id", [(OBJECT_ID_PK_KEY.to_string(), "true".to_string())])
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
set_stamp(dataset, 2).await
|
||||
}
|
||||
|
||||
async fn set_stamp(dataset: &mut Dataset, version: u32) -> Result<()> {
|
||||
dataset
|
||||
.update_schema_metadata([(
|
||||
INTERNAL_SCHEMA_VERSION_KEY.to_string(),
|
||||
version.to_string(),
|
||||
)])
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -21,6 +21,7 @@ use std::sync::Arc;
|
|||
use arrow_array::RecordBatchIterator;
|
||||
use async_trait::async_trait;
|
||||
use lance::Dataset;
|
||||
use lance::Error as LanceError;
|
||||
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
|
||||
use lance_namespace::NamespaceError;
|
||||
use lance_namespace::models::CreateTableVersionRequest;
|
||||
|
|
@ -29,6 +30,7 @@ use crate::error::{OmniError, Result};
|
|||
|
||||
use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id};
|
||||
use super::metadata::parse_namespace_version_request;
|
||||
use super::migrations::migrate_internal_schema;
|
||||
use super::state::{
|
||||
manifest_rows_batch, manifest_schema, read_manifest_entries, read_registered_table_locations,
|
||||
read_tombstone_versions,
|
||||
|
|
@ -38,9 +40,20 @@ use super::{
|
|||
SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone,
|
||||
};
|
||||
|
||||
/// Bound on the publisher-level retry loop that wraps Lance's row-level CAS
|
||||
/// (`TooMuchWriteContention`). Lance's own `conflict_retries` is set to 0 in
|
||||
/// `merge_rows` because its auto-rebase is "transparent merge" semantics —
|
||||
/// wrong for an OCC contract — so retry is owned here instead, where each
|
||||
/// iteration re-runs `load_publish_state` and the expected-version pre-check.
|
||||
const PUBLISHER_RETRY_BUDGET: u32 = 5;
|
||||
|
||||
#[async_trait]
|
||||
pub(super) trait ManifestBatchPublisher: Send + Sync {
|
||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset>;
|
||||
async fn publish(
|
||||
&self,
|
||||
changes: &[ManifestChange],
|
||||
expected_table_versions: &HashMap<String, u64>,
|
||||
) -> Result<Dataset>;
|
||||
}
|
||||
|
||||
pub(super) struct GraphNamespacePublisher {
|
||||
|
|
@ -82,7 +95,11 @@ impl GraphNamespacePublisher {
|
|||
HashMap<(String, u64), SubTableEntry>,
|
||||
HashMap<(String, u64), ()>,
|
||||
)> {
|
||||
let dataset = self.dataset().await?;
|
||||
let mut dataset = self.dataset().await?;
|
||||
// Run pending internal-schema migrations exactly once per publish on
|
||||
// the open-for-write path; idempotent when the on-disk stamp already
|
||||
// matches this binary. See `db/manifest/migrations.rs`.
|
||||
migrate_internal_schema(&mut dataset).await?;
|
||||
let registered_tables = read_registered_table_locations(&dataset).await?;
|
||||
let existing_entries = read_manifest_entries(&dataset).await?;
|
||||
let existing_versions = existing_entries
|
||||
|
|
@ -279,6 +296,77 @@ impl GraphNamespacePublisher {
|
|||
)
|
||||
}
|
||||
|
||||
/// Reduce the loaded `(table_key, table_version) → entry` map and the
|
||||
/// tombstone set to "latest non-tombstoned version per table" — the same
|
||||
/// reduction performed by `read_manifest_state` on the visible snapshot.
|
||||
/// Tombstoned tables fall back to their highest tombstone version so that
|
||||
/// the resulting `actual` reported in `ExpectedVersionMismatch` is
|
||||
/// meaningful even when the caller's expected table no longer exists.
|
||||
fn latest_visible_per_table(
|
||||
existing_versions: &HashMap<(String, u64), SubTableEntry>,
|
||||
existing_tombstones: &HashMap<(String, u64), ()>,
|
||||
) -> HashMap<String, u64> {
|
||||
let mut max_tombstones = HashMap::<String, u64>::new();
|
||||
for (key, version) in existing_tombstones.keys() {
|
||||
max_tombstones
|
||||
.entry(key.clone())
|
||||
.and_modify(|v| {
|
||||
if *version > *v {
|
||||
*v = *version;
|
||||
}
|
||||
})
|
||||
.or_insert(*version);
|
||||
}
|
||||
|
||||
let mut latest = HashMap::<String, u64>::new();
|
||||
for (key, version) in existing_versions.keys() {
|
||||
let tombstoned = max_tombstones
|
||||
.get(key)
|
||||
.map(|t| *t >= *version)
|
||||
.unwrap_or(false);
|
||||
if tombstoned {
|
||||
continue;
|
||||
}
|
||||
latest
|
||||
.entry(key.clone())
|
||||
.and_modify(|v| {
|
||||
if *version > *v {
|
||||
*v = *version;
|
||||
}
|
||||
})
|
||||
.or_insert(*version);
|
||||
}
|
||||
|
||||
// For tables that have only tombstones (no visible entry), surface the
|
||||
// tombstone version so callers see a non-zero `actual`.
|
||||
for (key, tombstone) in &max_tombstones {
|
||||
latest.entry(key.clone()).or_insert(*tombstone);
|
||||
}
|
||||
|
||||
latest
|
||||
}
|
||||
|
||||
/// Compare each caller-supplied expectation against the manifest's current
|
||||
/// latest visible version per table. The first mismatch is returned as a
|
||||
/// typed `ExpectedVersionMismatch` (`actual = 0` if the table isn't in the
|
||||
/// manifest at all).
|
||||
fn check_expected_table_versions(
|
||||
latest_per_table: &HashMap<String, u64>,
|
||||
expected: &HashMap<String, u64>,
|
||||
) -> Result<()> {
|
||||
for (table_key, expected_version) in expected {
|
||||
let actual = latest_per_table.get(table_key).copied().unwrap_or(0);
|
||||
if actual != *expected_version {
|
||||
return Err(OmniError::manifest_expected_version_mismatch(
|
||||
table_key.clone(),
|
||||
*expected_version,
|
||||
actual,
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn merge_rows(&self, dataset: Dataset, rows: Vec<PendingVersionRow>) -> Result<Dataset> {
|
||||
let batch = Self::pending_rows_to_batch(rows)?;
|
||||
let reader = RecordBatchIterator::new(vec![Ok(batch)], manifest_schema());
|
||||
|
|
@ -287,14 +375,18 @@ impl GraphNamespacePublisher {
|
|||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
merge_builder.when_matched(WhenMatched::UpdateAll);
|
||||
merge_builder.when_not_matched(WhenNotMatched::InsertAll);
|
||||
merge_builder.conflict_retries(5);
|
||||
// 0 here is intentional: Lance's built-in retry uses transparent rebase,
|
||||
// which would let a concurrent writer's row land alongside ours and
|
||||
// silently break the OCC contract on `__manifest`. Retries are owned by
|
||||
// the publisher loop above, where each attempt re-runs the pre-check.
|
||||
merge_builder.conflict_retries(0);
|
||||
merge_builder.use_index(false);
|
||||
let (new_dataset, _stats) = merge_builder
|
||||
.try_build()
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
.execute_reader(Box::new(reader))
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
.map_err(map_lance_publish_error)?;
|
||||
Ok(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone()))
|
||||
}
|
||||
|
||||
|
|
@ -318,25 +410,90 @@ impl GraphNamespacePublisher {
|
|||
}))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
self.publish(&changes).await
|
||||
self.publish(&changes, &HashMap::new()).await
|
||||
}
|
||||
}
|
||||
|
||||
/// `Error::TooMuchWriteContention` from Lance's row-level CAS bubbles up here
|
||||
/// when a concurrent writer landed a row with the same `object_id` (the
|
||||
/// merge-insert join key, annotated as an unenforced primary key on
|
||||
/// `__manifest`). Translate it to a typed manifest conflict so callers can
|
||||
/// match without parsing strings; everything else is opaque storage.
|
||||
fn map_lance_publish_error(err: LanceError) -> OmniError {
|
||||
if matches!(err, LanceError::TooMuchWriteContention { .. }) {
|
||||
return OmniError::manifest_row_level_cas_contention(format!(
|
||||
"manifest publish lost a row-level CAS race: {}",
|
||||
err
|
||||
));
|
||||
}
|
||||
OmniError::Lance(err.to_string())
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ManifestBatchPublisher for GraphNamespacePublisher {
|
||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset> {
|
||||
if changes.is_empty() {
|
||||
async fn publish(
|
||||
&self,
|
||||
changes: &[ManifestChange],
|
||||
expected_table_versions: &HashMap<String, u64>,
|
||||
) -> Result<Dataset> {
|
||||
if changes.is_empty() && expected_table_versions.is_empty() {
|
||||
return self.dataset().await;
|
||||
}
|
||||
|
||||
let (dataset, known_tables, existing_versions, existing_tombstones) =
|
||||
self.load_publish_state().await?;
|
||||
let rows = Self::build_pending_rows(
|
||||
changes,
|
||||
&known_tables,
|
||||
&existing_versions,
|
||||
&existing_tombstones,
|
||||
)?;
|
||||
self.merge_rows(dataset, rows).await
|
||||
for attempt in 0..=PUBLISHER_RETRY_BUDGET {
|
||||
let (dataset, known_tables, existing_versions, existing_tombstones) =
|
||||
self.load_publish_state().await?;
|
||||
|
||||
let latest_per_table =
|
||||
Self::latest_visible_per_table(&existing_versions, &existing_tombstones);
|
||||
// Pre-check on every attempt against freshly loaded state so a
|
||||
// concurrent commit that broke the caller's expectation is
|
||||
// surfaced as `ExpectedVersionMismatch` rather than retried.
|
||||
Self::check_expected_table_versions(&latest_per_table, expected_table_versions)?;
|
||||
|
||||
if changes.is_empty() {
|
||||
return Ok(dataset);
|
||||
}
|
||||
|
||||
let rows = Self::build_pending_rows(
|
||||
changes,
|
||||
&known_tables,
|
||||
&existing_versions,
|
||||
&existing_tombstones,
|
||||
)?;
|
||||
|
||||
match self.merge_rows(dataset, rows).await {
|
||||
Ok(new_dataset) => return Ok(new_dataset),
|
||||
Err(err) => {
|
||||
if attempt < PUBLISHER_RETRY_BUDGET && is_retryable_publish_conflict(&err) {
|
||||
continue;
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(OmniError::manifest_conflict(format!(
|
||||
"manifest publish exhausted {} retries against concurrent writers",
|
||||
PUBLISHER_RETRY_BUDGET
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// A retryable conflict here means: Lance's row-level CAS rejected our commit
|
||||
/// because someone else landed an `object_id` we were also inserting (mapped
|
||||
/// from `Error::TooMuchWriteContention` to
|
||||
/// `ManifestConflictDetails::RowLevelCasContention`). This is transparent
|
||||
/// contention; if the caller's `expected_table_versions` still holds against
|
||||
/// the new manifest state, we re-attempt. Other conflict variants (notably
|
||||
/// `ExpectedVersionMismatch`) propagate so the caller learns immediately.
|
||||
fn is_retryable_publish_conflict(err: &OmniError) -> bool {
|
||||
matches!(
|
||||
err,
|
||||
OmniError::Manifest(m)
|
||||
if matches!(
|
||||
m.details,
|
||||
Some(crate::error::ManifestConflictDetails::RowLevelCasContention)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ use crate::error::{OmniError, Result};
|
|||
use super::TABLE_VERSION_MANAGEMENT_KEY;
|
||||
use super::layout::{manifest_uri, open_manifest_dataset, type_name_hash};
|
||||
use super::metadata::TableVersionMetadata;
|
||||
use super::migrations::stamp_current_version;
|
||||
use super::state::{
|
||||
ManifestState, SubTableEntry, entries_to_batch, manifest_schema, read_manifest_state,
|
||||
};
|
||||
|
|
@ -40,6 +41,7 @@ pub(super) async fn init_manifest_repo(
|
|||
.update_config([(TABLE_VERSION_MANAGEMENT_KEY, Some("true"))])
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
stamp_current_version(&mut dataset).await?;
|
||||
|
||||
let known_state = read_manifest_state(&dataset).await?;
|
||||
Ok((dataset, known_state))
|
||||
|
|
|
|||
|
|
@ -42,8 +42,20 @@ struct ManifestScan {
|
|||
}
|
||||
|
||||
pub(super) fn manifest_schema() -> SchemaRef {
|
||||
// `object_id` is the merge-insert join key in the publisher; marking it as
|
||||
// Lance's unenforced primary key engages row-level CAS at commit time, so
|
||||
// two concurrent writers that try to land the same `object_id` row are
|
||||
// detected by Lance via bloom-filter intersection (see
|
||||
// `.context/merge-insert-cas-granularity.md`). Without this metadata,
|
||||
// Lance's conflict resolver would silently rebase both writers' new
|
||||
// fragments and admit duplicate rows.
|
||||
let object_id_metadata: HashMap<String, String> =
|
||||
[("lance-schema:unenforced-primary-key", "true")]
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect();
|
||||
Arc::new(Schema::new(vec![
|
||||
Field::new("object_id", DataType::Utf8, false),
|
||||
Field::new("object_id", DataType::Utf8, false).with_metadata(object_id_metadata),
|
||||
Field::new("object_type", DataType::Utf8, false),
|
||||
Field::new("location", DataType::Utf8, true),
|
||||
Field::new("metadata", DataType::Utf8, true),
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray, UInt64Array};
|
||||
|
|
@ -946,7 +947,11 @@ impl RecordingPublisher {
|
|||
|
||||
#[async_trait]
|
||||
impl ManifestBatchPublisher for RecordingPublisher {
|
||||
async fn publish(&self, changes: &[ManifestChange]) -> Result<Dataset> {
|
||||
async fn publish(
|
||||
&self,
|
||||
changes: &[ManifestChange],
|
||||
expected_table_versions: &HashMap<String, u64>,
|
||||
) -> Result<Dataset> {
|
||||
let requests: Vec<CreateTableVersionRequest> = changes
|
||||
.iter()
|
||||
.filter_map(|change| match change {
|
||||
|
|
@ -955,7 +960,7 @@ impl ManifestBatchPublisher for RecordingPublisher {
|
|||
})
|
||||
.collect();
|
||||
self.requests.lock().await.extend_from_slice(&requests);
|
||||
self.inner.publish(changes).await
|
||||
self.inner.publish(changes, expected_table_versions).await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -963,7 +968,11 @@ struct FailingPublisher;
|
|||
|
||||
#[async_trait]
|
||||
impl ManifestBatchPublisher for FailingPublisher {
|
||||
async fn publish(&self, _changes: &[ManifestChange]) -> Result<Dataset> {
|
||||
async fn publish(
|
||||
&self,
|
||||
_changes: &[ManifestChange],
|
||||
_expected_table_versions: &HashMap<String, u64>,
|
||||
) -> Result<Dataset> {
|
||||
Err(OmniError::manifest(
|
||||
"injected batch publisher failure".to_string(),
|
||||
))
|
||||
|
|
@ -1107,6 +1116,381 @@ async fn test_commit_failure_from_injected_batch_publisher_preserves_visible_sta
|
|||
);
|
||||
}
|
||||
|
||||
/// Drive Person to a fresh on-disk dataset version `v` (returns the new
|
||||
/// version number) and produce a `SubTableUpdate` ready to publish.
|
||||
async fn append_person_and_make_update(
|
||||
uri: &str,
|
||||
person_entry: &SubTableEntry,
|
||||
name: &str,
|
||||
) -> SubTableUpdate {
|
||||
let mut person_ds = Dataset::open(&format!("{}/{}", uri, person_entry.table_path))
|
||||
.await
|
||||
.unwrap();
|
||||
let person_schema = Arc::new(person_ds.schema().into());
|
||||
let row = RecordBatch::try_new(
|
||||
Arc::clone(&person_schema),
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec![format!("person-{name}")])),
|
||||
Arc::new(StringArray::from(vec![Some(name.to_string())])),
|
||||
Arc::new(Int32Array::from(vec![Some(30)])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader = RecordBatchIterator::new(vec![Ok(row)], person_schema);
|
||||
person_ds.append(reader, None).await.unwrap();
|
||||
let new_version = person_ds.version().version;
|
||||
let version_metadata =
|
||||
table_version_metadata_for_state(uri, &person_entry.table_path, None, new_version)
|
||||
.await
|
||||
.unwrap();
|
||||
SubTableUpdate {
|
||||
table_key: "node:Person".to_string(),
|
||||
table_version: new_version,
|
||||
table_branch: None,
|
||||
row_count: 1,
|
||||
version_metadata,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_with_expected_accepts_matching_versions() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
let snap = mc.snapshot();
|
||||
let person_entry = snap.entry("node:Person").unwrap().clone();
|
||||
|
||||
let update = append_person_and_make_update(uri, &person_entry, "Alice").await;
|
||||
let mut expected = HashMap::new();
|
||||
// After init, every table is at table_version=1 — assert that.
|
||||
expected.insert("node:Person".to_string(), 1);
|
||||
expected.insert("node:Company".to_string(), 1);
|
||||
|
||||
mc.commit_with_expected(&[update.clone()], &expected)
|
||||
.await
|
||||
.expect("matching expected versions should publish cleanly");
|
||||
|
||||
let after = mc.snapshot();
|
||||
assert_eq!(
|
||||
after.entry("node:Person").unwrap().table_version,
|
||||
update.table_version
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_with_expected_rejects_stale_with_typed_details() {
|
||||
use crate::error::ManifestConflictDetails;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
let snap = mc.snapshot();
|
||||
let person_entry = snap.entry("node:Person").unwrap().clone();
|
||||
|
||||
// Writer A advances Person.
|
||||
let update_a = append_person_and_make_update(uri, &person_entry, "Alice").await;
|
||||
let advanced_version = update_a.table_version;
|
||||
mc.commit(&[update_a]).await.unwrap();
|
||||
|
||||
// Writer B then tries to commit, asserting Person is still at v=1.
|
||||
let update_b = append_person_and_make_update(uri, &person_entry, "Bob").await;
|
||||
let mut stale_expected = HashMap::new();
|
||||
stale_expected.insert("node:Person".to_string(), 1);
|
||||
|
||||
let err = mc
|
||||
.commit_with_expected(&[update_b], &stale_expected)
|
||||
.await
|
||||
.expect_err("stale expected_table_versions should reject");
|
||||
|
||||
match err {
|
||||
OmniError::Manifest(m) => match m.details {
|
||||
Some(ManifestConflictDetails::ExpectedVersionMismatch {
|
||||
table_key,
|
||||
expected,
|
||||
actual,
|
||||
}) => {
|
||||
assert_eq!(table_key, "node:Person");
|
||||
assert_eq!(expected, 1);
|
||||
assert_eq!(actual, advanced_version);
|
||||
}
|
||||
other => panic!("expected ExpectedVersionMismatch details, got {:?}", other),
|
||||
},
|
||||
other => panic!("expected OmniError::Manifest, got {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_with_expected_catches_drift_on_untouched_table() {
|
||||
use crate::error::ManifestConflictDetails;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
let snap = mc.snapshot();
|
||||
let person_entry = snap.entry("node:Person").unwrap().clone();
|
||||
let company_entry = snap.entry("node:Company").unwrap().clone();
|
||||
|
||||
// Writer A advances Company.
|
||||
let mut company_ds = Dataset::open(&format!("{}/{}", uri, company_entry.table_path))
|
||||
.await
|
||||
.unwrap();
|
||||
let company_schema = Arc::new(company_ds.schema().into());
|
||||
let row = RecordBatch::try_new(
|
||||
Arc::clone(&company_schema),
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["company-1"])),
|
||||
Arc::new(StringArray::from(vec!["Acme"])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let reader = RecordBatchIterator::new(vec![Ok(row)], company_schema);
|
||||
company_ds.append(reader, None).await.unwrap();
|
||||
let company_version = company_ds.version().version;
|
||||
let company_metadata =
|
||||
table_version_metadata_for_state(uri, &company_entry.table_path, None, company_version)
|
||||
.await
|
||||
.unwrap();
|
||||
mc.commit(&[SubTableUpdate {
|
||||
table_key: "node:Company".to_string(),
|
||||
table_version: company_version,
|
||||
table_branch: None,
|
||||
row_count: 1,
|
||||
version_metadata: company_metadata,
|
||||
}])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Writer B writes Person but asserts Company is still at v=1.
|
||||
let update_person = append_person_and_make_update(uri, &person_entry, "Bob").await;
|
||||
let mut expected = HashMap::new();
|
||||
expected.insert("node:Company".to_string(), 1);
|
||||
|
||||
let err = mc
|
||||
.commit_with_expected(&[update_person], &expected)
|
||||
.await
|
||||
.expect_err("drift on an untouched expected table should reject");
|
||||
|
||||
let OmniError::Manifest(m) = err else {
|
||||
panic!("expected OmniError::Manifest");
|
||||
};
|
||||
match m.details {
|
||||
Some(ManifestConflictDetails::ExpectedVersionMismatch {
|
||||
ref table_key,
|
||||
expected,
|
||||
actual,
|
||||
}) => {
|
||||
assert_eq!(table_key, "node:Company");
|
||||
assert_eq!(expected, 1);
|
||||
assert_eq!(actual, company_version);
|
||||
}
|
||||
other => panic!("expected ExpectedVersionMismatch, got {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_with_expected_unknown_table_reports_actual_zero() {
|
||||
use crate::error::ManifestConflictDetails;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
|
||||
let mut expected = HashMap::new();
|
||||
expected.insert("node:DoesNotExist".to_string(), 7);
|
||||
let err = mc
|
||||
.commit_with_expected(&[], &expected)
|
||||
.await
|
||||
.expect_err("unknown expected table should reject");
|
||||
|
||||
let OmniError::Manifest(m) = err else {
|
||||
panic!("expected OmniError::Manifest");
|
||||
};
|
||||
match m.details {
|
||||
Some(ManifestConflictDetails::ExpectedVersionMismatch {
|
||||
table_key,
|
||||
expected,
|
||||
actual,
|
||||
}) => {
|
||||
assert_eq!(table_key, "node:DoesNotExist");
|
||||
assert_eq!(expected, 7);
|
||||
assert_eq!(actual, 0);
|
||||
}
|
||||
other => panic!("expected ExpectedVersionMismatch, got {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_concurrent_publish_with_overlapping_expected_versions_one_succeeds() {
|
||||
use crate::error::ManifestConflictDetails;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
let mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
let person_entry = mc.snapshot().entry("node:Person").unwrap().clone();
|
||||
|
||||
// Advance the Person dataset once so we have a real on-disk version 2 that
|
||||
// both publishers can target. Both attempt to land the *same*
|
||||
// `version:node:Person@v=2` row in `__manifest`, which is the row-level
|
||||
// CAS conflict the publisher must detect: load_publish_state at the same
|
||||
// baseline → pre-check passes for both → only one merge_insert can land
|
||||
// the unique `object_id`.
|
||||
let update = append_person_and_make_update(uri, &person_entry, "Alice").await;
|
||||
|
||||
let mut expected = HashMap::new();
|
||||
expected.insert("node:Person".to_string(), 1);
|
||||
|
||||
let publisher_a = GraphNamespacePublisher::new(uri, None);
|
||||
let publisher_b = GraphNamespacePublisher::new(uri, None);
|
||||
let changes_a = vec![ManifestChange::Update(update.clone())];
|
||||
let changes_b = vec![ManifestChange::Update(update)];
|
||||
let expected_a = expected.clone();
|
||||
let expected_b = expected;
|
||||
|
||||
let (res_a, res_b) = tokio::join!(
|
||||
async { publisher_a.publish(&changes_a, &expected_a).await },
|
||||
async { publisher_b.publish(&changes_b, &expected_b).await }
|
||||
);
|
||||
|
||||
let (succeeded, err) = match (res_a, res_b) {
|
||||
(Ok(_), Err(e)) => (1, e),
|
||||
(Err(e), Ok(_)) => (1, e),
|
||||
(Ok(_), Ok(_)) => panic!("both writers committed -- OCC failed"),
|
||||
(Err(a), Err(b)) => panic!("both writers failed: {:?} / {:?}", a, b),
|
||||
};
|
||||
assert_eq!(succeeded, 1, "exactly one writer must succeed");
|
||||
|
||||
let OmniError::Manifest(m) = err else {
|
||||
panic!("expected OmniError::Manifest, got {:?}", err);
|
||||
};
|
||||
// The losing writer surfaces either ExpectedVersionMismatch (its retry's
|
||||
// pre-check observed the winner's advance) or a plain Conflict (Lance
|
||||
// row-level CAS rejected, retry exhausted before the pre-check fired).
|
||||
// Both are acceptable typed conflict signals; what matters is that the
|
||||
// failure is not silent.
|
||||
use crate::error::ManifestErrorKind;
|
||||
assert!(
|
||||
matches!(m.kind, ManifestErrorKind::Conflict),
|
||||
"expected Conflict-kind manifest error, got {:?}: {}",
|
||||
m.kind,
|
||||
m.message,
|
||||
);
|
||||
if let Some(ManifestConflictDetails::ExpectedVersionMismatch {
|
||||
ref table_key,
|
||||
expected,
|
||||
..
|
||||
}) = m.details
|
||||
{
|
||||
assert_eq!(table_key, "node:Person");
|
||||
assert_eq!(expected, 1);
|
||||
}
|
||||
|
||||
// Manifest must reflect exactly one new commit on Person at the requested
|
||||
// version (no duplicate version rows).
|
||||
let mc = ManifestCoordinator::open(uri).await.unwrap();
|
||||
let entry = mc.snapshot().entry("node:Person").unwrap().clone();
|
||||
assert!(entry.table_version > 1, "Person should have advanced past v=1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_init_stamps_internal_schema_version() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
|
||||
let ds = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(
|
||||
super::migrations::read_stamp(&ds),
|
||||
super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION,
|
||||
"init should stamp the manifest at the current internal schema version",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_publish_migrates_pre_stamp_manifest_to_current_version() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
let mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
|
||||
// Simulate a v1 (pre-stamp) repo by removing the schema-level stamp on disk.
|
||||
{
|
||||
let mut ds = open_manifest_dataset(uri, None).await.unwrap();
|
||||
ds.update_schema_metadata([(
|
||||
"omnigraph:internal_schema_version".to_string(),
|
||||
None::<String>,
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
let post = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(
|
||||
super::migrations::read_stamp(&post),
|
||||
1,
|
||||
"stamp removed ⇒ read_stamp falls back to v1",
|
||||
);
|
||||
}
|
||||
|
||||
// Publish a no-op (empty changes) but require state to be loaded by passing
|
||||
// an expected_table_versions that matches the initial state. This forces
|
||||
// the publisher's open-for-write path, which runs the migration.
|
||||
let mut expected = HashMap::new();
|
||||
expected.insert("node:Person".to_string(), 1);
|
||||
GraphNamespacePublisher::new(uri, None)
|
||||
.publish(&[], &expected)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let post = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(
|
||||
super::migrations::read_stamp(&post),
|
||||
super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION,
|
||||
"publish on a v1 repo should leave the manifest stamped at the current version",
|
||||
);
|
||||
|
||||
// Manifest should still serve correctly post-migration.
|
||||
drop(mc);
|
||||
let reopened = ManifestCoordinator::open(uri).await.unwrap();
|
||||
assert!(reopened.snapshot().entry("node:Person").is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_publish_rejects_manifest_stamped_at_future_version() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
|
||||
// Stamp the manifest at a version higher than this binary knows about.
|
||||
let future = super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION + 99;
|
||||
{
|
||||
let mut ds = open_manifest_dataset(uri, None).await.unwrap();
|
||||
ds.update_schema_metadata([(
|
||||
"omnigraph:internal_schema_version".to_string(),
|
||||
Some(future.to_string()),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let mut expected = HashMap::new();
|
||||
expected.insert("node:Person".to_string(), 1);
|
||||
let err = GraphNamespacePublisher::new(uri, None)
|
||||
.publish(&[], &expected)
|
||||
.await
|
||||
.expect_err("future-stamped manifest should reject open-for-write");
|
||||
let msg = err.to_string();
|
||||
assert!(
|
||||
msg.contains("upgrade omnigraph") && msg.contains(&future.to_string()),
|
||||
"expected forward-version refusal, got: {}",
|
||||
msg,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn manifest_column_helpers_return_error_for_bad_schema() {
|
||||
let batch = RecordBatch::try_new(
|
||||
|
|
|
|||
|
|
@ -10,11 +10,31 @@ pub enum ManifestErrorKind {
|
|||
Internal,
|
||||
}
|
||||
|
||||
/// Structured details for a manifest-level conflict. Set on the `details`
|
||||
/// field of `ManifestError` when callers need to match on the specific
|
||||
/// concurrency-control failure rather than parse a string.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ManifestConflictDetails {
|
||||
/// A caller-supplied per-table expected version did not match the
|
||||
/// manifest's current latest non-tombstoned version for that table.
|
||||
ExpectedVersionMismatch {
|
||||
table_key: String,
|
||||
expected: u64,
|
||||
actual: u64,
|
||||
},
|
||||
/// Lance's row-level CAS rejected the publish because a concurrent writer
|
||||
/// landed a row with the same `object_id`. Distinct from
|
||||
/// `ExpectedVersionMismatch`: the caller's expectations (if any) still
|
||||
/// hold against the new manifest state, so the publisher will retry.
|
||||
RowLevelCasContention,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Error)]
|
||||
#[error("{message}")]
|
||||
pub struct ManifestError {
|
||||
pub kind: ManifestErrorKind,
|
||||
pub message: String,
|
||||
pub details: Option<ManifestConflictDetails>,
|
||||
}
|
||||
|
||||
impl ManifestError {
|
||||
|
|
@ -22,8 +42,14 @@ impl ManifestError {
|
|||
Self {
|
||||
kind,
|
||||
message: message.into(),
|
||||
details: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_details(mut self, details: ManifestConflictDetails) -> Self {
|
||||
self.details = Some(details);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -77,4 +103,32 @@ impl OmniError {
|
|||
pub fn manifest_internal(message: impl Into<String>) -> Self {
|
||||
Self::Manifest(ManifestError::new(ManifestErrorKind::Internal, message))
|
||||
}
|
||||
|
||||
pub fn manifest_expected_version_mismatch(
|
||||
table_key: impl Into<String>,
|
||||
expected: u64,
|
||||
actual: u64,
|
||||
) -> Self {
|
||||
let table_key = table_key.into();
|
||||
let message = format!(
|
||||
"stale view of '{}': expected manifest table version {} but current is {} — refresh and retry",
|
||||
table_key, expected, actual
|
||||
);
|
||||
Self::Manifest(
|
||||
ManifestError::new(ManifestErrorKind::Conflict, message).with_details(
|
||||
ManifestConflictDetails::ExpectedVersionMismatch {
|
||||
table_key,
|
||||
expected,
|
||||
actual,
|
||||
},
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn manifest_row_level_cas_contention(message: impl Into<String>) -> Self {
|
||||
Self::Manifest(
|
||||
ManifestError::new(ManifestErrorKind::Conflict, message)
|
||||
.with_details(ManifestConflictDetails::RowLevelCasContention),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@
|
|||
| Run registry dir | `_graph_runs.lance` | `db/run_registry.rs` |
|
||||
| Run branch prefix | `__run__` | `db/run_registry.rs` |
|
||||
| Schema apply lock | `__schema_apply_lock__` | `db/mod.rs` |
|
||||
| Manifest publisher retry budget | `PUBLISHER_RETRY_BUDGET = 5` | `db/manifest/publisher.rs` |
|
||||
| Internal manifest schema version | `INTERNAL_MANIFEST_SCHEMA_VERSION = 2` | `db/manifest/migrations.rs` |
|
||||
| Merge stage batch | `MERGE_STAGE_BATCH_ROWS = 8192` | `exec/merge.rs` |
|
||||
| Maintenance concurrency | `OMNIGRAPH_MAINTENANCE_CONCURRENCY=8` | `db/omnigraph/optimize.rs` |
|
||||
| Graph index cache size | `8` (LRU) | `runtime_cache.rs` |
|
||||
|
|
|
|||
|
|
@ -6,7 +6,9 @@
|
|||
- `Lance(String)` — storage layer
|
||||
- `DataFusion(String)` — execution layer
|
||||
- `Io(io::Error)`
|
||||
- `Manifest(ManifestError { kind: BadRequest|NotFound|Conflict|Internal, … })`
|
||||
- `Manifest(ManifestError { kind: BadRequest|NotFound|Conflict|Internal, details: Option<ManifestConflictDetails>, … })`
|
||||
- `ManifestConflictDetails::ExpectedVersionMismatch { table_key, expected, actual }` — caller's `expected_table_versions` did not match the manifest's current latest non-tombstoned version (set by `OmniError::manifest_expected_version_mismatch`).
|
||||
- `ManifestConflictDetails::RowLevelCasContention` — Lance row-level CAS rejected the publish because a concurrent writer landed the same `object_id`. Retried internally by the publisher; only surfaces if the retry budget exhausts.
|
||||
- `MergeConflicts(Vec<MergeConflict>)`
|
||||
|
||||
Compiler-side `NanoError` covers parse / catalog / type / storage / plan / execution / arrow / lance / IO / manifest / unique-constraint, each with structured spans (`SourceSpan { start, end }`) for ariadne-style diagnostics.
|
||||
|
|
|
|||
|
|
@ -20,3 +20,9 @@
|
|||
## Tombstones
|
||||
|
||||
Logical sub-table delete markers in `__manifest`; `tombstone_object_id(table_key, version)` excludes a sub-table version from snapshot reconstruction.
|
||||
|
||||
## Internal schema migrations (`db/manifest/migrations.rs`)
|
||||
|
||||
Version evolutions of the on-disk `__manifest` shape are reconciled automatically on the first write under a new binary. `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape the binary expects; the on-disk stamp `omnigraph:internal_schema_version` (Lance schema-level metadata) records the on-disk shape. The publisher's open-for-write path calls `migrate_internal_schema` before reading state; reads are side-effect-free. No operator action is required for in-place upgrades. See [storage.md → Internal schema versioning](storage.md) for the full mechanism.
|
||||
|
||||
A binary opening a manifest stamped at a version *higher* than it knows about refuses to publish with a clear "upgrade omnigraph first" error — old binaries cannot clobber a newer schema.
|
||||
|
|
|
|||
|
|
@ -29,6 +29,24 @@ OmniGraph is **not** a single Lance dataset; it is a *graph* of datasets coordin
|
|||
- `table_branch` is `null` for the main lineage and the branch name otherwise
|
||||
- **Snapshot reconstruction**: latest visible `table_version` per `(table_key, table_branch)` minus tombstones — rows where `object_type = table_tombstone`, whose own `table_version` (acting as the tombstone version) is `>= the entry's table_version`.
|
||||
- **Atomic publish**: multi-dataset commits publish via a `ManifestBatchPublisher` so a single write to `__manifest` flips all the new sub-table versions visible at once.
|
||||
- **Row-level CAS on the merge-insert join key**: `object_id` carries `lance-schema:unenforced-primary-key=true` so Lance's bloom-filter conflict resolver rejects two concurrent commits that land the same `object_id` row. Without this annotation, Lance's transparent rebase would admit silent duplicates of `version:T@v=N` from racing publishers (see `.context/merge-insert-cas-granularity.md`).
|
||||
- **Optimistic concurrency control on publish**: `ManifestBatchPublisher::publish` accepts a `expected_table_versions: HashMap<table_key, u64>` map. Each entry asserts the manifest's current latest non-tombstoned version for that table is exactly what the caller observed; mismatches surface as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch { table_key, expected, actual }`. Empty map preserves the legacy "best-effort publish" semantics. The publisher uses `conflict_retries(0)` against Lance and owns retry itself (`PUBLISHER_RETRY_BUDGET = 5`), re-running the pre-check on each iteration so concurrent advances surface as `ExpectedVersionMismatch` rather than being silently rebased through.
|
||||
|
||||
### Internal schema versioning (`db/manifest/migrations.rs`)
|
||||
|
||||
The on-disk shape of `__manifest` is reconciled with the binary via a single stamp + dispatcher. `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape this binary writes; the on-disk stamp `omnigraph:internal_schema_version` lives in the manifest dataset's schema-level metadata (Lance `update_schema_metadata`).
|
||||
|
||||
- **`init_manifest_repo`** stamps the current version at creation, so newly initialized repos never need migration.
|
||||
- **Publisher open-for-write path** (`load_publish_state`) calls `migrate_internal_schema(&mut dataset)` before reading state. When the on-disk stamp matches the binary, this is a single metadata read with no writes; otherwise the dispatcher walks `match`-arm steps forward (1→2, 2→3, …) until the stamp matches, then proceeds with the publish. Reads stay side-effect-free.
|
||||
- **Forward-version protection**: a stamp *higher* than the binary's known version triggers a clear "upgrade omnigraph first" error. An old binary cannot clobber a newer schema by silently treating "unknown stamp" as "missing stamp".
|
||||
- **Idempotency**: each migration step is safe to re-run. A crash between two metadata updates inside a single step leaves the partial state; the next open re-runs the step and the second update lands. The dispatcher itself is a cheap stamp-read on the steady-state path.
|
||||
|
||||
Adding a new on-disk shape change is one constant bump (`INTERNAL_MANIFEST_SCHEMA_VERSION`), one match arm in `migrate_internal_schema`, and one test. No code outside this module branches on the stamp.
|
||||
|
||||
| Stamp | Shape change |
|
||||
|---|---|
|
||||
| v1 (implicit, pre-stamp) | `__manifest.object_id` had no PK annotation; publisher had no row-level CAS protection. |
|
||||
| v2 | `__manifest.object_id` carries `lance-schema:unenforced-primary-key=true`; row-level CAS engaged. Stamped as `omnigraph:internal_schema_version=2`. |
|
||||
|
||||
## URI scheme support (`storage.rs`)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue