diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 37db77f..08c1fab 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -3558,7 +3558,7 @@ async fn main() -> Result<()> { finish_cluster_plan(&output, json)?; } ClusterCommand::Apply { config, json } => { - let output = apply_config_dir(config); + let output = apply_config_dir(config).await; finish_cluster_apply(&output, json)?; } ClusterCommand::Status { config, json } => { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index d47e13c..7ab7ca9 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1285,7 +1285,7 @@ node Person { /// Disaster input fails closed: a destroyed graph root drifts the ledger, /// the plan proposes deferred creates, and apply moves nothing. #[test] -fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() { +fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph() { let temp = tempdir().unwrap(); write_cluster_config_fixture(temp.path()); init_cluster_derived_graph(temp.path()); @@ -1327,15 +1327,20 @@ fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() { let plan = cluster_json(temp.path(), "plan"); assert_eq!(change_for(&plan, "graph.knowledge")["operation"], "create"); - assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred"); - assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred"); + // Stage 4A: the re-create is executable and the plan says so — nothing + // hidden about converging a destroyed root back to an EMPTY graph (the + // data was already lost; this is declarative convergence, RFC-004 §D1). + assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "applied"); + assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "applied"); // Converged-then-destroyed: query/policy are already in state at the // desired digests, so they are not changes at all. assert_eq!(plan["changes"].as_array().unwrap().len(), 2, "{plan}"); - let disaster_apply = cluster_json(temp.path(), "apply"); - assert_eq!(disaster_apply["applied_count"], 0, "{disaster_apply}"); - assert_eq!(disaster_apply["converged"], false, "{disaster_apply}"); + let recreate = cluster_json(temp.path(), "apply"); + assert_eq!(recreate["ok"], true, "{recreate}"); + assert_eq!(recreate["converged"], true, "{recreate}"); + // The empty graph is back on disk; catalog state survived throughout. + assert!(temp.path().join("graphs/knowledge.omni").exists()); let state: serde_json::Value = serde_json::from_str( &fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(), ) @@ -1352,59 +1357,84 @@ fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() { ); } -/// The disposition matrix as a system: one apply over two graphs (one live, -/// one not yet created) plus graph-spanning and cluster-scoped policies must -/// produce all four dispositions at once — then converge after the second -/// graph appears. +/// The disposition matrix as a system under Stage 4A: a fresh multi-graph +/// config converges in ONE apply (both graphs created, spanning and +/// cluster-scoped policies applied), and a later mixed run — schema update +/// (deferred), its dependent query (blocked), an independent query update +/// (applied), its composite (derived) — shows all four dispositions at once +/// before the graph-plane schema apply closes the loop. #[test] fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { let temp = tempdir().unwrap(); write_multi_graph_cluster_fixture(temp.path()); - init_cluster_derived_graph(temp.path()); // knowledge only + // No manual init: Stage 4A creates both graphs. let import = cluster_json(temp.path(), "import"); assert_eq!(import["ok"], true, "{import}"); let apply = cluster_json(temp.path(), "apply"); assert_eq!(apply["ok"], true, "{apply}"); - assert_eq!(apply["converged"], false, "{apply}"); - assert_eq!(apply["applied_count"], 2, "{apply}"); + assert_eq!(apply["converged"], true, "{apply}"); + assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied"); assert_eq!( - change_for(&apply, "query.knowledge.find_person")["disposition"], - "applied" - ); - assert_eq!( - change_for(&apply, "policy.cluster_wide")["disposition"], + change_for(&apply, "graph.engineering")["disposition"], "applied" ); assert_eq!( change_for(&apply, "query.engineering.find_service")["disposition"], + "applied" + ); + // The graph-spanning and cluster-scoped policies ride the same run. + assert_eq!(change_for(&apply, "policy.shared")["disposition"], "applied"); + assert_eq!( + change_for(&apply, "policy.cluster_wide")["disposition"], + "applied" + ); + assert!(temp.path().join("graphs/knowledge.omni").exists()); + assert!(temp.path().join("graphs/engineering.omni").exists()); + + // Mixed run: a knowledge schema update (4B territory — deferred) gates + // its query update (blocked), while an engineering query update is + // independent (applied) and re-derives its composite. + fs::write( + temp.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("services.gq"), + "\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name, $s.name }\n}\n", + ) + .unwrap(); + + let mixed = cluster_json(temp.path(), "apply"); + assert_eq!(mixed["ok"], true, "{mixed}"); + assert_eq!(mixed["converged"], false, "{mixed}"); + assert_eq!(change_for(&mixed, "schema.knowledge")["disposition"], "deferred"); + assert_eq!(change_for(&mixed, "graph.knowledge")["disposition"], "deferred"); + assert_eq!( + change_for(&mixed, "query.knowledge.find_person")["disposition"], "blocked" ); assert_eq!( - change_for(&apply, "query.engineering.find_service")["reason"], - "dependency_missing" - ); - // One missing dependency graph blocks the whole spanning policy. - assert_eq!(change_for(&apply, "policy.shared")["disposition"], "blocked"); - assert_eq!( - change_for(&apply, "graph.engineering")["disposition"], - "deferred" + change_for(&mixed, "query.knowledge.find_person")["reason"], + "dependency_not_applied" ); assert_eq!( - change_for(&apply, "schema.engineering")["disposition"], - "deferred" + change_for(&mixed, "query.engineering.find_service")["disposition"], + "applied" ); assert_eq!( - change_for(&apply, "graph.knowledge")["disposition"], + change_for(&mixed, "graph.engineering")["disposition"], "derived" ); - assert_eq!( - apply["resource_statuses"]["policy.shared"]["status"], - "blocked" - ); // Deterministic ordering: changes sorted by resource address. - let order: Vec<&str> = apply["changes"] + let order: Vec<&str> = mixed["changes"] .as_array() .unwrap() .iter() @@ -1412,21 +1442,22 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { .collect(); let mut sorted = order.clone(); sorted.sort_unstable(); - assert_eq!(order, sorted, "{apply}"); + assert_eq!(order, sorted, "{mixed}"); - // The second graph appears; refresh observes it; apply converges. - init_named_cluster_graph(temp.path(), "engineering", "services.pg"); + // The graph-plane tool applies the schema; refresh observes; converge. + output_success( + cli() + .arg("schema") + .arg("apply") + .arg(temp.path().join("graphs/knowledge.omni")) + .arg("--schema") + .arg(temp.path().join("people.pg")) + .arg("--json"), + ); let refresh = cluster_json(temp.path(), "refresh"); assert_eq!(refresh["ok"], true, "{refresh}"); - let converge = cluster_json(temp.path(), "apply"); - assert_eq!(converge["ok"], true, "{converge}"); assert_eq!(converge["converged"], true, "{converge}"); - assert_eq!( - change_for(&converge, "query.engineering.find_service")["disposition"], - "applied" - ); - assert_eq!(change_for(&converge, "policy.shared")["disposition"], "applied"); let final_plan = cluster_json(temp.path(), "plan"); assert!( @@ -1435,6 +1466,39 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { ); } +/// Stage 4A headline: a declared graph is created by `cluster apply` itself — +/// no manual `omnigraph init` anywhere in the flow. +#[test] +fn cluster_e2e_declared_graph_created_by_apply() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["ok"], true, "{apply}"); + assert_eq!(apply["converged"], true, "{apply}"); + assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied"); + assert!(temp.path().join("graphs/knowledge.omni").exists()); + + // The created graph is a real graph: the per-graph CLI can open it. + let snapshot = output_success( + cli() + .arg("snapshot") + .arg(temp.path().join("graphs/knowledge.omni")), + ); + assert!(!stdout_string(&snapshot).is_empty()); + + let plan = cluster_json(temp.path(), "plan"); + assert!(plan["changes"].as_array().unwrap().is_empty(), "{plan}"); + let status = cluster_json(temp.path(), "status"); + assert_eq!( + status["resource_statuses"]["graph.knowledge"]["status"], + "applied" + ); +} + /// Catalog payload drift self-heals across the command surface: status warns /// read-only, refresh persists the drift and drops the digest, apply /// republishes the blob, status comes back clean. diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 660f34c..863691c 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -24,6 +24,7 @@ pub const CLUSTER_STATE_DIR: &str = "__cluster"; pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; +pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries"; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -415,11 +416,53 @@ struct StateLockFile { pid: u32, } +/// Recovery-intent record for a graph-moving apply operation (RFC-004 §D2). +/// Written under the state lock before the engine call that can create or +/// move a graph manifest; deleted only after the cluster state CAS that +/// records the outcome lands. The sweep (§D3) classifies survivors. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct RecoverySidecar { + schema_version: u32, + operation_id: String, + started_at: String, + #[serde(default)] + actor: Option, + kind: RecoverySidecarKind, + graph_id: String, + graph_uri: String, + #[serde(default)] + observed_manifest_version: Option, + #[serde(default)] + expected_manifest_version: Option, + desired_schema_digest: String, + #[serde(default)] + state_cas_base: Option, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +enum RecoverySidecarKind { + GraphCreate, + // SchemaApply and GraphDelete arrive with stages 4B/4C. +} + +#[derive(Debug, Default)] +struct SweepOutcome { + /// Graphs whose sidecar was kept (rows 5/6): graph-moving work for them + /// is blocked until the operator repairs and re-observes. + pending_graphs: BTreeSet, + /// Sidecars whose outcome is recorded (rows 2/4): deleted only after the + /// command's state write lands, so a CAS failure re-sweeps them. + completed_sidecars: Vec, +} + #[derive(Debug)] struct LocalStateBackend { state_dir: PathBuf, state_path: PathBuf, lock_path: PathBuf, + recoveries_dir: PathBuf, } #[derive(Debug)] @@ -513,6 +556,10 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { None }; + // Plan is read-only: pending sidecars are reported, never acted on + // (RFC-004 open question 3 keeps read-only commands warn-only). + warn_pending_recovery_sidecars(&desired.config_dir, &mut diagnostics); + let mut prior_resources = BTreeMap::new(); if !has_errors(&diagnostics) { match backend.read_state(&mut observations) { @@ -530,7 +577,9 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } else { diff_resources(&prior_resources, &desired.resource_digests) }; - classify_changes(&mut changes, &desired.dependencies); + // Plan previews dispositions without sweeping; a pending recovery is + // surfaced as the cluster_recovery_pending warning above instead. + classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new()); let blast_radius = compute_blast_radius(&changes, &desired.dependencies); let approvals_required = compute_approvals(&changes); let ok = !has_errors(&diagnostics); @@ -561,7 +610,7 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { /// state is the publish point: a failure after payload writes leaves inert /// digest-named blobs and no success acknowledgement; re-running apply is the /// repair. -pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { +pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); @@ -656,7 +705,7 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { } }; let expected_cas = snapshot.state_cas; - let Some(state) = snapshot.state else { + let Some(mut state) = snapshot.state else { diagnostics.push(Diagnostic::error( "state_missing", CLUSTER_STATE_FILE, @@ -672,9 +721,15 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ); }; + // Snapshot the as-read state BEFORE the sweep so sweep mutations count as + // changes for the final dirty check and get persisted by the state CAS. + let before_value = + serde_json::to_value(&state).expect("cluster state must serialize deterministically"); + let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await; + let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); - classify_changes(&mut changes, &desired.dependencies); + classify_changes(&mut changes, &desired.dependencies, &sweep.pending_graphs); // Defensive invariant: nothing the approval gate covers may be executable. // Today approvals only cover graph/schema deletes (always deferred); this @@ -702,6 +757,169 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ); } + // Graph creates execute first (RFC-004 §D5), sequentially, sidecar-fenced: + // sidecar written before the init, rewritten with the post-init manifest + // version, deleted only after the final state CAS lands. A failure stops + // further graph-moving work and demotes that graph's dependents. + let source_paths: BTreeMap<&str, &str> = desired + .resources + .iter() + .filter_map(|resource| { + resource + .path + .as_deref() + .map(|path| (resource.address.as_str(), path)) + }) + .collect(); + let graph_creates_to_run: Vec = changes + .iter() + .filter(|change| { + change.disposition == Some(ApplyDisposition::Applied) + && change.operation == PlanOperation::Create + && matches!(resource_kind(&change.resource), ResourceKind::Graph(_)) + }) + .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) + .collect(); + let mut completed_create_sidecars: Vec = Vec::new(); + let mut failed_graphs: BTreeSet = BTreeSet::new(); + let mut creates_aborted = false; + for graph_id in &graph_creates_to_run { + if creates_aborted { + // A prior create failed: stop graph-moving work (loud partials). + diagnostics.push(Diagnostic::warning( + "graph_create_skipped", + graph_address(graph_id), + "skipped after an earlier graph create failed in this run", + )); + failed_graphs.insert(graph_id.clone()); + continue; + } + let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) + else { + continue; + }; + let graph_uri = display_path( + &desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ); + let mut sidecar = RecoverySidecar { + schema_version: 1, + operation_id: Ulid::new().to_string(), + started_at: now_rfc3339(), + actor: None, + kind: RecoverySidecarKind::GraphCreate, + graph_id: graph_id.clone(), + graph_uri: graph_uri.clone(), + observed_manifest_version: None, + expected_manifest_version: None, + desired_schema_digest: desired_graph.schema_digest.clone(), + state_cas_base: expected_cas.clone(), + }; + let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + Ok(path) => path, + Err(diagnostic) => { + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone()); + creates_aborted = true; + continue; + } + }; + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_create") { + // Simulated crash before the init: the sidecar stays for the + // sweep (row 1: root absent -> intent removed next run). + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone()); + creates_aborted = true; + continue; + } + // Re-read + re-verify the schema source under the lock — the same + // TOCTOU posture as write_resource_payload. + let schema_source = source_paths + .get(schema_address(graph_id).as_str()) + .ok_or_else(|| { + Diagnostic::error( + "graph_create_failed", + graph_address(graph_id), + "no schema source recorded for graph", + ) + }) + .and_then(|path| { + fs::read_to_string(Path::new(path)).map_err(|err| { + Diagnostic::error( + "graph_create_failed", + graph_address(graph_id), + format!("could not read schema source '{path}': {err}"), + ) + }) + }) + .and_then(|source| { + if sha256_hex(source.as_bytes()) == desired_graph.schema_digest { + Ok(source) + } else { + Err(Diagnostic::error( + "resource_content_changed", + schema_address(graph_id), + "schema source changed while apply was running; re-run `cluster apply`", + )) + } + }); + let schema_source = match schema_source { + Ok(source) => source, + Err(diagnostic) => { + diagnostics.push(diagnostic); + let _ = fs::remove_file(&sidecar_path); // nothing moved + failed_graphs.insert(graph_id.clone()); + creates_aborted = true; + continue; + } + }; + match Omnigraph::init(&graph_uri, &schema_source).await { + Ok(_) => {} + Err(err) => { + diagnostics.push(Diagnostic::error( + "graph_create_failed", + graph_address(graph_id), + format!("could not initialize graph at '{graph_uri}': {err}"), + )); + // The sidecar stays: the sweep classifies whether the failed + // init left a partial root (row 5) or nothing (row 1). + failed_graphs.insert(graph_id.clone()); + creates_aborted = true; + continue; + } + } + // Record the post-init pin in the sidecar (best effort — a failure + // here leaves expected = null and the sweep classifies by digest). + if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await { + if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await { + sidecar.expected_manifest_version = Some(snapshot.version()); + if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { + diagnostics.push(diagnostic); + } + } + } + // Crash point: the graph exists, the cluster state does not record it + // yet. A failure here must acknowledge nothing; the next run's sweep + // rolls the ledger forward (row 4). + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_create") { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + completed_create_sidecars.push(sidecar_path); + } + if !failed_graphs.is_empty() { + demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies); + } + for change in &changes { match change.disposition { Some(ApplyDisposition::Deferred) => diagnostics.push(Diagnostic::warning( @@ -723,16 +941,9 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { // Payload phase: content-addressed writes before the state CAS. Any // failure aborts before state moves; blobs already written are inert. - let source_paths: BTreeMap<&str, &str> = desired - .resources - .iter() - .filter_map(|resource| { - resource - .path - .as_deref() - .map(|path| (resource.address.as_str(), path)) - }) - .collect(); + // Gate on payload-phase errors only — sweep errors (e.g. a kept row-5 + // sidecar) must not abort the run, or their statuses would never persist. + let errors_before_payloads = count_errors(&diagnostics); for change in &changes { if change.disposition != Some(ApplyDisposition::Applied) || change.operation == PlanOperation::Delete @@ -761,7 +972,7 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { diagnostics.push(diagnostic); } } - if has_errors(&diagnostics) { + if count_errors(&diagnostics) > errors_before_payloads { return early_return( display_path(&desired.config_dir), Some(desired.config_digest), @@ -788,9 +999,8 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { } // State mutation. Apply owns query/policy statuses only; graph/schema - // statuses belong to refresh/import observation and must not be clobbered. - let before_value = - serde_json::to_value(&state).expect("cluster state must serialize deterministically"); + // statuses belong to refresh/import observation and must not be clobbered + // (the sweep above is the one exception: it owns recovery statuses). let mut new_state = state.clone(); for change in &changes { match change.disposition { @@ -813,13 +1023,17 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { } }, Some(ApplyDisposition::Blocked) => { - set_resource_status( - &mut new_state, - &change.resource, - ResourceLifecycleStatus::Blocked, - change.reason.as_deref().unwrap_or("dependency_not_applied"), - "waiting on an unapplied or missing dependency", - ); + // The sweep owns recovery statuses (Drifted/Error with their + // conditions); a generic Blocked must not clobber them. + if change.reason.as_deref() != Some("cluster_recovery_pending") { + set_resource_status( + &mut new_state, + &change.resource, + ResourceLifecycleStatus::Blocked, + change.reason.as_deref().unwrap_or("dependency_not_applied"), + "waiting on an unapplied or missing dependency", + ); + } } _ => {} } @@ -855,6 +1069,17 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { } } } + // Completed (rows 2/4) sweep sidecars are deleted only once their outcome + // is durably recorded; on a failed write they stay and re-sweep next run. + if !state_write_failed { + for sidecar_path in sweep + .completed_sidecars + .iter() + .chain(completed_create_sidecars.iter()) + { + let _ = fs::remove_file(sidecar_path); + } + } // On a failed state write, report the statuses that are actually on disk // (the pre-apply snapshot), not the in-memory mutations that were never // persisted — automation reading `resource_statuses` independently of `ok` @@ -902,6 +1127,7 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let backend = LocalStateBackend::new(&parsed.config_dir); let mut observations = backend.observations(); backend.observe_lock(&mut observations, &mut diagnostics); + warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics); let mut resource_digests = BTreeMap::new(); let mut resource_statuses = BTreeMap::new(); @@ -1107,6 +1333,11 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St (StateSyncOperation::Import, None) => initial_import_state(&desired), }; + // Recovery sweep first (RFC-004 §D3): classify any interrupted graph + // operation before observation/verification so a rolled-forward outcome + // is what those passes see. + let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await; + // Catalog payload verification must run BEFORE graph observation: removing // a drifted query digest first means the live-graph composite recompute // below already excludes it, so the persisted graph. composite stays @@ -1177,7 +1408,13 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St } match backend.write_state(&state, expected_cas.as_deref(), &mut observations) { - Ok(()) => {} + Ok(()) => { + // Completed sweep sidecars are deleted only after their outcome + // is durably recorded; on failure they stay and re-sweep. + for sidecar_path in &sweep.completed_sidecars { + let _ = fs::remove_file(sidecar_path); + } + } Err(diagnostic) => diagnostics.push(diagnostic), } @@ -1307,10 +1544,104 @@ impl LocalStateBackend { Self { state_path: config_dir.join(CLUSTER_STATE_FILE), lock_path: config_dir.join(CLUSTER_LOCK_FILE), + recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR), state_dir, } } + /// List recovery sidecars in ULID (filename) order. Unparseable files are + /// reported as warnings and skipped — they stay on disk for the operator. + fn list_recovery_sidecars( + &self, + diagnostics: &mut Vec, + ) -> Vec<(PathBuf, RecoverySidecar)> { + let mut paths = Vec::new(); + match fs::read_dir(&self.recoveries_dir) { + Ok(entries) => { + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().is_some_and(|ext| ext == "json") { + paths.push(path); + } + } + } + Err(err) if err.kind() == ErrorKind::NotFound => {} + Err(err) => { + diagnostics.push(Diagnostic::warning( + "recovery_sidecar_read_error", + CLUSTER_RECOVERIES_DIR, + format!("could not list recovery sidecars: {err}"), + )); + } + } + paths.sort(); + let mut sidecars = Vec::new(); + for path in paths { + match fs::read_to_string(&path) + .map_err(|err| err.to_string()) + .and_then(|text| { + serde_json::from_str::(&text).map_err(|err| err.to_string()) + }) { + Ok(sidecar) if sidecar.schema_version == 1 => sidecars.push((path, sidecar)), + Ok(sidecar) => diagnostics.push(Diagnostic::warning( + "unsupported_recovery_sidecar_version", + display_path(&path), + format!( + "unsupported recovery sidecar version {}; leaving it in place", + sidecar.schema_version + ), + )), + Err(err) => diagnostics.push(Diagnostic::warning( + "invalid_recovery_sidecar", + display_path(&path), + format!("could not parse recovery sidecar ({err}); leaving it in place"), + )), + } + } + sidecars + } + + /// Atomically write (or rewrite) a recovery sidecar; returns its path. + fn write_recovery_sidecar(&self, sidecar: &RecoverySidecar) -> Result { + fs::create_dir_all(&self.recoveries_dir).map_err(|err| { + Diagnostic::error( + "recovery_sidecar_write_error", + CLUSTER_RECOVERIES_DIR, + format!("could not create recoveries directory: {err}"), + ) + })?; + let target = self + .recoveries_dir + .join(format!("{}.json", sidecar.operation_id)); + let mut payload = serde_json::to_string_pretty(sidecar).map_err(|err| { + Diagnostic::error( + "recovery_sidecar_write_error", + display_path(&target), + format!("could not encode recovery sidecar: {err}"), + ) + })?; + payload.push('\n'); + let tmp_path = self + .recoveries_dir + .join(format!("{}.json.tmp.{}", sidecar.operation_id, Ulid::new())); + fs::write(&tmp_path, payload.as_bytes()).map_err(|err| { + Diagnostic::error( + "recovery_sidecar_write_error", + display_path(&tmp_path), + format!("could not write recovery sidecar: {err}"), + ) + })?; + if let Err(err) = fs::rename(&tmp_path, &target) { + let _ = fs::remove_file(&tmp_path); + return Err(Diagnostic::error( + "recovery_sidecar_write_error", + display_path(&target), + format!("could not move recovery sidecar into place: {err}"), + )); + } + Ok(target) + } + fn observations(&self) -> StateObservations { StateObservations { state_path: display_path(&self.state_path), @@ -1701,6 +2032,169 @@ fn initial_import_state(desired: &DesiredCluster) -> ClusterState { } } +/// Recovery sweep (RFC-004 §D3): runs at the start of every state-mutating +/// cluster command, under the state lock, before the command's own work. +/// Roll-forward-only — the engine's own sidecars make each graph-level +/// operation atomic within the graph, so the cluster never rolls a graph +/// back; it converges the ledger to observable reality or refuses loudly. +/// Mutations ride the calling command's CAS-checked state write; completed +/// sidecars are deleted only after that write lands. +async fn sweep_recovery_sidecars( + backend: &LocalStateBackend, + state: &mut ClusterState, + diagnostics: &mut Vec, +) -> SweepOutcome { + let mut outcome = SweepOutcome::default(); + for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) { + match sidecar.kind { + RecoverySidecarKind::GraphCreate => { + sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; + } + } + } + outcome +} + +async fn sweep_graph_create_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let schema_addr = schema_address(&sidecar.graph_id); + let graph_path = PathBuf::from(&sidecar.graph_uri); + + // Row 1: nothing moved — the init never landed. The sidecar is pure + // intent; remove it and let the command's own plan re-propose the create. + if !graph_path.exists() { + let _ = fs::remove_file(&path); + return; + } + + match Omnigraph::open_read_only(&sidecar.graph_uri).await { + Ok(db) => { + let live_digest = sha256_hex(db.schema_source().as_bytes()); + let recorded = state + .applied_revision + .resources + .get(&schema_addr) + .map(|resource| resource.digest.clone()); + if recorded.as_deref() == Some(live_digest.as_str()) { + // Row 2: crash fell between the state CAS and sidecar delete. + outcome.completed_sidecars.push(path); + } else if live_digest == sidecar.desired_schema_digest { + // Row 4: the create completed on the graph; roll the cluster + // state forward to observable reality. + state.applied_revision.resources.insert( + schema_addr.clone(), + StateResource { + digest: live_digest.clone(), + }, + ); + let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); + let composite = + graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); + state + .applied_revision + .resources + .insert(graph_address.clone(), StateResource { digest: composite }); + set_resource_status_applied(state, &graph_address); + set_resource_status_applied(state, &schema_addr); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "graph_create", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address.clone(), + "an interrupted graph create had completed on the graph; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); + } else { + // Row 6: the graph moved to something the sidecar did not + // intend. Refuse to guess; require refresh + operator re-plan. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + "an interrupted graph create left unexpected graph state; graph-moving work is blocked until repaired", + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } + } + Err(err) => { + // Row 5: partial root (the engine's documented init gap). Never + // auto-delete — reconciler deletes are the same data-loss class + // as human deletes; the operator removes the root explicitly. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Error, + "graph_create_incomplete", + "graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Error, + "graph_create_incomplete", + "graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`", + ); + diagnostics.push(Diagnostic::error( + "graph_create_incomplete", + graph_address.clone(), + format!( + "graph root '{}' exists but cannot be opened ({err}); remove the graph root and re-run `cluster apply`", + sidecar.graph_uri + ), + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } + } +} + +/// Read-only commands report pending sidecars without acting on them. +fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec) { + let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + let Ok(entries) = fs::read_dir(&recoveries_dir) else { + return; + }; + let mut names: Vec = entries + .flatten() + .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "json")) + .map(|entry| entry.file_name().to_string_lossy().into_owned()) + .collect(); + names.sort(); + for name in names { + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + format!("{CLUSTER_RECOVERIES_DIR}/{name}"), + "a recovery sidecar from an interrupted apply is pending; the next state-mutating command will classify it", + )); + } +} + async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { let mut graph_error_count = 0; for graph in &desired.graphs { @@ -2337,15 +2831,27 @@ fn resource_kind(address: &str) -> ResourceKind { /// it. Stage 3A executes only query/policy catalog writes; graph/schema /// movement is a later phase, and `graph.` composite updates whose schema /// component is unchanged converge automatically once query digests land. -fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { - let mut schema_changed = BTreeSet::new(); +fn classify_changes( + changes: &mut [PlanChange], + dependencies: &[Dependency], + pending_recovery: &BTreeSet, +) { + let mut schema_creates = BTreeSet::new(); + let mut schema_pending = BTreeSet::new(); let mut graph_creates = BTreeSet::new(); let mut graph_deletes = BTreeSet::new(); for change in changes.iter() { match resource_kind(&change.resource) { - ResourceKind::Schema(graph) => { - schema_changed.insert(graph); - } + ResourceKind::Schema(graph) => match change.operation { + PlanOperation::Create => { + schema_creates.insert(graph); + } + // Schema updates (4B) and deletes (4C) are still pending in + // this stage and block dependents. + _ => { + schema_pending.insert(graph); + } + }, ResourceKind::Graph(graph) => match change.operation { PlanOperation::Create => { graph_creates.insert(graph); @@ -2358,12 +2864,38 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { _ => {} } } + // A schema Create is satisfied by its paired graph create (the init + // carries the schema); a standalone schema Create stays pending. + for graph in &schema_creates { + if !graph_creates.contains(graph) { + schema_pending.insert(graph.clone()); + } + } for change in changes.iter_mut() { let (disposition, reason) = match resource_kind(&change.resource) { - ResourceKind::Schema(_) => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), + ResourceKind::Schema(graph) => match change.operation { + PlanOperation::Create + if graph_creates.contains(&graph) + && !pending_recovery.contains(&graph) => + { + // Applied with the graph create — the init carries it. + (ApplyDisposition::Applied, None) + } + PlanOperation::Create if graph_creates.contains(&graph) => { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } + _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), + }, ResourceKind::Graph(graph) => match change.operation { - PlanOperation::Update if !schema_changed.contains(&graph) => { + PlanOperation::Create => { + if pending_recovery.contains(&graph) { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } else { + (ApplyDisposition::Applied, None) + } + } + PlanOperation::Update if !schema_pending.contains(&graph) => { (ApplyDisposition::Derived, None) } _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), @@ -2380,16 +2912,16 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { } } PlanOperation::Create | PlanOperation::Update => { - // A missing graph is the more fundamental blocker than a - // pending schema change, so check it first. - if graph_creates.contains(&graph) { - (ApplyDisposition::Blocked, Some("dependency_missing")) - } else if schema_changed.contains(&graph) { + if pending_recovery.contains(&graph) { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } else if schema_pending.contains(&graph) { ( ApplyDisposition::Blocked, Some("dependency_not_applied"), ) } else { + // A graph create in the same plan no longer blocks: + // creates execute first in the same apply run. (ApplyDisposition::Applied, None) } } @@ -2397,15 +2929,15 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { ResourceKind::Policy(_) => match change.operation { PlanOperation::Delete => (ApplyDisposition::Applied, None), PlanOperation::Create | PlanOperation::Update => { - let blocked_dep = dependencies.iter().any(|dep| { + let blocked_pending = dependencies.iter().any(|dep| { dep.from == change.resource && dep .to .strip_prefix("graph.") - .is_some_and(|graph| graph_creates.contains(graph)) + .is_some_and(|graph| pending_recovery.contains(graph)) }); - if blocked_dep { - (ApplyDisposition::Blocked, Some("dependency_missing")) + if blocked_pending { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) } else { (ApplyDisposition::Applied, None) } @@ -2420,6 +2952,46 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { } } +/// After a graph create fails mid-run, every change that depended on that +/// graph (its schema, its queries, policies referencing it) flips from +/// Applied to Blocked so the output and the persisted statuses tell the +/// truth about what this run actually executed. +fn demote_dependents_of_failed_graphs( + changes: &mut [PlanChange], + failed: &BTreeSet, + dependencies: &[Dependency], +) { + for change in changes.iter_mut() { + if change.disposition != Some(ApplyDisposition::Applied) { + continue; + } + let demote_reason = match resource_kind(&change.resource) { + ResourceKind::Graph(graph) if failed.contains(&graph) => Some("graph_create_failed"), + ResourceKind::Schema(graph) if failed.contains(&graph) => { + Some("dependency_not_applied") + } + ResourceKind::Query { graph, .. } if failed.contains(&graph) => { + Some("dependency_not_applied") + } + ResourceKind::Policy(_) => { + let blocked = dependencies.iter().any(|dep| { + dep.from == change.resource + && dep + .to + .strip_prefix("graph.") + .is_some_and(|graph| failed.contains(graph)) + }); + blocked.then_some("dependency_not_applied") + } + _ => None, + }; + if let Some(reason) = demote_reason { + change.disposition = Some(ApplyDisposition::Blocked); + change.reason = Some(reason.to_string()); + } + } +} + /// Content-addressed catalog path for an applied resource payload. Extensions /// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name, /// so the catalog layout cannot drift with operator file conventions. @@ -2868,6 +3440,13 @@ fn has_errors(diagnostics: &[Diagnostic]) -> bool { .any(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error) } +fn count_errors(diagnostics: &[Diagnostic]) -> usize { + diagnostics + .iter() + .filter(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error) + .count() +} + fn display_path(path: &Path) -> String { path.display().to_string() } @@ -3932,10 +4511,10 @@ graphs: .join(format!("{digest}.yaml")) } - #[test] - fn apply_without_state_fails_with_state_missing() { + #[tokio::test] + async fn apply_without_state_fails_with_state_missing() { let dir = fixture(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!( out.diagnostics @@ -3948,8 +4527,8 @@ graphs: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn apply_writes_payloads_state_and_statuses() { + #[tokio::test] + async fn apply_writes_payloads_state_and_statuses() { let dir = fixture(); write_applyable_state(dir.path()); let desired = validate_config_dir(dir.path()); @@ -3965,7 +4544,7 @@ graphs: .unwrap() .clone(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(out.applied_count, 2); assert_eq!(out.deferred_count, 0); @@ -4011,8 +4590,8 @@ graphs: out.desired_revision.config_digest.clone().unwrap() } - #[test] - fn apply_update_changes_query_digest_and_keeps_old_blob() { + #[tokio::test] + async fn apply_update_changes_query_digest_and_keeps_old_blob() { let dir = fixture(); let desired = validate_config_dir(dir.path()); let schema_digest = desired @@ -4035,7 +4614,7 @@ graphs: fs::create_dir_all(old_blob.parent().unwrap()).unwrap(); fs::write(&old_blob, "old query source").unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); let new_digest = desired .resource_digests @@ -4050,8 +4629,8 @@ graphs: assert!(query_payload_path(dir.path(), new_digest).exists()); } - #[test] - fn apply_deletes_removed_resources_but_keeps_blobs() { + #[tokio::test] + async fn apply_deletes_removed_resources_but_keeps_blobs() { let dir = fixture(); let desired = validate_config_dir(dir.path()); let schema_digest = desired @@ -4080,7 +4659,7 @@ graphs: fs::create_dir_all(stale_blob.parent().unwrap()).unwrap(); fs::write(&stale_blob, "old policy").unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.converged); let state = read_state_json(dir.path()); @@ -4109,8 +4688,8 @@ graphs: assert_eq!(resources["graph.knowledge"]["digest"], expected_composite); } - #[test] - fn apply_defers_schema_change_and_blocks_dependent_query() { + #[tokio::test] + async fn apply_defers_schema_change_and_blocks_dependent_query() { let dir = fixture(); write_applyable_state(dir.path()); // Change the schema after seeding state: schema.knowledge now differs. @@ -4120,7 +4699,7 @@ graphs: ) .unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.converged); let by_resource: BTreeMap<&str, &PlanChange> = out @@ -4185,50 +4764,122 @@ graphs: ); } - #[test] - fn apply_blocks_resources_of_uncreated_graph() { + #[tokio::test] + async fn apply_creates_graph_and_unblocks_dependents() { let dir = fixture(); write_state_resources(dir.path(), &[]); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); - assert_eq!(out.applied_count, 0); - assert!(!out.converged); + assert!(out.converged, "{out:?}"); let by_resource: BTreeMap<&str, &PlanChange> = out .changes .iter() .map(|change| (change.resource.as_str(), change)) .collect(); + // Stage 4A: the create executes, and its dependents apply in-run. assert_eq!( by_resource["graph.knowledge"].disposition, - Some(ApplyDisposition::Deferred) + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["schema.knowledge"].disposition, + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["policy.base"].disposition, + Some(ApplyDisposition::Applied) + ); + // The graph exists on disk and opens; state records everything. + let graph_uri = derived_graph_uri(dir.path(), "knowledge"); + let db = Omnigraph::open_read_only(&graph_uri).await.unwrap(); + let desired = validate_config_dir(dir.path()); + assert_eq!( + sha256_hex(db.schema_source().as_bytes()), + desired.resource_digests["schema.knowledge"] + ); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + desired.resource_digests["schema.knowledge"] + ); + assert_eq!( + state["resource_statuses"]["graph.knowledge"]["status"], + "applied" + ); + // The create's sidecar was retired after the state CAS landed. + assert!( + !dir.path().join(CLUSTER_RECOVERIES_DIR).exists() + || fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR)) + .unwrap() + .next() + .is_none() + ); + } + + #[tokio::test] + async fn apply_create_failure_blocks_dependents_and_keeps_sidecar() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); + // Make the init fail its strict preflight: a junk _schema.pg already + // sits at the derived root (the engine refuses to overwrite it). + let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "junk").unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_create_failed") + ); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + // Dependents are demoted: the run tells the truth about what executed. + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Blocked) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Blocked) ); assert_eq!( by_resource["query.knowledge.find_person"].reason.as_deref(), - Some("dependency_missing") + Some("dependency_not_applied") ); assert_eq!( - by_resource["policy.base"].reason.as_deref(), - Some("dependency_missing") + by_resource["policy.base"].disposition, + Some(ApplyDisposition::Blocked) ); - // Statuses for blocked resources are recorded (state changed), but no - // resource digests moved. + assert!(!out.converged); + // The sidecar stays for the sweep to classify next run. + assert!( + fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR)) + .unwrap() + .next() + .is_some() + ); + // No graph digests moved. let state = read_state_json(dir.path()); - assert_eq!(state["state_revision"], 2); assert!( state["applied_revision"]["resources"] .as_object() .unwrap() .is_empty() ); - assert_eq!( - state["resource_statuses"]["policy.base"]["status"], - "blocked" - ); } - #[test] - fn apply_does_not_delete_subtree_of_deleted_graph() { + #[tokio::test] + async fn apply_does_not_delete_subtree_of_deleted_graph() { let dir = fixture(); let desired = validate_config_dir(dir.path()); let schema_digest = desired @@ -4249,7 +4900,7 @@ graphs: ], ); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.converged); let by_resource: BTreeMap<&str, &PlanChange> = out @@ -4276,17 +4927,17 @@ graphs: assert_eq!(resources["query.old.q"]["digest"], "5555"); } - #[test] - fn apply_is_idempotent() { + #[tokio::test] + async fn apply_is_idempotent() { let dir = fixture(); write_applyable_state(dir.path()); - let first = apply_config_dir(dir.path()); + let first = apply_config_dir(dir.path()).await; assert!(first.ok, "{:?}", first.diagnostics); assert!(first.state_written); let state_after_first = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); - let second = apply_config_dir(dir.path()); + let second = apply_config_dir(dir.path()).await; assert!(second.ok, "{:?}", second.diagnostics); assert!(second.changes.is_empty()); assert_eq!(second.applied_count, 0); @@ -4297,13 +4948,13 @@ graphs: assert_eq!(second.state_observations.state_revision, 2); } - #[test] - fn apply_respects_held_lock() { + #[tokio::test] + async fn apply_respects_held_lock() { let dir = fixture(); write_applyable_state(dir.path()); write_lock_file(dir.path(), "held-lock", "plan"); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!( out.diagnostics @@ -4317,8 +4968,8 @@ graphs: assert_eq!(state["state_revision"], 1); } - #[test] - fn apply_state_lock_false_bypasses_with_warning() { + #[tokio::test] + async fn apply_state_lock_false_bypasses_with_warning() { let dir = fixture(); fs::write( dir.path().join(CLUSTER_CONFIG_FILE), @@ -4338,7 +4989,7 @@ graphs: .unwrap(); write_applyable_state(dir.path()); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.state_written); assert!(!out.state_observations.lock_acquired); @@ -4350,8 +5001,8 @@ graphs: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn apply_skips_existing_payload_blob() { + #[tokio::test] + async fn apply_skips_existing_payload_blob() { let dir = fixture(); write_applyable_state(dir.path()); let desired = validate_config_dir(dir.path()); @@ -4366,13 +5017,13 @@ graphs: fs::create_dir_all(blob.parent().unwrap()).unwrap(); fs::write(&blob, "pre-existing").unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(fs::read_to_string(&blob).unwrap(), "pre-existing"); } - #[test] - fn apply_invalid_config_fails_before_lock() { + #[tokio::test] + async fn apply_invalid_config_fails_before_lock() { let dir = fixture(); fs::write( dir.path().join(CLUSTER_CONFIG_FILE), @@ -4380,7 +5031,7 @@ graphs: ) .unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); // Config errors bail before the lock or any state directory exists. assert!(!dir.path().join(CLUSTER_STATE_DIR).exists()); @@ -4391,8 +5042,8 @@ graphs: /// mutations (phantom `applied` entries would mislead automation that /// reads `resource_statuses` independently of `ok`). #[cfg(unix)] - #[test] - fn apply_state_write_failure_reports_persisted_statuses() { + #[tokio::test] + async fn apply_state_write_failure_reports_persisted_statuses() { use std::os::unix::fs::PermissionsExt; let dir = fixture(); @@ -4435,7 +5086,7 @@ graphs: return; } - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap(); assert!(!out.ok); @@ -4459,9 +5110,9 @@ graphs: // ---- catalog payload verification (Stage 3B) ---- /// Converge a fixture dir and return the query blob path. - fn converge_fixture(config_dir: &Path) -> std::path::PathBuf { + async fn converge_fixture(config_dir: &Path) -> std::path::PathBuf { write_applyable_state(config_dir); - let out = apply_config_dir(config_dir); + let out = apply_config_dir(config_dir).await; assert!(out.ok && out.converged, "{:?}", out.diagnostics); let desired = validate_config_dir(config_dir); query_payload_path( @@ -4473,10 +5124,10 @@ graphs: ) } - #[test] - fn status_reports_missing_payload_read_only() { + #[tokio::test] + async fn status_reports_missing_payload_read_only() { let dir = fixture(); - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); fs::remove_file(&blob).unwrap(); @@ -4501,7 +5152,7 @@ graphs: async fn refresh_removes_digest_and_drifts_on_missing_payload() { let dir = fixture(); init_derived_graph(dir.path()).await; - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; fs::remove_file(&blob).unwrap(); let out = refresh_config_dir(dir.path()).await; @@ -4527,7 +5178,7 @@ graphs: async fn refresh_drifts_on_corrupted_payload() { let dir = fixture(); init_derived_graph(dir.path()).await; - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; fs::write(&blob, "corrupted content").unwrap(); let out = refresh_config_dir(dir.path()).await; @@ -4547,7 +5198,7 @@ graphs: async fn refresh_flags_unreadable_payload_as_error() { let dir = fixture(); init_derived_graph(dir.path()).await; - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; // A same-named directory yields a non-NotFound IO error portably. fs::remove_file(&blob).unwrap(); fs::create_dir(&blob).unwrap(); @@ -4575,7 +5226,7 @@ graphs: async fn payload_drift_self_heals_through_refresh_plan_apply() { let dir = fixture(); init_derived_graph(dir.path()).await; - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; let original = fs::read_to_string(&blob).unwrap(); fs::remove_file(&blob).unwrap(); @@ -4591,7 +5242,7 @@ graphs: assert_eq!(query_change.operation, PlanOperation::Create); assert_eq!(query_change.disposition, Some(ApplyDisposition::Applied)); - let apply = apply_config_dir(dir.path()); + let apply = apply_config_dir(dir.path()).await; assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); assert_eq!(fs::read_to_string(&blob).unwrap(), original); @@ -4621,6 +5272,250 @@ graphs: ); } + // ---- recovery sidecars + sweep (Stage 4A) ---- + + fn derived_graph_uri(config_dir: &Path, graph_id: &str) -> String { + display_path( + &config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ) + } + + fn write_create_sidecar( + config_dir: &Path, + graph_id: &str, + desired_schema_digest: &str, + operation_id: &str, + ) -> PathBuf { + let dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join(format!("{operation_id}.json")); + fs::write( + &path, + serde_json::to_string_pretty(&json!({ + "schema_version": 1, + "operation_id": operation_id, + "started_at": "1970-01-01T00:00:00Z", + "kind": "graph_create", + "graph_id": graph_id, + "graph_uri": derived_graph_uri(config_dir, graph_id), + "desired_schema_digest": desired_schema_digest, + })) + .unwrap(), + ) + .unwrap(); + path + } + + #[tokio::test] + async fn sweep_removes_sidecar_when_root_absent() { + let dir = fixture(); + write_applyable_state(dir.path()); + let sidecar = write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01ROW1"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + // Row 1: nothing moved; intent removed, run proceeds normally. + assert!(!sidecar.exists()); + assert!(out.converged); + } + + #[tokio::test] + async fn sweep_rolls_forward_completed_create() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_state_resources(dir.path(), &[]); // state predates the create + let desired = validate_config_dir(dir.path()); + let schema_digest = desired.resource_digests["schema.knowledge"].clone(); + let sidecar = write_create_sidecar(dir.path(), "knowledge", &schema_digest, "01ROW4"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + // Row 4: ledger converged to observable reality, audit recorded, + // sidecar retired after the CAS landed. + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + schema_digest + ); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["outcome"] == "rolled_forward" + && record["graph_id"] == "knowledge") + ); + assert!(!sidecar.exists()); + // With the graph rolled forward, the same run converges the catalog. + assert!(out.converged, "{out:?}"); + } + + #[tokio::test] + async fn sweep_completes_already_recorded_create() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); // state already records graph+schema + let desired = validate_config_dir(dir.path()); + let sidecar = write_create_sidecar( + dir.path(), + "knowledge", + &desired.resource_digests["schema.knowledge"], + "01ROW2", + ); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + // Row 2: outcome was already durable; no audit entry, sidecar retired. + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert!( + state["recovery_records"] + .as_object() + .is_none_or(|records| records.is_empty()), + "{state}" + ); + } + + #[tokio::test] + async fn sweep_keeps_sidecar_for_incomplete_root() { + let dir = fixture(); + write_applyable_state(dir.path()); + // A root that exists but cannot be opened: the engine's partial-init gap. + let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "junk").unwrap(); + let sidecar = write_create_sidecar(dir.path(), "knowledge", "whatever", "01ROW5"); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_create_incomplete") + ); + // Row 5: never auto-delete; sidecar and root stay for the operator, + // and the Error status is persisted by the run's state write. + assert!(sidecar.exists()); + assert!(root.exists()); + let state = read_state_json(dir.path()); + assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error"); + assert!( + state["resource_statuses"]["graph.knowledge"]["conditions"] + .as_array() + .unwrap() + .iter() + .any(|condition| condition == "graph_create_incomplete") + ); + } + + #[tokio::test] + async fn sweep_flags_unexpected_schema_as_pending() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); + // Live graph exists with a schema the sidecar never intended. + let graph_dir = dir.path().join(CLUSTER_GRAPHS_DIR); + fs::create_dir_all(&graph_dir).unwrap(); + Omnigraph::init( + &derived_graph_uri(dir.path(), "knowledge"), + "\nnode Other {\n name: String @key\n}\n", + ) + .await + .unwrap(); + let desired = validate_config_dir(dir.path()); + let sidecar = write_create_sidecar( + dir.path(), + "knowledge", + &desired.resource_digests["schema.knowledge"], + "01ROW6", + ); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); // warning, not error + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_pending") + ); + // Row 6: refuse to guess; sidecar kept, Drifted persisted. + assert!(sidecar.exists()); + let state = read_state_json(dir.path()); + assert_eq!( + state["resource_statuses"]["graph.knowledge"]["status"], + "drifted" + ); + assert!( + state["resource_statuses"]["graph.knowledge"]["conditions"] + .as_array() + .unwrap() + .iter() + .any(|condition| condition == "actual_applied_state_pending") + ); + } + + #[tokio::test] + async fn apply_blocks_create_while_recovery_pending() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); + // A kept (row 5) sidecar: partial root that cannot be opened. + let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "junk").unwrap(); + let sidecar = write_create_sidecar(dir.path(), "knowledge", "whatever", "01PEND"); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); // row 5 is an error condition + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + // The pending recovery blocks the create and its dependents; the + // executor never attempts the init. + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Blocked) + ); + assert_eq!( + by_resource["graph.knowledge"].reason.as_deref(), + Some("cluster_recovery_pending") + ); + assert_eq!( + by_resource["query.knowledge.find_person"].reason.as_deref(), + Some("cluster_recovery_pending") + ); + assert_eq!( + by_resource["policy.base"].reason.as_deref(), + Some("cluster_recovery_pending") + ); + assert!(sidecar.exists()); + // The sweep's Error status is what persists — not a generic Blocked. + let state = read_state_json(dir.path()); + assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error"); + } + + #[test] + fn status_warns_on_pending_recovery_sidecar() { + let dir = fixture(); + write_applyable_state(dir.path()); + write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01STATUS"); + + let out = status_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_pending" + && diagnostic.severity == DiagnosticSeverity::Warning) + ); + } + #[test] fn plan_annotates_apply_dispositions() { let dir = fixture(); @@ -4631,23 +5526,23 @@ graphs: .iter() .map(|change| (change.resource.as_str(), change)) .collect(); - // Empty state: graph/schema creates are deferred, query/policy blocked - // on the uncreated graph — and plan says so before apply runs. + // Stage 4A: graph/schema creates are executable, and dependents ride + // the same run — plan previews exactly that. assert_eq!( by_resource["graph.knowledge"].disposition, - Some(ApplyDisposition::Deferred) + Some(ApplyDisposition::Applied) ); assert_eq!( by_resource["schema.knowledge"].disposition, - Some(ApplyDisposition::Deferred) + Some(ApplyDisposition::Applied) ); assert_eq!( by_resource["query.knowledge.find_person"].disposition, - Some(ApplyDisposition::Blocked) + Some(ApplyDisposition::Applied) ); assert_eq!( - by_resource["policy.base"].reason.as_deref(), - Some("dependency_missing") + by_resource["policy.base"].disposition, + Some(ApplyDisposition::Applied) ); } } diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index db7b82d..ec8ddfb 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -99,14 +99,14 @@ fn query_blob(config_dir: &Path, digests: &BTreeMap) -> PathBuf .join(format!("{}.gq", digests["query.knowledge.find_person"])) } -#[test] -fn failpoint_wiring_returns_injected_diagnostic() { +#[tokio::test] +async fn failpoint_wiring_returns_injected_diagnostic() { let scenario = FailScenario::setup(); let dir = fixture(); seed_applyable_state(dir.path()); let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(out.diagnostics.iter().any(|diagnostic| { diagnostic.code == "injected_failpoint" @@ -121,8 +121,8 @@ fn failpoint_wiring_returns_injected_diagnostic() { /// Crash between the payload phase and the state write: blobs are on disk, /// state.json is byte-identical, nothing is acknowledged — and a plain re-run /// repairs by trusting the existing content-addressed blobs. -#[test] -fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { +#[tokio::test] +async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { let scenario = FailScenario::setup(); let dir = fixture(); let digests = seed_applyable_state(dir.path()); @@ -130,7 +130,7 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { { let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(!out.state_written); assert!(!out.converged); @@ -149,7 +149,7 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { } // The repair is a plain re-run: existing blobs are trusted by digest. - let recovered = apply_config_dir(dir.path()); + let recovered = apply_config_dir(dir.path()).await; assert!(recovered.ok, "{:?}", recovered.diagnostics); assert!(recovered.converged); assert!(recovered.state_written); @@ -163,8 +163,8 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { /// A concurrent writer mutating state.json between apply's read and its write /// (possible under `state.lock: false`) must surface `state_cas_mismatch`, /// acknowledge nothing, and leave the concurrent writer's state on disk. -#[test] -fn apply_cas_race_surfaces_state_cas_mismatch() { +#[tokio::test] +async fn apply_cas_race_surfaces_state_cas_mismatch() { let scenario = FailScenario::setup(); let dir = fixture(); let digests = seed_applyable_state(dir.path()); @@ -182,7 +182,7 @@ fn apply_cas_race_surfaces_state_cas_mismatch() { fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap(); }); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; drop(failpoint); assert!(!out.ok); @@ -212,8 +212,136 @@ fn apply_cas_race_surfaces_state_cas_mismatch() { assert!(query_blob(dir.path(), &digests).exists()); // Recovery is a plain re-run against the rewritten state. - let recovered = apply_config_dir(dir.path()); + let recovered = apply_config_dir(dir.path()).await; assert!(recovered.ok, "{:?}", recovered.diagnostics); assert!(recovered.converged); scenario.teardown(); } + +fn seed_empty_state(config_dir: &Path) { + let state_dir = config_dir.join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{ + "version": 1, + "state_revision": 1, + "applied_revision": { "resources": {} } +} +"#, + ) + .unwrap(); +} + +fn recovery_sidecars(config_dir: &Path) -> Vec { + match fs::read_dir(config_dir.join("__cluster/recoveries")) { + Ok(entries) => { + let mut paths: Vec = entries + .flatten() + .map(|entry| entry.path()) + .filter(|path| path.extension().is_some_and(|ext| ext == "json")) + .collect(); + paths.sort(); + paths + } + Err(_) => Vec::new(), + } +} + +/// Crash before the init: the create-intent sidecar survives, nothing moved. +/// The next run's sweep removes the intent (row 1) and the same run creates +/// the graph and converges. +#[tokio::test] +async fn create_crash_before_init_recovers_via_sweep() { + let scenario = FailScenario::setup(); + let dir = fixture(); + seed_empty_state(dir.path()); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_create", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "injected_failpoint" + && diagnostic + .message + .contains("cluster_apply.before_graph_create") + })); + assert_eq!(recovery_sidecars(dir.path()).len(), 1); + assert!(!dir.path().join("graphs/knowledge.omni").exists()); + // No resource digest moved. + let state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(), + ) + .unwrap(); + assert!( + state["applied_revision"]["resources"] + .as_object() + .unwrap() + .is_empty() + ); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!(recovered.converged); + assert!(dir.path().join("graphs/knowledge.omni").exists()); + assert!(recovery_sidecars(dir.path()).is_empty()); + scenario.teardown(); +} + +/// Crash after the init but before the state CAS: the graph exists, the +/// ledger is stale, nothing was acknowledged. The next run's sweep rolls the +/// ledger forward (row 4) with an audit entry, and the run converges. +#[tokio::test] +async fn create_crash_after_init_rolls_state_forward() { + let scenario = FailScenario::setup(); + let dir = fixture(); + seed_empty_state(dir.path()); + let state_before = fs::read(dir.path().join("__cluster/state.json")).unwrap(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_create", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(!out.state_written); + // The graph exists; the cluster state is byte-identical (no ack). + assert!(dir.path().join("graphs/knowledge.omni").exists()); + assert_eq!( + fs::read(dir.path().join("__cluster/state.json")).unwrap(), + state_before + ); + // The sidecar carries the post-init manifest pin. + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert!( + sidecar["expected_manifest_version"].is_number(), + "{sidecar}" + ); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(recovered.converged); + assert!(recovery_sidecars(dir.path()).is_empty()); + let state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(), + ) + .unwrap(); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["outcome"] == "rolled_forward") + ); + scenario.teardown(); +} diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 5c88a37..c171f53 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), and failpoint crash-mid-apply / CAS-race coverage | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, and Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 9a2597b..7ff49e8 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -1,15 +1,17 @@ # Cluster Config -**Status:** Stage 3A config-only apply preview. +**Status:** Stage 4A graph-create apply preview. Cluster config is the future control-plane configuration surface for a whole OmniGraph deployment. In this stage, OmniGraph can validate a local `cluster.yaml` folder, produce a deterministic read-only plan, inspect the local JSON state ledger, explicitly refresh/import graph observations into that ledger, manually remove a held local state lock by exact lock id, and -**apply the config-only subset of the plan** — stored-query and policy-bundle -catalog writes. It does not move graph manifests, change schemas, start -servers, or serve anything it applies: the server still boots from +**apply the executable subset of the plan** — stored-query and policy-bundle +catalog writes, and **graph creation**: a declared graph that does not exist +yet is initialized by apply itself at the derived root. It does not change +existing schemas (deferred to a later stage), move existing graph manifests, +start servers, or serve anything it applies: the server still boots from `omnigraph.yaml`. ## Commands @@ -153,8 +155,8 @@ condition in `reason`). ## Apply -`cluster apply` executes the config-only subset of the plan — stored-query and -policy-bundle changes. There is no confirm flag: `cluster plan` is the preview, +`cluster apply` executes the executable subset of the plan — stored-query and +policy-bundle changes, and graph creates. There is no confirm flag: `cluster plan` is the preview, and apply recomputes the same diff under the state lock before executing, so a stale preview can never be applied. Apply requires an existing `state.json` (`state_missing` directs you to `cluster import` first). @@ -180,9 +182,39 @@ still boots from `omnigraph.yaml`; no query or policy applied here serves traffic until the server-boot stage ships, as an explicit per-deployment mode switch. -Graph and schema changes are never executed by this stage. They are reported -as `deferred` (warning `apply_unsupported_change`), and query/policy changes -that depend on them are `blocked` (warning `apply_dependency_blocked`, status +### Graph creation + +A `graph.` create (the graph is declared but no root exists) is executed +by apply: the graph is initialized at the derived root + +```text +/graphs/.omni +``` + +with the declared schema, before any catalog writes, so queries and policies +that depend on the new graph apply **in the same run**. Each create is fenced +by a recovery sidecar under `__cluster/recoveries/{ulid}.json`, written before +the init and removed only after the state update lands. If apply crashes in +between, the next state-mutating command (`apply`, `refresh`, `import`) runs a +**recovery sweep** that classifies the survivor by observation: an absent root +removes the stale intent; a completed create rolls the cluster state forward +(recorded in the state's `recovery_records`); a partial root reports +`graph_create_incomplete` (status `error` — remove the root and re-run apply; +nothing is auto-deleted); unexpected graph content reports +`actual_applied_state_pending` (status `drifted` — run `cluster refresh` and +re-plan). While a kept sidecar is pending, that graph's create and its +dependents are blocked with `cluster_recovery_pending`. Read-only commands +(`status`, `plan`) warn about pending sidecars without acting on them. + +**Re-creation is convergence.** If a graph root disappears out-of-band, +`refresh` records the drift and the next `plan` proposes a create — and apply +will execute it, producing an **empty** graph at the root. The data was +already lost when the root vanished; the create is visible in the plan +(disposition `applied`) before anything runs. + +Schema changes to existing graphs are never executed by this stage. They are +reported as `deferred` (warning `apply_unsupported_change`), and query/policy +changes that depend on them are `blocked` (warning `apply_dependency_blocked`, status `blocked` in state). A partially-applicable plan still exits 0 with warnings; the JSON `converged` field is the automation signal for "state now matches the desired revision". The applied `config_digest` is only recorded when apply