From a7109d5fba504246c6ce0c75793a65e312ef16fa Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Thu, 30 Apr 2026 15:17:00 +0200 Subject: [PATCH] MR-771: address PR review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes from automated PR review on #65: 1. Internal-branch guard in mutation/load (Cursor Bugbot, Medium). Pre-MR-771 the begin_run path called ensure_public_branch_ref; the direct-publish replacements only normalized the name. A caller passing __run__* or __schema_apply_lock__ verbatim could write directly to a system branch. Re-add the explicit guard at the public write boundary in mutate_with_current_actor and load. 2. Panic-safe coordinator restoration (Cursor Bugbot, High). The previous swap-and-restore pattern would skip restore_coordinator if execute_named_mutation panicked, leaving the handle pinned to the wrong branch indefinitely. Replace with a CoordinatorRestoreGuard RAII type that captures the previous coordinator on swap and restores it in Drop. 3. Flaky cancel-safety test (cubic, P2). tests/runs.rs::cancelled_mutation_future_leaves_no_state asserted manifest version equality after handle.abort(), but abort races the spawned task. Re-frame around what actually defines cancel safety: no __run__* branches, no _graph_runs.lance, no synthesized public branches. The fourth comment (Codex P1: branch_delete losing its in-flight write barrier) is bigger in scope — fits in the MR-794 storage-trait staging story rather than a hotfix here. Tracked there. Co-Authored-By: Claude Opus 4.7 --- crates/omnigraph/src/db/mod.rs | 1 + crates/omnigraph/src/db/omnigraph.rs | 2 +- crates/omnigraph/src/exec/mutation.rs | 77 +++++++++++++++++++++++++-- crates/omnigraph/src/loader/mod.rs | 6 +++ crates/omnigraph/tests/runs.rs | 43 ++++++++------- 5 files changed, 103 insertions(+), 26 deletions(-) diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index cc73315..8ce5576 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -12,6 +12,7 @@ pub use omnigraph::{ CleanupPolicyOptions, MergeOutcome, Omnigraph, SchemaApplyResult, TableCleanupStats, TableOptimizeStats, }; +pub(crate) use omnigraph::ensure_public_branch_ref; pub(crate) use run_registry::is_internal_run_branch; pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__"; diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 7af22be..01cf551 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -843,7 +843,7 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result> { Ok(Some(branch.to_string())) } -fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> { +pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> { if super::is_internal_run_branch(branch) { return Err(OmniError::manifest(format!( "{} does not allow internal run ref '{}'", diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index c79af3f..ab7df62 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -636,6 +636,58 @@ impl MutationStaging { } } +/// RAII helper that restores `Omnigraph::coordinator` on drop. Used by +/// `mutate_with_current_actor` so a panic between the coordinator swap and +/// the explicit restore (e.g. an assertion deep inside Lance) does not +/// leave the handle pinned to the requested branch indefinitely. The +/// captured coordinator is `take`n on drop and assigned via the +/// (synchronous) `restore_coordinator` accessor. +/// +/// Holds a bare `*mut Omnigraph` (no lifetime parameter) deliberately: +/// borrowing the engine through this guard would lock out the rest of +/// `mutate_with_current_actor` from calling `&mut self` methods on the +/// engine while the guard is alive. The unsafe is bounded by the +/// constructor contract — the caller must not let the guard outlive the +/// `&mut self` it was built from. In practice this is enforced by the +/// guard being assigned to a stack-local `_guard` binding inside one +/// function and never moved out. +struct CoordinatorRestoreGuard { + db: *mut Omnigraph, + previous: Option, +} + +// SAFETY: the pointer addresses an `Omnigraph`, which is `Send`. The guard +// is short-lived and the only operation it performs is the sync +// `restore_coordinator` field assignment in `Drop`. No reference is shared +// across threads — the future holding the guard moves between threads +// (e.g. when an Axum handler is awaited on a worker), and the swap-back is +// always invoked at most once on whichever thread runs `Drop`. +unsafe impl Send for CoordinatorRestoreGuard {} + +impl CoordinatorRestoreGuard { + /// SAFETY: `db` must outlive the returned guard, and the caller must + /// not move the guard outside the borrow scope of `db`. + fn new(db: &mut Omnigraph, previous: crate::db::GraphCoordinator) -> Self { + Self { + db: db as *mut Omnigraph, + previous: Some(previous), + } + } +} + +impl Drop for CoordinatorRestoreGuard { + fn drop(&mut self) { + if let Some(prev) = self.previous.take() { + // SAFETY: per the `new` contract, `db` is still valid here. + // `restore_coordinator` is a sync field assignment and does not + // re-enter the runtime. + unsafe { + (*self.db).restore_coordinator(prev); + } + } + } +} + /// Open a sub-table dataset for write in the current mutation query. On the /// first touch of a table, captures the pre-write manifest version into /// `staging.expected_versions` so the publisher can enforce OCC. On @@ -704,6 +756,14 @@ impl Omnigraph { ) -> Result { self.ensure_schema_state_valid().await?; let requested = Self::normalize_branch_name(branch)?; + // Reject internal `__run__*` / system-prefixed branches at the public + // write boundary. The pre-MR-771 path got this guard transitively via + // `begin_run`'s `ensure_public_branch_ref` call; the direct-publish + // path needs to assert it explicitly so a caller can't write to + // legacy or system staging branches by passing the prefix verbatim. + if let Some(name) = requested.as_deref() { + crate::db::ensure_public_branch_ref(name, "mutate")?; + } let resolved_params = enrich_mutation_params(params)?; // Direct-to-target write path. Per-query staging captures pre-write @@ -714,8 +774,16 @@ impl Omnigraph { let current = self.active_branch().map(str::to_string); let needs_swap = requested.as_deref() != current.as_deref(); - let previous = if needs_swap { - Some(self.swap_coordinator_for_branch(requested.as_deref()).await?) + + // RAII guard for coordinator state. If we swapped to the requested + // branch, the original coordinator is captured here and unconditionally + // restored on drop — including on panic from `execute_named_mutation` + // or the publisher. Without this, a Lance-internal panic between swap + // and restore would leave the handle pinned to the wrong branch for + // its remaining lifetime. + let _guard = if needs_swap { + let previous = self.swap_coordinator_for_branch(requested.as_deref()).await?; + Some(CoordinatorRestoreGuard::new(self, previous)) } else { None }; @@ -746,9 +814,8 @@ impl Omnigraph { } }; - if let Some(previous) = previous { - self.restore_coordinator(previous); - } + // `_guard` drops here and restores the coordinator (also restores on + // any panic that unwound through this function above). publish_result } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 7fc0737..405f560 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -154,6 +154,12 @@ impl Omnigraph { pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result { self.ensure_schema_state_valid().await?; + // Reject internal `__run__*` / system-prefixed branches at the public + // write boundary. The pre-MR-771 path got this guard transitively via + // `begin_run`'s `ensure_public_branch_ref` call; the direct-publish + // path needs to assert it explicitly so a caller can't write to + // legacy or system staging branches by passing the prefix verbatim. + crate::db::ensure_public_branch_ref(branch, "load")?; // Branch convention: `None` represents `main`. Re-normalizing to // `Some("main")` here would route the publisher commit through a // separate coordinator (the cross-branch path in diff --git a/crates/omnigraph/tests/runs.rs b/crates/omnigraph/tests/runs.rs index d418f84..4581264 100644 --- a/crates/omnigraph/tests/runs.rs +++ b/crates/omnigraph/tests/runs.rs @@ -319,8 +319,17 @@ async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() { /// The cancellation hole that motivated MR-771: dropping a mutation future /// mid-flight must not leave any graph-level state behind. With the run /// state machine gone, only orphaned Lance fragments can remain — and those -/// are reclaimed by `omnigraph cleanup`. Verify by aborting the future and -/// asserting the branch list and manifest version are unchanged. +/// are reclaimed by `omnigraph cleanup`. +/// +/// The test deliberately does NOT assert that the manifest version is +/// unchanged: `handle.abort()` is racing the spawned task, and on a fast +/// machine the mutation may complete before cancellation. That is acceptable +/// — what matters for cancel safety is that no `__run__*` staging branches +/// are ever created, that `_graph_runs.lance` is never written, and that +/// any partial state on disk is reachable through the regular manifest / +/// commit graph pipes (so `omnigraph cleanup` can reclaim it). Asserting +/// version equality would just be a flake on hosts where the abort lands +/// late. #[tokio::test] async fn cancelled_mutation_future_leaves_no_state() { let dir = tempfile::tempdir().unwrap(); @@ -333,13 +342,6 @@ async fn cancelled_mutation_future_leaves_no_state() { .unwrap(); } - let manifest_version_before = { - let db = Omnigraph::open(&uri).await.unwrap(); - db.snapshot_of(ReadTarget::branch("main")) - .await - .unwrap() - .version() - }; let branches_before = { let db = Omnigraph::open(&uri).await.unwrap(); db.branch_list().await.unwrap() @@ -358,27 +360,28 @@ async fn cancelled_mutation_future_leaves_no_state() { }); // Cancel the future. Whether the in-flight write managed to land a - // fragment is timing-dependent and irrelevant; what matters is that no - // graph-level state remains pointing at it. + // fragment (or even fully publish) is timing-dependent and irrelevant — + // see the doc comment on this test for why. handle.abort(); let _ = handle.await; let db = Omnigraph::open(&uri).await.unwrap(); let branches_after = db.branch_list().await.unwrap(); - let manifest_version_after = db - .snapshot_of(ReadTarget::branch("main")) - .await - .unwrap() - .version(); - assert_eq!( - branches_after, branches_before, + // Cancel-safety property: no graph-level run/staging state remains. + // (1) No `__run__*` staging branches are created either way. + assert!( + !branches_after.iter().any(|b| b.starts_with("__run__")), "cancelled mutation must not leave a __run__* branch behind", ); + // (2) The branch list is otherwise unchanged: cancellation/completion + // cannot synthesize new public branches. assert_eq!( - manifest_version_after, manifest_version_before, - "cancelled mutation must not advance the manifest", + branches_after.iter().filter(|b| !b.starts_with("__run__")).count(), + branches_before.iter().filter(|b| !b.starts_with("__run__")).count(), + "cancelled mutation must not synthesize new public branches", ); + // (3) The legacy run-state machine table never reappears. assert!( !std::path::Path::new(&format!("{}/_graph_runs.lance", uri)).exists(), "no _graph_runs.lance after cancel — state machine is gone",