MR-771: address PR review feedback

Three fixes from automated PR review on #65:

1. Internal-branch guard in mutation/load (Cursor Bugbot, Medium).
   Pre-MR-771 the begin_run path called ensure_public_branch_ref;
   the direct-publish replacements only normalized the name. A caller
   passing __run__* or __schema_apply_lock__ verbatim could write
   directly to a system branch. Re-add the explicit guard at the
   public write boundary in mutate_with_current_actor and load.

2. Panic-safe coordinator restoration (Cursor Bugbot, High).
   The previous swap-and-restore pattern would skip restore_coordinator
   if execute_named_mutation panicked, leaving the handle pinned to
   the wrong branch indefinitely. Replace with a CoordinatorRestoreGuard
   RAII type that captures the previous coordinator on swap and
   restores it in Drop.

3. Flaky cancel-safety test (cubic, P2).
   tests/runs.rs::cancelled_mutation_future_leaves_no_state asserted
   manifest version equality after handle.abort(), but abort races
   the spawned task. Re-frame around what actually defines cancel
   safety: no __run__* branches, no _graph_runs.lance, no synthesized
   public branches.

The fourth comment (Codex P1: branch_delete losing its in-flight
write barrier) is bigger in scope — fits in the MR-794 storage-trait
staging story rather than a hotfix here. Tracked there.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-04-30 15:17:00 +02:00
parent 35be20cb05
commit a7109d5fba
No known key found for this signature in database
5 changed files with 103 additions and 26 deletions

View file

@ -12,6 +12,7 @@ pub use omnigraph::{
CleanupPolicyOptions, MergeOutcome, Omnigraph, SchemaApplyResult, TableCleanupStats,
TableOptimizeStats,
};
pub(crate) use omnigraph::ensure_public_branch_ref;
pub(crate) use run_registry::is_internal_run_branch;
pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__";

View file

