diff --git a/crates/omnigraph/tests/composite_flow.rs b/crates/omnigraph/tests/composite_flow.rs index 00f4d49..63ec8b2 100644 --- a/crates/omnigraph/tests/composite_flow.rs +++ b/crates/omnigraph/tests/composite_flow.rs @@ -396,6 +396,137 @@ async fn composite_flow_canonical_lifecycle() { assert!(!final_total.batches().is_empty()); } +/// Cross-handle sequence that exercises operations after a schema_apply +/// invalidates a peer handle's cached `_schema.pg`. The narrow load-bearing +/// pin is that `Omnigraph::refresh()` must not deadlock when its +/// `reload_schema_if_source_changed()` step needs to acquire a read on the +/// coordinator's `RwLock`. The broader sequencing — schema_apply → +/// branch_create → branch_delete → branch_merge → mutate (using the new +/// schema's added property) → reopen — pins that the fix doesn't regress +/// any of the related call sites. +/// +/// Pre-fix bug class: `Omnigraph::refresh()` held +/// `coordinator.write().await` from start to finish, including across the +/// `self.reload_schema_if_source_changed()` call. That helper's +/// `self.coordinator.read().await` (only reached when the on-disk schema +/// source differs from the in-memory cache) deadlocks against the outer +/// write guard because tokio's `RwLock` is not reentrant. Reachable from +/// every public refresh-using API: `branch_delete` (`omnigraph.rs:910`), +/// `branch_merge` (post-merge refresh on bound target), and any caller +/// that calls `Omnigraph::refresh` directly. +/// +/// The cross-handle setup is the realistic trigger: handle A applies a +/// schema, advancing `_schema.pg` on disk; handle B has stale in-memory +/// schema_source. B's next `refresh()` (via branch_delete here) hits the +/// read-after-write reload path. Single-handle is unreachable because +/// `apply_schema` updates the local ArcSwap cache in-line. +/// +/// Post-fix invariant: `refresh()` scopes its write guard to the recovery +/// section only, releasing it before `reload_schema_if_source_changed()`. +/// The reload's read acquisition is uncontested. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + // Step 1: init + load on handle A. + let mut db_a = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + load_jsonl(&mut db_a, TEST_DATA, LoadMode::Append).await.unwrap(); + assert_eq!(count_rows(&db_a, "node:Person").await, 4); + + // Step 2: open handle B on the same repo. B's in-memory schema_source + // cache is now a snapshot of `_schema.pg` at open time. + let db_b = Omnigraph::open(uri).await.unwrap(); + + // Step 3: A applies a schema that adds a nullable property to Person. + // A's on-disk `_schema.pg` is rewritten; A's in-memory cache is updated + // in-line by `apply_schema`. B's in-memory cache is now STALE relative + // to disk. + const TEST_SCHEMA_V2: &str = "node Person {\n name: String @key\n age: I32?\n nickname: String?\n}\n\nnode Company {\n name: String @key\n}\n\nedge Knows: Person -> Person {\n since: Date?\n}\n\nedge WorksAt: Person -> Company\n"; + let plan = db_a.apply_schema(TEST_SCHEMA_V2).await.unwrap(); + assert!(plan.applied, "apply_schema must succeed on a clean repo"); + assert!( + !plan.steps.is_empty(), + "apply_schema must record the AddProperty step" + ); + + // Step 4: deadlock vector. B.branch_delete calls B.refresh() internally + // (omnigraph.rs:910). refresh() pre-fix holds the coord write guard + // across reload_schema_if_source_changed; with B's cache stale, that + // helper takes the not-early-return branch and tries + // self.coordinator.read().await — deadlocks against the outer write. + // + // Wrap in tokio::time::timeout so a deadlock surfaces as a clean test + // panic instead of a stuck CI job. 15s is well above natural completion + // on local FS (sub-second under normal conditions). + db_b.branch_create("post-schema-apply-test").await.unwrap(); + let delete_result = tokio::time::timeout( + std::time::Duration::from_secs(15), + db_b.branch_delete("post-schema-apply-test"), + ) + .await; + assert!( + delete_result.is_ok(), + "branch_delete deadlocked in refresh() with stale schema cache. \ + Pre-fix symptom: Omnigraph::refresh() holds coordinator.write().await \ + across reload_schema_if_source_changed(), which acquires \ + coordinator.read().await on the same non-reentrant RwLock when the \ + on-disk schema source differs from the in-memory cache.", + ); + delete_result + .unwrap() + .expect("branch_delete must succeed once refresh() releases its write guard"); + + // Step 5: continuing operations on B post-refresh — verify the broader + // sequence works. B's catalog should now reflect the new schema (the + // refresh path includes reload_schema_if_source_changed which calls + // store_catalog). + db_b.branch_create("feature-after-apply").await.unwrap(); + + // Step 6: branch_merge from B exercises the post-merge refresh() path + // (merge.rs:1100-1107) — same deadlock surface as branch_delete, + // sanity-pinned by reusing the same handle whose cache was just + // refreshed. + let _outcome = tokio::time::timeout( + std::time::Duration::from_secs(15), + db_b.branch_merge("feature-after-apply", "main"), + ) + .await + .expect("branch_merge deadlocked in refresh() post-schema-apply") + .expect("branch_merge must succeed"); + + // Step 7: mutation on main using the new schema's added property — + // verifies the catalog reload completed and the engine accepts a + // mutation referencing `nickname`. + const NICKNAME_QUERY: &str = "query set_nickname($name: String, $nickname: String) {\n update Person set { nickname: $nickname } where name = $name\n}"; + db_b.mutate_as( + "main", + NICKNAME_QUERY, + "set_nickname", + &mixed_params(&[("$name", "Alice"), ("$nickname", "Ali")], &[]), + None, + ) + .await + .expect("update using post-apply schema property must succeed"); + + // Step 8: reopen — final integration check that the post-deadlock-fix + // state persists across handle drop/open. + drop(db_a); + drop(db_b); + let db_c = Omnigraph::open(uri).await.unwrap(); + assert_eq!( + count_rows(&db_c, "node:Person").await, + 4, + "Person count consistent across reopen post-schema-apply", + ); + let branches = db_c.branch_list().await.unwrap(); + assert!( + !branches.iter().any(|b| b == "post-schema-apply-test"), + "deleted branch must stay deleted across reopen; got {:?}", + branches, + ); +} + /// Multi-branch sequential merges with main writes interleaved between /// every diverge point. Catches compositional regressions that single- /// merge tests can't see: