From 243c0c3464182ddc14a23f10aa587252b70dc1e6 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 29 Apr 2026 11:44:14 +0000 Subject: [PATCH] Add internal-schema versioning + auto-migration for __manifest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The on-disk shape of `__manifest` is reconciled with the binary via a single stamp + dispatcher in `db/manifest/migrations.rs`: - `INTERNAL_MANIFEST_SCHEMA_VERSION = 2` declares the shape this binary writes. - The on-disk stamp `omnigraph:internal_schema_version` lives in the manifest dataset's schema-level metadata (Lance `update_schema_metadata`). - `migrate_internal_schema(&mut dataset)` walks `match`-arm steps forward from the on-disk stamp until it matches the binary, then returns. Idempotent. - `init_manifest_repo` stamps the current version at creation; the publisher's open-for-write path runs pending migrations before reading state. Reads stay side-effect-free. - Forward-version protection: a stamp higher than the binary's known version triggers a clear "upgrade omnigraph first" error so an old binary cannot clobber a newer schema. Self-heals existing pre-MR-766 deployments by auto-applying the v1→v2 step: the `lance-schema:unenforced-primary-key` annotation on `__manifest.object_id` that engages Lance's row-level CAS at commit time. New repos created via `init` are stamped at v2 immediately and don't need migration. Adding a future on-disk shape change is one constant bump, one match arm in `migrate_internal_schema`, and one test — no new branches in unrelated code paths. Code outside the migration module never inspects the stamp. New tests in `manifest/tests.rs`: - `test_init_stamps_internal_schema_version` - `test_publish_migrates_pre_stamp_manifest_to_current_version` - `test_publish_rejects_manifest_stamped_at_future_version` Docs: `docs/storage.md`, `docs/maintenance.md`, `docs/constants.md` updated per the AGENTS.md maintenance contract. --- crates/omnigraph/src/db/manifest.rs | 2 + .../omnigraph/src/db/manifest/migrations.rs | 131 ++++++++++++++++++ crates/omnigraph/src/db/manifest/publisher.rs | 7 +- crates/omnigraph/src/db/manifest/repo.rs | 2 + crates/omnigraph/src/db/manifest/tests.rs | 95 +++++++++++++ docs/constants.md | 1 + docs/maintenance.md | 6 + docs/storage.md | 16 +++ 8 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 crates/omnigraph/src/db/manifest/migrations.rs diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 651c10f..b20433a 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -10,6 +10,8 @@ use omnigraph_compiler::catalog::Catalog; mod layout; #[path = "manifest/metadata.rs"] mod metadata; +#[path = "manifest/migrations.rs"] +mod migrations; #[path = "manifest/namespace.rs"] mod namespace; #[path = "manifest/publisher.rs"] diff --git a/crates/omnigraph/src/db/manifest/migrations.rs b/crates/omnigraph/src/db/manifest/migrations.rs new file mode 100644 index 0000000..c568bef --- /dev/null +++ b/crates/omnigraph/src/db/manifest/migrations.rs @@ -0,0 +1,131 @@ +//! Internal schema migrations for the `__manifest` Lance dataset. +//! +//! ## Why this exists +//! +//! The on-disk shape of `__manifest` evolves alongside the engine. We do not +//! want healing hooks scattered through every read/write path that ask +//! "is this an old shape? am I supposed to upgrade it?" — that pattern +//! accrues liability with every change. Instead this module is the *single* +//! place where on-disk shape is reconciled with what the binary expects: +//! +//! - One constant `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape this +//! binary writes. +//! - One stamp `omnigraph:internal_schema_version` in the manifest dataset's +//! schema-level metadata records the on-disk shape. +//! - One dispatcher `migrate_internal_schema` walks the on-disk stamp forward +//! to the expected version via `match`-arm steps. Each future change adds +//! one arm + one test, never a new branch in unrelated code paths. +//! +//! After the dispatcher runs, the rest of the engine assumes current shape. +//! No code outside this module should ever inspect the stamp. +//! +//! ## When it runs +//! +//! Only on open-for-write paths (the publisher's `load_publish_state`). +//! Reads are side-effect-free by contract; an old-shape `__manifest` reads +//! fine, it just lacks the protections introduced by later versions. +//! `init_manifest_repo` stamps the current version at creation, so newly +//! initialized repos never need migration. +//! +//! ## Forward-version protection +//! +//! A stamp *higher* than this binary's known version triggers a clear +//! "upgrade omnigraph first" error. An old binary cannot clobber a newer +//! schema by silently treating "unknown stamp" as "missing stamp". + +use lance::Dataset; + +use crate::error::{OmniError, Result}; + +/// Current internal schema version this binary expects to find on disk. +/// +/// History: +/// - v1 — implicit (pre-stamp). `__manifest.object_id` carried no +/// `lance-schema:unenforced-primary-key` annotation; the publisher had +/// no row-level CAS protection (see `.context/merge-insert-cas-granularity.md`). +/// - v2 — `__manifest.object_id` carries the unenforced-PK annotation, +/// engaging Lance's bloom-filter conflict resolver at commit time. Added +/// alongside `expected_table_versions` OCC on `ManifestBatchPublisher::publish`. +pub(super) const INTERNAL_MANIFEST_SCHEMA_VERSION: u32 = 2; + +const INTERNAL_SCHEMA_VERSION_KEY: &str = "omnigraph:internal_schema_version"; +const OBJECT_ID_PK_KEY: &str = "lance-schema:unenforced-primary-key"; + +/// Read the on-disk stamp from `__manifest`'s schema-level metadata. +/// Absent ⇒ v1 (pre-stamp world). +pub(super) fn read_stamp(dataset: &Dataset) -> u32 { + dataset + .schema() + .metadata + .get(INTERNAL_SCHEMA_VERSION_KEY) + .and_then(|s| s.parse().ok()) + .unwrap_or(1) +} + +/// Stamp a freshly-initialized manifest with the current internal schema +/// version. Idempotent — safe to call on an already-stamped dataset. +pub(super) async fn stamp_current_version(dataset: &mut Dataset) -> Result<()> { + set_stamp(dataset, INTERNAL_MANIFEST_SCHEMA_VERSION).await +} + +/// Apply any pending internal-schema migrations to the manifest dataset. +/// +/// Idempotent: when the on-disk stamp matches the binary, this is a single +/// metadata read with no writes. +pub(super) async fn migrate_internal_schema(dataset: &mut Dataset) -> Result<()> { + let mut current = read_stamp(dataset); + + if current > INTERNAL_MANIFEST_SCHEMA_VERSION { + return Err(OmniError::manifest(format!( + "__manifest is stamped at internal schema v{} but this binary expects v{} \ + — upgrade omnigraph before opening this repo for writes", + current, INTERNAL_MANIFEST_SCHEMA_VERSION, + ))); + } + + while current < INTERNAL_MANIFEST_SCHEMA_VERSION { + match current { + 1 => { + migrate_v1_to_v2(dataset).await?; + current = 2; + } + other => { + return Err(OmniError::manifest_internal(format!( + "no internal-schema migration registered for v{} → v{}", + other, + other + 1, + ))); + } + } + } + Ok(()) +} + +/// v1 → v2: annotate `__manifest.object_id` as Lance's unenforced primary key +/// so the merge-insert conflict resolver enforces row-level CAS at commit +/// time, then bump the stamp. +/// +/// Both steps are idempotent under retry: re-applying the field annotation +/// at its current value is a no-op-ish bump in Lance, and the stamp is a +/// simple key-value write. A crash between the two leaves the field set +/// without a stamp; the next open re-runs this fn and only the stamp lands. +async fn migrate_v1_to_v2(dataset: &mut Dataset) -> Result<()> { + dataset + .update_field_metadata() + .update("object_id", [(OBJECT_ID_PK_KEY.to_string(), "true".to_string())]) + .map_err(|e| OmniError::Lance(e.to_string()))? + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + set_stamp(dataset, 2).await +} + +async fn set_stamp(dataset: &mut Dataset, version: u32) -> Result<()> { + dataset + .update_schema_metadata([( + INTERNAL_SCHEMA_VERSION_KEY.to_string(), + version.to_string(), + )]) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + Ok(()) +} diff --git a/crates/omnigraph/src/db/manifest/publisher.rs b/crates/omnigraph/src/db/manifest/publisher.rs index 0de87bb..d13dd08 100644 --- a/crates/omnigraph/src/db/manifest/publisher.rs +++ b/crates/omnigraph/src/db/manifest/publisher.rs @@ -30,6 +30,7 @@ use crate::error::{OmniError, Result}; use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id}; use super::metadata::parse_namespace_version_request; +use super::migrations::migrate_internal_schema; use super::state::{ manifest_rows_batch, manifest_schema, read_manifest_entries, read_registered_table_locations, read_tombstone_versions, @@ -94,7 +95,11 @@ impl GraphNamespacePublisher { HashMap<(String, u64), SubTableEntry>, HashMap<(String, u64), ()>, )> { - let dataset = self.dataset().await?; + let mut dataset = self.dataset().await?; + // Run pending internal-schema migrations exactly once per publish on + // the open-for-write path; idempotent when the on-disk stamp already + // matches this binary. See `db/manifest/migrations.rs`. + migrate_internal_schema(&mut dataset).await?; let registered_tables = read_registered_table_locations(&dataset).await?; let existing_entries = read_manifest_entries(&dataset).await?; let existing_versions = existing_entries diff --git a/crates/omnigraph/src/db/manifest/repo.rs b/crates/omnigraph/src/db/manifest/repo.rs index 1133be2..90a958b 100644 --- a/crates/omnigraph/src/db/manifest/repo.rs +++ b/crates/omnigraph/src/db/manifest/repo.rs @@ -12,6 +12,7 @@ use crate::error::{OmniError, Result}; use super::TABLE_VERSION_MANAGEMENT_KEY; use super::layout::{manifest_uri, open_manifest_dataset, type_name_hash}; use super::metadata::TableVersionMetadata; +use super::migrations::stamp_current_version; use super::state::{ ManifestState, SubTableEntry, entries_to_batch, manifest_schema, read_manifest_state, }; @@ -40,6 +41,7 @@ pub(super) async fn init_manifest_repo( .update_config([(TABLE_VERSION_MANAGEMENT_KEY, Some("true"))]) .await .map_err(|e| OmniError::Lance(e.to_string()))?; + stamp_current_version(&mut dataset).await?; let known_state = read_manifest_state(&dataset).await?; Ok((dataset, known_state)) diff --git a/crates/omnigraph/src/db/manifest/tests.rs b/crates/omnigraph/src/db/manifest/tests.rs index 72738ea..d51a882 100644 --- a/crates/omnigraph/src/db/manifest/tests.rs +++ b/crates/omnigraph/src/db/manifest/tests.rs @@ -1396,6 +1396,101 @@ async fn test_concurrent_publish_with_overlapping_expected_versions_one_succeeds assert!(entry.table_version > 1, "Person should have advanced past v=1"); } +#[tokio::test] +async fn test_init_stamps_internal_schema_version() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + ManifestCoordinator::init(uri, &catalog).await.unwrap(); + + let ds = open_manifest_dataset(uri, None).await.unwrap(); + assert_eq!( + super::migrations::read_stamp(&ds), + super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION, + "init should stamp the manifest at the current internal schema version", + ); +} + +#[tokio::test] +async fn test_publish_migrates_pre_stamp_manifest_to_current_version() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + let mc = ManifestCoordinator::init(uri, &catalog).await.unwrap(); + + // Simulate a v1 (pre-stamp) repo by removing the schema-level stamp on disk. + { + let mut ds = open_manifest_dataset(uri, None).await.unwrap(); + ds.update_schema_metadata([( + "omnigraph:internal_schema_version".to_string(), + None::, + )]) + .await + .unwrap(); + let post = open_manifest_dataset(uri, None).await.unwrap(); + assert_eq!( + super::migrations::read_stamp(&post), + 1, + "stamp removed ⇒ read_stamp falls back to v1", + ); + } + + // Publish a no-op (empty changes) but require state to be loaded by passing + // an expected_table_versions that matches the initial state. This forces + // the publisher's open-for-write path, which runs the migration. + let mut expected = HashMap::new(); + expected.insert("node:Person".to_string(), 1); + GraphNamespacePublisher::new(uri, None) + .publish(&[], &expected) + .await + .unwrap(); + + let post = open_manifest_dataset(uri, None).await.unwrap(); + assert_eq!( + super::migrations::read_stamp(&post), + super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION, + "publish on a v1 repo should leave the manifest stamped at the current version", + ); + + // Manifest should still serve correctly post-migration. + drop(mc); + let reopened = ManifestCoordinator::open(uri).await.unwrap(); + assert!(reopened.snapshot().entry("node:Person").is_some()); +} + +#[tokio::test] +async fn test_publish_rejects_manifest_stamped_at_future_version() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + ManifestCoordinator::init(uri, &catalog).await.unwrap(); + + // Stamp the manifest at a version higher than this binary knows about. + let future = super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION + 99; + { + let mut ds = open_manifest_dataset(uri, None).await.unwrap(); + ds.update_schema_metadata([( + "omnigraph:internal_schema_version".to_string(), + Some(future.to_string()), + )]) + .await + .unwrap(); + } + + let mut expected = HashMap::new(); + expected.insert("node:Person".to_string(), 1); + let err = GraphNamespacePublisher::new(uri, None) + .publish(&[], &expected) + .await + .expect_err("future-stamped manifest should reject open-for-write"); + let msg = err.to_string(); + assert!( + msg.contains("upgrade omnigraph") && msg.contains(&future.to_string()), + "expected forward-version refusal, got: {}", + msg, + ); +} + #[test] fn manifest_column_helpers_return_error_for_bad_schema() { let batch = RecordBatch::try_new( diff --git a/docs/constants.md b/docs/constants.md index 8c329b1..198b77e 100644 --- a/docs/constants.md +++ b/docs/constants.md @@ -8,6 +8,7 @@ | Run branch prefix | `__run__` | `db/run_registry.rs` | | Schema apply lock | `__schema_apply_lock__` | `db/mod.rs` | | Manifest publisher retry budget | `PUBLISHER_RETRY_BUDGET = 5` | `db/manifest/publisher.rs` | +| Internal manifest schema version | `INTERNAL_MANIFEST_SCHEMA_VERSION = 2` | `db/manifest/migrations.rs` | | Merge stage batch | `MERGE_STAGE_BATCH_ROWS = 8192` | `exec/merge.rs` | | Maintenance concurrency | `OMNIGRAPH_MAINTENANCE_CONCURRENCY=8` | `db/omnigraph/optimize.rs` | | Graph index cache size | `8` (LRU) | `runtime_cache.rs` | diff --git a/docs/maintenance.md b/docs/maintenance.md index a222014..aaab37f 100644 --- a/docs/maintenance.md +++ b/docs/maintenance.md @@ -20,3 +20,9 @@ ## Tombstones Logical sub-table delete markers in `__manifest`; `tombstone_object_id(table_key, version)` excludes a sub-table version from snapshot reconstruction. + +## Internal schema migrations (`db/manifest/migrations.rs`) + +Version evolutions of the on-disk `__manifest` shape are reconciled automatically on the first write under a new binary. `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape the binary expects; the on-disk stamp `omnigraph:internal_schema_version` (Lance schema-level metadata) records the on-disk shape. The publisher's open-for-write path calls `migrate_internal_schema` before reading state; reads are side-effect-free. No operator action is required for in-place upgrades. See [storage.md → Internal schema versioning](storage.md) for the full mechanism. + +A binary opening a manifest stamped at a version *higher* than it knows about refuses to publish with a clear "upgrade omnigraph first" error — old binaries cannot clobber a newer schema. diff --git a/docs/storage.md b/docs/storage.md index db58de4..b7d1b92 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -32,6 +32,22 @@ OmniGraph is **not** a single Lance dataset; it is a *graph* of datasets coordin - **Row-level CAS on the merge-insert join key**: `object_id` carries `lance-schema:unenforced-primary-key=true` so Lance's bloom-filter conflict resolver rejects two concurrent commits that land the same `object_id` row. Without this annotation, Lance's transparent rebase would admit silent duplicates of `version:T@v=N` from racing publishers (see `.context/merge-insert-cas-granularity.md`). - **Optimistic concurrency control on publish**: `ManifestBatchPublisher::publish` accepts a `expected_table_versions: HashMap` map. Each entry asserts the manifest's current latest non-tombstoned version for that table is exactly what the caller observed; mismatches surface as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch { table_key, expected, actual }`. Empty map preserves the legacy "best-effort publish" semantics. The publisher uses `conflict_retries(0)` against Lance and owns retry itself (`PUBLISHER_RETRY_BUDGET = 5`), re-running the pre-check on each iteration so concurrent advances surface as `ExpectedVersionMismatch` rather than being silently rebased through. +### Internal schema versioning (`db/manifest/migrations.rs`) + +The on-disk shape of `__manifest` is reconciled with the binary via a single stamp + dispatcher. `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape this binary writes; the on-disk stamp `omnigraph:internal_schema_version` lives in the manifest dataset's schema-level metadata (Lance `update_schema_metadata`). + +- **`init_manifest_repo`** stamps the current version at creation, so newly initialized repos never need migration. +- **Publisher open-for-write path** (`load_publish_state`) calls `migrate_internal_schema(&mut dataset)` before reading state. When the on-disk stamp matches the binary, this is a single metadata read with no writes; otherwise the dispatcher walks `match`-arm steps forward (1→2, 2→3, …) until the stamp matches, then proceeds with the publish. Reads stay side-effect-free. +- **Forward-version protection**: a stamp *higher* than the binary's known version triggers a clear "upgrade omnigraph first" error. An old binary cannot clobber a newer schema by silently treating "unknown stamp" as "missing stamp". +- **Idempotency**: each migration step is safe to re-run. A crash between two metadata updates inside a single step leaves the partial state; the next open re-runs the step and the second update lands. The dispatcher itself is a cheap stamp-read on the steady-state path. + +Adding a new on-disk shape change is one constant bump (`INTERNAL_MANIFEST_SCHEMA_VERSION`), one match arm in `migrate_internal_schema`, and one test. No code outside this module branches on the stamp. + +| Stamp | Shape change | +|---|---| +| v1 (implicit, pre-stamp) | `__manifest.object_id` had no PK annotation; publisher had no row-level CAS protection. | +| v2 | `__manifest.object_id` carries `lance-schema:unenforced-primary-key=true`; row-level CAS engaged. Stamped as `omnigraph:internal_schema_version=2`. | + ## URI scheme support (`storage.rs`) | Scheme | Backend | Notes |