mr-668: consolidate AppState single-mode constructors; delete with_policy_engine

The prior `with_policy_engine` constructor reused the engine `Arc`
from the existing handle (`engine: Arc::clone(&existing.engine)`)
without re-applying `Omnigraph::with_policy`. Combined with
`new_with_workload`, the documented composition pattern was
`AppState::new_with_workload(...).with_policy_engine(p)` — which
produced an `AppState` whose HTTP layer enforced Cedar but whose
underlying engine had no `PolicyChecker` installed. Any caller
reaching the engine via `state.registry().list()[i].engine` could
bypass policy entirely. The doc comment named this gap; the type
system didn't.

Make composition impossible to get wrong:

* Add `AppState::new_single(uri, db, tokens, Option<PolicyEngine>,
  WorkloadController)` — canonical single-mode constructor that
  takes every option together and routes through `build_single_mode`
  (which applies `db.with_policy(checker)` to the engine itself).
* `new`, `new_with_bearer_token`, `new_with_bearer_tokens`,
  `new_with_bearer_tokens_and_policy`, `new_with_workload` all become
  thin wrappers around `new_single`.
* Delete `with_policy_engine`. There is no post-construction policy
  install path any more; the single linear construction forces
  HTTP-layer and engine-layer policy to install together or not at all.

Regression test `engine_layer_policy_fires_via_direct_arc_omnigraph_from_new_single`
constructs an `AppState::new_single` with a deny-all policy, pulls
the `Arc<Omnigraph>` from the registry handle (the same path an
embedded SDK consumer would take), and asserts a direct `mutate_as`
call returns `OmniError::Policy`. Pre-fix this test would have
succeeded the mutation.

Test caller in `ingest_per_actor_admission_cap_returns_429`
migrates from `.with_policy_engine(...)` to `new_single(...,
Some(policy_engine), workload)`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-27 12:02:14 +02:00
parent 76ee061cac
commit fc95eb8ff3
No known key found for this signature in database
2 changed files with 117 additions and 51 deletions

View file

