diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index d47e13c..7ab7ca9 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1285,7 +1285,7 @@ node Person { /// Disaster input fails closed: a destroyed graph root drifts the ledger, /// the plan proposes deferred creates, and apply moves nothing. #[test] -fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() { +fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph() { let temp = tempdir().unwrap(); write_cluster_config_fixture(temp.path()); init_cluster_derived_graph(temp.path()); @@ -1327,15 +1327,20 @@ fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() { let plan = cluster_json(temp.path(), "plan"); assert_eq!(change_for(&plan, "graph.knowledge")["operation"], "create"); - assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred"); - assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred"); + // Stage 4A: the re-create is executable and the plan says so — nothing + // hidden about converging a destroyed root back to an EMPTY graph (the + // data was already lost; this is declarative convergence, RFC-004 §D1). + assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "applied"); + assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "applied"); // Converged-then-destroyed: query/policy are already in state at the // desired digests, so they are not changes at all. assert_eq!(plan["changes"].as_array().unwrap().len(), 2, "{plan}"); - let disaster_apply = cluster_json(temp.path(), "apply"); - assert_eq!(disaster_apply["applied_count"], 0, "{disaster_apply}"); - assert_eq!(disaster_apply["converged"], false, "{disaster_apply}"); + let recreate = cluster_json(temp.path(), "apply"); + assert_eq!(recreate["ok"], true, "{recreate}"); + assert_eq!(recreate["converged"], true, "{recreate}"); + // The empty graph is back on disk; catalog state survived throughout. + assert!(temp.path().join("graphs/knowledge.omni").exists()); let state: serde_json::Value = serde_json::from_str( &fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(), ) @@ -1352,59 +1357,84 @@ fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() { ); } -/// The disposition matrix as a system: one apply over two graphs (one live, -/// one not yet created) plus graph-spanning and cluster-scoped policies must -/// produce all four dispositions at once — then converge after the second -/// graph appears. +/// The disposition matrix as a system under Stage 4A: a fresh multi-graph +/// config converges in ONE apply (both graphs created, spanning and +/// cluster-scoped policies applied), and a later mixed run — schema update +/// (deferred), its dependent query (blocked), an independent query update +/// (applied), its composite (derived) — shows all four dispositions at once +/// before the graph-plane schema apply closes the loop. #[test] fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { let temp = tempdir().unwrap(); write_multi_graph_cluster_fixture(temp.path()); - init_cluster_derived_graph(temp.path()); // knowledge only + // No manual init: Stage 4A creates both graphs. let import = cluster_json(temp.path(), "import"); assert_eq!(import["ok"], true, "{import}"); let apply = cluster_json(temp.path(), "apply"); assert_eq!(apply["ok"], true, "{apply}"); - assert_eq!(apply["converged"], false, "{apply}"); - assert_eq!(apply["applied_count"], 2, "{apply}"); + assert_eq!(apply["converged"], true, "{apply}"); + assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied"); assert_eq!( - change_for(&apply, "query.knowledge.find_person")["disposition"], - "applied" - ); - assert_eq!( - change_for(&apply, "policy.cluster_wide")["disposition"], + change_for(&apply, "graph.engineering")["disposition"], "applied" ); assert_eq!( change_for(&apply, "query.engineering.find_service")["disposition"], + "applied" + ); + // The graph-spanning and cluster-scoped policies ride the same run. + assert_eq!(change_for(&apply, "policy.shared")["disposition"], "applied"); + assert_eq!( + change_for(&apply, "policy.cluster_wide")["disposition"], + "applied" + ); + assert!(temp.path().join("graphs/knowledge.omni").exists()); + assert!(temp.path().join("graphs/engineering.omni").exists()); + + // Mixed run: a knowledge schema update (4B territory — deferred) gates + // its query update (blocked), while an engineering query update is + // independent (applied) and re-derives its composite. + fs::write( + temp.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("services.gq"), + "\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name, $s.name }\n}\n", + ) + .unwrap(); + + let mixed = cluster_json(temp.path(), "apply"); + assert_eq!(mixed["ok"], true, "{mixed}"); + assert_eq!(mixed["converged"], false, "{mixed}"); + assert_eq!(change_for(&mixed, "schema.knowledge")["disposition"], "deferred"); + assert_eq!(change_for(&mixed, "graph.knowledge")["disposition"], "deferred"); + assert_eq!( + change_for(&mixed, "query.knowledge.find_person")["disposition"], "blocked" ); assert_eq!( - change_for(&apply, "query.engineering.find_service")["reason"], - "dependency_missing" - ); - // One missing dependency graph blocks the whole spanning policy. - assert_eq!(change_for(&apply, "policy.shared")["disposition"], "blocked"); - assert_eq!( - change_for(&apply, "graph.engineering")["disposition"], - "deferred" + change_for(&mixed, "query.knowledge.find_person")["reason"], + "dependency_not_applied" ); assert_eq!( - change_for(&apply, "schema.engineering")["disposition"], - "deferred" + change_for(&mixed, "query.engineering.find_service")["disposition"], + "applied" ); assert_eq!( - change_for(&apply, "graph.knowledge")["disposition"], + change_for(&mixed, "graph.engineering")["disposition"], "derived" ); - assert_eq!( - apply["resource_statuses"]["policy.shared"]["status"], - "blocked" - ); // Deterministic ordering: changes sorted by resource address. - let order: Vec<&str> = apply["changes"] + let order: Vec<&str> = mixed["changes"] .as_array() .unwrap() .iter() @@ -1412,21 +1442,22 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { .collect(); let mut sorted = order.clone(); sorted.sort_unstable(); - assert_eq!(order, sorted, "{apply}"); + assert_eq!(order, sorted, "{mixed}"); - // The second graph appears; refresh observes it; apply converges. - init_named_cluster_graph(temp.path(), "engineering", "services.pg"); + // The graph-plane tool applies the schema; refresh observes; converge. + output_success( + cli() + .arg("schema") + .arg("apply") + .arg(temp.path().join("graphs/knowledge.omni")) + .arg("--schema") + .arg(temp.path().join("people.pg")) + .arg("--json"), + ); let refresh = cluster_json(temp.path(), "refresh"); assert_eq!(refresh["ok"], true, "{refresh}"); - let converge = cluster_json(temp.path(), "apply"); - assert_eq!(converge["ok"], true, "{converge}"); assert_eq!(converge["converged"], true, "{converge}"); - assert_eq!( - change_for(&converge, "query.engineering.find_service")["disposition"], - "applied" - ); - assert_eq!(change_for(&converge, "policy.shared")["disposition"], "applied"); let final_plan = cluster_json(temp.path(), "plan"); assert!( @@ -1435,6 +1466,39 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { ); } +/// Stage 4A headline: a declared graph is created by `cluster apply` itself — +/// no manual `omnigraph init` anywhere in the flow. +#[test] +fn cluster_e2e_declared_graph_created_by_apply() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["ok"], true, "{apply}"); + assert_eq!(apply["converged"], true, "{apply}"); + assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied"); + assert!(temp.path().join("graphs/knowledge.omni").exists()); + + // The created graph is a real graph: the per-graph CLI can open it. + let snapshot = output_success( + cli() + .arg("snapshot") + .arg(temp.path().join("graphs/knowledge.omni")), + ); + assert!(!stdout_string(&snapshot).is_empty()); + + let plan = cluster_json(temp.path(), "plan"); + assert!(plan["changes"].as_array().unwrap().is_empty(), "{plan}"); + let status = cluster_json(temp.path(), "status"); + assert_eq!( + status["resource_statuses"]["graph.knowledge"]["status"], + "applied" + ); +} + /// Catalog payload drift self-heals across the command surface: status warns /// read-only, refresh persists the drift and drops the digest, apply /// republishes the blob, status comes back clean. diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index fc13bab..863691c 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -577,7 +577,9 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } else { diff_resources(&prior_resources, &desired.resource_digests) }; - classify_changes(&mut changes, &desired.dependencies); + // Plan previews dispositions without sweeping; a pending recovery is + // surfaced as the cluster_recovery_pending warning above instead. + classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new()); let blast_radius = compute_blast_radius(&changes, &desired.dependencies); let approvals_required = compute_approvals(&changes); let ok = !has_errors(&diagnostics); @@ -727,8 +729,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); - classify_changes(&mut changes, &desired.dependencies); - let _ = &sweep.pending_graphs; // consumed by the graph-create executor (4A C4) + classify_changes(&mut changes, &desired.dependencies, &sweep.pending_graphs); // Defensive invariant: nothing the approval gate covers may be executable. // Today approvals only cover graph/schema deletes (always deferred); this @@ -756,6 +757,169 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ); } + // Graph creates execute first (RFC-004 §D5), sequentially, sidecar-fenced: + // sidecar written before the init, rewritten with the post-init manifest + // version, deleted only after the final state CAS lands. A failure stops + // further graph-moving work and demotes that graph's dependents. + let source_paths: BTreeMap<&str, &str> = desired + .resources + .iter() + .filter_map(|resource| { + resource + .path + .as_deref() + .map(|path| (resource.address.as_str(), path)) + }) + .collect(); + let graph_creates_to_run: Vec = changes + .iter() + .filter(|change| { + change.disposition == Some(ApplyDisposition::Applied) + && change.operation == PlanOperation::Create + && matches!(resource_kind(&change.resource), ResourceKind::Graph(_)) + }) + .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) + .collect(); + let mut completed_create_sidecars: Vec = Vec::new(); + let mut failed_graphs: BTreeSet = BTreeSet::new(); + let mut creates_aborted = false; + for graph_id in &graph_creates_to_run { + if creates_aborted { + // A prior create failed: stop graph-moving work (loud partials). + diagnostics.push(Diagnostic::warning( + "graph_create_skipped", + graph_address(graph_id), + "skipped after an earlier graph create failed in this run", + )); + failed_graphs.insert(graph_id.clone()); + continue; + } + let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) + else { + continue; + }; + let graph_uri = display_path( + &desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ); + let mut sidecar = RecoverySidecar { + schema_version: 1, + operation_id: Ulid::new().to_string(), + started_at: now_rfc3339(), + actor: None, + kind: RecoverySidecarKind::GraphCreate, + graph_id: graph_id.clone(), + graph_uri: graph_uri.clone(), + observed_manifest_version: None, + expected_manifest_version: None, + desired_schema_digest: desired_graph.schema_digest.clone(), + state_cas_base: expected_cas.clone(), + }; + let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + Ok(path) => path, + Err(diagnostic) => { + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone()); + creates_aborted = true; + continue; + } + }; + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_create") { + // Simulated crash before the init: the sidecar stays for the + // sweep (row 1: root absent -> intent removed next run). + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone()); + creates_aborted = true; + continue; + } + // Re-read + re-verify the schema source under the lock — the same + // TOCTOU posture as write_resource_payload. + let schema_source = source_paths + .get(schema_address(graph_id).as_str()) + .ok_or_else(|| { + Diagnostic::error( + "graph_create_failed", + graph_address(graph_id), + "no schema source recorded for graph", + ) + }) + .and_then(|path| { + fs::read_to_string(Path::new(path)).map_err(|err| { + Diagnostic::error( + "graph_create_failed", + graph_address(graph_id), + format!("could not read schema source '{path}': {err}"), + ) + }) + }) + .and_then(|source| { + if sha256_hex(source.as_bytes()) == desired_graph.schema_digest { + Ok(source) + } else { + Err(Diagnostic::error( + "resource_content_changed", + schema_address(graph_id), + "schema source changed while apply was running; re-run `cluster apply`", + )) + } + }); + let schema_source = match schema_source { + Ok(source) => source, + Err(diagnostic) => { + diagnostics.push(diagnostic); + let _ = fs::remove_file(&sidecar_path); // nothing moved + failed_graphs.insert(graph_id.clone()); + creates_aborted = true; + continue; + } + }; + match Omnigraph::init(&graph_uri, &schema_source).await { + Ok(_) => {} + Err(err) => { + diagnostics.push(Diagnostic::error( + "graph_create_failed", + graph_address(graph_id), + format!("could not initialize graph at '{graph_uri}': {err}"), + )); + // The sidecar stays: the sweep classifies whether the failed + // init left a partial root (row 5) or nothing (row 1). + failed_graphs.insert(graph_id.clone()); + creates_aborted = true; + continue; + } + } + // Record the post-init pin in the sidecar (best effort — a failure + // here leaves expected = null and the sweep classifies by digest). + if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await { + if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await { + sidecar.expected_manifest_version = Some(snapshot.version()); + if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { + diagnostics.push(diagnostic); + } + } + } + // Crash point: the graph exists, the cluster state does not record it + // yet. A failure here must acknowledge nothing; the next run's sweep + // rolls the ledger forward (row 4). + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_create") { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + completed_create_sidecars.push(sidecar_path); + } + if !failed_graphs.is_empty() { + demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies); + } + for change in &changes { match change.disposition { Some(ApplyDisposition::Deferred) => diagnostics.push(Diagnostic::warning( @@ -780,16 +944,6 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { // Gate on payload-phase errors only — sweep errors (e.g. a kept row-5 // sidecar) must not abort the run, or their statuses would never persist. let errors_before_payloads = count_errors(&diagnostics); - let source_paths: BTreeMap<&str, &str> = desired - .resources - .iter() - .filter_map(|resource| { - resource - .path - .as_deref() - .map(|path| (resource.address.as_str(), path)) - }) - .collect(); for change in &changes { if change.disposition != Some(ApplyDisposition::Applied) || change.operation == PlanOperation::Delete @@ -869,13 +1023,17 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { } }, Some(ApplyDisposition::Blocked) => { - set_resource_status( - &mut new_state, - &change.resource, - ResourceLifecycleStatus::Blocked, - change.reason.as_deref().unwrap_or("dependency_not_applied"), - "waiting on an unapplied or missing dependency", - ); + // The sweep owns recovery statuses (Drifted/Error with their + // conditions); a generic Blocked must not clobber them. + if change.reason.as_deref() != Some("cluster_recovery_pending") { + set_resource_status( + &mut new_state, + &change.resource, + ResourceLifecycleStatus::Blocked, + change.reason.as_deref().unwrap_or("dependency_not_applied"), + "waiting on an unapplied or missing dependency", + ); + } } _ => {} } @@ -914,7 +1072,11 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { // Completed (rows 2/4) sweep sidecars are deleted only once their outcome // is durably recorded; on a failed write they stay and re-sweep next run. if !state_write_failed { - for sidecar_path in &sweep.completed_sidecars { + for sidecar_path in sweep + .completed_sidecars + .iter() + .chain(completed_create_sidecars.iter()) + { let _ = fs::remove_file(sidecar_path); } } @@ -2669,15 +2831,27 @@ fn resource_kind(address: &str) -> ResourceKind { /// it. Stage 3A executes only query/policy catalog writes; graph/schema /// movement is a later phase, and `graph.` composite updates whose schema /// component is unchanged converge automatically once query digests land. -fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { - let mut schema_changed = BTreeSet::new(); +fn classify_changes( + changes: &mut [PlanChange], + dependencies: &[Dependency], + pending_recovery: &BTreeSet, +) { + let mut schema_creates = BTreeSet::new(); + let mut schema_pending = BTreeSet::new(); let mut graph_creates = BTreeSet::new(); let mut graph_deletes = BTreeSet::new(); for change in changes.iter() { match resource_kind(&change.resource) { - ResourceKind::Schema(graph) => { - schema_changed.insert(graph); - } + ResourceKind::Schema(graph) => match change.operation { + PlanOperation::Create => { + schema_creates.insert(graph); + } + // Schema updates (4B) and deletes (4C) are still pending in + // this stage and block dependents. + _ => { + schema_pending.insert(graph); + } + }, ResourceKind::Graph(graph) => match change.operation { PlanOperation::Create => { graph_creates.insert(graph); @@ -2690,12 +2864,38 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { _ => {} } } + // A schema Create is satisfied by its paired graph create (the init + // carries the schema); a standalone schema Create stays pending. + for graph in &schema_creates { + if !graph_creates.contains(graph) { + schema_pending.insert(graph.clone()); + } + } for change in changes.iter_mut() { let (disposition, reason) = match resource_kind(&change.resource) { - ResourceKind::Schema(_) => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), + ResourceKind::Schema(graph) => match change.operation { + PlanOperation::Create + if graph_creates.contains(&graph) + && !pending_recovery.contains(&graph) => + { + // Applied with the graph create — the init carries it. + (ApplyDisposition::Applied, None) + } + PlanOperation::Create if graph_creates.contains(&graph) => { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } + _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), + }, ResourceKind::Graph(graph) => match change.operation { - PlanOperation::Update if !schema_changed.contains(&graph) => { + PlanOperation::Create => { + if pending_recovery.contains(&graph) { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } else { + (ApplyDisposition::Applied, None) + } + } + PlanOperation::Update if !schema_pending.contains(&graph) => { (ApplyDisposition::Derived, None) } _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), @@ -2712,16 +2912,16 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { } } PlanOperation::Create | PlanOperation::Update => { - // A missing graph is the more fundamental blocker than a - // pending schema change, so check it first. - if graph_creates.contains(&graph) { - (ApplyDisposition::Blocked, Some("dependency_missing")) - } else if schema_changed.contains(&graph) { + if pending_recovery.contains(&graph) { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } else if schema_pending.contains(&graph) { ( ApplyDisposition::Blocked, Some("dependency_not_applied"), ) } else { + // A graph create in the same plan no longer blocks: + // creates execute first in the same apply run. (ApplyDisposition::Applied, None) } } @@ -2729,15 +2929,15 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { ResourceKind::Policy(_) => match change.operation { PlanOperation::Delete => (ApplyDisposition::Applied, None), PlanOperation::Create | PlanOperation::Update => { - let blocked_dep = dependencies.iter().any(|dep| { + let blocked_pending = dependencies.iter().any(|dep| { dep.from == change.resource && dep .to .strip_prefix("graph.") - .is_some_and(|graph| graph_creates.contains(graph)) + .is_some_and(|graph| pending_recovery.contains(graph)) }); - if blocked_dep { - (ApplyDisposition::Blocked, Some("dependency_missing")) + if blocked_pending { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) } else { (ApplyDisposition::Applied, None) } @@ -2752,6 +2952,46 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { } } +/// After a graph create fails mid-run, every change that depended on that +/// graph (its schema, its queries, policies referencing it) flips from +/// Applied to Blocked so the output and the persisted statuses tell the +/// truth about what this run actually executed. +fn demote_dependents_of_failed_graphs( + changes: &mut [PlanChange], + failed: &BTreeSet, + dependencies: &[Dependency], +) { + for change in changes.iter_mut() { + if change.disposition != Some(ApplyDisposition::Applied) { + continue; + } + let demote_reason = match resource_kind(&change.resource) { + ResourceKind::Graph(graph) if failed.contains(&graph) => Some("graph_create_failed"), + ResourceKind::Schema(graph) if failed.contains(&graph) => { + Some("dependency_not_applied") + } + ResourceKind::Query { graph, .. } if failed.contains(&graph) => { + Some("dependency_not_applied") + } + ResourceKind::Policy(_) => { + let blocked = dependencies.iter().any(|dep| { + dep.from == change.resource + && dep + .to + .strip_prefix("graph.") + .is_some_and(|graph| failed.contains(graph)) + }); + blocked.then_some("dependency_not_applied") + } + _ => None, + }; + if let Some(reason) = demote_reason { + change.disposition = Some(ApplyDisposition::Blocked); + change.reason = Some(reason.to_string()); + } + } +} + /// Content-addressed catalog path for an applied resource payload. Extensions /// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name, /// so the catalog layout cannot drift with operator file conventions. @@ -4525,45 +4765,117 @@ graphs: } #[tokio::test] - async fn apply_blocks_resources_of_uncreated_graph() { + async fn apply_creates_graph_and_unblocks_dependents() { let dir = fixture(); write_state_resources(dir.path(), &[]); let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); - assert_eq!(out.applied_count, 0); - assert!(!out.converged); + assert!(out.converged, "{out:?}"); let by_resource: BTreeMap<&str, &PlanChange> = out .changes .iter() .map(|change| (change.resource.as_str(), change)) .collect(); + // Stage 4A: the create executes, and its dependents apply in-run. assert_eq!( by_resource["graph.knowledge"].disposition, - Some(ApplyDisposition::Deferred) + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["schema.knowledge"].disposition, + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["policy.base"].disposition, + Some(ApplyDisposition::Applied) + ); + // The graph exists on disk and opens; state records everything. + let graph_uri = derived_graph_uri(dir.path(), "knowledge"); + let db = Omnigraph::open_read_only(&graph_uri).await.unwrap(); + let desired = validate_config_dir(dir.path()); + assert_eq!( + sha256_hex(db.schema_source().as_bytes()), + desired.resource_digests["schema.knowledge"] + ); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + desired.resource_digests["schema.knowledge"] + ); + assert_eq!( + state["resource_statuses"]["graph.knowledge"]["status"], + "applied" + ); + // The create's sidecar was retired after the state CAS landed. + assert!( + !dir.path().join(CLUSTER_RECOVERIES_DIR).exists() + || fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR)) + .unwrap() + .next() + .is_none() + ); + } + + #[tokio::test] + async fn apply_create_failure_blocks_dependents_and_keeps_sidecar() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); + // Make the init fail its strict preflight: a junk _schema.pg already + // sits at the derived root (the engine refuses to overwrite it). + let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "junk").unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_create_failed") + ); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + // Dependents are demoted: the run tells the truth about what executed. + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Blocked) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Blocked) ); assert_eq!( by_resource["query.knowledge.find_person"].reason.as_deref(), - Some("dependency_missing") + Some("dependency_not_applied") ); assert_eq!( - by_resource["policy.base"].reason.as_deref(), - Some("dependency_missing") + by_resource["policy.base"].disposition, + Some(ApplyDisposition::Blocked) ); - // Statuses for blocked resources are recorded (state changed), but no - // resource digests moved. + assert!(!out.converged); + // The sidecar stays for the sweep to classify next run. + assert!( + fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR)) + .unwrap() + .next() + .is_some() + ); + // No graph digests moved. let state = read_state_json(dir.path()); - assert_eq!(state["state_revision"], 2); assert!( state["applied_revision"]["resources"] .as_object() .unwrap() .is_empty() ); - assert_eq!( - state["resource_statuses"]["policy.base"]["status"], - "blocked" - ); } #[tokio::test] @@ -5147,6 +5459,47 @@ graphs: ); } + #[tokio::test] + async fn apply_blocks_create_while_recovery_pending() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); + // A kept (row 5) sidecar: partial root that cannot be opened. + let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "junk").unwrap(); + let sidecar = write_create_sidecar(dir.path(), "knowledge", "whatever", "01PEND"); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); // row 5 is an error condition + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + // The pending recovery blocks the create and its dependents; the + // executor never attempts the init. + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Blocked) + ); + assert_eq!( + by_resource["graph.knowledge"].reason.as_deref(), + Some("cluster_recovery_pending") + ); + assert_eq!( + by_resource["query.knowledge.find_person"].reason.as_deref(), + Some("cluster_recovery_pending") + ); + assert_eq!( + by_resource["policy.base"].reason.as_deref(), + Some("cluster_recovery_pending") + ); + assert!(sidecar.exists()); + // The sweep's Error status is what persists — not a generic Blocked. + let state = read_state_json(dir.path()); + assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error"); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture(); @@ -5173,23 +5526,23 @@ graphs: .iter() .map(|change| (change.resource.as_str(), change)) .collect(); - // Empty state: graph/schema creates are deferred, query/policy blocked - // on the uncreated graph — and plan says so before apply runs. + // Stage 4A: graph/schema creates are executable, and dependents ride + // the same run — plan previews exactly that. assert_eq!( by_resource["graph.knowledge"].disposition, - Some(ApplyDisposition::Deferred) + Some(ApplyDisposition::Applied) ); assert_eq!( by_resource["schema.knowledge"].disposition, - Some(ApplyDisposition::Deferred) + Some(ApplyDisposition::Applied) ); assert_eq!( by_resource["query.knowledge.find_person"].disposition, - Some(ApplyDisposition::Blocked) + Some(ApplyDisposition::Applied) ); assert_eq!( - by_resource["policy.base"].reason.as_deref(), - Some("dependency_missing") + by_resource["policy.base"].disposition, + Some(ApplyDisposition::Applied) ); } }