@ -843,7 +843,7 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
Ok(Some(branch.to_string()))
}
fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
if super::is_internal_run_branch(branch) {
return Err(OmniError::manifest(format!(
"{} does not allow internal run ref '{}'",

View file

@ -636,6 +636,58 @@ impl MutationStaging {
}
}
/// RAII helper that restores `Omnigraph::coordinator` on drop. Used by
/// `mutate_with_current_actor` so a panic between the coordinator swap and
/// the explicit restore (e.g. an assertion deep inside Lance) does not
/// leave the handle pinned to the requested branch indefinitely. The
/// captured coordinator is `take`n on drop and assigned via the
/// (synchronous) `restore_coordinator` accessor.
///
/// Holds a bare `*mut Omnigraph` (no lifetime parameter) deliberately:
/// borrowing the engine through this guard would lock out the rest of
/// `mutate_with_current_actor` from calling `&mut self` methods on the
/// engine while the guard is alive. The unsafe is bounded by the
/// constructor contract — the caller must not let the guard outlive the
/// `&mut self` it was built from. In practice this is enforced by the
/// guard being assigned to a stack-local `_guard` binding inside one
/// function and never moved out.
struct CoordinatorRestoreGuard {
db: *mut Omnigraph,
previous: Option<crate::db::GraphCoordinator>,
}
// SAFETY: the pointer addresses an `Omnigraph`, which is `Send`. The guard
// is short-lived and the only operation it performs is the sync
// `restore_coordinator` field assignment in `Drop`. No reference is shared
// across threads — the future holding the guard moves between threads
// (e.g. when an Axum handler is awaited on a worker), and the swap-back is
// always invoked at most once on whichever thread runs `Drop`.
unsafe impl Send for CoordinatorRestoreGuard {}
impl CoordinatorRestoreGuard {
/// SAFETY: `db` must outlive the returned guard, and the caller must
/// not move the guard outside the borrow scope of `db`.
fn new(db: &mut Omnigraph, previous: crate::db::GraphCoordinator) -> Self {
Self {
db: db as *mut Omnigraph,
previous: Some(previous),
}
}
}
impl Drop for CoordinatorRestoreGuard {
fn drop(&mut self) {
if let Some(prev) = self.previous.take() {
// SAFETY: per the `new` contract, `db` is still valid here.
// `restore_coordinator` is a sync field assignment and does not
// re-enter the runtime.
unsafe {
(*self.db).restore_coordinator(prev);
}
}
}
}
/// Open a sub-table dataset for write in the current mutation query. On the
/// first touch of a table, captures the pre-write manifest version into
/// `staging.expected_versions` so the publisher can enforce OCC. On
@ -704,6 +756,14 @@ impl Omnigraph {
) -> Result<MutationResult> {
self.ensure_schema_state_valid().await?;
let requested = Self::normalize_branch_name(branch)?;
// Reject internal `__run__*` / system-prefixed branches at the public
// write boundary. The pre-MR-771 path got this guard transitively via
// `begin_run`'s `ensure_public_branch_ref` call; the direct-publish
// path needs to assert it explicitly so a caller can't write to
// legacy or system staging branches by passing the prefix verbatim.
if let Some(name) = requested.as_deref() {
crate::db::ensure_public_branch_ref(name, "mutate")?;
}
let resolved_params = enrich_mutation_params(params)?;
// Direct-to-target write path. Per-query staging captures pre-write
@ -714,8 +774,16 @@ impl Omnigraph {
let current = self.active_branch().map(str::to_string);
let needs_swap = requested.as_deref() != current.as_deref();
let previous = if needs_swap {
Some(self.swap_coordinator_for_branch(requested.as_deref()).await?)
// RAII guard for coordinator state. If we swapped to the requested
// branch, the original coordinator is captured here and unconditionally
// restored on drop — including on panic from `execute_named_mutation`
// or the publisher. Without this, a Lance-internal panic between swap
// and restore would leave the handle pinned to the wrong branch for
// its remaining lifetime.
let _guard = if needs_swap {
let previous = self.swap_coordinator_for_branch(requested.as_deref()).await?;
Some(CoordinatorRestoreGuard::new(self, previous))
} else {
None
};
@ -746,9 +814,8 @@ impl Omnigraph {
}
};
if let Some(previous) = previous {
self.restore_coordinator(previous);
}
// `_guard` drops here and restores the coordinator (also restores on
// any panic that unwound through this function above).
publish_result
}

View file

@ -154,6 +154,12 @@ impl Omnigraph {
pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
self.ensure_schema_state_valid().await?;
// Reject internal `__run__*` / system-prefixed branches at the public
// write boundary. The pre-MR-771 path got this guard transitively via
// `begin_run`'s `ensure_public_branch_ref` call; the direct-publish
// path needs to assert it explicitly so a caller can't write to
// legacy or system staging branches by passing the prefix verbatim.
crate::db::ensure_public_branch_ref(branch, "load")?;
// Branch convention: `None` represents `main`. Re-normalizing to
// `Some("main")` here would route the publisher commit through a
// separate coordinator (the cross-branch path in

View file

@ -319,8 +319,17 @@ async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() {
/// The cancellation hole that motivated MR-771: dropping a mutation future
/// mid-flight must not leave any graph-level state behind. With the run
/// state machine gone, only orphaned Lance fragments can remain — and those
/// are reclaimed by `omnigraph cleanup`. Verify by aborting the future and
/// asserting the branch list and manifest version are unchanged.
/// are reclaimed by `omnigraph cleanup`.
///
/// The test deliberately does NOT assert that the manifest version is
/// unchanged: `handle.abort()` is racing the spawned task, and on a fast
/// machine the mutation may complete before cancellation. That is acceptable
/// — what matters for cancel safety is that no `__run__*` staging branches
/// are ever created, that `_graph_runs.lance` is never written, and that
/// any partial state on disk is reachable through the regular manifest /
/// commit graph pipes (so `omnigraph cleanup` can reclaim it). Asserting
/// version equality would just be a flake on hosts where the abort lands
/// late.
#[tokio::test]
async fn cancelled_mutation_future_leaves_no_state() {
let dir = tempfile::tempdir().unwrap();
@ -333,13 +342,6 @@ async fn cancelled_mutation_future_leaves_no_state() {
.unwrap();
}
let manifest_version_before = {
let db = Omnigraph::open(&uri).await.unwrap();
db.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version()
};
let branches_before = {
let db = Omnigraph::open(&uri).await.unwrap();
db.branch_list().await.unwrap()
@ -358,27 +360,28 @@ async fn cancelled_mutation_future_leaves_no_state() {
});
// Cancel the future. Whether the in-flight write managed to land a
// fragment is timing-dependent and irrelevant; what matters is that no
// graph-level state remains pointing at it.
// fragment (or even fully publish) is timing-dependent and irrelevant —
// see the doc comment on this test for why.
handle.abort();
let _ = handle.await;
let db = Omnigraph::open(&uri).await.unwrap();
let branches_after = db.branch_list().await.unwrap();
let manifest_version_after = db
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version();
assert_eq!(
branches_after, branches_before,
// Cancel-safety property: no graph-level run/staging state remains.
// (1) No `__run__*` staging branches are created either way.
assert!(
!branches_after.iter().any(|b| b.starts_with("__run__")),
"cancelled mutation must not leave a __run__* branch behind",
);
// (2) The branch list is otherwise unchanged: cancellation/completion
// cannot synthesize new public branches.
assert_eq!(
manifest_version_after, manifest_version_before,
"cancelled mutation must not advance the manifest",
branches_after.iter().filter(|b| !b.starts_with("__run__")).count(),
branches_before.iter().filter(|b| !b.starts_with("__run__")).count(),
"cancelled mutation must not synthesize new public branches",
);
// (3) The legacy run-state machine table never reappears.
assert!(
!std::path::Path::new(&format!("{}/_graph_runs.lance", uri)).exists(),
"no _graph_runs.lance after cancel — state machine is gone",