feat(cli): resolve cluster actor via the per-operator config cascade

Cluster FACTS stay unlayered (cluster.yaml only), but the operator's
identity is a per-operator fact — exactly the per-operator omnigraph.yaml's
permanent job, and the cascade every data-plane write already uses. cluster
apply/approve now resolve: --as flag wins and skips any config read
entirely (containers and CI stay config-free); without it, the standard cwd
search supplies cli.actor, with a malformed config failing loudly and
actionably ('pass --as to skip this lookup') rather than silently dropping
attribution. approve's no-actor error now names both sources.

Tests pin the contract from both sides: cli.actor is the no-flag default
for apply (echoed actor) and approve (approved_by), the flag overrides it,
a malformed omnigraph.yaml in cwd breaks nothing except the no-flag actor
lookup, and a conflicting well-formed one leaks nothing into cluster
outputs.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 22:29:49 +03:00
parent b8300736be
commit f3374ac6dc
2 changed files with 254 additions and 14 deletions

View file

@ -6,7 +6,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use clap::{Arg, ArgAction, Args, CommandFactory, FromArgMatches, Parser, Subcommand, ValueEnum};
use color_eyre::eyre::{Result, bail};
use color_eyre::eyre::{Result, WrapErr, bail};
use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
@ -1257,6 +1257,22 @@ async fn open_local_db_with_policy(graph: &ResolvedCliGraph) -> Result<Omnigraph
/// policy is configured and this returns `None`, the engine-layer
/// footgun guard intentionally denies — silent bypass via "I forgot the
/// actor" is what the guard prevents.
/// Actor resolution for cluster operations. Cluster FACTS stay unlayered
/// (cluster.yaml only), but the operator's identity is a per-operator fact —
/// the per-operator config's permanent job. An explicit --as never touches
/// any config (containers and CI stay config-free); without it, the standard
/// cwd omnigraph.yaml search supplies `cli.actor`, and a malformed config
/// fails loudly rather than silently dropping attribution.
fn resolve_cluster_actor(cli_as: Option<&str>) -> Result<Option<String>> {
if let Some(actor) = cli_as {
return Ok(Some(actor.to_string()));
}
let config = load_cli_config(None).wrap_err(
"resolving the default actor from the per-operator omnigraph.yaml (pass --as <ACTOR> to skip this lookup)",
)?;
Ok(config.cli.actor.clone())
}
fn resolve_cli_actor<'a>(cli_as: Option<&'a str>, config: &'a OmnigraphConfig) -> Option<&'a str> {
cli_as.or(config.cli.actor.as_deref())
}
@ -3610,16 +3626,12 @@ async fn main() -> Result<()> {
finish_cluster_plan(&output, json)?;
}
ClusterCommand::Apply { config, json } => {
// The global --as actor attributes graph-moving operations
// (sidecars, audit entries, engine schema-apply commits).
// Cluster config stays unlayered: no omnigraph.yaml fallback.
let output = apply_config_dir_with_options(
config,
ApplyOptions {
actor: cli.as_actor.clone(),
},
)
.await;
// The actor attributes graph-moving operations (sidecars,
// audit entries, engine schema-apply commits). Cluster FACTS
// stay unlayered; the operator's identity resolves --as flag
// first, then the per-operator omnigraph.yaml `cli.actor`.
let actor = resolve_cluster_actor(cli.as_actor.as_deref())?;
let output = apply_config_dir_with_options(config, ApplyOptions { actor }).await;
finish_cluster_apply(&output, json)?;
}
ClusterCommand::Approve {
@ -3627,12 +3639,12 @@ async fn main() -> Result<()> {
config,
json,
} => {
let Some(approver) = cli.as_actor.as_deref() else {
let Some(approver) = resolve_cluster_actor(cli.as_actor.as_deref())? else {
bail!(
"`cluster approve` requires the global --as <ACTOR> flag: an approval without an approver is meaningless"
"`cluster approve` requires an approver: pass the global --as <ACTOR> flag or set `cli.actor` in your omnigraph.yaml — an approval without an approver is meaningless"
);
};
let output = approve_config_dir(config, &resource, approver).await;
let output = approve_config_dir(config, &resource, &approver).await;
finish_cluster_approve(&output, json)?;
}
ClusterCommand::Status { config, json } => {

View file

@ -4325,3 +4325,231 @@ fn queries_validate_positional_uri_ignores_default_graph() {
"positional URI must validate the top-level registry, not the cli.graph default; stdout:\n{stdout}"
);
}
// ---- per-operator local config (omnigraph.yaml) vs the cluster surfaces ----
/// Cluster ops resolve operator identity per-operator: --as wins, and
/// without it the cwd omnigraph.yaml's `cli.actor` is the default.
#[test]
fn cluster_apply_uses_cli_actor_from_local_config() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
fs::write(
temp.path().join("omnigraph.yaml"),
"cli:\n actor: act-local\n",
)
.unwrap();
let run = |extra: &[&str]| {
let mut command = cli();
command.current_dir(temp.path());
for arg in extra {
command.arg(arg);
}
let output = command
.arg("cluster")
.arg("import")
.arg("--config")
.arg(temp.path())
.arg("--json")
.output()
.unwrap();
assert!(output.status.success(), "{output:?}");
// apply, capturing the echoed actor
let mut command = cli();
command.current_dir(temp.path());
for arg in extra {
command.arg(arg);
}
let output = command
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json")
.output()
.unwrap();
let json: serde_json::Value =
serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap();
json["actor"].clone()
};
assert_eq!(run(&[]), "act-local", "cli.actor is the no-flag default");
// A fresh dir (state already imported above): the flag wins over config.
let mut command = cli();
command.current_dir(temp.path());
let output = command
.arg("--as")
.arg("andrew")
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json")
.output()
.unwrap();
let json: serde_json::Value =
serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap();
assert_eq!(json["actor"], "andrew", "--as overrides cli.actor");
}
#[test]
fn cluster_approve_uses_cli_actor_fallback() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
fs::write(
temp.path().join("omnigraph.yaml"),
"cli:\n actor: act-local\n",
)
.unwrap();
// Converge, then remove the graph so a gated delete is pending.
for command in ["import", "apply"] {
let output = cli()
.current_dir(temp.path())
.arg("cluster")
.arg(command)
.arg("--config")
.arg(temp.path())
.output()
.unwrap();
assert!(output.status.success(), "cluster {command} failed");
}
fs::write(temp.path().join("cluster.yaml"), "version: 1\ngraphs: {}\n").unwrap();
let output = cli()
.current_dir(temp.path())
.arg("cluster")
.arg("approve")
.arg("graph.knowledge")
.arg("--config")
.arg(temp.path())
.arg("--json")
.output()
.unwrap();
assert!(output.status.success(), "{output:?}");
let json: serde_json::Value =
serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap();
assert_eq!(json["approved_by"], "act-local");
// With neither flag nor config: refused with the actionable message.
let bare = tempdir().unwrap();
write_cluster_config_fixture(bare.path());
let output = output_failure(
cli()
.current_dir(bare.path())
.arg("cluster")
.arg("approve")
.arg("graph.knowledge")
.arg("--config")
.arg(bare.path()),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(stderr.contains("--as"), "{stderr}");
assert!(stderr.contains("cli.actor"), "{stderr}");
}
/// A malformed omnigraph.yaml in the cwd must never break cluster commands;
/// it is read for exactly one thing (the actor default when --as is absent),
/// and only that path fails loudly.
#[test]
fn cluster_commands_ignore_malformed_local_config() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
fs::write(temp.path().join("omnigraph.yaml"), "{{{{ not yaml").unwrap();
for command in ["validate", "plan", "status"] {
let output = cli()
.current_dir(temp.path())
.arg("cluster")
.arg(command)
.arg("--config")
.arg(temp.path())
.arg("--json")
.output()
.unwrap();
assert!(
output.status.success() || command == "plan", // plan warns state-missing pre-import; still must not config-error
"cluster {command} affected by malformed omnigraph.yaml: {output:?}"
);
assert!(
!String::from_utf8_lossy(&output.stderr).contains("omnigraph.yaml"),
"cluster {command} touched omnigraph.yaml"
);
}
// import + apply with an explicit --as: the config is never loaded.
for (command, args) in [("import", vec![]), ("apply", vec!["--as", "andrew"])] {
let mut invocation = cli();
invocation.current_dir(temp.path());
for arg in &args {
invocation.arg(arg);
}
let output = invocation
.arg("cluster")
.arg(command)
.arg("--config")
.arg(temp.path())
.output()
.unwrap();
assert!(
output.status.success(),
"cluster {command} affected by malformed omnigraph.yaml: {}",
String::from_utf8_lossy(&output.stderr)
);
}
// Only the no-flag actor lookup is allowed to fail, and loudly.
let output = output_failure(
cli()
.current_dir(temp.path())
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path()),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("omnigraph.yaml") && stderr.contains("--as"),
"the actor-default config read must fail loudly and actionably: {stderr}"
);
}
/// A well-formed omnigraph.yaml with a CONFLICTING world view (different
/// graphs, server bind) leaks nothing into cluster outputs.
#[test]
fn cluster_commands_ignore_conflicting_local_config() {
let baseline = tempdir().unwrap();
write_cluster_config_fixture(baseline.path());
let with_config = tempdir().unwrap();
write_cluster_config_fixture(with_config.path());
fs::write(
with_config.path().join("omnigraph.yaml"),
r#"
server:
bind: 0.0.0.0:9999
graphs:
phantom:
uri: ./phantom.omni
"#,
)
.unwrap();
let validate = |dir: &std::path::Path| {
let output = cli()
.current_dir(dir)
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(dir)
.arg("--json")
.output()
.unwrap();
assert!(output.status.success(), "{output:?}");
serde_json::from_str::<serde_json::Value>(String::from_utf8_lossy(&output.stdout).trim())
.unwrap()
};
let (a, b) = (validate(baseline.path()), validate(with_config.path()));
// Compare the path-free invariants (paths embed each tempdir).
for key in ["ok", "diagnostics", "resource_digests", "dependencies"] {
assert_eq!(a[key], b[key], "conflicting omnigraph.yaml leaked into cluster validate ({key})");
}
let leaked = b.to_string();
assert!(!leaked.contains("phantom") && !leaked.contains("9999"), "{leaked}");
}