feat(engine): sweep & remove legacy __run__ branch guard (MR-770) (#132)

* feat(engine): sweep legacy __run__ branches via v2→v3 manifest migration

Pre-v0.4.0 graphs can carry stale `__run__<id>` staging branches on the
`__manifest` dataset, left by the Run state machine removed in MR-771. Lance's
`list_branches` still enumerates them, so they leak into `branch_list()` and
count as blocking branches at schema-apply time.

Add a one-time `migrate_v2_to_v3` arm to the internal-schema dispatcher: on the
first read-write open it enumerates `__manifest` branches, deletes every
`__run__*` ref, and bumps the stamp to 3. Idempotent under retry (re-enumerates
fresh each run). The `"__run__"` prefix is inlined so the migration does not
depend on the run_registry guard that MR-770 removes next.

This is the prerequisite sweep; the guard removal follows in the next commit.

* refactor(engine): remove the legacy __run__ branch guard (MR-770)

With the v2→v3 migration sweeping stale `__run__*` branches off `__manifest`
on first read-write open, the defense-in-depth `is_internal_run_branch` guard
is no longer needed.

- delete `db/run_registry.rs`; drop the module + re-export from `db/mod.rs`
- collapse `is_internal_system_branch` to the schema-apply-lock check only
- `ensure_public_branch_ref`: drop the run-ref rejection; `__run__*` is now an
  ordinary branch name
- `branch_merge`: reject `is_internal_system_branch` (was run-only) so the
  schema-apply lock is rejected consistently with create/delete — a small,
  deliberate tightening
- update the inline schema-apply test + the writes integration tests
  (`public_branch_apis_reject_internal_run_refs` →
  `public_branch_apis_reject_internal_system_refs`, which also asserts
  `__run__*` now creates successfully)
- docs: flip the "pending production sweep / defense-in-depth" notes to
  "auto-swept by the v2→v3 migration"; document the read-only-open limitation

Known residual: the inert `_graph_runs.lance` / `_graph_run_actors.lance` bytes
remain until a `StorageAdapter::delete_prefix` primitive lands.

* fix(engine): run __run__ sweep at Omnigraph::open, not only on publish

Review (PR #132) caught a regression: removing __run__ from
`is_internal_system_branch` exposed legacy `__run__*` branches to the
schema-apply blocking-branch checks (schema_apply.rs:104 and :778) and to
`branch_list()`, but the v2→v3 sweep ran only inside the publisher's
`load_publish_state`. On a pre-v0.4.0 graph whose first write is a schema
apply, the blocking-branch check fires before any publish, so apply failed
with "found non-main branches: __run__…". The same lazy timing also created a
reverse hazard: a user-created `__run__*` branch on a still-v2 graph could be
deleted by the first publish's sweep.

Fix: run the internal-schema migration in `Omnigraph::open(ReadWrite)` (new
`manifest::migrate_on_open`), before the coordinator reads branch state. The
sweep now lands before any branch-observing code, and a graph is stamped v3 at
open — so the one-time sweep can never catch a legitimately-created branch.
Both checks and `branch_list` see the swept graph; correct by construction for
every write path.

Accepted residual: a read-only open of an unmigrated legacy graph still lists
`__run__*` (read-only opens must not write, so they can't sweep). Documented.

Regression test `legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply`
confirmed RED before the fix (panicked on the branch_list leak assertion) and
GREEN after. Also updates the stale schema_apply.rs comment, the writes.md
"Migration code" section, and adds the v3 row to storage.md's migration table.

* test(engine): sweep multiple legacy __run__ branches; doc nit

Strengthen the v2→v3 migration test to synthesize three `__run__*` branches
(a real legacy graph accumulates one per run) so the migration's delete loop
is exercised on a single reused dataset handle, not just a single branch.
Confirms multi-branch deletion is safe.

Also drop a stale "active runs" reference from the branch_delete doc line.

* fix(engine): force-delete in __run__ sweep for concurrency safety

`migrate_v2_to_v3` ran `Dataset::delete_branch` (= `branches().delete(.., false)`),
which errors "BranchContents not found" if the branch is already gone. Since the
sweep now runs in `Omnigraph::open(ReadWrite)`, two processes opening the same
legacy v2 graph concurrently would race: one wins each delete, the other's open
fails. The migration only claimed idempotency under *sequential* retry.

Switch to `Dataset::force_delete_branch` (= `delete(.., true)`), Lance's
documented path for cleaning up zombie branches, which tolerates an
already-absent branch. The sweep is now idempotent under concurrent runners and
robust to partial/zombie state. Found in self-review; no behavior change for the
common single-open path.

* docs(release): note MR-770 __run__ cleanup in v0.6.1

* docs(branches): reconcile branch cleanup semantics
This commit is contained in:
Ragnor Comerford 2026-06-07 17:33:14 +02:00 committed by GitHub
parent fd8e078a77
commit 54842808db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 269 additions and 72 deletions

View file

@ -48,6 +48,22 @@ const OBJECT_TYPE_TABLE_VERSION: &str = "table_version";
const OBJECT_TYPE_TABLE_TOMBSTONE: &str = "table_tombstone";
const TABLE_VERSION_MANAGEMENT_KEY: &str = "table_version_management";
/// Apply pending internal-schema migrations against `__manifest` on the
/// open-for-write path, independent of a publish.
///
/// `Omnigraph::open(ReadWrite)` calls this before the coordinator reads branch
/// state, so branch-observing code (`branch_list`, the schema-apply
/// blocking-branch checks) sees the post-migration graph. In particular the
/// v2→v3 step sweeps legacy `__run__*` staging branches off `__manifest`
/// (MR-770); running it here closes the window where those branches would
/// otherwise block schema apply before the first publish runs the migration.
///
/// Idempotent: a no-op stamp read when the on-disk version already matches.
pub(crate) async fn migrate_on_open(root_uri: &str) -> Result<()> {
let mut dataset = open_manifest_dataset(root_uri, None).await?;
migrations::migrate_internal_schema(&mut dataset).await
}
/// Immutable point-in-time view of the database.
///
/// Cheap to create (no storage I/O). All reads within a query go through one

View file

@ -46,7 +46,11 @@ use crate::error::{OmniError, Result};
/// - v2 — `__manifest.object_id` carries the unenforced-PK annotation,
/// engaging Lance's bloom-filter conflict resolver at commit time. Added
/// alongside `expected_table_versions` OCC on `ManifestBatchPublisher::publish`.
pub(super) const INTERNAL_MANIFEST_SCHEMA_VERSION: u32 = 2;
/// - v3 — one-time sweep of legacy `__run__<id>` staging branches left on the
/// `__manifest` dataset by the pre-v0.4.0 Run state machine (removed in
/// MR-771). Once swept, the `is_internal_run_branch` defense-in-depth guard
/// is no longer needed (MR-770).
pub(super) const INTERNAL_MANIFEST_SCHEMA_VERSION: u32 = 3;
const INTERNAL_SCHEMA_VERSION_KEY: &str = "omnigraph:internal_schema_version";
const OBJECT_ID_PK_KEY: &str = "lance-schema:unenforced-primary-key";
@ -89,6 +93,10 @@ pub(super) async fn migrate_internal_schema(dataset: &mut Dataset) -> Result<()>
migrate_v1_to_v2(dataset).await?;
current = 2;
}
2 => {
migrate_v2_to_v3(dataset).await?;
current = 3;
}
other => {
return Err(OmniError::manifest_internal(format!(
"no internal-schema migration registered for v{} → v{}",
@ -122,6 +130,51 @@ async fn migrate_v1_to_v2(dataset: &mut Dataset) -> Result<()> {
set_stamp(dataset, 2).await
}
/// v2 → v3: sweep legacy `__run__<id>` staging branches off the `__manifest`
/// dataset, then bump the stamp.
///
/// The pre-v0.4.0 Run state machine (removed in MR-771) created graph-level
/// staging branches named `__run__<ulid>` on `__manifest`. MR-771 stopped
/// creating them but left any pre-existing ones in place; Lance's
/// `list_branches` still enumerates them, so they leak into `branch_list()`
/// and count as blocking branches at schema-apply time. This one-time sweep
/// removes them so the `is_internal_run_branch` guard can retire (MR-770).
///
/// The `"__run__"` prefix is inlined here on purpose: this migration must keep
/// working after the `run_registry` module (the guard) is deleted, so it does
/// not depend on it.
///
/// Idempotent under both sequential retry and concurrent runners: each run
/// re-enumerates `list_branches` fresh, and `force_delete_branch` tolerates a
/// branch that is already gone — so a crash before the stamp bump, or a second
/// process opening the same legacy graph at the same time, never errors out.
async fn migrate_v2_to_v3(dataset: &mut Dataset) -> Result<()> {
const LEGACY_RUN_BRANCH_PREFIX: &str = "__run__";
let branches = dataset
.list_branches()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let run_branches: Vec<String> = branches
.into_keys()
.filter(|name| {
name.trim_start_matches('/')
.starts_with(LEGACY_RUN_BRANCH_PREFIX)
})
.collect();
for name in run_branches {
// `force_delete_branch` deletes even when the `BranchContents` is
// already gone. Plain `delete_branch` errors "BranchContents not
// found", which would fail a second concurrent open (or a retry that
// raced another runner) after the first one swept the branch. Force is
// exactly Lance's documented path for cleaning up zombie branches.
dataset
.force_delete_branch(&name)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
set_stamp(dataset, 3).await
}
async fn set_stamp(dataset: &mut Dataset, version: u32) -> Result<()> {
dataset
.update_schema_metadata([(INTERNAL_SCHEMA_VERSION_KEY.to_string(), version.to_string())])

View file

@ -1461,6 +1461,80 @@ async fn test_publish_migrates_pre_stamp_manifest_to_current_version() {
assert!(reopened.snapshot().entry("node:Person").is_some());
}
#[tokio::test]
async fn test_v2_to_v3_sweeps_legacy_run_branches_on_write_open() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let catalog = build_test_catalog();
let mut mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
// Synthesize a pre-MR-770 graph: several stale `__run__` staging branches
// left on `__manifest` (a real legacy graph accumulates one per run), plus
// a real user branch that must survive the sweep. Multiple run branches
// exercise the migration's delete loop on a single reused dataset handle.
mc.create_branch("__run__01J9LEGACY").await.unwrap();
mc.create_branch("__run__01J9SECOND").await.unwrap();
mc.create_branch("__run__01J9THIRD").await.unwrap();
mc.create_branch("feature").await.unwrap();
let before = mc.list_branches().await.unwrap();
assert_eq!(
before.iter().filter(|b| b.starts_with("__run__")).count(),
3,
"precondition: three legacy run branches exist on __manifest; got {before:?}",
);
// Rewind the internal-schema stamp to v2 so the next write-open runs the
// v2 → v3 sweep arm (init stamps at the current version, which is past it).
{
let mut ds = open_manifest_dataset(uri, None).await.unwrap();
ds.update_schema_metadata([(
"omnigraph:internal_schema_version".to_string(),
Some("2".to_string()),
)])
.await
.unwrap();
let post = open_manifest_dataset(uri, None).await.unwrap();
assert_eq!(super::migrations::read_stamp(&post), 2, "stamp rewound to v2");
}
// A no-op publish forces the open-for-write path, which runs the migration.
let mut expected = HashMap::new();
expected.insert("node:Person".to_string(), 1);
GraphNamespacePublisher::new(uri, None)
.publish(&[], &expected)
.await
.unwrap();
// Stamp advanced to current; the legacy run branch is physically gone from
// `__manifest` (checked via the raw, unfiltered manifest list — not the
// guard-filtered `branch_list`), and the real branch + `main` survive.
let post = open_manifest_dataset(uri, None).await.unwrap();
assert_eq!(
super::migrations::read_stamp(&post),
super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION,
);
let reopened = ManifestCoordinator::open(uri).await.unwrap();
let after = reopened.list_branches().await.unwrap();
assert!(
!after.iter().any(|b| b.starts_with("__run__")),
"legacy run branch must be swept; got {after:?}",
);
assert!(after.iter().any(|b| b == "feature"), "user branch must survive");
assert!(after.iter().any(|b| b == "main"), "main must survive");
// Idempotent: a second write-open finds the stamp at current and does not
// re-run the sweep or error.
GraphNamespacePublisher::new(uri, None)
.publish(&[], &expected)
.await
.unwrap();
let final_ds = open_manifest_dataset(uri, None).await.unwrap();
assert_eq!(
super::migrations::read_stamp(&final_ds),
super::migrations::INTERNAL_MANIFEST_SCHEMA_VERSION,
);
}
#[tokio::test]
async fn test_publish_rejects_manifest_stamped_at_future_version() {
let dir = tempfile::tempdir().unwrap();

View file

@ -3,7 +3,6 @@ pub mod graph_coordinator;
pub mod manifest;
mod omnigraph;
mod recovery_audit;
mod run_registry;
mod schema_state;
pub(crate) mod write_queue;
@ -15,7 +14,6 @@ pub use omnigraph::{
CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions,
SchemaApplyResult, SkipReason, TableCleanupStats, TableOptimizeStats,
};
pub(crate) use run_registry::is_internal_run_branch;
pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__";
@ -69,5 +67,8 @@ pub(crate) fn is_schema_apply_lock_branch(name: &str) -> bool {
}
pub(crate) fn is_internal_system_branch(name: &str) -> bool {
is_internal_run_branch(name) || is_schema_apply_lock_branch(name)
// Legacy `__run__*` staging branches (Run state machine, removed MR-771)
// are swept off `__manifest` by the v2→v3 internal-schema migration, so the
// only internal branch the engine still creates is the schema-apply lock.
is_schema_apply_lock_branch(name)
}

View file

@ -346,6 +346,16 @@ impl Omnigraph {
mode: OpenMode,
) -> Result<Self> {
let root = normalize_root_uri(uri)?;
// Apply pending internal-schema migrations before the coordinator reads
// branch state, so `branch_list` and the schema-apply blocking-branch
// checks observe the post-migration graph — notably the v2→v3 sweep of
// legacy `__run__*` staging branches (MR-770). ReadWrite only: a
// read-only open must not trigger object-store writes, so a read-only
// open of an unmigrated legacy graph still lists `__run__*` until its
// first read-write open (an accepted, documented limitation).
if matches!(mode, OpenMode::ReadWrite) {
crate::db::manifest::migrate_on_open(&root).await?;
}
// Open the coordinator first so the schema-staging recovery sweep can
// compare its snapshot against any leftover staging files.
let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
@ -1491,12 +1501,6 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
}
pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
if super::is_internal_run_branch(branch) {
return Err(OmniError::manifest(format!(
"{} does not allow internal run ref '{}'",
operation, branch
)));
}
if is_internal_system_branch(branch) {
return Err(OmniError::manifest(format!(
"{} does not allow internal system ref '{}'",
@ -1900,7 +1904,6 @@ fn json_value_from_array(array: &dyn Array, row: usize) -> Result<serde_json::Va
#[cfg(test)]
mod tests {
use super::*;
use crate::db::is_internal_run_branch;
use crate::db::manifest::ManifestCoordinator;
use async_trait::async_trait;
use serde_json::Value;
@ -2238,11 +2241,11 @@ edge WorksAt: Person -> Company
#[tokio::test]
async fn test_apply_schema_succeeds_after_load() {
// Historical: schema apply used to be blocked by leftover
// `__run__` branches. A defense-in-depth filter now skips
// internal system branches, and run branches were made
// ephemeral on every terminal state — so in practice no
// `__run__` branch survives publish. The filter still guards
// the invariant.
// `__run__` branches. The Run state machine was removed in
// MR-771, so a fresh graph never creates a `__run__` branch;
// legacy ones are swept by the v2→v3 manifest migration. This
// asserts the invariant a current graph upholds: publish leaves
// no `__run__` branch behind, so schema apply proceeds.
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
@ -2257,8 +2260,8 @@ edge WorksAt: Person -> Company
let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
assert!(
!all_branches.iter().any(|b| is_internal_run_branch(b)),
"run branch should be deleted after publish, got: {:?}",
!all_branches.iter().any(|b| b.starts_with("__run__")),
"no __run__ branch should exist after publish, got: {:?}",
all_branches
);
@ -2270,6 +2273,56 @@ edge WorksAt: Person -> Company
assert!(result.applied, "schema apply should have applied");
}
/// Regression (MR-770): a pre-v0.4.0 graph that still carries a stale
/// `__run__*` branch on `__manifest` must not block schema apply. The
/// v2→v3 sweep runs in `Omnigraph::open(ReadWrite)` — before the
/// schema-apply blocking-branch check — so apply succeeds with no
/// intervening publish.
///
/// Confirmed to fail before the open-time migration landed: the reopened
/// graph still listed `__run__legacy`, and `apply_schema` returned
/// "found non-main branches: __run__legacy".
#[tokio::test]
async fn legacy_run_branch_is_swept_on_open_and_does_not_block_schema_apply() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
// Synthesize a legacy graph: a stale `__run__` branch on `__manifest`
// plus the manifest stamp rewound to v2 (pre-sweep).
db.branch_create("__run__legacy").await.unwrap();
drop(db);
{
let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
.await
.unwrap();
ds.update_schema_metadata([(
"omnigraph:internal_schema_version".to_string(),
Some("2".to_string()),
)])
.await
.unwrap();
}
// Reopen (ReadWrite): the open-time migration must sweep `__run__legacy`
// before any branch-observing code runs.
let db = Omnigraph::open(uri).await.unwrap();
let branches = db.branch_list().await.unwrap();
assert!(
!branches.iter().any(|b| b.starts_with("__run__")),
"open-time migration must sweep legacy __run__ branches; got {branches:?}",
);
// Schema apply must proceed with no intervening publish — the
// blocking-branch check no longer sees `__run__legacy`.
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
let result = db.apply_schema(&desired).await.unwrap();
assert!(result.applied, "schema apply should have applied");
}
#[tokio::test]
async fn test_apply_schema_adds_index_for_existing_property() {
let dir = tempfile::tempdir().unwrap();

View file

@ -61,11 +61,11 @@ async fn plan_schema_for_apply(
) -> Result<PlannedSchemaApply> {
db.ensure_schema_state_valid().await?;
let branches = db.coordinator.read().await.all_branches().await?;
// Skip `main` and internal system branches. The schema-apply lock branch
// is excluded because it is the cluster-wide schema-apply serializer.
// `__run__*` branches are no longer created; the filter remains as
// defense-in-depth for legacy graphs with leftover staging branches.
// A future production sweep will let this guard go.
// Skip `main` and internal system branches (the schema-apply lock branch,
// the cluster-wide schema-apply serializer). Legacy `__run__*` staging
// branches were swept off `__manifest` by the v2→v3 migration that runs in
// `Omnigraph::open(ReadWrite)` before this check (MR-770), so they no
// longer appear here.
let blocking_branches = branches
.into_iter()
.filter(|branch| branch != "main" && !is_internal_system_branch(branch))

View file

@ -1,16 +0,0 @@
// The Run state machine has been removed. Mutations now write directly
// to target tables and use the publisher's `expected_table_versions`
// CAS for cross-table OCC; `__run__<id>` staging branches and the
// `_graph_runs.lance` state machine no longer exist.
//
// What remains is the branch-name predicate, kept as a defense-in-depth
// guard against users naming a public branch `__run__*`. A future
// production sweep of legacy `_graph_runs.lance` rows and stale
// `__run__*` branches will let this predicate (and this file) go too.
pub(crate) const INTERNAL_RUN_BRANCH_PREFIX: &str = "__run__";
pub(crate) fn is_internal_run_branch(name: &str) -> bool {
name.trim_start_matches('/')
.starts_with(INTERNAL_RUN_BRANCH_PREFIX)
}

View file

@ -1087,9 +1087,9 @@ impl Omnigraph {
target: &str,
actor_id: Option<&str>,
) -> Result<MergeOutcome> {
if is_internal_run_branch(source) || is_internal_run_branch(target) {
if is_internal_system_branch(source) || is_internal_system_branch(target) {
return Err(OmniError::manifest(format!(
"branch_merge does not allow internal run refs ('{}' -> '{}')",
"branch_merge does not allow internal system refs ('{}' -> '{}')",
source, target
)));
}

View file

@ -35,7 +35,7 @@ use time::format_description::well_known::Rfc3339;
use crate::db::commit_graph::CommitGraph;
use crate::db::manifest::ManifestCoordinator;
use crate::db::{MergeOutcome, Omnigraph, is_internal_run_branch};
use crate::db::{MergeOutcome, Omnigraph, is_internal_system_branch};
use crate::db::{ReadTarget, Snapshot};
use crate::embedding::EmbeddingClient;
use crate::error::{MergeConflict, MergeConflictKind, OmniError, Result};

View file

@ -371,11 +371,10 @@ async fn cancelled_mutation_future_leaves_no_state() {
// Cancel-safety property: no graph-level run/staging state remains.
//
// Note: `branch_list()` already filters `__run__*` via
// `is_internal_system_branch`, so a runtime "no `__run__` branches" check
// would be vacuous. The structural property that no `__run__` branches
// can ever be created is enforced by deletion of `begin_run` etc. in
// (verified by the build itself — those symbols no longer exist).
// No `__run__` branches can ever be created: the Run state machine
// (`begin_run` etc.) was deleted in MR-771 — verified by the build itself,
// those symbols no longer exist. Any legacy `__run__*` branch on an
// upgraded graph is swept by the v2→v3 manifest migration.
//
// (1) The branch list is unchanged: cancellation/completion cannot
// synthesize new public branches.
@ -442,34 +441,40 @@ async fn repeated_loads_do_not_accumulate_branches() {
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
}
/// User code must not be able to write to internal `__run__*` names.
/// The branch-name guard predicate is kept as defense-in-depth; it
/// will be removed once a future production sweep retires the legacy
/// branches.
/// After MR-770, `__run__*` is an ordinary branch name — the Run state machine
/// and its `is_internal_run_branch` guard are gone. The surviving internal-ref
/// guard still rejects the active `__schema_apply_lock__` branch on the public
/// create/merge APIs.
#[tokio::test]
async fn public_branch_apis_reject_internal_run_refs() {
async fn public_branch_apis_reject_internal_system_refs() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let create_err = db.branch_create("__run__synthetic").await.unwrap_err();
// `__run__*` is no longer reserved — creating it now succeeds.
db.branch_create("__run__formerly_reserved")
.await
.expect("__run__ prefix is a normal branch name post-MR-770");
// The schema-apply lock branch is still rejected on public branch APIs.
let create_err = db.branch_create("__schema_apply_lock__").await.unwrap_err();
let OmniError::Manifest(err) = create_err else {
panic!("expected Manifest error");
};
assert!(
err.message.contains("internal run ref"),
err.message.contains("internal system ref"),
"unexpected error: {}",
err.message
);
let merge_err = db
.branch_merge("__run__synthetic", "main")
.branch_merge("__schema_apply_lock__", "main")
.await
.unwrap_err();
let OmniError::Manifest(err) = merge_err else {
panic!("expected Manifest error");
};
assert!(
err.message.contains("internal run refs"),
err.message.contains("internal system refs"),
"unexpected error: {}",
err.message
);