mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
feat(cluster): execute approved graph deletes in cluster apply
Stage 4C execution half (RFC-004 §D5/§D6 + sweep rows 7/7b/8): an approved graph.<id> delete — and its riding schema/query deletes — classifies Applied and executes LAST in the run, sidecar-fenced: pre-op manifest pin (best effort; partial roots still delete), approval_id carried in the sidecar, recursive root removal (NotFound tolerated), subtree tombstoned out of the ledger with a tombstone observation, the approval consumed in the same state CAS (ledger summary) and its artifact file rewritten with consumed_at only after the CAS lands — a failed run consumes nothing and the approval stays valid for the retry. Sweep rows: already-tombstoned intents retire (7); a completed delete with a stale ledger rolls forward — tombstone + approval consumption + audit entry (7b, idempotent); a still-present root retires the stale intent with a graph_delete_incomplete warning and the still-approved delete re-executes in the same run (8) — prefix removal is idempotent, so retry IS the repair. The multi-graph mixed e2e gets its conclusion: blocked without approval, cluster approve graph.engineering --as andrew, converge, tombstone visible in status. Phase 4's disposition matrix is now fully executable. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
f4e9105272
commit
d1d04217ab
2 changed files with 513 additions and 9 deletions
|
|
@ -1358,7 +1358,7 @@ fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph()
|
|||
/// (applied), its composite (derived) — shows all four dispositions at once
|
||||
/// before the graph-plane schema apply closes the loop.
|
||||
#[test]
|
||||
fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
|
||||
fn cluster_e2e_multi_graph_mixed_dispositions_then_approve_and_converge() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_multi_graph_cluster_fixture(temp.path());
|
||||
// No manual init: Stage 4A creates both graphs.
|
||||
|
|
@ -1468,7 +1468,55 @@ policies:
|
|||
let mut sorted = order.clone();
|
||||
sorted.sort_unstable();
|
||||
assert_eq!(order, sorted, "{mixed}");
|
||||
// Conclusion (approve + converge) extends below once the delete executor lands.
|
||||
// The conclusion: an apply without approval stays blocked; the approved
|
||||
// delete converges the cluster, tombstoning the removed graph.
|
||||
let still_blocked = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(still_blocked["converged"], false, "{still_blocked}");
|
||||
|
||||
let approve = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("--as")
|
||||
.arg("andrew")
|
||||
.arg("cluster")
|
||||
.arg("approve")
|
||||
.arg("graph.engineering")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(approve["ok"], true, "{approve}");
|
||||
assert_eq!(approve["approved_by"], "andrew");
|
||||
|
||||
let converge = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(converge["ok"], true, "{converge}");
|
||||
assert_eq!(converge["converged"], true, "{converge}");
|
||||
assert!(!temp.path().join("graphs/engineering.omni").exists());
|
||||
|
||||
let status = cluster_json(temp.path(), "status");
|
||||
assert_eq!(status["observations"]["graph.engineering"]["kind"], "tombstone");
|
||||
let final_plan = cluster_json(temp.path(), "plan");
|
||||
assert!(
|
||||
final_plan["changes"].as_array().unwrap().is_empty(),
|
||||
"{final_plan}"
|
||||
);
|
||||
}
|
||||
|
||||
/// An approval without an approver is meaningless: approve requires --as.
|
||||
#[test]
|
||||
fn cluster_e2e_approve_requires_actor() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("approve")
|
||||
.arg("graph.knowledge")
|
||||
.arg("--config")
|
||||
.arg(temp.path()),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("--as"), "{stderr}");
|
||||
}
|
||||
|
||||
/// Stage 4A headline: a declared graph is created by `cluster apply` itself —
|
||||
|
|
|
|||
|
|
@ -491,6 +491,10 @@ struct RecoverySidecar {
|
|||
desired_schema_digest: String,
|
||||
#[serde(default)]
|
||||
state_cas_base: Option<String>,
|
||||
/// For graph_delete: the approval this operation consumes; lets a sweep
|
||||
/// roll-forward consume it too.
|
||||
#[serde(default)]
|
||||
approval_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
|
|
@ -498,7 +502,7 @@ struct RecoverySidecar {
|
|||
enum RecoverySidecarKind {
|
||||
GraphCreate,
|
||||
SchemaApply,
|
||||
// GraphDelete arrives with stage 4C.
|
||||
GraphDelete,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
|
@ -509,6 +513,9 @@ struct SweepOutcome {
|
|||
/// Sidecars whose outcome is recorded (rows 2/4): deleted only after the
|
||||
/// command's state write lands, so a CAS failure re-sweeps them.
|
||||
completed_sidecars: Vec<PathBuf>,
|
||||
/// Approval artifacts consumed by a roll-forward (delete row 7b): their
|
||||
/// files are rewritten with consumed_at only after the state write lands.
|
||||
consumed_approvals: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
@ -942,6 +949,7 @@ pub async fn apply_config_dir_with_options(
|
|||
expected_manifest_version: None,
|
||||
desired_schema_digest: desired_graph.schema_digest.clone(),
|
||||
state_cas_base: expected_cas.clone(),
|
||||
approval_id: None,
|
||||
};
|
||||
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
|
||||
Ok(path) => path,
|
||||
|
|
@ -1105,6 +1113,7 @@ pub async fn apply_config_dir_with_options(
|
|||
expected_manifest_version: None,
|
||||
desired_schema_digest: desired_graph.schema_digest.clone(),
|
||||
state_cas_base: expected_cas.clone(),
|
||||
approval_id: None,
|
||||
};
|
||||
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
|
||||
Ok(path) => path,
|
||||
|
|
@ -1290,6 +1299,121 @@ pub async fn apply_config_dir_with_options(
|
|||
);
|
||||
}
|
||||
|
||||
// Approved graph deletes execute LAST (RFC-004 §D5): catalog writes for
|
||||
// surviving resources land first, then the irreversible work.
|
||||
let graph_deletes_to_run: Vec<String> = changes
|
||||
.iter()
|
||||
.filter(|change| {
|
||||
change.disposition == Some(ApplyDisposition::Applied)
|
||||
&& change.operation == PlanOperation::Delete
|
||||
&& matches!(resource_kind(&change.resource), ResourceKind::Graph(_))
|
||||
})
|
||||
.filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
|
||||
.collect();
|
||||
let mut executed_deletes: Vec<(String, Option<String>)> = Vec::new(); // (graph_id, approval_id)
|
||||
let mut consumed_approval_ids: Vec<String> = Vec::new();
|
||||
for graph_id in &graph_deletes_to_run {
|
||||
if graph_moving_aborted {
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"graph_delete_skipped",
|
||||
graph_address(graph_id),
|
||||
"skipped after an earlier graph-moving operation failed in this run",
|
||||
));
|
||||
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
|
||||
continue;
|
||||
}
|
||||
let graph_addr = graph_address(graph_id);
|
||||
// Re-locate the consumable approval (classification verified one exists).
|
||||
let approval_id = approval_artifacts
|
||||
.iter()
|
||||
.map(|(_, artifact)| artifact)
|
||||
.find(|artifact| {
|
||||
artifact.consumed_at.is_none()
|
||||
&& artifact.resource == graph_addr
|
||||
&& artifact.bound_config_digest == desired.config_digest
|
||||
})
|
||||
.map(|artifact| artifact.approval_id.clone());
|
||||
let graph_uri = display_path(
|
||||
&desired
|
||||
.config_dir
|
||||
.join(CLUSTER_GRAPHS_DIR)
|
||||
.join(format!("{graph_id}.omni")),
|
||||
);
|
||||
let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await {
|
||||
Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await {
|
||||
Ok(snapshot) => Some(snapshot.version()),
|
||||
Err(_) => None,
|
||||
},
|
||||
Err(_) => None, // partial/unopenable roots still get deleted
|
||||
};
|
||||
let sidecar = RecoverySidecar {
|
||||
schema_version: 1,
|
||||
operation_id: Ulid::new().to_string(),
|
||||
started_at: now_rfc3339(),
|
||||
actor: options.actor.clone(),
|
||||
kind: RecoverySidecarKind::GraphDelete,
|
||||
graph_id: graph_id.clone(),
|
||||
graph_uri: graph_uri.clone(),
|
||||
observed_manifest_version,
|
||||
expected_manifest_version: None, // no post-op manifest exists
|
||||
desired_schema_digest: String::new(),
|
||||
state_cas_base: expected_cas.clone(),
|
||||
approval_id: approval_id.clone(),
|
||||
};
|
||||
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
|
||||
Ok(path) => path,
|
||||
Err(diagnostic) => {
|
||||
diagnostics.push(diagnostic);
|
||||
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
|
||||
graph_moving_aborted = true;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_delete") {
|
||||
// Simulated crash before removal: row 8 retires the intent and
|
||||
// the still-valid approval lets a later run retry.
|
||||
diagnostics.push(diagnostic);
|
||||
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
|
||||
graph_moving_aborted = true;
|
||||
continue;
|
||||
}
|
||||
match fs::remove_dir_all(PathBuf::from(&graph_uri)) {
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {} // already gone
|
||||
Err(err) => {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"graph_delete_failed",
|
||||
graph_addr.clone(),
|
||||
format!("could not remove graph root '{graph_uri}': {err}"),
|
||||
));
|
||||
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
|
||||
graph_moving_aborted = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// Crash point: the root is gone, the ledger does not record it yet.
|
||||
// The sweep rolls forward (row 7b) and consumes the approval.
|
||||
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_delete") {
|
||||
diagnostics.push(diagnostic);
|
||||
return early_return(
|
||||
display_path(&desired.config_dir),
|
||||
Some(desired.config_digest),
|
||||
observations,
|
||||
changes,
|
||||
state.resource_statuses,
|
||||
diagnostics,
|
||||
);
|
||||
}
|
||||
executed_deletes.push((graph_id.clone(), approval_id.clone()));
|
||||
if let Some(approval_id) = approval_id {
|
||||
consumed_approval_ids.push(approval_id);
|
||||
}
|
||||
completed_op_sidecars.push(sidecar_path);
|
||||
}
|
||||
if !failed_graphs.is_empty() {
|
||||
demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies);
|
||||
}
|
||||
|
||||
// State mutation. Apply owns query/policy statuses only; graph/schema
|
||||
// statuses belong to refresh/import observation and must not be clobbered
|
||||
// (the sweep above is the one exception: it owns recovery statuses).
|
||||
|
|
@ -1330,6 +1454,17 @@ pub async fn apply_config_dir_with_options(
|
|||
_ => {}
|
||||
}
|
||||
}
|
||||
for (graph_id, approval_id) in &executed_deletes {
|
||||
tombstone_graph_subtree(
|
||||
&mut new_state,
|
||||
graph_id,
|
||||
approval_id.as_deref(),
|
||||
options.actor.as_deref(),
|
||||
);
|
||||
if let Some(approval_id) = approval_id {
|
||||
record_approval_consumed(&mut new_state, approval_id, "apply");
|
||||
}
|
||||
}
|
||||
recompute_state_graph_digests(&mut new_state, &desired);
|
||||
|
||||
let residual = diff_resources(
|
||||
|
|
@ -1371,6 +1506,9 @@ pub async fn apply_config_dir_with_options(
|
|||
{
|
||||
let _ = fs::remove_file(sidecar_path);
|
||||
}
|
||||
let mut all_consumed = sweep.consumed_approvals.clone();
|
||||
all_consumed.extend(consumed_approval_ids.iter().cloned());
|
||||
mark_approvals_consumed(&backend, &all_consumed);
|
||||
}
|
||||
// On a failed state write, report the statuses that are actually on disk
|
||||
// (the pre-apply snapshot), not the in-memory mutations that were never
|
||||
|
|
@ -1827,6 +1965,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
|
|||
for sidecar_path in &sweep.completed_sidecars {
|
||||
let _ = fs::remove_file(sidecar_path);
|
||||
}
|
||||
mark_approvals_consumed(&backend, &sweep.consumed_approvals);
|
||||
}
|
||||
Err(diagnostic) => diagnostics.push(diagnostic),
|
||||
}
|
||||
|
|
@ -2558,6 +2697,9 @@ async fn sweep_recovery_sidecars(
|
|||
RecoverySidecarKind::SchemaApply => {
|
||||
sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
|
||||
}
|
||||
RecoverySidecarKind::GraphDelete => {
|
||||
sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome);
|
||||
}
|
||||
}
|
||||
}
|
||||
outcome
|
||||
|
|
@ -2778,6 +2920,121 @@ async fn sweep_schema_apply_sidecar(
|
|||
}
|
||||
}
|
||||
|
||||
fn sweep_graph_delete_sidecar(
|
||||
path: PathBuf,
|
||||
sidecar: RecoverySidecar,
|
||||
state: &mut ClusterState,
|
||||
diagnostics: &mut Vec<Diagnostic>,
|
||||
outcome: &mut SweepOutcome,
|
||||
) {
|
||||
let graph_address = graph_address(&sidecar.graph_id);
|
||||
let root = PathBuf::from(&sidecar.graph_uri);
|
||||
|
||||
if root.exists() {
|
||||
// Row 8: the delete never completed. Prefix removal is idempotent and
|
||||
// works on partial roots, so the repair is simply the re-proposed,
|
||||
// still-approved delete on a later run — retire the stale intent.
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"graph_delete_incomplete",
|
||||
graph_address,
|
||||
"a previous graph delete did not complete; it will be re-proposed by plan and can be retried under its approval",
|
||||
));
|
||||
outcome.completed_sidecars.push(path);
|
||||
return;
|
||||
}
|
||||
|
||||
if !state.applied_revision.resources.contains_key(&graph_address) {
|
||||
// Row 7: already tombstoned (or never recorded); crash fell between
|
||||
// the state CAS and sidecar delete.
|
||||
outcome.completed_sidecars.push(path);
|
||||
return;
|
||||
}
|
||||
|
||||
// Row 7b: the root is gone, the ledger is stale — roll forward the
|
||||
// tombstone, consume the approval the sidecar carries, audit.
|
||||
tombstone_graph_subtree(state, &sidecar.graph_id, sidecar.approval_id.as_deref(), sidecar.actor.as_deref());
|
||||
state.recovery_records.insert(
|
||||
sidecar.operation_id.clone(),
|
||||
json!({
|
||||
"kind": "graph_delete",
|
||||
"graph_id": sidecar.graph_id,
|
||||
"outcome": "rolled_forward",
|
||||
"recovered_at": now_rfc3339(),
|
||||
"actor": sidecar.actor,
|
||||
}),
|
||||
);
|
||||
if let Some(approval_id) = &sidecar.approval_id {
|
||||
record_approval_consumed(state, approval_id, &sidecar.operation_id);
|
||||
outcome.consumed_approvals.push(approval_id.clone());
|
||||
}
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"cluster_recovery_rolled_forward",
|
||||
graph_address,
|
||||
"an interrupted graph delete had completed on disk; cluster state was rolled forward to match",
|
||||
));
|
||||
outcome.completed_sidecars.push(path);
|
||||
}
|
||||
|
||||
/// Remove a graph's subtree (graph, schema, queries) from the ledger and
|
||||
/// leave a tombstone observation. Idempotent.
|
||||
fn tombstone_graph_subtree(
|
||||
state: &mut ClusterState,
|
||||
graph_id: &str,
|
||||
approval_id: Option<&str>,
|
||||
actor: Option<&str>,
|
||||
) {
|
||||
let graph_addr = graph_address(graph_id);
|
||||
let schema_addr = schema_address(graph_id);
|
||||
let query_prefix = format!("query.{graph_id}.");
|
||||
state.applied_revision.resources.remove(&graph_addr);
|
||||
state.applied_revision.resources.remove(&schema_addr);
|
||||
state
|
||||
.applied_revision
|
||||
.resources
|
||||
.retain(|address, _| !address.starts_with(&query_prefix));
|
||||
state.resource_statuses.remove(&graph_addr);
|
||||
state.resource_statuses.remove(&schema_addr);
|
||||
state
|
||||
.resource_statuses
|
||||
.retain(|address, _| !address.starts_with(&query_prefix));
|
||||
state.observations.insert(
|
||||
graph_addr,
|
||||
json!({
|
||||
"kind": "tombstone",
|
||||
"deleted_at": now_rfc3339(),
|
||||
"approval_id": approval_id,
|
||||
"actor": actor,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
/// Record approval consumption in the state ledger. The artifact FILE is
|
||||
/// rewritten with consumed_at only after the state write lands, so a failed
|
||||
/// CAS leaves the approval valid for the retry.
|
||||
fn record_approval_consumed(state: &mut ClusterState, approval_id: &str, operation_id: &str) {
|
||||
state.approval_records.insert(
|
||||
approval_id.to_string(),
|
||||
json!({
|
||||
"consumed_at": now_rfc3339(),
|
||||
"consumed_by_operation": operation_id,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
/// Mark approval artifact files consumed on disk (post-CAS).
|
||||
fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) {
|
||||
if approval_ids.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut sink = Vec::new();
|
||||
for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) {
|
||||
if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() {
|
||||
artifact.consumed_at = Some(now_rfc3339());
|
||||
let _ = backend.write_approval_artifact(&artifact);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read-only commands report pending sidecars without acting on them.
|
||||
fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec<Diagnostic>) {
|
||||
let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR);
|
||||
|
|
@ -3547,11 +3804,12 @@ fn classify_changes(
|
|||
schema_pending.insert(graph.clone());
|
||||
}
|
||||
}
|
||||
// Subtree deletes ride the approved graph delete. NOTE: execution lands
|
||||
// with the delete executor; until then approved deletes stay blocked so a
|
||||
// gate-only build can never strip state without removing the root.
|
||||
let rides_approved_delete = |_graph: &str| false;
|
||||
let _ = approved;
|
||||
// Subtree deletes ride the approved graph delete.
|
||||
let rides_approved_delete = |graph: &str| {
|
||||
graph_deletes.contains(graph)
|
||||
&& approved.contains(&graph_address(graph))
|
||||
&& !pending_recovery.contains(graph)
|
||||
};
|
||||
|
||||
for change in changes.iter_mut() {
|
||||
let (disposition, reason) = match resource_kind(&change.resource) {
|
||||
|
|
@ -3662,6 +3920,7 @@ fn classify_changes(
|
|||
enum FailedGraphOrigin {
|
||||
GraphCreate,
|
||||
SchemaApply,
|
||||
GraphDelete,
|
||||
}
|
||||
|
||||
/// After a graph-moving operation fails mid-run, every change that depended
|
||||
|
|
@ -3681,12 +3940,15 @@ fn demote_dependents_of_failed_graphs(
|
|||
let demote_reason = match resource_kind(&change.resource) {
|
||||
ResourceKind::Graph(graph) => match failed.get(&graph) {
|
||||
Some(FailedGraphOrigin::GraphCreate) => Some("graph_create_failed"),
|
||||
Some(FailedGraphOrigin::GraphDelete) => Some("graph_delete_failed"),
|
||||
Some(FailedGraphOrigin::SchemaApply) => Some("dependency_not_applied"),
|
||||
None => None,
|
||||
},
|
||||
ResourceKind::Schema(graph) => match failed.get(&graph) {
|
||||
Some(FailedGraphOrigin::SchemaApply) => Some("schema_apply_failed"),
|
||||
Some(FailedGraphOrigin::GraphCreate) => Some("dependency_not_applied"),
|
||||
Some(FailedGraphOrigin::GraphCreate) | Some(FailedGraphOrigin::GraphDelete) => {
|
||||
Some("dependency_not_applied")
|
||||
}
|
||||
None => None,
|
||||
},
|
||||
ResourceKind::Query { graph, .. } if failed.contains_key(&graph) => {
|
||||
|
|
@ -6606,6 +6868,200 @@ graphs:
|
|||
assert!(sidecar.exists());
|
||||
}
|
||||
|
||||
/// Seed: converged knowledge subtree + a stale `old` graph subtree with a
|
||||
/// real directory on disk.
|
||||
fn seed_deletable_state(config_dir: &Path) {
|
||||
write_applyable_state(config_dir);
|
||||
let state = read_state_json(config_dir);
|
||||
let g = state["applied_revision"]["resources"]["graph.knowledge"]["digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let sc = state["applied_revision"]["resources"]["schema.knowledge"]["digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
write_state_resources(
|
||||
config_dir,
|
||||
&[
|
||||
("graph.knowledge", g.as_str()),
|
||||
("schema.knowledge", sc.as_str()),
|
||||
("graph.old", "3333"),
|
||||
("schema.old", "4444"),
|
||||
("query.old.q", "5555"),
|
||||
],
|
||||
);
|
||||
let root = config_dir.join(CLUSTER_GRAPHS_DIR).join("old.omni");
|
||||
fs::create_dir_all(&root).unwrap();
|
||||
fs::write(root.join("_schema.pg"), "stale").unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_executes_approved_graph_delete() {
|
||||
let dir = fixture();
|
||||
seed_deletable_state(dir.path());
|
||||
let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await;
|
||||
assert!(approved.ok, "{:?}", approved.diagnostics);
|
||||
let approval_id = approved.approval_id.clone().unwrap();
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(out.converged, "{out:?}");
|
||||
let by_resource: BTreeMap<&str, &PlanChange> = out
|
||||
.changes
|
||||
.iter()
|
||||
.map(|change| (change.resource.as_str(), change))
|
||||
.collect();
|
||||
assert_eq!(by_resource["graph.old"].disposition, Some(ApplyDisposition::Applied));
|
||||
assert_eq!(by_resource["schema.old"].disposition, Some(ApplyDisposition::Applied));
|
||||
assert_eq!(by_resource["query.old.q"].disposition, Some(ApplyDisposition::Applied));
|
||||
// The root is gone; the subtree is tombstoned out of the ledger.
|
||||
assert!(!dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni").exists());
|
||||
let state = read_state_json(dir.path());
|
||||
let resources = state["applied_revision"]["resources"].as_object().unwrap();
|
||||
assert!(!resources.contains_key("graph.old"));
|
||||
assert!(!resources.contains_key("schema.old"));
|
||||
assert!(!resources.contains_key("query.old.q"));
|
||||
assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone");
|
||||
assert_eq!(state["observations"]["graph.old"]["approval_id"], approval_id);
|
||||
// Approval consumed in BOTH stores: ledger summary + artifact file.
|
||||
assert!(state["approval_records"][&approval_id]["consumed_at"].is_string());
|
||||
let artifact: serde_json::Value = serde_json::from_str(
|
||||
&fs::read_to_string(
|
||||
dir.path()
|
||||
.join(CLUSTER_APPROVALS_DIR)
|
||||
.join(format!("{approval_id}.json")),
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(artifact["consumed_at"].is_string(), "{artifact}");
|
||||
// Sidecar retired.
|
||||
assert!(
|
||||
fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR))
|
||||
.map(|mut entries| entries.next().is_none())
|
||||
.unwrap_or(true)
|
||||
);
|
||||
// A consumed approval authorizes nothing further (idempotent re-apply).
|
||||
let again = apply_config_dir(dir.path()).await;
|
||||
assert!(again.ok && again.converged && !again.state_written, "{again:?}");
|
||||
}
|
||||
|
||||
fn write_delete_sidecar(
|
||||
config_dir: &Path,
|
||||
graph_id: &str,
|
||||
approval_id: Option<&str>,
|
||||
operation_id: &str,
|
||||
) -> PathBuf {
|
||||
let dir = config_dir.join(CLUSTER_RECOVERIES_DIR);
|
||||
fs::create_dir_all(&dir).unwrap();
|
||||
let path = dir.join(format!("{operation_id}.json"));
|
||||
fs::write(
|
||||
&path,
|
||||
serde_json::to_string_pretty(&json!({
|
||||
"schema_version": 1,
|
||||
"operation_id": operation_id,
|
||||
"started_at": "1970-01-01T00:00:00Z",
|
||||
"kind": "graph_delete",
|
||||
"graph_id": graph_id,
|
||||
"graph_uri": derived_graph_uri(config_dir, graph_id),
|
||||
"desired_schema_digest": "",
|
||||
"approval_id": approval_id,
|
||||
}))
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
path
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sweep_retires_delete_sidecar_when_tombstoned() {
|
||||
let dir = fixture();
|
||||
write_applyable_state(dir.path()); // no graph.old in state, no root
|
||||
let sidecar = write_delete_sidecar(dir.path(), "old", None, "01DROW7");
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(!sidecar.exists());
|
||||
let state = read_state_json(dir.path());
|
||||
assert!(
|
||||
state["recovery_records"]
|
||||
.as_object()
|
||||
.is_none_or(|records| records.is_empty())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sweep_rolls_forward_completed_delete() {
|
||||
let dir = fixture();
|
||||
seed_deletable_state(dir.path());
|
||||
// Approve, then simulate: root removed, state stale, sidecar present.
|
||||
let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await;
|
||||
let approval_id = approved.approval_id.unwrap();
|
||||
fs::remove_dir_all(dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni")).unwrap();
|
||||
let sidecar = write_delete_sidecar(dir.path(), "old", Some(&approval_id), "01DROW7B");
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
|
||||
);
|
||||
assert!(!sidecar.exists());
|
||||
let state = read_state_json(dir.path());
|
||||
assert!(
|
||||
!state["applied_revision"]["resources"]
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.contains_key("graph.old")
|
||||
);
|
||||
assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone");
|
||||
assert!(state["approval_records"][&approval_id]["consumed_at"].is_string());
|
||||
assert!(
|
||||
state["recovery_records"]
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.values()
|
||||
.any(|record| record["kind"] == "graph_delete"
|
||||
&& record["outcome"] == "rolled_forward")
|
||||
);
|
||||
// The artifact file is marked consumed post-CAS.
|
||||
let artifact: serde_json::Value = serde_json::from_str(
|
||||
&fs::read_to_string(
|
||||
dir.path()
|
||||
.join(CLUSTER_APPROVALS_DIR)
|
||||
.join(format!("{approval_id}.json")),
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(artifact["consumed_at"].is_string());
|
||||
assert!(out.converged, "{out:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sweep_reproposes_incomplete_delete() {
|
||||
let dir = fixture();
|
||||
seed_deletable_state(dir.path()); // root present
|
||||
let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await;
|
||||
assert!(approved.ok);
|
||||
let sidecar = write_delete_sidecar(dir.path(), "old", approved.approval_id.as_deref(), "01DROW8");
|
||||
|
||||
// Row 8: the stale intent is retired with a warning, and the same run
|
||||
// re-executes the still-approved delete to completion.
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "graph_delete_incomplete")
|
||||
);
|
||||
assert!(!sidecar.exists());
|
||||
assert!(!dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni").exists());
|
||||
assert!(out.converged, "{out:?}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_warns_on_pending_recovery_sidecar() {
|
||||
let dir = fixture();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue