[codex] fix RFC-011 follow-up regressions (#258)

* fix rfc-011 follow-up regressions

* test(cli): remove served schema-apply tests obsoleted by the cluster 409

This PR disables server-side schema apply for cluster-backed serving (409 →
`omnigraph cluster apply`). Two system_local tests still drove *served* schema
apply against a spawned `--cluster` server and asserted the pre-409 behavior, so
they failed under `cargo test --workspace`:

- `local_cli_schema_apply_enforces_engine_layer_policy` — expected a per-actor
  policy `denied`/allow on the served route; the route now 409s for everyone
  before policy runs.
- `local_cli_schema_apply_rejects_stored_query_breakage_before_publish` —
  expected a served apply to reject a stored-query breakage; the route now 409s
  before any apply.

Both exercise a path the PR intentionally removed. Their surviving coverage:
the 409 itself is pinned by `schema_routes::schema_apply_route_refuses_cluster_backed_server_mode`
(asserts 409 + no mutation); stored-query-breakage-before-publish stays covered
by `schema_routes::schema_apply_route_rejects_stored_query_breakage_before_publish`
(single-mode); engine-layer schema_apply Cedar enforcement stays covered by
`policy_engine_chassis`. Remove the obsolete served versions.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(server): report the cluster-backed schema-apply 409 after the Cedar gate

The 409 ("schema apply is disabled for cluster-backed serving") fired at the top
of `server_schema_apply`, before `authorize_request`. An authenticated-but-
unauthorized actor therefore learned the server is cluster-backed (409) instead
of getting a normal 403 — leaking topology before authorization, against the
same posture that keeps `GET /graphs` default-deny.

Move the 409 below the Cedar gate so the route reports 401 → 403 → 409: an
unauthorized actor gets 403, and only an actor authorized for `schema_apply`
sees the actionable "use `omnigraph cluster apply`" 409. (An open/unauthenticated
server still 409s, as it has no topology to protect.)

Regression: `schema_apply_route_cluster_backed_denies_unauthorized_actor_before_409`
(POLICY_YAML grants no schema_apply → act-ragnor gets 403, not 409). Addresses the
bot-review finding on #258.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-06-16 03:11:43 +03:00 committed by GitHub
parent 9513b076d2
commit b5658dc696
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 429 additions and 261 deletions

View file

@ -21,7 +21,7 @@ direct — direct storage access; reject --server (init, optimize, repair, clean
schema plan, lint).\n \
control manage or inspect a cluster (cluster via --config; policy & queries via \
--cluster).\n \
local no graph; local config & tooling: embed, login, logout, profile, version.\n\
local no explicit graph scope; local config & tooling: alias, embed, login, logout, profile, version.\n\
See the 'Command capabilities' section of the CLI reference for which flags apply where.")]
pub(crate) struct Cli {
/// Actor id for direct-engine writes; overrides `cli.actor`. No effect on
@ -325,8 +325,7 @@ pub(crate) enum Command {
command: ClusterCommand,
},
// ── Session / config ── no graph addressing; local tooling.
/// Policy administration and diagnostics
/// Policy administration and diagnostics against a cluster's applied bundles
Policy {
#[command(subcommand)]
command: PolicyCommand,

View file

@ -367,12 +367,16 @@ pub(crate) async fn execute_operator_alias(
}
}
let body = (!params.is_empty()).then(|| serde_json::json!({ "params": params }));
let mut body = serde_json::Map::new();
body.insert("expect_mutation".to_string(), Value::Bool(false));
if !params.is_empty() {
body.insert("params".to_string(), Value::Object(params));
}
remote_json(
client,
Method::POST,
remote_url(&uri, &["queries", &alias.query], &[])?,
body,
Some(Value::Object(body)),
bearer_token.as_deref(),
)
.await

View file

@ -106,15 +106,44 @@ async fn main() -> Result<()> {
.profiles
.iter()
.map(|(name, profile)| {
let binding = match profile.binding(name) {
Ok(ScopeBinding::Server(s)) => format!("server: {s}"),
Ok(ScopeBinding::Cluster(c)) => format!("cluster: {c}"),
Ok(ScopeBinding::Store(u)) => format!("store: {u}"),
Err(e) => format!("invalid: {e}"),
};
let (binding, scope_kind, target, valid, error) =
match profile.binding(name) {
Ok(ScopeBinding::Server(s)) => (
format!("server: {s}"),
"server".to_string(),
Some(s),
true,
None,
),
Ok(ScopeBinding::Cluster(c)) => (
format!("cluster: {c}"),
"cluster".to_string(),
Some(c),
true,
None,
),
Ok(ScopeBinding::Store(u)) => (
format!("store: {u}"),
"store".to_string(),
Some(u),
true,
None,
),
Err(e) => (
format!("invalid: {e}"),
"invalid".to_string(),
None,
false,
Some(e.to_string()),
),
};
ProfileListItem {
name: name.clone(),
binding,
scope_kind,
target,
valid,
error,
default_graph: profile.default_graph.clone(),
active: active.as_deref() == Some(name.as_str()),
}

View file

@ -892,6 +892,12 @@ pub(crate) struct ProfileListItem {
pub(crate) name: String,
/// `server: <n>` / `cluster: <n>` / `store: <uri>` / `invalid: <reason>`.
pub(crate) binding: String,
/// `server` | `cluster` | `store` | `invalid`.
pub(crate) scope_kind: String,
/// The bound server/cluster name, or the store URI. `None` when invalid.
pub(crate) target: Option<String>,
pub(crate) valid: bool,
pub(crate) error: Option<String>,
pub(crate) default_graph: Option<String>,
pub(crate) active: bool,
}

View file

@ -98,13 +98,11 @@ pub(crate) fn command_capability(cmd: &Command) -> Capability {
/// The plane a subcommand belongs to. Exhaustive — a new `Command` variant
/// will not compile until classified. Descends into the nested enums where
/// the plane differs per subcommand (`schema plan` is storage while `schema
/// show`/`apply` are data; `queries validate` opens the graph while `queries
/// list` only reads config).
/// show`/`apply` are data; `queries`/`policy` read cluster applied state).
pub(crate) fn command_plane(cmd: &Command) -> Plane {
match cmd {
Command::Query { .. }
| Command::Mutate { .. }
| Command::Alias { .. }
| Command::Load { .. }
| Command::Ingest { .. }
| Command::Branch { .. }
@ -129,7 +127,8 @@ pub(crate) fn command_plane(cmd: &Command) -> Plane {
| Command::Cleanup { .. }
| Command::Lint { .. } => Plane::Storage,
Command::Cluster { .. } => Plane::Control,
Command::Embed(_)
Command::Alias { .. }
| Command::Embed(_)
| Command::Login { .. }
| Command::Logout { .. }
| Command::Profile { .. }
@ -175,12 +174,13 @@ pub(crate) fn command_label(cmd: &Command) -> &'static str {
}
}
/// The verbs that address an existing graph through a cluster scope
/// (`--cluster <root> --graph <id>`): the storage-maintenance commands.
/// `init` is storage-plane too but *creates* a graph (cluster graphs are born
/// from `cluster apply`, not `init`), and `schema plan` / `lint` take a
/// positional URI — none consume cluster addressing, so the guard rejects
/// `--cluster`/`--graph` on them rather than silently dropping the flag.
/// The verbs that consume a cluster scope. Maintenance/lint select a graph with
/// `--cluster <root> --graph <id>`; policy/queries inspect the cluster's
/// applied control-plane state and may optionally use `--graph` to select one
/// bundle/registry. `init` is storage-plane too but *creates* a graph (cluster
/// graphs are born from `cluster apply`, not `init`), and `schema plan` takes a
/// positional URI, so the guard rejects `--cluster`/`--graph` there rather than
/// silently dropping the flag.
pub(crate) fn accepts_cluster_addressing(cmd: &Command) -> bool {
matches!(
cmd,
@ -201,12 +201,43 @@ pub(crate) fn accepts_cluster_addressing(cmd: &Command) -> bool {
/// Reject a scope-addressing flag (`--server`/`--cluster`/`--graph`) on a verb
/// that cannot consume it, rather than silently dropping it (the old behavior:
/// e.g. `optimize --server prod` dropped `--server` and failed later with an
/// unrelated message). Each flag has a distinct valid surface:
/// unrelated message). `alias` gets an extra guard because its binding owns all
/// addressing and several ignored globals sit outside this three-flag guard.
/// Each flag has a distinct valid surface:
/// - `--server` → served-graph scopes (`any`/`served`);
/// - `--cluster` → the cluster-maintenance verbs (optimize/repair/cleanup);
/// - `--cluster` → cluster-scoped direct/control verbs;
/// - `--graph` → any multi-graph scope: a served scope *or* a cluster one.
/// RFC-010 Slice 1, generalized for RFC-011 cluster addressing.
pub(crate) fn guard_addressing(cli: &Cli) -> Result<()> {
if let Command::Alias { .. } = &cli.command {
let mut flags = Vec::new();
if cli.server.is_some() {
flags.push("--server");
}
if cli.graph.is_some() {
flags.push("--graph");
}
if cli.store.is_some() {
flags.push("--store");
}
if cli.cluster.is_some() {
flags.push("--cluster");
}
if cli.profile.is_some() {
flags.push("--profile");
}
if cli.as_actor.is_some() {
flags.push("--as");
}
if !flags.is_empty() {
bail!(
"`alias` uses the server, graph, and stored query declared in \
`aliases.<name>` in ~/.omnigraph/config.yaml; remove global scope \
flag(s): {}",
flags.join(", ")
);
}
}
if cli.server.is_none() && cli.cluster.is_none() && cli.graph.is_none() {
return Ok(());
}
@ -223,8 +254,8 @@ pub(crate) fn guard_addressing(cli: &Cli) -> Result<()> {
}
if cli.cluster.is_some() && !cluster_ok {
bail!(
"`{label}` is a {} command; --cluster addresses a cluster-managed graph for \
maintenance (optimize/repair/cleanup) and does not apply.{}",
"`{label}` is a {} command; --cluster addresses a cluster-scoped command \
and does not apply.{}",
capability.describe(),
remediation(capability, &cli.command),
);
@ -253,7 +284,15 @@ fn remediation(capability: Capability, cmd: &Command) -> &'static str {
}
_ => " Pass a storage URI.",
},
Capability::Control => " It operates on a cluster (pass --config <dir>).",
Capability::Control => match cmd {
Command::Cluster { .. } => {
" It operates on a cluster config directory (pass --config <dir>)."
}
Command::Policy { .. } | Command::Queries { .. } => {
" It operates on a cluster (pass --cluster <dir|uri>, or select a cluster profile)."
}
_ => " It operates on a cluster.",
},
Capability::Local => " It does not address a graph.",
Capability::Any | Capability::Served => "",
}
@ -286,6 +325,7 @@ mod tests {
// The one Data→Served refinement — if the `graphs` guard were deleted,
// every other assertion here would still pass.
assert_eq!(cap(&["omnigraph", "graphs", "list"]), Capability::Served);
assert_eq!(cap(&["omnigraph", "alias", "who"]), Capability::Local);
assert_eq!(cap(&["omnigraph", "optimize", "graph.omni"]), Capability::Direct);
assert_eq!(cap(&["omnigraph", "schema", "plan", "--schema", "s.pg", "graph.omni"]), Capability::Direct);
assert_eq!(cap(&["omnigraph", "cluster", "status", "--config", "."]), Capability::Control);

View file

@ -186,9 +186,9 @@ fn scope_from_binding(
ScopeBinding::Cluster(cluster) => {
if capability == Capability::Any {
bail!(
"{source} resolves a cluster scope, which is maintenance-only; run \
data commands through a server, or use --store <uri> for ad-hoc \
direct access"
"{source} resolves a cluster scope, which is not valid for graph data \
commands; run data commands through a server, or use --store <uri> \
for ad-hoc direct access"
);
}
// A cluster value is a config name (resolved against `clusters:`)
@ -501,7 +501,7 @@ mod tests {
)
.unwrap_err()
.to_string();
assert!(err.contains("maintenance-only"), "{err}");
assert!(err.contains("not valid for graph data commands"), "{err}");
}
#[test]

View file

@ -2127,7 +2127,26 @@ fn profile_list_json_shape() {
.find(|p| p["name"] == "brain-admin")
.unwrap();
assert_eq!(brain["binding"], "cluster: brain");
assert_eq!(brain["scope_kind"], "cluster");
assert_eq!(brain["target"], "brain");
assert_eq!(brain["valid"], true);
assert!(brain["error"].is_null());
assert_eq!(brain["active"], false);
let broken = items
.as_array()
.unwrap()
.iter()
.find(|p| p["name"] == "broken")
.unwrap();
assert_eq!(broken["scope_kind"], "invalid");
assert_eq!(broken["valid"], false);
assert!(broken["target"].is_null());
assert!(
broken["error"]
.as_str()
.unwrap()
.contains("profile 'broken'")
);
}
#[test]

View file

@ -94,6 +94,49 @@ fn alias_unknown_name_errors_listing_defined() {
);
}
#[test]
fn alias_rejects_global_scope_flags_that_the_binding_owns() {
for (flag, value) in [
("--server", "dev"),
("--graph", "local"),
("--store", "file:///tmp/graph.omni"),
("--cluster", "."),
("--profile", "prod"),
("--as", "act-op"),
] {
let output = output_failure(cli().arg(flag).arg(value).arg("alias").arg("who"));
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("`alias` uses the server, graph, and stored query")
&& stderr.contains(flag),
"expected {flag} to be rejected by the alias binding guard; got: {stderr}"
);
}
}
#[test]
fn queries_and_policy_wrong_server_scope_points_at_cluster_scope() {
let output = output_failure(cli().arg("--server").arg("prod").arg("queries").arg("list"));
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("pass --cluster <dir|uri>") && !stderr.contains("pass --config <dir>"),
"queries should point at --cluster, not --config; got: {stderr}"
);
let output = output_failure(
cli()
.arg("--server")
.arg("prod")
.arg("policy")
.arg("validate"),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("pass --cluster <dir|uri>") && !stderr.contains("pass --config <dir>"),
"policy should point at --cluster, not --config; got: {stderr}"
);
}
// RFC-011: `queries validate`/`list` source the registry + schemas from a
// converged cluster's applied state (`--cluster <dir>`), not omnigraph.yaml.

View file

@ -1398,154 +1398,6 @@ fn local_cli_ingest_enforces_engine_layer_policy() {
assert_eq!(allowed["branch_created"], true);
}
#[test]
fn local_cli_schema_apply_enforces_engine_layer_policy() {
// RFC-011 served re-point: the server enforces schema_apply against the
// graph-bound bundle. Bruno has no schema_apply rule → denied; ragnor
// has admins-schema-apply → allowed. The schema is additive (a nullable
// property), SDK-compatible with the fixture.
if skip_system_e2e("local_cli_schema_apply_enforces_engine_layer_policy") {
return;
}
let cluster = converged_loaded_cluster("knowledge", Some(POLICY_E2E_YAML));
let server = spawn_server_with_cluster_env(
cluster.path(),
&[("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", POLICY_TOKENS_JSON)],
);
let new_schema = std::fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
let temp = tempfile::tempdir().unwrap();
let schema_path = temp.path().join("policy-additive.pg");
std::fs::write(&schema_path, &new_schema).unwrap();
let denied = cli()
.env("OMNIGRAPH_BEARER_TOKEN", "bruno-tok")
.arg("schema")
.arg("apply")
.arg("--server")
.arg(&server.base_url)
.arg("--graph")
.arg("knowledge")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.output()
.unwrap();
assert!(!denied.status.success(), "bruno schema apply must be denied");
let stderr = String::from_utf8_lossy(&denied.stderr);
assert!(
stderr.contains("denied"),
"expected 'denied' for bruno schema apply, got: {stderr}"
);
let allowed = parse_stdout_json(&output_success(
cli()
.env("OMNIGRAPH_BEARER_TOKEN", "ragnor-tok")
.arg("schema")
.arg("apply")
.arg("--server")
.arg(&server.base_url)
.arg("--graph")
.arg("knowledge")
.arg("--schema")
.arg(&schema_path)
.arg("--json"),
));
assert_eq!(allowed["applied"], true);
}
#[test]
fn local_cli_schema_apply_rejects_stored_query_breakage_before_publish() {
// RFC-011: stored queries live in the cluster catalog, not omnigraph.yaml.
// The served `schema apply` runs the server's catalog check against the
// applied stored queries; renaming `age`→`years` breaks the bundled
// `find_person` (which projects `$p.age`), so the apply is rejected before
// publish — the schema stays unchanged.
if skip_system_e2e("local_cli_schema_apply_rejects_stored_query_breakage_before_publish") {
return;
}
// A graph-bound bundle that lets ragnor apply schema, plus a stored query
// `find_person` projecting $p.age (the catalog the server checks against).
let cluster = tempfile::tempdir().unwrap();
let dir = cluster.path();
fs::copy(fixture("test.pg"), dir.join("graph.pg")).unwrap();
fs::write(
dir.join("find-person.gq"),
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
)
.unwrap();
fs::write(dir.join("graph.policy.yaml"), POLICY_E2E_YAML).unwrap();
fs::write(
dir.join("cluster.yaml"),
"version: 1\nmetadata:\n name: sys\nstate:\n backend: cluster\n lock: true\ngraphs:\n knowledge:\n schema: ./graph.pg\n queries:\n find_person:\n file: ./find-person.gq\npolicies:\n graph:\n file: ./graph.policy.yaml\n applies_to: [knowledge]\n",
)
.unwrap();
output_success(cli().arg("cluster").arg("import").arg("--config").arg(dir));
output_success(cli().arg("cluster").arg("apply").arg("--config").arg(dir));
output_success(
cli()
.arg("load")
.arg("--data")
.arg(fixture("test.jsonl"))
.arg("--mode")
.arg("overwrite")
.arg(dir.join("graphs").join("knowledge.omni")),
);
let server = spawn_server_with_cluster_env(
dir,
&[("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", POLICY_TOKENS_JSON)],
);
let renamed_schema = std::fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace("age: I32?", "years: I32? @rename_from(\"age\")");
let temp = tempfile::tempdir().unwrap();
let schema_path = temp.path().join("stored-query-breaks.pg");
fs::write(&schema_path, &renamed_schema).unwrap();
let rejected = cli()
.env("OMNIGRAPH_BEARER_TOKEN", "ragnor-tok")
.arg("schema")
.arg("apply")
.arg("--server")
.arg(&server.base_url)
.arg("--graph")
.arg("knowledge")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.output()
.unwrap();
assert!(
!rejected.status.success(),
"schema apply that breaks a stored query must be rejected"
);
let stderr = String::from_utf8_lossy(&rejected.stderr);
assert!(
stderr.contains("find_person") && stderr.contains("schema check"),
"schema apply should reject the stored-query breakage before publish; stderr: {stderr}"
);
// The schema stayed unchanged (read it back via the served graph as the
// bruno reader, who holds `team-read`).
let schema = stdout_string(&output_success(
cli()
.env("OMNIGRAPH_BEARER_TOKEN", "bruno-tok")
.arg("schema")
.arg("show")
.arg("--server")
.arg(&server.base_url)
.arg("--graph")
.arg("knowledge"),
));
assert!(schema.contains("age: I32?"));
assert!(!schema.contains("years: I32?"));
}
#[test]
fn local_cli_branch_create_enforces_engine_layer_policy() {
// RFC-011 served re-point: bruno has no branch-ops rule → denied;
@ -2482,14 +2334,19 @@ fn local_cli_operator_alias_and_server_flag_invoke_stored_query() {
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.name } }",
)
.unwrap();
fs::write(
cluster.path().join("insert-person.gq"),
"query insert_person($name: String) { insert Person { name: $name, age: 41 } }",
)
.unwrap();
fs::write(
cluster.path().join("graph.policy.yaml"),
"version: 1\ngroups:\n ops: [\"act-op\"]\nprotected_branches: [main]\nrules:\n - id: allow-invoke\n allow:\n actors: { group: ops }\n actions: [invoke_query]\n - id: allow-read\n allow:\n actors: { group: ops }\n actions: [read]\n branch_scope: any\n",
"version: 1\ngroups:\n ops: [\"act-op\"]\nprotected_branches: [main]\nrules:\n - id: allow-invoke\n allow:\n actors: { group: ops }\n actions: [invoke_query]\n - id: allow-read\n allow:\n actors: { group: ops }\n actions: [read]\n branch_scope: any\n - id: allow-change\n allow:\n actors: { group: ops }\n actions: [change]\n branch_scope: any\n",
)
.unwrap();
fs::write(
cluster.path().join("cluster.yaml"),
"version: 1\nmetadata:\n name: alias-sys\nstate:\n backend: cluster\n lock: true\ngraphs:\n local:\n schema: ./local.pg\n queries:\n find_person:\n file: ./find-person.gq\npolicies:\n graph:\n file: ./graph.policy.yaml\n applies_to: [local]\n",
"version: 1\nmetadata:\n name: alias-sys\nstate:\n backend: cluster\n lock: true\ngraphs:\n local:\n schema: ./local.pg\n queries:\n find_person:\n file: ./find-person.gq\n insert_person:\n file: ./insert-person.gq\npolicies:\n graph:\n file: ./graph.policy.yaml\n applies_to: [local]\n",
)
.unwrap();
output_success(cli().arg("cluster").arg("import").arg("--config").arg(cluster.path()));
@ -2515,7 +2372,7 @@ fn local_cli_operator_alias_and_server_flag_invoke_stored_query() {
fs::write(
operator_home.path().join("config.yaml"),
format!(
"servers:\n dev:\n url: {}\naliases:\n who:\n server: dev\n graph: local\n query: find_person\n args: [name]\n",
"servers:\n dev:\n url: {}\naliases:\n who:\n server: dev\n graph: local\n query: find_person\n args: [name]\n create_person:\n server: dev\n graph: local\n query: insert_person\n args: [name]\n",
server.base_url
),
)
@ -2552,6 +2409,46 @@ fn local_cli_operator_alias_and_server_flag_invoke_stored_query() {
let payload: serde_json::Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["rows"][0]["p.name"], "Alice", "{payload}");
// Operator aliases are read-only conveniences: a binding to a stored
// mutation must be rejected before the server executes it.
let output = cli()
.env("OMNIGRAPH_HOME", operator_home.path())
.arg("alias")
.arg("create_person")
.arg("AliasGuardPerson")
.output()
.unwrap();
assert!(!output.status.success(), "mutation alias must fail");
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("'insert_person' is a mutation")
&& stderr.contains("omnigraph mutate insert_person"),
"expected mutation-kind mismatch; got: {stderr}"
);
let output = cli()
.env("OMNIGRAPH_HOME", operator_home.path())
.arg("query")
.arg("find_person")
.arg("--server")
.arg("dev")
.arg("--graph")
.arg("local")
.arg("--params")
.arg(r#"{"name":"AliasGuardPerson"}"#)
.arg("--json")
.output()
.unwrap();
assert!(
output.status.success(),
"post-alias read should succeed: {output:?}"
);
let payload: serde_json::Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(
payload["rows"].as_array().unwrap().len(),
0,
"mutation alias must not insert AliasGuardPerson: {payload}"
);
// --server/--graph: the same stored query via explicit targeting.
let output = cli()
.env("OMNIGRAPH_HOME", operator_home.path())