@ -259,8 +259,34 @@ pub struct ApiError {
}
impl AppState {
/// Canonical single-mode constructor. Every other `new_*` / `open_*`
/// helper is a thin convenience wrapper around this one. Builds the
/// engine + per-graph policy through `build_single_mode`, which
/// applies `Omnigraph::with_policy` so HTTP-layer and engine-layer
/// policy can never diverge — there is no "policy installed on HTTP
/// but not on engine" representable state (closes the prior
/// `with_policy_engine` footgun that reused the engine `Arc`
/// without re-applying `with_policy`).
pub fn new_single(
uri: String,
db: Omnigraph,
bearer_tokens: Vec<(String, String)>,
policy_engine: Option<PolicyEngine>,
workload: workload::WorkloadController,
) -> Self {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let per_graph_policy = policy_engine.map(Arc::new);
Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload))
}
pub fn new(uri: String, db: Omnigraph) -> Self {
Self::new_with_bearer_tokens(uri, db, Vec::new())
Self::new_single(
uri,
db,
Vec::new(),
None,
workload::WorkloadController::from_env(),
)
}
pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
@ -276,7 +302,13 @@ impl AppState {
db: Omnigraph,
bearer_tokens: Vec<(String, String)>,
) -> Self {
Self::new_with_bearer_tokens_and_policy(uri, db, bearer_tokens, None)
Self::new_single(
uri,
db,
bearer_tokens,
None,
workload::WorkloadController::from_env(),
)
}
pub fn new_with_bearer_tokens_and_policy(
@ -285,62 +317,27 @@ impl AppState {
bearer_tokens: Vec<(String, String)>,
policy_engine: Option<PolicyEngine>,
) -> Self {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let per_graph_policy: Option<Arc<PolicyEngine>> = policy_engine.map(Arc::new);
let workload = Arc::new(workload::WorkloadController::from_env());
Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, workload)
Self::new_single(
uri,
db,
bearer_tokens,
policy_engine,
workload::WorkloadController::from_env(),
)
}
/// Construct with a caller-provided [`workload::WorkloadController`].
/// Tests and benches use this to override per-actor caps without
/// mutating global env vars (which is unsafe in Rust 2024 once the
/// async runtime is up — `setenv` isn't thread-safe).
/// mutating global env vars (unsafe in Rust 2024 once the async
/// runtime is up — `setenv` isn't thread-safe). For tests that also
/// need a custom `PolicyEngine`, use [`new_single`] directly.
pub fn new_with_workload(
uri: String,
db: Omnigraph,
bearer_tokens: Vec<(String, String)>,
workload: workload::WorkloadController,
) -> Self {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
Self::build_single_mode(uri, db, bearer_tokens, None, Arc::new(workload))
}
/// Install a `PolicyEngine` post-construction (MR-723). Used by
/// integration tests that need to thread custom workload limits
/// alongside a permit-all policy — the existing `new_with_*` and
/// `new_with_workload` constructors don't compose. Production
/// callers should use `open_with_bearer_tokens_and_policy` which
/// also installs the policy on the engine.
///
/// PR 4a: rebuilds the single-mode handle with the policy attached
/// on the HTTP-layer (`handle.policy`). The engine inside the handle
/// is reused as-is — engine-layer policy enforcement is NOT
/// reinstalled by this path (the old single-field `policy_engine`
/// API had the same semantics — engine-layer enforcement was only
/// applied by constructors that took `policy_engine: Option<...>`
/// at build time). Tests that depend on engine-layer enforcement
/// should use `open_with_bearer_tokens_and_policy` instead.
pub fn with_policy_engine(self, engine: PolicyEngine) -> Self {
let policy_arc: Arc<PolicyEngine> = Arc::new(engine);
let existing = single_mode_handle(&self.registry)
.expect("with_policy_engine called on a non-single-mode AppState");
let new_handle = Arc::new(GraphHandle {
key: existing.key.clone(),
uri: existing.uri.clone(),
engine: Arc::clone(&existing.engine),
policy: Some(policy_arc),
});
let registry = Arc::new(
GraphRegistry::from_handles(vec![new_handle])
.expect("rebuilt single-mode registry must accept one handle"),
);
Self {
mode: self.mode,
registry,
workload: self.workload,
bearer_tokens: self.bearer_tokens,
server_policy: self.server_policy,
}
Self::new_single(uri, db, bearer_tokens, None, workload)
}
pub async fn open(uri: impl Into<String>) -> Result<Self> {

View file

@ -3594,13 +3594,13 @@ async fn ingest_per_actor_admission_cap_returns_429() {
let policy_engine =
omnigraph_server::PolicyEngine::load(&policy_path, graph.to_string_lossy().as_ref())
.unwrap();
let state = AppState::new_with_workload(
let state = AppState::new_single(
graph.to_string_lossy().to_string(),
db,
vec![("act-flooder".to_string(), "flooder-token".to_string())],
Some(policy_engine),
workload,
)
.with_policy_engine(policy_engine);
);
let app = build_app(state);
let _temp = temp;
@ -3693,6 +3693,75 @@ async fn ingest_per_actor_admission_cap_returns_429() {
}
}
/// Regression for B2 (MR-668): when an `AppState` is built with a
/// per-graph policy and a custom workload, the engine inside the
/// registry's `GraphHandle` MUST have the same policy applied via
/// `Omnigraph::with_policy`. Pre-fix, `new_with_workload(...).with_policy_engine(p)`
/// installed the policy only on the HTTP-layer `handle.policy`; the
/// underlying `Arc<Omnigraph>` was reused without `with_policy`, so any
/// caller reaching through `state.registry().list()` could bypass Cedar.
///
/// This test reaches the engine the same way an embedded SDK consumer
/// or future routing code path would, and asserts the policy still
/// fires. The deny path is "act-blocked has a valid bearer but isn't in
/// the policy's allowed group" — i.e., authenticated-but-unauthorised.
#[tokio::test(flavor = "multi_thread")]
async fn engine_layer_policy_fires_via_direct_arc_omnigraph_from_new_single() {
let temp = init_loaded_graph().await;
let graph = graph_path(temp.path());
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
// Permit `act-allowed` for change actions; `act-blocked` is not in
// any allowed group — every change request from them must deny.
let policy_path = temp.path().join("policy.yaml");
fs::write(&policy_path, permit_all_policy_yaml(&["act-allowed"])).unwrap();
let policy_engine =
omnigraph_server::PolicyEngine::load(&policy_path, graph.to_string_lossy().as_ref())
.unwrap();
let workload = omnigraph_server::workload::WorkloadController::new(100, 1_000_000_000);
let state = AppState::new_single(
graph.to_string_lossy().to_string(),
db,
vec![("act-blocked".to_string(), "block-token".to_string())],
Some(policy_engine),
workload,
);
// Reach into the registry and pull the engine the same way an
// embedded consumer holding `Arc<Omnigraph>` would. If `new_single`
// failed to apply `with_policy` to the engine, this `mutate_as`
// would succeed — the HTTP-layer is bypassed entirely.
let handle = state.registry().list().into_iter().next().expect("handle");
let engine = Arc::clone(&handle.engine);
let mut params: omnigraph_compiler::ParamMap = Default::default();
params.insert(
"name".to_string(),
omnigraph_compiler::Literal::String("EngineLayerBlocked".to_string()),
);
params.insert("age".to_string(), omnigraph_compiler::Literal::Integer(30));
let result = engine
.mutate_as(
"main",
MUTATION_QUERIES,
"insert_person",
&params,
Some("act-blocked"),
)
.await;
match result {
Err(OmniError::Policy(_)) => { /* expected — engine-layer gate fired */ }
Ok(_) => panic!(
"engine-layer policy did NOT fire — act-blocked successfully ran mutate_as via \
the engine pulled from the registry handle. AppState::new_single failed to apply \
with_policy to the underlying Omnigraph engine. This is the B2 footgun the \
with_policy_engine deletion was supposed to close."
),
Err(other) => panic!("expected OmniError::Policy, got: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn oversized_request_body_returns_payload_too_large() {
let (_temp, app) = app_for_loaded_graph().await;