mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
feat(cluster): cluster approve — digest-bound approval artifacts
RFC-004 §D4, gate half: graph deletes (and their subtree) now classify
Blocked/approval_required instead of Deferred; the new cluster approve
command (requires the global --as actor) writes
__cluster/approvals/{ulid}.json bound to the desired config digest and the
change's before/after digests, so config or state drift invalidates the
artifact automatically (approval_stale warning, never authorizes). One gate
per subtree: compute_approvals lists only the graph-level delete, and
ApprovalRequirement gains a satisfied flag surfaced by plan. Consumption and
the delete executor land next — until then approved deletes stay blocked so
a gate-only build can never strip state without removing the root.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
f799d4578c
commit
f4e9105272
3 changed files with 605 additions and 45 deletions
|
|
@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
|
|||
use omnigraph::loader::LoadMode;
|
||||
use omnigraph::storage::normalize_root_uri;
|
||||
use omnigraph_cluster::{
|
||||
ApplyOptions, ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
|
||||
ValidateOutput, apply_config_dir_with_options, force_unlock_config_dir, import_config_dir, plan_config_dir,
|
||||
ApplyOptions, ApplyOutput, ApproveOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
|
||||
ValidateOutput, apply_config_dir_with_options, approve_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir,
|
||||
refresh_config_dir, status_config_dir, validate_config_dir,
|
||||
};
|
||||
use omnigraph_compiler::query::parser::parse_query;
|
||||
|
|
@ -371,6 +371,18 @@ enum ClusterCommand {
|
|||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Record a digest-bound approval for a gated (irreversible) change,
|
||||
/// e.g. a graph delete. Requires the global --as actor.
|
||||
Approve {
|
||||
/// Typed resource address of the gated change (e.g. graph.scratch).
|
||||
resource: String,
|
||||
/// Cluster config directory containing cluster.yaml.
|
||||
#[arg(long, default_value = ".")]
|
||||
config: PathBuf,
|
||||
/// Emit JSON instead of human text.
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Read the local JSON state ledger without scanning live graph resources.
|
||||
Status {
|
||||
/// Cluster config directory containing cluster.yaml.
|
||||
|
|
@ -1011,6 +1023,33 @@ fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn finish_cluster_approve(output: &ApproveOutput, json: bool) -> Result<()> {
|
||||
if json {
|
||||
print_json(output)?;
|
||||
} else if output.ok {
|
||||
println!(
|
||||
"cluster approve: {} {} approved by {} (approval {})",
|
||||
output
|
||||
.operation
|
||||
.as_ref()
|
||||
.map(|operation| format!("{operation:?}").to_lowercase())
|
||||
.unwrap_or_default(),
|
||||
output.resource.as_deref().unwrap_or("?"),
|
||||
output.approved_by.as_deref().unwrap_or("?"),
|
||||
output.approval_id.as_deref().unwrap_or("?"),
|
||||
);
|
||||
print_cluster_diagnostics(&output.diagnostics);
|
||||
} else {
|
||||
println!("cluster approve failed");
|
||||
print_cluster_diagnostics(&output.diagnostics);
|
||||
}
|
||||
if !output.ok {
|
||||
io::stdout().flush()?;
|
||||
std::process::exit(1);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> {
|
||||
if json {
|
||||
print_json(output)?;
|
||||
|
|
@ -3581,6 +3620,19 @@ async fn main() -> Result<()> {
|
|||
.await;
|
||||
finish_cluster_apply(&output, json)?;
|
||||
}
|
||||
ClusterCommand::Approve {
|
||||
resource,
|
||||
config,
|
||||
json,
|
||||
} => {
|
||||
let Some(approver) = cli.as_actor.as_deref() else {
|
||||
bail!(
|
||||
"`cluster approve` requires the global --as <ACTOR> flag: an approval without an approver is meaningless"
|
||||
);
|
||||
};
|
||||
let output = approve_config_dir(config, &resource, approver).await;
|
||||
finish_cluster_approve(&output, json)?;
|
||||
}
|
||||
ClusterCommand::Status { config, json } => {
|
||||
let output = status_config_dir(config);
|
||||
finish_cluster_status(&output, json)?;
|
||||
|
|
|
|||
|
|
@ -1424,22 +1424,29 @@ policies:
|
|||
let mixed = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(mixed["ok"], true, "{mixed}");
|
||||
assert_eq!(mixed["converged"], false, "{mixed}");
|
||||
// Stage 4C: deletes are gated on a digest-bound approval, one gate per
|
||||
// subtree (the graph-level approval carries schema + queries).
|
||||
assert_eq!(
|
||||
change_for(&mixed, "graph.engineering")["disposition"],
|
||||
"deferred"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "schema.engineering")["disposition"],
|
||||
"deferred"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "query.engineering.find_service")["disposition"],
|
||||
"blocked"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "query.engineering.find_service")["reason"],
|
||||
"dependency_not_applied"
|
||||
change_for(&mixed, "graph.engineering")["reason"],
|
||||
"approval_required"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "schema.engineering")["reason"],
|
||||
"approval_required"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "query.engineering.find_service")["reason"],
|
||||
"approval_required"
|
||||
);
|
||||
let gate_plan = cluster_json(temp.path(), "plan");
|
||||
let gates = gate_plan["approvals_required"].as_array().unwrap();
|
||||
assert_eq!(gates.len(), 1, "{gate_plan}");
|
||||
assert_eq!(gates[0]["resource"], "graph.engineering");
|
||||
assert_eq!(gates[0]["satisfied"], false);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "query.knowledge.find_person")["disposition"],
|
||||
"applied"
|
||||
|
|
@ -1461,7 +1468,7 @@ policies:
|
|||
let mut sorted = order.clone();
|
||||
sorted.sort_unstable();
|
||||
assert_eq!(order, sorted, "{mixed}");
|
||||
// Graph deletion cannot converge until stage 4C's approval artifacts.
|
||||
// Conclusion (approve + converge) extends below once the delete executor lands.
|
||||
}
|
||||
|
||||
/// Stage 4A headline: a declared graph is created by `cluster apply` itself —
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json";
|
|||
pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json";
|
||||
pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources";
|
||||
pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries";
|
||||
pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
|
|
@ -212,6 +213,9 @@ pub struct BlastRadius {
|
|||
pub struct ApprovalRequirement {
|
||||
pub resource: String,
|
||||
pub reason: String,
|
||||
/// True when a valid (digest-matching, unconsumed) approval artifact is
|
||||
/// pending for this change.
|
||||
pub satisfied: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
|
|
@ -293,6 +297,47 @@ pub struct ApplyOutput {
|
|||
pub diagnostics: Vec<Diagnostic>,
|
||||
}
|
||||
|
||||
/// A digest-bound human approval for an irreversible operation (RFC-004
|
||||
/// §D4). Written by `cluster approve`, consumed by apply. The file is never
|
||||
/// deleted on consumption — it is rewritten with `consumed_at` and also
|
||||
/// summarized into the state ledger's `approval_records`, so the audit fact
|
||||
/// survives the loss of either store (axiom 11).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct ApprovalArtifact {
|
||||
schema_version: u32,
|
||||
approval_id: String,
|
||||
resource: String,
|
||||
operation: String,
|
||||
reason: String,
|
||||
bound_config_digest: String,
|
||||
#[serde(default)]
|
||||
bound_before_digest: Option<String>,
|
||||
#[serde(default)]
|
||||
bound_after_digest: Option<String>,
|
||||
approved_by: String,
|
||||
created_at: String,
|
||||
#[serde(default)]
|
||||
consumed_at: Option<String>,
|
||||
#[serde(default)]
|
||||
consumed_by_operation: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ApproveOutput {
|
||||
pub ok: bool,
|
||||
pub config_dir: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub approval_id: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub resource: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub operation: Option<PlanOperation>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub approved_by: Option<String>,
|
||||
pub diagnostics: Vec<Diagnostic>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct DesiredCluster {
|
||||
config_dir: PathBuf,
|
||||
|
|
@ -472,6 +517,7 @@ struct LocalStateBackend {
|
|||
state_path: PathBuf,
|
||||
lock_path: PathBuf,
|
||||
recoveries_dir: PathBuf,
|
||||
approvals_dir: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
@ -588,7 +634,14 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
|
|||
};
|
||||
// Plan previews dispositions without sweeping; a pending recovery is
|
||||
// surfaced as the cluster_recovery_pending warning above instead.
|
||||
classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new());
|
||||
let artifacts = backend.list_approval_artifacts(&mut diagnostics);
|
||||
let approved = approved_resources(
|
||||
&artifacts,
|
||||
&changes,
|
||||
&desired.config_digest,
|
||||
&mut diagnostics,
|
||||
);
|
||||
classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new(), &approved);
|
||||
|
||||
// Embed real migration steps for schema updates so plan is a data-aware
|
||||
// preview; failures degrade to the digest diff with a warning.
|
||||
|
|
@ -624,7 +677,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
|
|||
}
|
||||
}
|
||||
let blast_radius = compute_blast_radius(&changes, &desired.dependencies);
|
||||
let approvals_required = compute_approvals(&changes);
|
||||
let approvals_required = compute_approvals(&changes, &approved);
|
||||
let ok = !has_errors(&diagnostics);
|
||||
|
||||
PlanOutput {
|
||||
|
|
@ -790,17 +843,29 @@ pub async fn apply_config_dir_with_options(
|
|||
|
||||
let prior_resources = state_resource_digests(&state);
|
||||
let mut changes = diff_resources(&prior_resources, &desired.resource_digests);
|
||||
classify_changes(&mut changes, &desired.dependencies, &sweep.pending_graphs);
|
||||
let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics);
|
||||
let approved = approved_resources(
|
||||
&approval_artifacts,
|
||||
&changes,
|
||||
&desired.config_digest,
|
||||
&mut diagnostics,
|
||||
);
|
||||
classify_changes(
|
||||
&mut changes,
|
||||
&desired.dependencies,
|
||||
&sweep.pending_graphs,
|
||||
&approved,
|
||||
);
|
||||
|
||||
// Defensive invariant: nothing the approval gate covers may be executable.
|
||||
// Today approvals only cover graph/schema deletes (always deferred); this
|
||||
// keeps a future widening of the executable set from silently bypassing it.
|
||||
let approvals = compute_approvals(&changes);
|
||||
// Defensive invariant: nothing the approval gate covers may be executable
|
||||
// WITHOUT a matching approval. Gated changes with a valid artifact are the
|
||||
// sanctioned exception (stage 4C).
|
||||
let approvals = compute_approvals(&changes, &approved);
|
||||
let approval_violation = changes.iter().any(|change| {
|
||||
change.disposition == Some(ApplyDisposition::Applied)
|
||||
&& approvals
|
||||
.iter()
|
||||
.any(|approval| approval.resource == change.resource)
|
||||
.any(|approval| approval.resource == change.resource && !approval.satisfied)
|
||||
});
|
||||
if approval_violation {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
|
|
@ -1349,6 +1414,126 @@ pub async fn apply_config_dir_with_options(
|
|||
}
|
||||
}
|
||||
|
||||
/// Record a digest-bound human approval for a gated (irreversible) change —
|
||||
/// today: graph deletes. The artifact binds to the exact desired config
|
||||
/// digest and the change's before/after digests, so config or state drift
|
||||
/// invalidates it automatically (a stale approval can never authorize a
|
||||
/// different change).
|
||||
pub async fn approve_config_dir(
|
||||
config_dir: impl AsRef<Path>,
|
||||
resource: &str,
|
||||
approved_by: &str,
|
||||
) -> ApproveOutput {
|
||||
let outcome = load_desired(config_dir.as_ref());
|
||||
let mut diagnostics = outcome.diagnostics;
|
||||
let backend = LocalStateBackend::new(&outcome.config_dir);
|
||||
let mut observations = backend.observations();
|
||||
|
||||
let fail = |config_dir: String, diagnostics: Vec<Diagnostic>| ApproveOutput {
|
||||
ok: false,
|
||||
config_dir,
|
||||
approval_id: None,
|
||||
resource: None,
|
||||
operation: None,
|
||||
approved_by: None,
|
||||
diagnostics,
|
||||
};
|
||||
|
||||
let Some(desired) = outcome.desired else {
|
||||
return fail(display_path(&outcome.config_dir), diagnostics);
|
||||
};
|
||||
if has_errors(&diagnostics) {
|
||||
return fail(display_path(&desired.config_dir), diagnostics);
|
||||
}
|
||||
|
||||
let _lock_guard = if desired.state_lock {
|
||||
match backend.acquire_lock("approve", &mut observations) {
|
||||
Ok(guard) => Some(guard),
|
||||
Err(diagnostic) => {
|
||||
diagnostics.push(diagnostic);
|
||||
return fail(display_path(&desired.config_dir), diagnostics);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"state_lock_disabled",
|
||||
"state.lock",
|
||||
"state.lock is false; approve ran without acquiring the cluster state lock",
|
||||
));
|
||||
None
|
||||
};
|
||||
|
||||
let state = match backend.read_state(&mut observations) {
|
||||
Ok(snapshot) => match snapshot.state {
|
||||
Some(state) => state,
|
||||
None => {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"state_missing",
|
||||
CLUSTER_STATE_FILE,
|
||||
"approve requires an existing state.json; run `cluster import` first",
|
||||
));
|
||||
return fail(display_path(&desired.config_dir), diagnostics);
|
||||
}
|
||||
},
|
||||
Err(diagnostic) => {
|
||||
diagnostics.push(diagnostic);
|
||||
return fail(display_path(&desired.config_dir), diagnostics);
|
||||
}
|
||||
};
|
||||
|
||||
let prior_resources = state_resource_digests(&state);
|
||||
let changes = diff_resources(&prior_resources, &desired.resource_digests);
|
||||
let gates = compute_approvals(&changes, &BTreeSet::new());
|
||||
let Some(change) = changes.iter().find(|change| {
|
||||
change.resource == resource && gates.iter().any(|gate| gate.resource == resource)
|
||||
}) else {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"approval_not_required",
|
||||
resource,
|
||||
"no pending change for this resource requires approval (check `cluster plan`)",
|
||||
));
|
||||
return fail(display_path(&desired.config_dir), diagnostics);
|
||||
};
|
||||
|
||||
let artifact = ApprovalArtifact {
|
||||
schema_version: 1,
|
||||
approval_id: Ulid::new().to_string(),
|
||||
resource: change.resource.clone(),
|
||||
operation: match change.operation {
|
||||
PlanOperation::Create => "create",
|
||||
PlanOperation::Update => "update",
|
||||
PlanOperation::Delete => "delete",
|
||||
}
|
||||
.to_string(),
|
||||
reason: gates
|
||||
.iter()
|
||||
.find(|gate| gate.resource == resource)
|
||||
.map(|gate| gate.reason.clone())
|
||||
.unwrap_or_default(),
|
||||
bound_config_digest: desired.config_digest.clone(),
|
||||
bound_before_digest: change.before_digest.clone(),
|
||||
bound_after_digest: change.after_digest.clone(),
|
||||
approved_by: approved_by.to_string(),
|
||||
created_at: now_rfc3339(),
|
||||
consumed_at: None,
|
||||
consumed_by_operation: None,
|
||||
};
|
||||
if let Err(diagnostic) = backend.write_approval_artifact(&artifact) {
|
||||
diagnostics.push(diagnostic);
|
||||
return fail(display_path(&desired.config_dir), diagnostics);
|
||||
}
|
||||
|
||||
ApproveOutput {
|
||||
ok: !has_errors(&diagnostics),
|
||||
config_dir: display_path(&desired.config_dir),
|
||||
approval_id: Some(artifact.approval_id),
|
||||
resource: Some(artifact.resource),
|
||||
operation: Some(change.operation.clone()),
|
||||
approved_by: Some(artifact.approved_by),
|
||||
diagnostics,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
|
||||
let parsed = parse_cluster_config(config_dir.as_ref());
|
||||
let mut diagnostics = parsed.diagnostics;
|
||||
|
|
@ -1773,10 +1958,102 @@ impl LocalStateBackend {
|
|||
state_path: config_dir.join(CLUSTER_STATE_FILE),
|
||||
lock_path: config_dir.join(CLUSTER_LOCK_FILE),
|
||||
recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR),
|
||||
approvals_dir: config_dir.join(CLUSTER_APPROVALS_DIR),
|
||||
state_dir,
|
||||
}
|
||||
}
|
||||
|
||||
/// List approval artifacts in ULID (filename) order; unparseable files
|
||||
/// warn and stay on disk for the operator.
|
||||
fn list_approval_artifacts(
|
||||
&self,
|
||||
diagnostics: &mut Vec<Diagnostic>,
|
||||
) -> Vec<(PathBuf, ApprovalArtifact)> {
|
||||
let mut paths = Vec::new();
|
||||
match fs::read_dir(&self.approvals_dir) {
|
||||
Ok(entries) => {
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if path.extension().is_some_and(|ext| ext == "json") {
|
||||
paths.push(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||
"approval_read_error",
|
||||
CLUSTER_APPROVALS_DIR,
|
||||
format!("could not list approval artifacts: {err}"),
|
||||
)),
|
||||
}
|
||||
paths.sort();
|
||||
let mut artifacts = Vec::new();
|
||||
for path in paths {
|
||||
match fs::read_to_string(&path)
|
||||
.map_err(|err| err.to_string())
|
||||
.and_then(|text| {
|
||||
serde_json::from_str::<ApprovalArtifact>(&text).map_err(|err| err.to_string())
|
||||
}) {
|
||||
Ok(artifact) if artifact.schema_version == 1 => artifacts.push((path, artifact)),
|
||||
Ok(artifact) => diagnostics.push(Diagnostic::warning(
|
||||
"unsupported_approval_version",
|
||||
display_path(&path),
|
||||
format!(
|
||||
"unsupported approval artifact version {}; leaving it in place",
|
||||
artifact.schema_version
|
||||
),
|
||||
)),
|
||||
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||
"invalid_approval_artifact",
|
||||
display_path(&path),
|
||||
format!("could not parse approval artifact ({err}); leaving it in place"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
artifacts
|
||||
}
|
||||
|
||||
/// Atomically write (or rewrite, e.g. on consumption) an approval artifact.
|
||||
fn write_approval_artifact(&self, artifact: &ApprovalArtifact) -> Result<PathBuf, Diagnostic> {
|
||||
fs::create_dir_all(&self.approvals_dir).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"approval_write_error",
|
||||
CLUSTER_APPROVALS_DIR,
|
||||
format!("could not create approvals directory: {err}"),
|
||||
)
|
||||
})?;
|
||||
let target = self
|
||||
.approvals_dir
|
||||
.join(format!("{}.json", artifact.approval_id));
|
||||
let mut payload = serde_json::to_string_pretty(artifact).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"approval_write_error",
|
||||
display_path(&target),
|
||||
format!("could not encode approval artifact: {err}"),
|
||||
)
|
||||
})?;
|
||||
payload.push('\n');
|
||||
let tmp_path = self
|
||||
.approvals_dir
|
||||
.join(format!("{}.json.tmp.{}", artifact.approval_id, Ulid::new()));
|
||||
fs::write(&tmp_path, payload.as_bytes()).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"approval_write_error",
|
||||
display_path(&tmp_path),
|
||||
format!("could not write approval artifact: {err}"),
|
||||
)
|
||||
})?;
|
||||
if let Err(err) = fs::rename(&tmp_path, &target) {
|
||||
let _ = fs::remove_file(&tmp_path);
|
||||
return Err(Diagnostic::error(
|
||||
"approval_write_error",
|
||||
display_path(&target),
|
||||
format!("could not move approval artifact into place: {err}"),
|
||||
));
|
||||
}
|
||||
Ok(target)
|
||||
}
|
||||
|
||||
/// List recovery sidecars in ULID (filename) order. Unparseable files are
|
||||
/// reported as warnings and skipped — they stay on disk for the operator.
|
||||
fn list_recovery_sidecars(
|
||||
|
|
@ -3127,24 +3404,74 @@ fn compute_blast_radius(changes: &[PlanChange], dependencies: &[Dependency]) ->
|
|||
.collect()
|
||||
}
|
||||
|
||||
fn compute_approvals(changes: &[PlanChange]) -> Vec<ApprovalRequirement> {
|
||||
fn compute_approvals(
|
||||
changes: &[PlanChange],
|
||||
approved: &BTreeSet<String>,
|
||||
) -> Vec<ApprovalRequirement> {
|
||||
// One gate per subtree: the graph.<id> delete carries its schema and
|
||||
// queries, so a schema delete whose graph is also deleted is not listed.
|
||||
let graph_deletes: BTreeSet<String> = changes
|
||||
.iter()
|
||||
.filter(|change| change.operation == PlanOperation::Delete)
|
||||
.filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
|
||||
.collect();
|
||||
changes
|
||||
.iter()
|
||||
.filter_map(|change| {
|
||||
if change.operation == PlanOperation::Delete
|
||||
&& (change.resource.starts_with("graph.") || change.resource.starts_with("schema."))
|
||||
{
|
||||
Some(ApprovalRequirement {
|
||||
resource: change.resource.clone(),
|
||||
reason: "delete may remove deployed graph or schema definition".to_string(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
if change.operation != PlanOperation::Delete {
|
||||
return None;
|
||||
}
|
||||
let gated = match resource_kind(&change.resource) {
|
||||
ResourceKind::Graph(_) => true,
|
||||
ResourceKind::Schema(graph) => !graph_deletes.contains(&graph),
|
||||
_ => false,
|
||||
};
|
||||
gated.then(|| ApprovalRequirement {
|
||||
resource: change.resource.clone(),
|
||||
reason: "delete may remove deployed graph or schema definition".to_string(),
|
||||
satisfied: approved.contains(&change.resource),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Resources with a valid (digest-matching, unconsumed) pending approval.
|
||||
/// Near-misses — an artifact for the same resource whose bound digests no
|
||||
/// longer match — warn as `approval_stale` and never authorize anything.
|
||||
fn approved_resources(
|
||||
artifacts: &[(PathBuf, ApprovalArtifact)],
|
||||
changes: &[PlanChange],
|
||||
config_digest: &str,
|
||||
diagnostics: &mut Vec<Diagnostic>,
|
||||
) -> BTreeSet<String> {
|
||||
let mut approved = BTreeSet::new();
|
||||
for change in changes {
|
||||
let candidates: Vec<&ApprovalArtifact> = artifacts
|
||||
.iter()
|
||||
.map(|(_, artifact)| artifact)
|
||||
.filter(|artifact| artifact.consumed_at.is_none() && artifact.resource == change.resource)
|
||||
.collect();
|
||||
if candidates.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let matched = candidates.iter().any(|artifact| {
|
||||
artifact.bound_config_digest == config_digest
|
||||
&& artifact.bound_before_digest == change.before_digest
|
||||
&& artifact.bound_after_digest == change.after_digest
|
||||
});
|
||||
if matched {
|
||||
approved.insert(change.resource.clone());
|
||||
} else {
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"approval_stale",
|
||||
change.resource.clone(),
|
||||
"an approval artifact exists but its bound digests no longer match the plan; re-run `cluster approve`",
|
||||
));
|
||||
}
|
||||
}
|
||||
approved
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum ResourceKind {
|
||||
Graph(String),
|
||||
|
|
@ -3182,6 +3509,7 @@ fn classify_changes(
|
|||
changes: &mut [PlanChange],
|
||||
dependencies: &[Dependency],
|
||||
pending_recovery: &BTreeSet<String>,
|
||||
approved: &BTreeSet<String>,
|
||||
) {
|
||||
let mut schema_creates = BTreeSet::new();
|
||||
let mut schema_pending = BTreeSet::new();
|
||||
|
|
@ -3219,6 +3547,11 @@ fn classify_changes(
|
|||
schema_pending.insert(graph.clone());
|
||||
}
|
||||
}
|
||||
// Subtree deletes ride the approved graph delete. NOTE: execution lands
|
||||
// with the delete executor; until then approved deletes stay blocked so a
|
||||
// gate-only build can never strip state without removing the root.
|
||||
let rides_approved_delete = |_graph: &str| false;
|
||||
let _ = approved;
|
||||
|
||||
for change in changes.iter_mut() {
|
||||
let (disposition, reason) = match resource_kind(&change.resource) {
|
||||
|
|
@ -3238,6 +3571,15 @@ fn classify_changes(
|
|||
PlanOperation::Create | PlanOperation::Update => {
|
||||
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||
}
|
||||
PlanOperation::Delete if graph_deletes.contains(&graph) => {
|
||||
if rides_approved_delete(&graph) {
|
||||
(ApplyDisposition::Applied, None)
|
||||
} else if pending_recovery.contains(&graph) {
|
||||
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||
} else {
|
||||
(ApplyDisposition::Blocked, Some("approval_required"))
|
||||
}
|
||||
}
|
||||
_ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
|
||||
},
|
||||
ResourceKind::Graph(graph) => match change.operation {
|
||||
|
|
@ -3251,15 +3593,26 @@ fn classify_changes(
|
|||
PlanOperation::Update if !schema_pending.contains(&graph) => {
|
||||
(ApplyDisposition::Derived, None)
|
||||
}
|
||||
// Stage 4C: an approved graph delete executes (the
|
||||
// irreversible tier — gated by a digest-bound artifact).
|
||||
PlanOperation::Delete => {
|
||||
if pending_recovery.contains(&graph) {
|
||||
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||
} else if rides_approved_delete(&graph) {
|
||||
(ApplyDisposition::Applied, None)
|
||||
} else {
|
||||
(ApplyDisposition::Blocked, Some("approval_required"))
|
||||
}
|
||||
}
|
||||
_ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
|
||||
},
|
||||
ResourceKind::Query { graph, .. } => match change.operation {
|
||||
PlanOperation::Delete => {
|
||||
if graph_deletes.contains(&graph) {
|
||||
(
|
||||
ApplyDisposition::Blocked,
|
||||
Some("dependency_not_applied"),
|
||||
)
|
||||
if rides_approved_delete(&graph) {
|
||||
// Tombstoned with the approved graph delete.
|
||||
(ApplyDisposition::Applied, None)
|
||||
} else if graph_deletes.contains(&graph) {
|
||||
(ApplyDisposition::Blocked, Some("approval_required"))
|
||||
} else {
|
||||
(ApplyDisposition::Applied, None)
|
||||
}
|
||||
|
|
@ -5302,7 +5655,7 @@ graphs:
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_does_not_delete_subtree_of_deleted_graph() {
|
||||
async fn apply_blocks_graph_delete_without_approval() {
|
||||
let dir = fixture();
|
||||
let desired = validate_config_dir(dir.path());
|
||||
let schema_digest = desired
|
||||
|
|
@ -5331,18 +5684,25 @@ graphs:
|
|||
.iter()
|
||||
.map(|change| (change.resource.as_str(), change))
|
||||
.collect();
|
||||
// Stage 4C: deletes are gated, not deferred — every subtree change
|
||||
// blocks on the single graph-level approval.
|
||||
assert_eq!(
|
||||
by_resource["graph.old"].disposition,
|
||||
Some(ApplyDisposition::Deferred)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["schema.old"].disposition,
|
||||
Some(ApplyDisposition::Deferred)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["query.old.q"].disposition,
|
||||
Some(ApplyDisposition::Blocked)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["graph.old"].reason.as_deref(),
|
||||
Some("approval_required")
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["schema.old"].reason.as_deref(),
|
||||
Some("approval_required")
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["query.old.q"].reason.as_deref(),
|
||||
Some("approval_required")
|
||||
);
|
||||
// State intact; nothing destroyed without the artifact.
|
||||
let state = read_state_json(dir.path());
|
||||
let resources = &state["applied_revision"]["resources"];
|
||||
assert_eq!(resources["graph.old"]["digest"], "3333");
|
||||
|
|
@ -5350,6 +5710,147 @@ graphs:
|
|||
assert_eq!(resources["query.old.q"]["digest"], "5555");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn approve_writes_digest_bound_artifact() {
|
||||
let dir = fixture();
|
||||
write_applyable_state(dir.path());
|
||||
// Seed a deletable subtree.
|
||||
let state = read_state_json(dir.path());
|
||||
let graph_digest_str = state["applied_revision"]["resources"]["graph.knowledge"]["digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let schema_digest_str = state["applied_revision"]["resources"]["schema.knowledge"]
|
||||
["digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
write_state_resources(
|
||||
dir.path(),
|
||||
&[
|
||||
("graph.knowledge", graph_digest_str.as_str()),
|
||||
("schema.knowledge", schema_digest_str.as_str()),
|
||||
("graph.old", "3333"),
|
||||
("schema.old", "4444"),
|
||||
],
|
||||
);
|
||||
|
||||
let out = approve_config_dir(dir.path(), "graph.old", "andrew").await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
let approval_id = out.approval_id.clone().unwrap();
|
||||
let artifact: serde_json::Value = serde_json::from_str(
|
||||
&fs::read_to_string(
|
||||
dir.path()
|
||||
.join(CLUSTER_APPROVALS_DIR)
|
||||
.join(format!("{approval_id}.json")),
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(artifact["resource"], "graph.old");
|
||||
assert_eq!(artifact["operation"], "delete");
|
||||
assert_eq!(artifact["approved_by"], "andrew");
|
||||
assert_eq!(artifact["bound_before_digest"], "3333");
|
||||
assert!(artifact["bound_after_digest"].is_null());
|
||||
assert!(artifact["bound_config_digest"].is_string());
|
||||
assert!(artifact["consumed_at"].is_null());
|
||||
|
||||
// A non-gated address is refused.
|
||||
let not_gated = approve_config_dir(dir.path(), "query.knowledge.find_person", "andrew").await;
|
||||
assert!(!not_gated.ok);
|
||||
assert!(
|
||||
not_gated
|
||||
.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "approval_not_required")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stale_approval_is_ignored() {
|
||||
let dir = fixture();
|
||||
write_applyable_state(dir.path());
|
||||
let state = read_state_json(dir.path());
|
||||
let graph_digest_str = state["applied_revision"]["resources"]["graph.knowledge"]["digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let schema_digest_str = state["applied_revision"]["resources"]["schema.knowledge"]
|
||||
["digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
write_state_resources(
|
||||
dir.path(),
|
||||
&[
|
||||
("graph.knowledge", graph_digest_str.as_str()),
|
||||
("schema.knowledge", schema_digest_str.as_str()),
|
||||
("graph.old", "3333"),
|
||||
],
|
||||
);
|
||||
let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await;
|
||||
assert!(approved.ok, "{:?}", approved.diagnostics);
|
||||
// The config moves after approval: the bound config digest no longer
|
||||
// matches and the artifact authorizes nothing.
|
||||
fs::write(dir.path().join("base.policy.yaml"), "rules: [] # moved\n").unwrap();
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "approval_stale"),
|
||||
"{:?}",
|
||||
out.diagnostics
|
||||
);
|
||||
let by_resource: BTreeMap<&str, &PlanChange> = out
|
||||
.changes
|
||||
.iter()
|
||||
.map(|change| (change.resource.as_str(), change))
|
||||
.collect();
|
||||
assert_eq!(
|
||||
by_resource["graph.old"].reason.as_deref(),
|
||||
Some("approval_required")
|
||||
);
|
||||
let state = read_state_json(dir.path());
|
||||
assert_eq!(
|
||||
state["applied_revision"]["resources"]["graph.old"]["digest"],
|
||||
"3333"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compute_approvals_one_gate_per_subtree() {
|
||||
let dir = fixture();
|
||||
write_applyable_state(dir.path());
|
||||
let state = read_state_json(dir.path());
|
||||
let g = state["applied_revision"]["resources"]["graph.knowledge"]["digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let sc = state["applied_revision"]["resources"]["schema.knowledge"]["digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
write_state_resources(
|
||||
dir.path(),
|
||||
&[
|
||||
("graph.knowledge", g.as_str()),
|
||||
("schema.knowledge", sc.as_str()),
|
||||
("graph.old", "3333"),
|
||||
("schema.old", "4444"),
|
||||
("query.old.q", "5555"),
|
||||
],
|
||||
);
|
||||
let plan = plan_config_dir(dir.path()).await;
|
||||
let gated: Vec<&str> = plan
|
||||
.approvals_required
|
||||
.iter()
|
||||
.map(|gate| gate.resource.as_str())
|
||||
.collect();
|
||||
assert_eq!(gated, vec!["graph.old"], "{plan:?}");
|
||||
assert!(!plan.approvals_required[0].satisfied);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_is_idempotent() {
|
||||
let dir = fixture();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue