From fc95eb8ff3315862865390ba817d77831e0756e6 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Wed, 27 May 2026 12:02:14 +0200 Subject: [PATCH] mr-668: consolidate AppState single-mode constructors; delete with_policy_engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The prior `with_policy_engine` constructor reused the engine `Arc` from the existing handle (`engine: Arc::clone(&existing.engine)`) without re-applying `Omnigraph::with_policy`. Combined with `new_with_workload`, the documented composition pattern was `AppState::new_with_workload(...).with_policy_engine(p)` — which produced an `AppState` whose HTTP layer enforced Cedar but whose underlying engine had no `PolicyChecker` installed. Any caller reaching the engine via `state.registry().list()[i].engine` could bypass policy entirely. The doc comment named this gap; the type system didn't. Make composition impossible to get wrong: * Add `AppState::new_single(uri, db, tokens, Option, WorkloadController)` — canonical single-mode constructor that takes every option together and routes through `build_single_mode` (which applies `db.with_policy(checker)` to the engine itself). * `new`, `new_with_bearer_token`, `new_with_bearer_tokens`, `new_with_bearer_tokens_and_policy`, `new_with_workload` all become thin wrappers around `new_single`. * Delete `with_policy_engine`. There is no post-construction policy install path any more; the single linear construction forces HTTP-layer and engine-layer policy to install together or not at all. Regression test `engine_layer_policy_fires_via_direct_arc_omnigraph_from_new_single` constructs an `AppState::new_single` with a deny-all policy, pulls the `Arc` from the registry handle (the same path an embedded SDK consumer would take), and asserts a direct `mutate_as` call returns `OmniError::Policy`. Pre-fix this test would have succeeded the mutation. Test caller in `ingest_per_actor_admission_cap_returns_429` migrates from `.with_policy_engine(...)` to `new_single(..., Some(policy_engine), workload)`. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/src/lib.rs | 93 ++++++++++++------------- crates/omnigraph-server/tests/server.rs | 75 +++++++++++++++++++- 2 files changed, 117 insertions(+), 51 deletions(-) diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index f8deb0c..5e64b07 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -259,8 +259,34 @@ pub struct ApiError { } impl AppState { + /// Canonical single-mode constructor. Every other `new_*` / `open_*` + /// helper is a thin convenience wrapper around this one. Builds the + /// engine + per-graph policy through `build_single_mode`, which + /// applies `Omnigraph::with_policy` so HTTP-layer and engine-layer + /// policy can never diverge — there is no "policy installed on HTTP + /// but not on engine" representable state (closes the prior + /// `with_policy_engine` footgun that reused the engine `Arc` + /// without re-applying `with_policy`). + pub fn new_single( + uri: String, + db: Omnigraph, + bearer_tokens: Vec<(String, String)>, + policy_engine: Option, + workload: workload::WorkloadController, + ) -> Self { + let bearer_tokens = hash_bearer_tokens(bearer_tokens); + let per_graph_policy = policy_engine.map(Arc::new); + Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload)) + } + pub fn new(uri: String, db: Omnigraph) -> Self { - Self::new_with_bearer_tokens(uri, db, Vec::new()) + Self::new_single( + uri, + db, + Vec::new(), + None, + workload::WorkloadController::from_env(), + ) } pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option) -> Self { @@ -276,7 +302,13 @@ impl AppState { db: Omnigraph, bearer_tokens: Vec<(String, String)>, ) -> Self { - Self::new_with_bearer_tokens_and_policy(uri, db, bearer_tokens, None) + Self::new_single( + uri, + db, + bearer_tokens, + None, + workload::WorkloadController::from_env(), + ) } pub fn new_with_bearer_tokens_and_policy( @@ -285,62 +317,27 @@ impl AppState { bearer_tokens: Vec<(String, String)>, policy_engine: Option, ) -> Self { - let bearer_tokens = hash_bearer_tokens(bearer_tokens); - let per_graph_policy: Option> = policy_engine.map(Arc::new); - let workload = Arc::new(workload::WorkloadController::from_env()); - Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, workload) + Self::new_single( + uri, + db, + bearer_tokens, + policy_engine, + workload::WorkloadController::from_env(), + ) } /// Construct with a caller-provided [`workload::WorkloadController`]. /// Tests and benches use this to override per-actor caps without - /// mutating global env vars (which is unsafe in Rust 2024 once the - /// async runtime is up — `setenv` isn't thread-safe). + /// mutating global env vars (unsafe in Rust 2024 once the async + /// runtime is up — `setenv` isn't thread-safe). For tests that also + /// need a custom `PolicyEngine`, use [`new_single`] directly. pub fn new_with_workload( uri: String, db: Omnigraph, bearer_tokens: Vec<(String, String)>, workload: workload::WorkloadController, ) -> Self { - let bearer_tokens = hash_bearer_tokens(bearer_tokens); - Self::build_single_mode(uri, db, bearer_tokens, None, Arc::new(workload)) - } - - /// Install a `PolicyEngine` post-construction (MR-723). Used by - /// integration tests that need to thread custom workload limits - /// alongside a permit-all policy — the existing `new_with_*` and - /// `new_with_workload` constructors don't compose. Production - /// callers should use `open_with_bearer_tokens_and_policy` which - /// also installs the policy on the engine. - /// - /// PR 4a: rebuilds the single-mode handle with the policy attached - /// on the HTTP-layer (`handle.policy`). The engine inside the handle - /// is reused as-is — engine-layer policy enforcement is NOT - /// reinstalled by this path (the old single-field `policy_engine` - /// API had the same semantics — engine-layer enforcement was only - /// applied by constructors that took `policy_engine: Option<...>` - /// at build time). Tests that depend on engine-layer enforcement - /// should use `open_with_bearer_tokens_and_policy` instead. - pub fn with_policy_engine(self, engine: PolicyEngine) -> Self { - let policy_arc: Arc = Arc::new(engine); - let existing = single_mode_handle(&self.registry) - .expect("with_policy_engine called on a non-single-mode AppState"); - let new_handle = Arc::new(GraphHandle { - key: existing.key.clone(), - uri: existing.uri.clone(), - engine: Arc::clone(&existing.engine), - policy: Some(policy_arc), - }); - let registry = Arc::new( - GraphRegistry::from_handles(vec![new_handle]) - .expect("rebuilt single-mode registry must accept one handle"), - ); - Self { - mode: self.mode, - registry, - workload: self.workload, - bearer_tokens: self.bearer_tokens, - server_policy: self.server_policy, - } + Self::new_single(uri, db, bearer_tokens, None, workload) } pub async fn open(uri: impl Into) -> Result { diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 4599250..9845fb3 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -3594,13 +3594,13 @@ async fn ingest_per_actor_admission_cap_returns_429() { let policy_engine = omnigraph_server::PolicyEngine::load(&policy_path, graph.to_string_lossy().as_ref()) .unwrap(); - let state = AppState::new_with_workload( + let state = AppState::new_single( graph.to_string_lossy().to_string(), db, vec![("act-flooder".to_string(), "flooder-token".to_string())], + Some(policy_engine), workload, - ) - .with_policy_engine(policy_engine); + ); let app = build_app(state); let _temp = temp; @@ -3693,6 +3693,75 @@ async fn ingest_per_actor_admission_cap_returns_429() { } } +/// Regression for B2 (MR-668): when an `AppState` is built with a +/// per-graph policy and a custom workload, the engine inside the +/// registry's `GraphHandle` MUST have the same policy applied via +/// `Omnigraph::with_policy`. Pre-fix, `new_with_workload(...).with_policy_engine(p)` +/// installed the policy only on the HTTP-layer `handle.policy`; the +/// underlying `Arc` was reused without `with_policy`, so any +/// caller reaching through `state.registry().list()` could bypass Cedar. +/// +/// This test reaches the engine the same way an embedded SDK consumer +/// or future routing code path would, and asserts the policy still +/// fires. The deny path is "act-blocked has a valid bearer but isn't in +/// the policy's allowed group" — i.e., authenticated-but-unauthorised. +#[tokio::test(flavor = "multi_thread")] +async fn engine_layer_policy_fires_via_direct_arc_omnigraph_from_new_single() { + let temp = init_loaded_graph().await; + let graph = graph_path(temp.path()); + let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap(); + + // Permit `act-allowed` for change actions; `act-blocked` is not in + // any allowed group — every change request from them must deny. + let policy_path = temp.path().join("policy.yaml"); + fs::write(&policy_path, permit_all_policy_yaml(&["act-allowed"])).unwrap(); + let policy_engine = + omnigraph_server::PolicyEngine::load(&policy_path, graph.to_string_lossy().as_ref()) + .unwrap(); + + let workload = omnigraph_server::workload::WorkloadController::new(100, 1_000_000_000); + let state = AppState::new_single( + graph.to_string_lossy().to_string(), + db, + vec![("act-blocked".to_string(), "block-token".to_string())], + Some(policy_engine), + workload, + ); + + // Reach into the registry and pull the engine the same way an + // embedded consumer holding `Arc` would. If `new_single` + // failed to apply `with_policy` to the engine, this `mutate_as` + // would succeed — the HTTP-layer is bypassed entirely. + let handle = state.registry().list().into_iter().next().expect("handle"); + let engine = Arc::clone(&handle.engine); + + let mut params: omnigraph_compiler::ParamMap = Default::default(); + params.insert( + "name".to_string(), + omnigraph_compiler::Literal::String("EngineLayerBlocked".to_string()), + ); + params.insert("age".to_string(), omnigraph_compiler::Literal::Integer(30)); + let result = engine + .mutate_as( + "main", + MUTATION_QUERIES, + "insert_person", + ¶ms, + Some("act-blocked"), + ) + .await; + match result { + Err(OmniError::Policy(_)) => { /* expected — engine-layer gate fired */ } + Ok(_) => panic!( + "engine-layer policy did NOT fire — act-blocked successfully ran mutate_as via \ + the engine pulled from the registry handle. AppState::new_single failed to apply \ + with_policy to the underlying Omnigraph engine. This is the B2 footgun the \ + with_policy_engine deletion was supposed to close." + ), + Err(other) => panic!("expected OmniError::Policy, got: {other:?}"), + } +} + #[tokio::test(flavor = "multi_thread")] async fn oversized_request_body_returns_payload_too_large() { let (_temp, app) = app_for_loaded_graph().await;