mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Add internal-schema versioning + auto-migration for __manifest
The on-disk shape of `__manifest` is reconciled with the binary via a single stamp + dispatcher in `db/manifest/migrations.rs`: - `INTERNAL_MANIFEST_SCHEMA_VERSION = 2` declares the shape this binary writes. - The on-disk stamp `omnigraph:internal_schema_version` lives in the manifest dataset's schema-level metadata (Lance `update_schema_metadata`). - `migrate_internal_schema(&mut dataset)` walks `match`-arm steps forward from the on-disk stamp until it matches the binary, then returns. Idempotent. - `init_manifest_repo` stamps the current version at creation; the publisher's open-for-write path runs pending migrations before reading state. Reads stay side-effect-free. - Forward-version protection: a stamp higher than the binary's known version triggers a clear "upgrade omnigraph first" error so an old binary cannot clobber a newer schema. Self-heals existing pre-MR-766 deployments by auto-applying the v1→v2 step: the `lance-schema:unenforced-primary-key` annotation on `__manifest.object_id` that engages Lance's row-level CAS at commit time. New repos created via `init` are stamped at v2 immediately and don't need migration. Adding a future on-disk shape change is one constant bump, one match arm in `migrate_internal_schema`, and one test — no new branches in unrelated code paths. Code outside the migration module never inspects the stamp. New tests in `manifest/tests.rs`: - `test_init_stamps_internal_schema_version` - `test_publish_migrates_pre_stamp_manifest_to_current_version` - `test_publish_rejects_manifest_stamped_at_future_version` Docs: `docs/storage.md`, `docs/maintenance.md`, `docs/constants.md` updated per the AGENTS.md maintenance contract.
This commit is contained in:
parent
5eb47b8c13
commit
243c0c3464
8 changed files with 259 additions and 1 deletions
|
|
@ -10,6 +10,8 @@ use omnigraph_compiler::catalog::Catalog;
|
|||
mod layout;
|
||||
#[path = "manifest/metadata.rs"]
|
||||
mod metadata;
|
||||
#[path = "manifest/migrations.rs"]
|
||||
mod migrations;
|
||||
#[path = "manifest/namespace.rs"]
|
||||
mod namespace;
|
||||
#[path = "manifest/publisher.rs"]
|
||||
|
|
|
|||
131
crates/omnigraph/src/db/manifest/migrations.rs
Normal file
131
crates/omnigraph/src/db/manifest/migrations.rs
Normal file
|
|
@ -0,0 +1,131 @@
|
|||
//! Internal schema migrations for the `__manifest` Lance dataset.
|
||||
//!
|
||||
//! ## Why this exists
|
||||
//!
|
||||
//! The on-disk shape of `__manifest` evolves alongside the engine. We do not
|
||||
//! want healing hooks scattered through every read/write path that ask
|
||||
//! "is this an old shape? am I supposed to upgrade it?" — that pattern
|
||||
//! accrues liability with every change. Instead this module is the *single*
|
||||
//! place where on-disk shape is reconciled with what the binary expects:
|
||||
//!
|
||||
//! - One constant `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape this
|
||||
//! binary writes.
|
||||
//! - One stamp `omnigraph:internal_schema_version` in the manifest dataset's
|
||||
//! schema-level metadata records the on-disk shape.
|
||||
//! - One dispatcher `migrate_internal_schema` walks the on-disk stamp forward
|
||||
//! to the expected version via `match`-arm steps. Each future change adds
|
||||
//! one arm + one test, never a new branch in unrelated code paths.
|
||||
//!
|
||||
//! After the dispatcher runs, the rest of the engine assumes current shape.
|
||||
//! No code outside this module should ever inspect the stamp.
|
||||
//!
|
||||
//! ## When it runs
|
||||
//!
|
||||
//! Only on open-for-write paths (the publisher's `load_publish_state`).
|
||||
//! Reads are side-effect-free by contract; an old-shape `__manifest` reads
|
||||
//! fine, it just lacks the protections introduced by later versions.
|
||||
//! `init_manifest_repo` stamps the current version at creation, so newly
|
||||
//! initialized repos never need migration.
|
||||
//!
|
||||
//! ## Forward-version protection
|
||||
//!
|
||||
//! A stamp *higher* than this binary's known version triggers a clear
|
||||
//! "upgrade omnigraph first" error. An old binary cannot clobber a newer
|
||||
//! schema by silently treating "unknown stamp" as "missing stamp".
|
||||
|
||||
use lance::Dataset;
|
||||
|
||||
use crate::error::{OmniError, Result};
|
||||
|
||||
/// Current internal schema version this binary expects to find on disk.
|
||||
///
|
||||
/// History:
|
||||
/// - v1 — implicit (pre-stamp). `__manifest.object_id` carried no
|
||||
/// `lance-schema:unenforced-primary-key` annotation; the publisher had
|
||||
/// no row-level CAS protection (see `.context/merge-insert-cas-granularity.md`).
|
||||
/// - v2 — `__manifest.object_id` carries the unenforced-PK annotation,
|
||||
/// engaging Lance's bloom-filter conflict resolver at commit time. Added
|
||||
/// alongside `expected_table_versions` OCC on `ManifestBatchPublisher::publish`.
|
||||
pub(super) const INTERNAL_MANIFEST_SCHEMA_VERSION: u32 = 2;
|
||||
|
||||
const INTERNAL_SCHEMA_VERSION_KEY: &str = "omnigraph:internal_schema_version";
|
||||
const OBJECT_ID_PK_KEY: &str = "lance-schema:unenforced-primary-key";
|
||||
|
||||
/// Read the on-disk stamp from `__manifest`'s schema-level metadata.
|
||||
/// Absent ⇒ v1 (pre-stamp world).
|
||||
pub(super) fn read_stamp(dataset: &Dataset) -> u32 {
|
||||
dataset
|
||||
.schema()
|
||||
.metadata
|
||||
.get(INTERNAL_SCHEMA_VERSION_KEY)
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(1)
|
||||
}
|
||||
|
||||
/// Stamp a freshly-initialized manifest with the current internal schema
|
||||
/// version. Idempotent — safe to call on an already-stamped dataset.
|
||||
pub(super) async fn stamp_current_version(dataset: &mut Dataset) -> Result<()> {
|
||||
set_stamp(dataset, INTERNAL_MANIFEST_SCHEMA_VERSION).await
|
||||
}
|
||||
|
||||
/// Apply any pending internal-schema migrations to the manifest dataset.
|
||||
///
|
||||
/// Idempotent: when the on-disk stamp matches the binary, this is a single
|
||||
/// metadata read with no writes.
|
||||
pub(super) async fn migrate_internal_schema(dataset: &mut Dataset) -> Result<()> {
|
||||
let mut current = read_stamp(dataset);
|
||||
|
||||
if current > INTERNAL_MANIFEST_SCHEMA_VERSION {
|
||||
return Err(OmniError::manifest(format!(
|
||||
"__manifest is stamped at internal schema v{} but this binary expects v{} \
|
||||
— upgrade omnigraph before opening this repo for writes",
|
||||
current, INTERNAL_MANIFEST_SCHEMA_VERSION,
|
||||
)));
|
||||
}
|
||||
|
||||
while current < INTERNAL_MANIFEST_SCHEMA_VERSION {
|
||||
match current {
|
||||
1 => {
|
||||
migrate_v1_to_v2(dataset).await?;
|
||||
current = 2;
|
||||
}
|
||||
other => {
|
||||
return Err(OmniError::manifest_internal(format!(
|
||||
"no internal-schema migration registered for v{} → v{}",
|
||||
other,
|
||||
other + 1,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// v1 → v2: annotate `__manifest.object_id` as Lance's unenforced primary key
|
||||
/// so the merge-insert conflict resolver enforces row-level CAS at commit
|
||||
/// time, then bump the stamp.
|
||||
///
|
||||
/// Both steps are idempotent under retry: re-applying the field annotation
|
||||
/// at its current value is a no-op-ish bump in Lance, and the stamp is a
|
||||
/// simple key-value write. A crash between the two leaves the field set
|
||||
/// without a stamp; the next open re-runs this fn and only the stamp lands.
|
||||
async fn migrate_v1_to_v2(dataset: &mut Dataset) -> Result<()> {
|
||||
dataset
|
||||
.update_field_metadata()
|
||||
.update("object_id", [(OBJECT_ID_PK_KEY.to_string(), "true".to_string())])
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
set_stamp(dataset, 2).await
|
||||
}
|
||||
|
||||
async fn set_stamp(dataset: &mut Dataset, version: u32) -> Result<()> {
|
||||
dataset
|
||||
.update_schema_metadata([(
|
||||
INTERNAL_SCHEMA_VERSION_KEY.to_string(),
|
||||
version.to_string(),
|
||||
)])
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -30,6 +30,7 @@ use crate::error::{OmniError, Result};
|
|||
|
||||
use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id};
|
||||
use super::metadata::parse_namespace_version_request;
|
||||
use super::migrations::migrate_internal_schema;
|
||||
use super::state::{
|
||||
manifest_rows_batch, manifest_schema, read_manifest_entries, read_registered_table_locations,
|
||||
read_tombstone_versions,
|
||||
|
|
@ -94,7 +95,11 @@ impl GraphNamespacePublisher {
|
|||
HashMap<(String, u64), SubTableEntry>,
|
||||
HashMap<(String, u64), ()>,
|
||||
)> {
|
||||
let dataset = self.dataset().await?;
|
||||
let mut dataset = self.dataset().await?;
|
||||
// Run pending internal-schema migrations exactly once per publish on
|
||||
// the open-for-write path; idempotent when the on-disk stamp already
|
||||
// matches this binary. See `db/manifest/migrations.rs`.
|
||||
migrate_internal_schema(&mut dataset).await?;
|
||||
let registered_tables = read_registered_table_locations(&dataset).await?;
|
||||
let existing_entries = read_manifest_entries(&dataset).await?;
|
||||
let existing_versions = existing_entries
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ use crate::error::{OmniError, Result};
|
|||
use super::TABLE_VERSION_MANAGEMENT_KEY;
|
||||
use super::layout::{manifest_uri, open_manifest_dataset, type_name_hash};
|
||||
use super::metadata::TableVersionMetadata;
|
||||
use super::migrations::stamp_current_version;
|
||||
use super::state::{
|
||||
ManifestState, SubTableEntry, entries_to_batch, manifest_schema, read_manifest_state,
|
||||
};
|
||||
|
|
@ -40,6 +41,7 @@ pub(super) async fn init_manifest_repo(
|
|||
.update_config([(TABLE_VERSION_MANAGEMENT_KEY, Some("true"))])
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
stamp_current_version(&mut dataset).await?;
|
||||
|
||||
let known_state = read_manifest_state(&dataset).await?;
|
||||
Ok((dataset, known_state))
|
||||
|
|
|
|||
|
|
@ -1396,6 +1396,101 @@ async fn test_concurrent_publish_with_overlapping_expected_versions_one_succeeds
|
|||
assert!(entry.table_version > 1, "Person should have advanced past v=1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_init_stamps_internal_schema_version() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
|
||||
let ds = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(
|
||||
super::migrations::read_stamp(&ds),
|
||||
super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION,
|
||||
"init should stamp the manifest at the current internal schema version",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_publish_migrates_pre_stamp_manifest_to_current_version() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
let mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
|
||||
// Simulate a v1 (pre-stamp) repo by removing the schema-level stamp on disk.
|
||||
{
|
||||
let mut ds = open_manifest_dataset(uri, None).await.unwrap();
|
||||
ds.update_schema_metadata([(
|
||||
"omnigraph:internal_schema_version".to_string(),
|
||||
None::<String>,
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
let post = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(
|
||||
super::migrations::read_stamp(&post),
|
||||
1,
|
||||
"stamp removed ⇒ read_stamp falls back to v1",
|
||||
);
|
||||
}
|
||||
|
||||
// Publish a no-op (empty changes) but require state to be loaded by passing
|
||||
// an expected_table_versions that matches the initial state. This forces
|
||||
// the publisher's open-for-write path, which runs the migration.
|
||||
let mut expected = HashMap::new();
|
||||
expected.insert("node:Person".to_string(), 1);
|
||||
GraphNamespacePublisher::new(uri, None)
|
||||
.publish(&[], &expected)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let post = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(
|
||||
super::migrations::read_stamp(&post),
|
||||
super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION,
|
||||
"publish on a v1 repo should leave the manifest stamped at the current version",
|
||||
);
|
||||
|
||||
// Manifest should still serve correctly post-migration.
|
||||
drop(mc);
|
||||
let reopened = ManifestCoordinator::open(uri).await.unwrap();
|
||||
assert!(reopened.snapshot().entry("node:Person").is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_publish_rejects_manifest_stamped_at_future_version() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
|
||||
// Stamp the manifest at a version higher than this binary knows about.
|
||||
let future = super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION + 99;
|
||||
{
|
||||
let mut ds = open_manifest_dataset(uri, None).await.unwrap();
|
||||
ds.update_schema_metadata([(
|
||||
"omnigraph:internal_schema_version".to_string(),
|
||||
Some(future.to_string()),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let mut expected = HashMap::new();
|
||||
expected.insert("node:Person".to_string(), 1);
|
||||
let err = GraphNamespacePublisher::new(uri, None)
|
||||
.publish(&[], &expected)
|
||||
.await
|
||||
.expect_err("future-stamped manifest should reject open-for-write");
|
||||
let msg = err.to_string();
|
||||
assert!(
|
||||
msg.contains("upgrade omnigraph") && msg.contains(&future.to_string()),
|
||||
"expected forward-version refusal, got: {}",
|
||||
msg,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn manifest_column_helpers_return_error_for_bad_schema() {
|
||||
let batch = RecordBatch::try_new(
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
| Run branch prefix | `__run__` | `db/run_registry.rs` |
|
||||
| Schema apply lock | `__schema_apply_lock__` | `db/mod.rs` |
|
||||
| Manifest publisher retry budget | `PUBLISHER_RETRY_BUDGET = 5` | `db/manifest/publisher.rs` |
|
||||
| Internal manifest schema version | `INTERNAL_MANIFEST_SCHEMA_VERSION = 2` | `db/manifest/migrations.rs` |
|
||||
| Merge stage batch | `MERGE_STAGE_BATCH_ROWS = 8192` | `exec/merge.rs` |
|
||||
| Maintenance concurrency | `OMNIGRAPH_MAINTENANCE_CONCURRENCY=8` | `db/omnigraph/optimize.rs` |
|
||||
| Graph index cache size | `8` (LRU) | `runtime_cache.rs` |
|
||||
|
|
|
|||
|
|
@ -20,3 +20,9 @@
|
|||
## Tombstones
|
||||
|
||||
Logical sub-table delete markers in `__manifest`; `tombstone_object_id(table_key, version)` excludes a sub-table version from snapshot reconstruction.
|
||||
|
||||
## Internal schema migrations (`db/manifest/migrations.rs`)
|
||||
|
||||
Version evolutions of the on-disk `__manifest` shape are reconciled automatically on the first write under a new binary. `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape the binary expects; the on-disk stamp `omnigraph:internal_schema_version` (Lance schema-level metadata) records the on-disk shape. The publisher's open-for-write path calls `migrate_internal_schema` before reading state; reads are side-effect-free. No operator action is required for in-place upgrades. See [storage.md → Internal schema versioning](storage.md) for the full mechanism.
|
||||
|
||||
A binary opening a manifest stamped at a version *higher* than it knows about refuses to publish with a clear "upgrade omnigraph first" error — old binaries cannot clobber a newer schema.
|
||||
|
|
|
|||
|
|
@ -32,6 +32,22 @@ OmniGraph is **not** a single Lance dataset; it is a *graph* of datasets coordin
|
|||
- **Row-level CAS on the merge-insert join key**: `object_id` carries `lance-schema:unenforced-primary-key=true` so Lance's bloom-filter conflict resolver rejects two concurrent commits that land the same `object_id` row. Without this annotation, Lance's transparent rebase would admit silent duplicates of `version:T@v=N` from racing publishers (see `.context/merge-insert-cas-granularity.md`).
|
||||
- **Optimistic concurrency control on publish**: `ManifestBatchPublisher::publish` accepts a `expected_table_versions: HashMap<table_key, u64>` map. Each entry asserts the manifest's current latest non-tombstoned version for that table is exactly what the caller observed; mismatches surface as `OmniError::Manifest` with `ManifestConflictDetails::ExpectedVersionMismatch { table_key, expected, actual }`. Empty map preserves the legacy "best-effort publish" semantics. The publisher uses `conflict_retries(0)` against Lance and owns retry itself (`PUBLISHER_RETRY_BUDGET = 5`), re-running the pre-check on each iteration so concurrent advances surface as `ExpectedVersionMismatch` rather than being silently rebased through.
|
||||
|
||||
### Internal schema versioning (`db/manifest/migrations.rs`)
|
||||
|
||||
The on-disk shape of `__manifest` is reconciled with the binary via a single stamp + dispatcher. `INTERNAL_MANIFEST_SCHEMA_VERSION` declares the shape this binary writes; the on-disk stamp `omnigraph:internal_schema_version` lives in the manifest dataset's schema-level metadata (Lance `update_schema_metadata`).
|
||||
|
||||
- **`init_manifest_repo`** stamps the current version at creation, so newly initialized repos never need migration.
|
||||
- **Publisher open-for-write path** (`load_publish_state`) calls `migrate_internal_schema(&mut dataset)` before reading state. When the on-disk stamp matches the binary, this is a single metadata read with no writes; otherwise the dispatcher walks `match`-arm steps forward (1→2, 2→3, …) until the stamp matches, then proceeds with the publish. Reads stay side-effect-free.
|
||||
- **Forward-version protection**: a stamp *higher* than the binary's known version triggers a clear "upgrade omnigraph first" error. An old binary cannot clobber a newer schema by silently treating "unknown stamp" as "missing stamp".
|
||||
- **Idempotency**: each migration step is safe to re-run. A crash between two metadata updates inside a single step leaves the partial state; the next open re-runs the step and the second update lands. The dispatcher itself is a cheap stamp-read on the steady-state path.
|
||||
|
||||
Adding a new on-disk shape change is one constant bump (`INTERNAL_MANIFEST_SCHEMA_VERSION`), one match arm in `migrate_internal_schema`, and one test. No code outside this module branches on the stamp.
|
||||
|
||||
| Stamp | Shape change |
|
||||
|---|---|
|
||||
| v1 (implicit, pre-stamp) | `__manifest.object_id` had no PK annotation; publisher had no row-level CAS protection. |
|
||||
| v2 | `__manifest.object_id` carries `lance-schema:unenforced-primary-key=true`; row-level CAS engaged. Stamped as `omnigraph:internal_schema_version=2`. |
|
||||
|
||||
## URI scheme support (`storage.rs`)
|
||||
|
||||
| Scheme | Backend | Notes |
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue