From f3374ac6dcf36db1512ffe8b79070b15b5fdc35c Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 22:29:49 +0300 Subject: [PATCH] feat(cli): resolve cluster actor via the per-operator config cascade MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cluster FACTS stay unlayered (cluster.yaml only), but the operator's identity is a per-operator fact — exactly the per-operator omnigraph.yaml's permanent job, and the cascade every data-plane write already uses. cluster apply/approve now resolve: --as flag wins and skips any config read entirely (containers and CI stay config-free); without it, the standard cwd search supplies cli.actor, with a malformed config failing loudly and actionably ('pass --as to skip this lookup') rather than silently dropping attribution. approve's no-actor error now names both sources. Tests pin the contract from both sides: cli.actor is the no-flag default for apply (echoed actor) and approve (approved_by), the flag overrides it, a malformed omnigraph.yaml in cwd breaks nothing except the no-flag actor lookup, and a conflicting well-formed one leaks nothing into cluster outputs. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 40 ++++-- crates/omnigraph-cli/tests/cli.rs | 228 ++++++++++++++++++++++++++++++ 2 files changed, 254 insertions(+), 14 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index da4f8e8..dab83d1 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use std::sync::Arc; use clap::{Arg, ArgAction, Args, CommandFactory, FromArgMatches, Parser, Subcommand, ValueEnum}; -use color_eyre::eyre::{Result, bail}; +use color_eyre::eyre::{Result, WrapErr, bail}; use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; @@ -1257,6 +1257,22 @@ async fn open_local_db_with_policy(graph: &ResolvedCliGraph) -> Result) -> Result> { + if let Some(actor) = cli_as { + return Ok(Some(actor.to_string())); + } + let config = load_cli_config(None).wrap_err( + "resolving the default actor from the per-operator omnigraph.yaml (pass --as to skip this lookup)", + )?; + Ok(config.cli.actor.clone()) +} + fn resolve_cli_actor<'a>(cli_as: Option<&'a str>, config: &'a OmnigraphConfig) -> Option<&'a str> { cli_as.or(config.cli.actor.as_deref()) } @@ -3610,16 +3626,12 @@ async fn main() -> Result<()> { finish_cluster_plan(&output, json)?; } ClusterCommand::Apply { config, json } => { - // The global --as actor attributes graph-moving operations - // (sidecars, audit entries, engine schema-apply commits). - // Cluster config stays unlayered: no omnigraph.yaml fallback. - let output = apply_config_dir_with_options( - config, - ApplyOptions { - actor: cli.as_actor.clone(), - }, - ) - .await; + // The actor attributes graph-moving operations (sidecars, + // audit entries, engine schema-apply commits). Cluster FACTS + // stay unlayered; the operator's identity resolves --as flag + // first, then the per-operator omnigraph.yaml `cli.actor`. + let actor = resolve_cluster_actor(cli.as_actor.as_deref())?; + let output = apply_config_dir_with_options(config, ApplyOptions { actor }).await; finish_cluster_apply(&output, json)?; } ClusterCommand::Approve { @@ -3627,12 +3639,12 @@ async fn main() -> Result<()> { config, json, } => { - let Some(approver) = cli.as_actor.as_deref() else { + let Some(approver) = resolve_cluster_actor(cli.as_actor.as_deref())? else { bail!( - "`cluster approve` requires the global --as flag: an approval without an approver is meaningless" + "`cluster approve` requires an approver: pass the global --as flag or set `cli.actor` in your omnigraph.yaml — an approval without an approver is meaningless" ); }; - let output = approve_config_dir(config, &resource, approver).await; + let output = approve_config_dir(config, &resource, &approver).await; finish_cluster_approve(&output, json)?; } ClusterCommand::Status { config, json } => { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index e4590f6..00582a7 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -4325,3 +4325,231 @@ fn queries_validate_positional_uri_ignores_default_graph() { "positional URI must validate the top-level registry, not the cli.graph default; stdout:\n{stdout}" ); } + +// ---- per-operator local config (omnigraph.yaml) vs the cluster surfaces ---- + +/// Cluster ops resolve operator identity per-operator: --as wins, and +/// without it the cwd omnigraph.yaml's `cli.actor` is the default. +#[test] +fn cluster_apply_uses_cli_actor_from_local_config() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + fs::write( + temp.path().join("omnigraph.yaml"), + "cli:\n actor: act-local\n", + ) + .unwrap(); + let run = |extra: &[&str]| { + let mut command = cli(); + command.current_dir(temp.path()); + for arg in extra { + command.arg(arg); + } + let output = command + .arg("cluster") + .arg("import") + .arg("--config") + .arg(temp.path()) + .arg("--json") + .output() + .unwrap(); + assert!(output.status.success(), "{output:?}"); + // apply, capturing the echoed actor + let mut command = cli(); + command.current_dir(temp.path()); + for arg in extra { + command.arg(arg); + } + let output = command + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json") + .output() + .unwrap(); + let json: serde_json::Value = + serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap(); + json["actor"].clone() + }; + assert_eq!(run(&[]), "act-local", "cli.actor is the no-flag default"); + + // A fresh dir (state already imported above): the flag wins over config. + let mut command = cli(); + command.current_dir(temp.path()); + let output = command + .arg("--as") + .arg("andrew") + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json") + .output() + .unwrap(); + let json: serde_json::Value = + serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap(); + assert_eq!(json["actor"], "andrew", "--as overrides cli.actor"); +} + +#[test] +fn cluster_approve_uses_cli_actor_fallback() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + fs::write( + temp.path().join("omnigraph.yaml"), + "cli:\n actor: act-local\n", + ) + .unwrap(); + // Converge, then remove the graph so a gated delete is pending. + for command in ["import", "apply"] { + let output = cli() + .current_dir(temp.path()) + .arg("cluster") + .arg(command) + .arg("--config") + .arg(temp.path()) + .output() + .unwrap(); + assert!(output.status.success(), "cluster {command} failed"); + } + fs::write(temp.path().join("cluster.yaml"), "version: 1\ngraphs: {}\n").unwrap(); + + let output = cli() + .current_dir(temp.path()) + .arg("cluster") + .arg("approve") + .arg("graph.knowledge") + .arg("--config") + .arg(temp.path()) + .arg("--json") + .output() + .unwrap(); + assert!(output.status.success(), "{output:?}"); + let json: serde_json::Value = + serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap(); + assert_eq!(json["approved_by"], "act-local"); + + // With neither flag nor config: refused with the actionable message. + let bare = tempdir().unwrap(); + write_cluster_config_fixture(bare.path()); + let output = output_failure( + cli() + .current_dir(bare.path()) + .arg("cluster") + .arg("approve") + .arg("graph.knowledge") + .arg("--config") + .arg(bare.path()), + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("--as"), "{stderr}"); + assert!(stderr.contains("cli.actor"), "{stderr}"); +} + +/// A malformed omnigraph.yaml in the cwd must never break cluster commands; +/// it is read for exactly one thing (the actor default when --as is absent), +/// and only that path fails loudly. +#[test] +fn cluster_commands_ignore_malformed_local_config() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + fs::write(temp.path().join("omnigraph.yaml"), "{{{{ not yaml").unwrap(); + + for command in ["validate", "plan", "status"] { + let output = cli() + .current_dir(temp.path()) + .arg("cluster") + .arg(command) + .arg("--config") + .arg(temp.path()) + .arg("--json") + .output() + .unwrap(); + assert!( + output.status.success() || command == "plan", // plan warns state-missing pre-import; still must not config-error + "cluster {command} affected by malformed omnigraph.yaml: {output:?}" + ); + assert!( + !String::from_utf8_lossy(&output.stderr).contains("omnigraph.yaml"), + "cluster {command} touched omnigraph.yaml" + ); + } + // import + apply with an explicit --as: the config is never loaded. + for (command, args) in [("import", vec![]), ("apply", vec!["--as", "andrew"])] { + let mut invocation = cli(); + invocation.current_dir(temp.path()); + for arg in &args { + invocation.arg(arg); + } + let output = invocation + .arg("cluster") + .arg(command) + .arg("--config") + .arg(temp.path()) + .output() + .unwrap(); + assert!( + output.status.success(), + "cluster {command} affected by malformed omnigraph.yaml: {}", + String::from_utf8_lossy(&output.stderr) + ); + } + // Only the no-flag actor lookup is allowed to fail, and loudly. + let output = output_failure( + cli() + .current_dir(temp.path()) + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()), + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + stderr.contains("omnigraph.yaml") && stderr.contains("--as"), + "the actor-default config read must fail loudly and actionably: {stderr}" + ); +} + +/// A well-formed omnigraph.yaml with a CONFLICTING world view (different +/// graphs, server bind) leaks nothing into cluster outputs. +#[test] +fn cluster_commands_ignore_conflicting_local_config() { + let baseline = tempdir().unwrap(); + write_cluster_config_fixture(baseline.path()); + let with_config = tempdir().unwrap(); + write_cluster_config_fixture(with_config.path()); + fs::write( + with_config.path().join("omnigraph.yaml"), + r#" +server: + bind: 0.0.0.0:9999 +graphs: + phantom: + uri: ./phantom.omni +"#, + ) + .unwrap(); + + let validate = |dir: &std::path::Path| { + let output = cli() + .current_dir(dir) + .arg("cluster") + .arg("validate") + .arg("--config") + .arg(dir) + .arg("--json") + .output() + .unwrap(); + assert!(output.status.success(), "{output:?}"); + serde_json::from_str::(String::from_utf8_lossy(&output.stdout).trim()) + .unwrap() + }; + let (a, b) = (validate(baseline.path()), validate(with_config.path())); + // Compare the path-free invariants (paths embed each tempdir). + for key in ["ok", "diagnostics", "resource_digests", "dependencies"] { + assert_eq!(a[key], b[key], "conflicting omnigraph.yaml leaked into cluster validate ({key})"); + } + let leaked = b.to_string(); + assert!(!leaked.contains("phantom") && !leaked.contains("9999"), "{leaked}"); +}