mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
feat(engine): sweep legacy __run__ branches via v2→v3 manifest migration
Pre-v0.4.0 graphs can carry stale `__run__<id>` staging branches on the `__manifest` dataset, left by the Run state machine removed in MR-771. Lance's `list_branches` still enumerates them, so they leak into `branch_list()` and count as blocking branches at schema-apply time. Add a one-time `migrate_v2_to_v3` arm to the internal-schema dispatcher: on the first read-write open it enumerates `__manifest` branches, deletes every `__run__*` ref, and bumps the stamp to 3. Idempotent under retry (re-enumerates fresh each run). The `"__run__"` prefix is inlined so the migration does not depend on the run_registry guard that MR-770 removes next. This is the prerequisite sweep; the guard removal follows in the next commit.
This commit is contained in:
parent
2d5c4b1202
commit
1aab951c25
2 changed files with 117 additions and 1 deletions
|
|
@ -46,7 +46,11 @@ use crate::error::{OmniError, Result};
|
|||
/// - v2 — `__manifest.object_id` carries the unenforced-PK annotation,
|
||||
/// engaging Lance's bloom-filter conflict resolver at commit time. Added
|
||||
/// alongside `expected_table_versions` OCC on `ManifestBatchPublisher::publish`.
|
||||
pub(super) const INTERNAL_MANIFEST_SCHEMA_VERSION: u32 = 2;
|
||||
/// - v3 — one-time sweep of legacy `__run__<id>` staging branches left on the
|
||||
/// `__manifest` dataset by the pre-v0.4.0 Run state machine (removed in
|
||||
/// MR-771). Once swept, the `is_internal_run_branch` defense-in-depth guard
|
||||
/// is no longer needed (MR-770).
|
||||
pub(super) const INTERNAL_MANIFEST_SCHEMA_VERSION: u32 = 3;
|
||||
|
||||
const INTERNAL_SCHEMA_VERSION_KEY: &str = "omnigraph:internal_schema_version";
|
||||
const OBJECT_ID_PK_KEY: &str = "lance-schema:unenforced-primary-key";
|
||||
|
|
@ -89,6 +93,10 @@ pub(super) async fn migrate_internal_schema(dataset: &mut Dataset) -> Result<()>
|
|||
migrate_v1_to_v2(dataset).await?;
|
||||
current = 2;
|
||||
}
|
||||
2 => {
|
||||
migrate_v2_to_v3(dataset).await?;
|
||||
current = 3;
|
||||
}
|
||||
other => {
|
||||
return Err(OmniError::manifest_internal(format!(
|
||||
"no internal-schema migration registered for v{} → v{}",
|
||||
|
|
@ -122,6 +130,45 @@ async fn migrate_v1_to_v2(dataset: &mut Dataset) -> Result<()> {
|
|||
set_stamp(dataset, 2).await
|
||||
}
|
||||
|
||||
/// v2 → v3: sweep legacy `__run__<id>` staging branches off the `__manifest`
|
||||
/// dataset, then bump the stamp.
|
||||
///
|
||||
/// The pre-v0.4.0 Run state machine (removed in MR-771) created graph-level
|
||||
/// staging branches named `__run__<ulid>` on `__manifest`. MR-771 stopped
|
||||
/// creating them but left any pre-existing ones in place; Lance's
|
||||
/// `list_branches` still enumerates them, so they leak into `branch_list()`
|
||||
/// and count as blocking branches at schema-apply time. This one-time sweep
|
||||
/// removes them so the `is_internal_run_branch` guard can retire (MR-770).
|
||||
///
|
||||
/// The `"__run__"` prefix is inlined here on purpose: this migration must keep
|
||||
/// working after the `run_registry` module (the guard) is deleted, so it does
|
||||
/// not depend on it.
|
||||
///
|
||||
/// Idempotent under retry: each run re-enumerates `list_branches` fresh, so a
|
||||
/// crash before the stamp bump simply re-deletes whatever `__run__*` branches
|
||||
/// still remain on the next open.
|
||||
async fn migrate_v2_to_v3(dataset: &mut Dataset) -> Result<()> {
|
||||
const LEGACY_RUN_BRANCH_PREFIX: &str = "__run__";
|
||||
let branches = dataset
|
||||
.list_branches()
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
let run_branches: Vec<String> = branches
|
||||
.into_keys()
|
||||
.filter(|name| {
|
||||
name.trim_start_matches('/')
|
||||
.starts_with(LEGACY_RUN_BRANCH_PREFIX)
|
||||
})
|
||||
.collect();
|
||||
for name in run_branches {
|
||||
dataset
|
||||
.delete_branch(&name)
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
}
|
||||
set_stamp(dataset, 3).await
|
||||
}
|
||||
|
||||
async fn set_stamp(dataset: &mut Dataset, version: u32) -> Result<()> {
|
||||
dataset
|
||||
.update_schema_metadata([(INTERNAL_SCHEMA_VERSION_KEY.to_string(), version.to_string())])
|
||||
|
|
|
|||
|
|
@ -1461,6 +1461,75 @@ async fn test_publish_migrates_pre_stamp_manifest_to_current_version() {
|
|||
assert!(reopened.snapshot().entry("node:Person").is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_v2_to_v3_sweeps_legacy_run_branches_on_write_open() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let catalog = build_test_catalog();
|
||||
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
|
||||
|
||||
// Synthesize a pre-MR-770 graph: a stale `__run__` staging branch left on
|
||||
// `__manifest`, alongside a real user branch that must survive the sweep.
|
||||
mc.create_branch("__run__01J9LEGACY").await.unwrap();
|
||||
mc.create_branch("feature").await.unwrap();
|
||||
let before = mc.list_branches().await.unwrap();
|
||||
assert!(
|
||||
before.iter().any(|b| b == "__run__01J9LEGACY"),
|
||||
"precondition: legacy run branch exists on __manifest; got {before:?}",
|
||||
);
|
||||
|
||||
// Rewind the internal-schema stamp to v2 so the next write-open runs the
|
||||
// v2 → v3 sweep arm (init stamps at the current version, which is past it).
|
||||
{
|
||||
let mut ds = open_manifest_dataset(uri, None).await.unwrap();
|
||||
ds.update_schema_metadata([(
|
||||
"omnigraph:internal_schema_version".to_string(),
|
||||
Some("2".to_string()),
|
||||
)])
|
||||
.await
|
||||
.unwrap();
|
||||
let post = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(super::migrations::read_stamp(&post), 2, "stamp rewound to v2");
|
||||
}
|
||||
|
||||
// A no-op publish forces the open-for-write path, which runs the migration.
|
||||
let mut expected = HashMap::new();
|
||||
expected.insert("node:Person".to_string(), 1);
|
||||
GraphNamespacePublisher::new(uri, None)
|
||||
.publish(&[], &expected)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Stamp advanced to current; the legacy run branch is physically gone from
|
||||
// `__manifest` (checked via the raw, unfiltered manifest list — not the
|
||||
// guard-filtered `branch_list`), and the real branch + `main` survive.
|
||||
let post = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(
|
||||
super::migrations::read_stamp(&post),
|
||||
super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION,
|
||||
);
|
||||
let reopened = ManifestCoordinator::open(uri).await.unwrap();
|
||||
let after = reopened.list_branches().await.unwrap();
|
||||
assert!(
|
||||
!after.iter().any(|b| b.starts_with("__run__")),
|
||||
"legacy run branch must be swept; got {after:?}",
|
||||
);
|
||||
assert!(after.iter().any(|b| b == "feature"), "user branch must survive");
|
||||
assert!(after.iter().any(|b| b == "main"), "main must survive");
|
||||
|
||||
// Idempotent: a second write-open finds the stamp at current and does not
|
||||
// re-run the sweep or error.
|
||||
GraphNamespacePublisher::new(uri, None)
|
||||
.publish(&[], &expected)
|
||||
.await
|
||||
.unwrap();
|
||||
let final_ds = open_manifest_dataset(uri, None).await.unwrap();
|
||||
assert_eq!(
|
||||
super::migrations::read_stamp(&final_ds),
|
||||
super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION,
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_publish_rejects_manifest_stamped_at_future_version() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue