use std::collections::{BTreeMap, BTreeSet}; use std::fs::{self, OpenOptions}; use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::process; use omnigraph::db::{Omnigraph, ReadTarget, SchemaApplyOptions}; use omnigraph_compiler::SchemaMigrationPlan; use omnigraph_compiler::build_catalog; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::query::typecheck::typecheck_query_decl; use omnigraph_compiler::schema::parser::parse_schema; use serde::{Deserialize, Serialize}; use serde_json::json; use sha2::{Digest, Sha256}; use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; use ulid::Ulid; pub mod failpoints; mod diff; mod serve; mod sweep; mod store; use store::{LocalStateBackend, StateLockGuard, StateSnapshot}; pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot}; use serve::read_verified_payload; use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind}; use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars}; pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml"; pub const CLUSTER_GRAPHS_DIR: &str = "graphs"; pub const CLUSTER_STATE_DIR: &str = "__cluster"; pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries"; pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals"; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum DiagnosticSeverity { Error, Warning, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct Diagnostic { pub code: String, pub severity: DiagnosticSeverity, pub path: String, pub message: String, } impl Diagnostic { fn error(code: impl Into, path: impl Into, message: impl Into) -> Self { Self { code: code.into(), severity: DiagnosticSeverity::Error, path: path.into(), message: message.into(), } } fn warning( code: impl Into, path: impl Into, message: impl Into, ) -> Self { Self { code: code.into(), severity: DiagnosticSeverity::Warning, path: path.into(), message: message.into(), } } } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct ResourceSummary { pub address: String, pub kind: String, pub digest: String, #[serde(skip_serializing_if = "Option::is_none")] pub path: Option, } #[derive(Debug, Clone, Serialize, PartialEq, Eq, PartialOrd, Ord)] pub struct Dependency { pub from: String, pub to: String, } #[derive(Debug, Clone, Serialize)] pub struct ValidateOutput { pub ok: bool, pub config_dir: String, pub config_file: String, pub resource_digests: BTreeMap, pub resources: Vec, pub dependencies: Vec, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize)] pub struct DesiredRevision { #[serde(skip_serializing_if = "Option::is_none")] pub config_digest: Option, } #[derive(Debug, Clone, Serialize)] pub struct StateObservations { pub state_path: String, pub lock_path: String, pub state_found: bool, #[serde(skip_serializing_if = "Option::is_none")] pub applied_config_digest: Option, pub state_revision: u64, #[serde(skip_serializing_if = "Option::is_none")] pub state_cas: Option, pub resource_count: usize, pub locked: bool, #[serde(skip_serializing_if = "Option::is_none")] pub lock_id: Option, pub lock_acquired: bool, #[serde(skip_serializing_if = "Option::is_none")] pub acquired_lock_id: Option, #[serde(skip_serializing_if = "Option::is_none")] pub lock_operation: Option, #[serde(skip_serializing_if = "Option::is_none")] pub lock_created_at: Option, #[serde(skip_serializing_if = "Option::is_none")] pub lock_pid: Option, #[serde(skip_serializing_if = "Option::is_none")] pub lock_age_seconds: Option, } impl StateObservations { fn observe_lock_metadata(&mut self, lock: &StateLockFile) { self.locked = true; self.lock_id = Some(lock.lock_id.clone()); self.lock_operation = Some(lock.operation.clone()); self.lock_created_at = Some(lock.created_at.clone()); self.lock_pid = Some(lock.pid); self.lock_age_seconds = lock_age_seconds(&lock.created_at); } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ResourceLifecycleStatus { Pending, Planned, Applying, Applied, Drifted, Blocked, Error, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(deny_unknown_fields)] pub struct ResourceStatusRecord { pub status: ResourceLifecycleStatus, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub conditions: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] pub message: Option, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum PlanOperation { Create, Update, Delete, } /// How `cluster apply` treats a planned change in the current stage. /// /// `Applied` changes execute (config-only query/policy catalog writes). /// `Derived` marks a `graph.` composite-digest update that converges /// automatically once its applied query digests land in state. `Deferred` /// changes need a later phase (graph/schema lifecycle or schema content). /// `Blocked` query/policy changes are gated by an unapplied or missing /// dependency. #[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ApplyDisposition { Applied, Derived, Deferred, Blocked, } #[derive(Debug, Clone, Serialize, PartialEq)] pub struct PlanChange { pub resource: String, pub operation: PlanOperation, #[serde(skip_serializing_if = "Option::is_none")] pub before_digest: Option, #[serde(skip_serializing_if = "Option::is_none")] pub after_digest: Option, #[serde(skip_serializing_if = "Option::is_none")] pub disposition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub reason: Option, /// True for a policy change whose file digest is unchanged but whose /// `applies_to` bindings differ from the applied revision (including the /// pre-5A backfill case). #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub binding_change: bool, /// For schema updates: the engine's migration plan against the live /// graph (RFC-004 §D7's data-aware preview). Absent when the preview is /// unavailable (warning `schema_preview_unavailable`). #[serde(skip_serializing_if = "Option::is_none")] pub migration: Option, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct BlastRadius { pub resource: String, pub affected: Vec, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct ApprovalRequirement { pub resource: String, pub reason: String, /// True when a valid (digest-matching, unconsumed) approval artifact is /// pending for this change. pub satisfied: bool, } #[derive(Debug, Clone, Serialize)] pub struct PlanOutput { pub ok: bool, pub config_dir: String, pub desired_revision: DesiredRevision, pub resource_digests: BTreeMap, pub dependencies: Vec, pub state_observations: StateObservations, pub changes: Vec, pub blast_radius: Vec, pub approvals_required: Vec, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize)] pub struct StatusOutput { pub ok: bool, pub config_dir: String, pub state_observations: StateObservations, pub resource_digests: BTreeMap, pub resource_statuses: BTreeMap, pub observations: BTreeMap, pub diagnostics: Vec, } #[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum StateSyncOperation { Refresh, Import, } #[derive(Debug, Clone, Serialize)] pub struct StateSyncOutput { pub ok: bool, pub operation: StateSyncOperation, pub config_dir: String, pub state_observations: StateObservations, pub resource_digests: BTreeMap, pub resource_statuses: BTreeMap, pub observations: BTreeMap, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize)] pub struct ForceUnlockOutput { pub ok: bool, pub config_dir: String, pub state_observations: StateObservations, pub lock_removed: bool, pub diagnostics: Vec, } /// Output of config-only `cluster apply`. "Applied" means recorded in the /// local cluster catalog (`__cluster/`); nothing applied here serves traffic — /// the server still boots from `omnigraph.yaml` until the server-boot stage. #[derive(Debug, Clone, Serialize)] pub struct ApplyOutput { pub ok: bool, pub config_dir: String, #[serde(skip_serializing_if = "Option::is_none")] pub actor: Option, pub desired_revision: DesiredRevision, pub state_observations: StateObservations, /// Every planned change, with `disposition`/`reason` always populated. pub changes: Vec, pub applied_count: usize, /// Deferred + Blocked changes (Derived composite updates count as neither). pub deferred_count: usize, /// True when state matches the desired revision after this apply. pub converged: bool, /// False for a no-op re-apply: state bytes (and revision) were left untouched. pub state_written: bool, /// The statuses as persisted: post-apply on success, the pre-apply on-disk /// snapshot when the state write fails (never unpersisted in-memory state). pub resource_statuses: BTreeMap, pub diagnostics: Vec, } /// A digest-bound human approval for an irreversible operation (RFC-004 /// §D4). Written by `cluster approve`, consumed by apply. The file is never /// deleted on consumption — it is rewritten with `consumed_at` and also /// summarized into the state ledger's `approval_records`, so the audit fact /// survives the loss of either store (axiom 11). #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct ApprovalArtifact { schema_version: u32, approval_id: String, resource: String, operation: String, reason: String, bound_config_digest: String, #[serde(default)] bound_before_digest: Option, #[serde(default)] bound_after_digest: Option, approved_by: String, created_at: String, #[serde(default)] consumed_at: Option, #[serde(default)] consumed_by_operation: Option, } #[derive(Debug, Clone, Serialize)] pub struct ApproveOutput { pub ok: bool, pub config_dir: String, #[serde(skip_serializing_if = "Option::is_none")] pub approval_id: Option, #[serde(skip_serializing_if = "Option::is_none")] pub resource: Option, #[serde(skip_serializing_if = "Option::is_none")] pub operation: Option, #[serde(skip_serializing_if = "Option::is_none")] pub approved_by: Option, pub diagnostics: Vec, } #[derive(Debug, Clone)] struct DesiredCluster { config_dir: PathBuf, config_digest: String, state_lock: bool, graphs: Vec, resource_digests: BTreeMap, resources: Vec, dependencies: Vec, /// `policy.` address -> normalized applies_to refs. policy_bindings: BTreeMap>, } #[derive(Debug, Clone)] struct DesiredGraph { id: String, schema_digest: String, } #[derive(Debug)] struct ParsedConfig { raw: Option, diagnostics: Vec, config_dir: PathBuf, config_file: PathBuf, } #[derive(Debug, Clone, Copy)] struct ClusterSettings { state_lock: bool, } #[derive(Debug)] struct LoadOutcome { desired: Option, diagnostics: Vec, config_dir: PathBuf, config_file: PathBuf, } #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct RawClusterConfig { version: u32, #[serde(default)] metadata: Metadata, #[serde(default)] state: StateConfig, #[serde(default)] graphs: BTreeMap, #[serde(default)] policies: BTreeMap, } #[derive(Debug, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct Metadata { name: Option, } #[derive(Debug, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct StateConfig { backend: Option, lock: Option, } #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct GraphConfig { schema: PathBuf, #[serde(default)] queries: QueriesDecl, } /// How a graph declares its stored queries. Terraform-style: the `.gq` /// files ARE the declaration — point at them (or a directory) and every /// `query ` they contain is discovered. The explicit name->file map /// remains for fine-grained control. #[derive(Debug, Serialize, Deserialize)] #[serde(untagged)] enum QueriesDecl { /// `queries: ./queries/` — a directory (top-level `*.gq`, sorted) or a /// single `.gq` file; every declaration inside is registered. Discover(PathBuf), /// `queries: [./queries/, ./extra.gq]` — several directories/files. DiscoverMany(Vec), /// `queries: { name: { file: ... } }` — explicit registry. Explicit(BTreeMap), } impl Default for QueriesDecl { fn default() -> Self { QueriesDecl::Explicit(BTreeMap::new()) } } /// Expand a graph's query declaration into the canonical name->file map. /// Discovery reads and parses each `.gq`; unreadable or unparseable files /// and duplicate query names are loud validation errors — a declaration the /// tool cannot enumerate is broken, not partially usable. fn resolve_query_decls( config_dir: &Path, graph_id: &str, decl: &QueriesDecl, diagnostics: &mut Vec, ) -> (BTreeMap, BTreeMap) { let paths: Vec = match decl { QueriesDecl::Explicit(map) => { return ( map.iter() .map(|(name, config)| { (name.clone(), QueryConfig { file: config.file.clone() }) }) .collect(), BTreeMap::new(), ); } QueriesDecl::Discover(path) => vec![path.clone()], QueriesDecl::DiscoverMany(paths) => paths.clone(), }; let mut files: Vec<(PathBuf, PathBuf)> = Vec::new(); // (declared-relative, resolved) for declared in &paths { let resolved = resolve_config_path(config_dir, declared); if resolved.is_dir() { let mut entries: Vec = match fs::read_dir(&resolved) { Ok(read) => read .flatten() .map(|entry| entry.path()) .filter(|path| path.extension().is_some_and(|ext| ext == "gq")) .collect(), Err(err) => { diagnostics.push(Diagnostic::error( "query_dir_unreadable", format!("graphs.{graph_id}.queries"), format!("could not list query directory '{}': {err}", resolved.display()), )); continue; } }; entries.sort(); if entries.is_empty() { diagnostics.push(Diagnostic::warning( "query_dir_empty", format!("graphs.{graph_id}.queries"), format!("query directory '{}' contains no .gq files", resolved.display()), )); } for path in entries { let relative = declared.join(path.file_name().expect("dir entries have names")); files.push((relative, path)); } } else { files.push((declared.clone(), resolved)); } } let mut registry: BTreeMap = BTreeMap::new(); let mut origin: BTreeMap = BTreeMap::new(); // Content read once at discovery and handed to the caller — the per-query // digest/typecheck pass reuses it instead of re-reading (no N+1 reads, no // window for the file to change between enumeration and validation). let mut contents: BTreeMap = BTreeMap::new(); for (declared, resolved) in files { let source = match fs::read_to_string(&resolved) { Ok(source) => source, Err(err) => { diagnostics.push(Diagnostic::error( "query_file_missing", format!("graphs.{graph_id}.queries"), format!("could not read query file '{}': {err}", resolved.display()), )); continue; } }; let parsed = match parse_query(&source) { Ok(parsed) => parsed, Err(err) => { diagnostics.push(Diagnostic::error( "query_parse_error", format!("graphs.{graph_id}.queries"), format!("'{}' does not parse: {err}", resolved.display()), )); continue; } }; for query_decl in &parsed.queries { let name = query_decl.name.clone(); if let Some(previous) = origin.get(&name) { diagnostics.push(Diagnostic::error( "duplicate_query_name", format!("graphs.{graph_id}.queries.{name}"), format!( "query '{name}' is declared in both '{}' and '{}'", previous.display(), declared.display() ), )); continue; } origin.insert(name.clone(), declared.clone()); registry.insert(name, QueryConfig { file: declared.clone() }); } contents.insert(declared, source); } (registry, contents) } #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct QueryConfig { file: PathBuf, } #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct PolicyConfig { file: PathBuf, applies_to: Vec, } // Stage 2A/2B accept these forward-compatible state sections so existing // ledgers won't churn while approval/recovery semantics are staged later. #[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct ClusterState { version: u32, #[serde(default)] state_revision: u64, applied_revision: AppliedRevisionState, #[serde(default)] resource_statuses: BTreeMap, #[serde(default)] approval_records: BTreeMap, #[serde(default)] recovery_records: BTreeMap, #[serde(default)] observations: BTreeMap, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct AppliedRevisionState { #[serde(default)] config_digest: Option, #[serde(default)] resources: BTreeMap, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct StateResource { digest: String, /// Policy resources only: the applied `applies_to` bindings, normalized /// to typed refs (`cluster` | `graph.`). Recorded so the state /// ledger is serving-sufficient for the Phase-5 server boot (RFC-005 /// §D3). Absent on pre-5A entries (backfilled by the next apply) and on /// non-policy resources. #[serde(default, skip_serializing_if = "Option::is_none")] applies_to: Option>, } #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct StateLockFile { version: u32, lock_id: String, operation: String, created_at: String, pid: u32, } /// Recovery-intent record for a graph-moving apply operation (RFC-004 §D2). /// Written under the state lock before the engine call that can create or /// move a graph manifest; deleted only after the cluster state CAS that /// records the outcome lands. The sweep (§D3) classifies survivors. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct RecoverySidecar { schema_version: u32, operation_id: String, started_at: String, #[serde(default)] actor: Option, kind: RecoverySidecarKind, graph_id: String, graph_uri: String, #[serde(default)] observed_manifest_version: Option, #[serde(default)] expected_manifest_version: Option, desired_schema_digest: String, #[serde(default)] state_cas_base: Option, /// For graph_delete: the approval this operation consumes; lets a sweep /// roll-forward consume it too. #[serde(default)] approval_id: Option, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] enum RecoverySidecarKind { GraphCreate, SchemaApply, GraphDelete, } #[derive(Debug, Default)] struct SweepOutcome { /// Graphs whose sidecar was kept (rows 5/6): graph-moving work for them /// is blocked until the operator repairs and re-observes. pending_graphs: BTreeSet, /// Sidecars whose outcome is recorded (rows 2/4): deleted only after the /// command's state write lands, so a CAS failure re-sweeps them. completed_sidecars: Vec, /// Approval artifacts consumed by a roll-forward (delete row 7b): their /// files are rewritten with consumed_at only after the state write lands. consumed_approvals: Vec, } pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { let outcome = load_desired(config_dir.as_ref()); let (resource_digests, resources, dependencies) = match outcome.desired { Some(desired) => ( desired.resource_digests, desired.resources, desired.dependencies, ), None => (BTreeMap::new(), Vec::new(), Vec::new()), }; let ok = !has_errors(&outcome.diagnostics); ValidateOutput { ok, config_dir: display_path(&outcome.config_dir), config_file: display_path(&outcome.config_file), resource_digests, resources, dependencies, diagnostics: outcome.diagnostics, } } pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); let mut observations = backend.observations(); let Some(desired) = outcome.desired else { return PlanOutput { ok: false, config_dir: display_path(&outcome.config_dir), desired_revision: DesiredRevision { config_digest: None, }, resource_digests: BTreeMap::new(), dependencies: Vec::new(), state_observations: observations, changes: Vec::new(), blast_radius: Vec::new(), approvals_required: Vec::new(), diagnostics, }; }; if has_errors(&diagnostics) { return PlanOutput { ok: false, config_dir: display_path(&desired.config_dir), desired_revision: DesiredRevision { config_digest: Some(desired.config_digest), }, resource_digests: desired.resource_digests, dependencies: desired.dependencies, state_observations: observations, changes: Vec::new(), blast_radius: Vec::new(), approvals_required: Vec::new(), diagnostics, }; } let _lock_guard = if desired.state_lock { match backend.acquire_lock("plan", &mut observations) { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); None } } } else { diagnostics.push(Diagnostic::warning( "state_lock_disabled", "state.lock", "state.lock is false; plan read state without acquiring the cluster state lock", )); None }; // Plan is read-only: pending sidecars are reported, never acted on // (RFC-004 open question 3 keeps read-only commands warn-only). warn_pending_recovery_sidecars(&desired.config_dir, &mut diagnostics); let mut prior_resources = BTreeMap::new(); let mut prior_state: Option = None; if !has_errors(&diagnostics) { match backend.read_state(&mut observations) { Ok(snapshot) => { if let Some(state) = snapshot.state { prior_resources = state_resource_digests(&state); prior_state = Some(state); } } Err(diagnostic) => diagnostics.push(diagnostic), } } let mut changes = if has_errors(&diagnostics) { Vec::new() } else { diff_resources(&prior_resources, &desired.resource_digests) }; if !has_errors(&diagnostics) { append_policy_binding_changes(&mut changes, prior_state.as_ref(), &desired); } // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. let artifacts = backend.list_approval_artifacts(&mut diagnostics); let approved = approved_resources( &artifacts, &changes, &desired.config_digest, &mut diagnostics, ); classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new(), &approved); // Embed real migration steps for schema updates so plan is a data-aware // preview; failures degrade to the digest diff with a warning. for change in &mut changes { if change.operation != PlanOperation::Update { continue; } let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else { continue; }; let graph_uri = display_path( &desired .config_dir .join(CLUSTER_GRAPHS_DIR) .join(format!("{graph_id}.omni")), ); let source_path = desired .resources .iter() .find(|resource| resource.address == change.resource) .and_then(|resource| resource.path.clone()); let preview = match source_path { Some(path) => preview_schema_migration(&graph_uri, &path).await, None => Err("no schema source recorded".to_string()), }; match preview { Ok(migration) => change.migration = Some(migration), Err(err) => diagnostics.push(Diagnostic::warning( "schema_preview_unavailable", change.resource.clone(), format!("could not preview the schema migration: {err}"), )), } } let blast_radius = compute_blast_radius(&changes, &desired.dependencies); let approvals_required = compute_approvals(&changes, &approved); let ok = !has_errors(&diagnostics); PlanOutput { ok, config_dir: display_path(&desired.config_dir), desired_revision: DesiredRevision { config_digest: Some(desired.config_digest), }, resource_digests: desired.resource_digests, dependencies: desired.dependencies, state_observations: observations, changes, blast_radius, approvals_required, diagnostics, } } /// Config-only `cluster apply` (Stage 3A): execute the query/policy subset of /// the plan against the local cluster catalog. The plan is recomputed under /// the state lock, so freshness is structural; the state CAS inside /// `write_state` is the second fence. Graph/schema changes are never executed /// here — they are deferred to the graph-lifecycle phase and reported loudly. /// /// Payloads are content-addressed and written BEFORE the state CAS because /// state is the publish point: a failure after payload writes leaves inert /// digest-named blobs and no success acknowledgement; re-running apply is the /// repair. /// Options for `cluster apply`. `actor` attributes graph-moving operations /// (recorded in sidecars and audit entries, threaded to the engine's /// `apply_schema_as` so Cedar enforcement fires wherever a policy checker is /// installed). #[derive(Debug, Clone, Default)] pub struct ApplyOptions { pub actor: Option, } pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { apply_config_dir_with_options(config_dir, ApplyOptions::default()).await } pub async fn apply_config_dir_with_options( config_dir: impl AsRef, options: ApplyOptions, ) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); let mut observations = backend.observations(); let actor_for_output = options.actor.clone(); let early_return = |config_dir: String, config_digest: Option, observations: StateObservations, changes: Vec, resource_statuses: BTreeMap, diagnostics: Vec| { ApplyOutput { ok: !has_errors(&diagnostics), config_dir, actor: actor_for_output.clone(), desired_revision: DesiredRevision { config_digest, }, state_observations: observations, changes, applied_count: 0, deferred_count: 0, converged: false, state_written: false, resource_statuses, diagnostics, } }; let Some(desired) = outcome.desired else { return early_return( display_path(&outcome.config_dir), None, observations, Vec::new(), BTreeMap::new(), diagnostics, ); }; if has_errors(&diagnostics) { return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, Vec::new(), BTreeMap::new(), diagnostics, ); } // Named guard: the lock must be held until the state outcome is recorded. let _lock_guard = if desired.state_lock { match backend.acquire_lock("apply", &mut observations) { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); None } } } else { diagnostics.push(Diagnostic::warning( "state_lock_disabled", "state.lock", "state.lock is false; apply wrote state without acquiring the cluster state lock", )); None }; if has_errors(&diagnostics) { return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, Vec::new(), BTreeMap::new(), diagnostics, ); } let snapshot = match backend.read_state(&mut observations) { Ok(snapshot) => snapshot, Err(diagnostic) => { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, Vec::new(), BTreeMap::new(), diagnostics, ); } }; let expected_cas = snapshot.state_cas; let Some(mut state) = snapshot.state else { diagnostics.push(Diagnostic::error( "state_missing", CLUSTER_STATE_FILE, "apply requires an existing state.json; run `cluster import` to bootstrap state", )); return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, Vec::new(), BTreeMap::new(), diagnostics, ); }; // Snapshot the as-read state BEFORE the sweep so sweep mutations count as // changes for the final dirty check and get persisted by the state CAS. let before_value = serde_json::to_value(&state).expect("cluster state must serialize deterministically"); let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await; let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); append_policy_binding_changes(&mut changes, Some(&state), &desired); let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics); let approved = approved_resources( &approval_artifacts, &changes, &desired.config_digest, &mut diagnostics, ); classify_changes( &mut changes, &desired.dependencies, &sweep.pending_graphs, &approved, ); // Defensive invariant: nothing the approval gate covers may be executable // WITHOUT a matching approval. Gated changes with a valid artifact are the // sanctioned exception (stage 4C). let approvals = compute_approvals(&changes, &approved); let approval_violation = changes.iter().any(|change| { change.disposition == Some(ApplyDisposition::Applied) && approvals .iter() .any(|approval| approval.resource == change.resource && !approval.satisfied) }); if approval_violation { diagnostics.push(Diagnostic::error( "apply_approval_invariant_violation", "changes", "an executable change requires approval; refusing to apply", )); return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, changes, state.resource_statuses, diagnostics, ); } // Graph creates execute first (RFC-004 §D5), sequentially, sidecar-fenced: // sidecar written before the init, rewritten with the post-init manifest // version, deleted only after the final state CAS lands. A failure stops // further graph-moving work and demotes that graph's dependents. let source_paths: BTreeMap<&str, &str> = desired .resources .iter() .filter_map(|resource| { resource .path .as_deref() .map(|path| (resource.address.as_str(), path)) }) .collect(); let graph_creates_to_run: Vec = changes .iter() .filter(|change| { change.disposition == Some(ApplyDisposition::Applied) && change.operation == PlanOperation::Create && matches!(resource_kind(&change.resource), ResourceKind::Graph(_)) }) .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) .collect(); let mut completed_op_sidecars: Vec = Vec::new(); let mut failed_graphs: BTreeMap = BTreeMap::new(); let mut graph_moving_aborted = false; for graph_id in &graph_creates_to_run { if graph_moving_aborted { // A prior create failed: stop graph-moving work (loud partials). diagnostics.push(Diagnostic::warning( "graph_create_skipped", graph_address(graph_id), "skipped after an earlier graph create failed in this run", )); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); continue; } let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else { continue; }; let graph_uri = display_path( &desired .config_dir .join(CLUSTER_GRAPHS_DIR) .join(format!("{graph_id}.omni")), ); let mut sidecar = RecoverySidecar { schema_version: 1, operation_id: Ulid::new().to_string(), started_at: now_rfc3339(), actor: options.actor.clone(), kind: RecoverySidecarKind::GraphCreate, graph_id: graph_id.clone(), graph_uri: graph_uri.clone(), observed_manifest_version: None, expected_manifest_version: None, desired_schema_digest: desired_graph.schema_digest.clone(), state_cas_base: expected_cas.clone(), approval_id: None, }; let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); graph_moving_aborted = true; continue; } }; if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_create") { // Simulated crash before the init: the sidecar stays for the // sweep (row 1: root absent -> intent removed next run). diagnostics.push(diagnostic); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); graph_moving_aborted = true; continue; } // Re-read + re-verify the schema source under the lock — the same // TOCTOU posture as write_resource_payload. let schema_source = source_paths .get(schema_address(graph_id).as_str()) .ok_or_else(|| { Diagnostic::error( "graph_create_failed", graph_address(graph_id), "no schema source recorded for graph", ) }) .and_then(|path| { fs::read_to_string(Path::new(path)).map_err(|err| { Diagnostic::error( "graph_create_failed", graph_address(graph_id), format!("could not read schema source '{path}': {err}"), ) }) }) .and_then(|source| { if sha256_hex(source.as_bytes()) == desired_graph.schema_digest { Ok(source) } else { Err(Diagnostic::error( "resource_content_changed", schema_address(graph_id), "schema source changed while apply was running; re-run `cluster apply`", )) } }); let schema_source = match schema_source { Ok(source) => source, Err(diagnostic) => { diagnostics.push(diagnostic); let _ = fs::remove_file(&sidecar_path); // nothing moved failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); graph_moving_aborted = true; continue; } }; match Omnigraph::init(&graph_uri, &schema_source).await { Ok(_) => {} Err(err) => { diagnostics.push(Diagnostic::error( "graph_create_failed", graph_address(graph_id), format!("could not initialize graph at '{graph_uri}': {err}"), )); // The sidecar stays: the sweep classifies whether the failed // init left a partial root (row 5) or nothing (row 1). failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); graph_moving_aborted = true; continue; } } // Record the post-init pin in the sidecar (best effort — a failure // here leaves expected = null and the sweep classifies by digest). if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await { if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await { sidecar.expected_manifest_version = Some(snapshot.version()); if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { diagnostics.push(diagnostic); } } } // Crash point: the graph exists, the cluster state does not record it // yet. A failure here must acknowledge nothing; the next run's sweep // rolls the ledger forward (row 4). if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_create") { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, changes, state.resource_statuses, diagnostics, ); } completed_op_sidecars.push(sidecar_path); } // Schema applies execute next (RFC-004 §D5): the first cluster operation // that moves an EXISTING graph manifest, sidecar-fenced the same way. let schema_updates_to_run: Vec = changes .iter() .filter(|change| { change.disposition == Some(ApplyDisposition::Applied) && change.operation == PlanOperation::Update && matches!(resource_kind(&change.resource), ResourceKind::Schema(_)) }) .filter_map(|change| change.resource.strip_prefix("schema.").map(str::to_string)) .collect(); for graph_id in &schema_updates_to_run { if graph_moving_aborted { diagnostics.push(Diagnostic::warning( "schema_apply_skipped", schema_address(graph_id), "skipped after an earlier graph-moving operation failed in this run", )); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); continue; } let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else { continue; }; let graph_uri = display_path( &desired .config_dir .join(CLUSTER_GRAPHS_DIR) .join(format!("{graph_id}.omni")), ); // Read-write open: the engine's own recovery sweep runs here, which // is exactly what we want before moving its manifest. let db = match Omnigraph::open(&graph_uri).await { Ok(db) => db, Err(err) => { diagnostics.push(Diagnostic::error( "schema_apply_failed", schema_address(graph_id), format!("could not open graph at '{graph_uri}': {err}"), )); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; } }; let observed_manifest_version = match db.snapshot_of(ReadTarget::branch("main")).await { Ok(snapshot) => Some(snapshot.version()), Err(_) => None, }; let mut sidecar = RecoverySidecar { schema_version: 1, operation_id: Ulid::new().to_string(), started_at: now_rfc3339(), actor: options.actor.clone(), kind: RecoverySidecarKind::SchemaApply, graph_id: graph_id.clone(), graph_uri: graph_uri.clone(), observed_manifest_version, expected_manifest_version: None, desired_schema_digest: desired_graph.schema_digest.clone(), state_cas_base: expected_cas.clone(), approval_id: None, }; let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; } }; if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") { // Simulated crash before the engine call: the sidecar stays; the // sweep retires it next run (ledger still consistent with live). diagnostics.push(diagnostic); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; } // Re-read + digest-verify the desired schema source under the lock. let schema_source = source_paths .get(schema_address(graph_id).as_str()) .ok_or_else(|| { Diagnostic::error( "schema_apply_failed", schema_address(graph_id), "no schema source recorded for graph", ) }) .and_then(|path| { fs::read_to_string(Path::new(path)).map_err(|err| { Diagnostic::error( "schema_apply_failed", schema_address(graph_id), format!("could not read schema source '{path}': {err}"), ) }) }) .and_then(|source| { if sha256_hex(source.as_bytes()) == desired_graph.schema_digest { Ok(source) } else { Err(Diagnostic::error( "resource_content_changed", schema_address(graph_id), "schema source changed while apply was running; re-run `cluster apply`", )) } }); let schema_source = match schema_source { Ok(source) => source, Err(diagnostic) => { diagnostics.push(diagnostic); let _ = fs::remove_file(&sidecar_path); // nothing moved failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; } }; // Soft drops only: allow_data_loss stays false until the approval // artifacts of stage 4C exist (RFC-004 §D4). match db .apply_schema_as( &schema_source, SchemaApplyOptions::default(), options.actor.as_deref(), ) .await { Ok(result) => { sidecar.expected_manifest_version = Some(result.manifest_version); if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { diagnostics.push(diagnostic); } } Err(err) => { diagnostics.push(Diagnostic::error( "schema_apply_failed", schema_address(graph_id), format!("schema apply failed on '{graph_uri}': {err}"), )); // Sidecar stays; the sweep retires it (live digest unchanged // == ledger consistent) or flags real movement. failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; } } // Crash point: the manifest moved, the ledger does not record it yet. // A failure here acknowledges nothing; the sweep rolls forward. if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_schema_apply") { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, changes, state.resource_statuses, diagnostics, ); } completed_op_sidecars.push(sidecar_path); } if !failed_graphs.is_empty() { demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies); } for change in &changes { match change.disposition { Some(ApplyDisposition::Deferred) => diagnostics.push(Diagnostic::warning( "apply_unsupported_change", change.resource.clone(), "graph/schema changes are not applied in this stage; they are deferred to the graph-lifecycle phase", )), Some(ApplyDisposition::Blocked) => diagnostics.push(Diagnostic::warning( "apply_dependency_blocked", change.resource.clone(), format!( "blocked by an unapplied or missing dependency ({})", change.reason.as_deref().unwrap_or("dependency") ), )), _ => {} } } // Payload phase: content-addressed writes before the state CAS. Any // failure aborts before state moves; blobs already written are inert. // Gate on payload-phase errors only — sweep errors (e.g. a kept row-5 // sidecar) must not abort the run, or their statuses would never persist. let errors_before_payloads = count_errors(&diagnostics); for change in &changes { if change.disposition != Some(ApplyDisposition::Applied) || change.operation == PlanOperation::Delete { continue; } let kind = resource_kind(&change.resource); let digest = change .after_digest .as_deref() .expect("create/update always carries an after digest"); let Some(target) = payload_path(&desired.config_dir, &kind, digest) else { continue; }; let Some(source) = source_paths.get(change.resource.as_str()) else { diagnostics.push(Diagnostic::error( "resource_payload_write_error", change.resource.clone(), "no source file recorded for resource", )); continue; }; if let Err(diagnostic) = write_resource_payload(&target, Path::new(source), digest, &change.resource) { diagnostics.push(diagnostic); } } if count_errors(&diagnostics) > errors_before_payloads { return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, changes, state.resource_statuses, diagnostics, ); } // Crash point: payloads are on disk, state has not moved. A failure here // must leave state.json byte-identical and acknowledge nothing; re-running // apply repairs via the skip-if-exists blob reuse. if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_payload_phase") { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, changes, state.resource_statuses, diagnostics, ); } // Approved graph deletes execute LAST (RFC-004 §D5): catalog writes for // surviving resources land first, then the irreversible work. let graph_deletes_to_run: Vec = changes .iter() .filter(|change| { change.disposition == Some(ApplyDisposition::Applied) && change.operation == PlanOperation::Delete && matches!(resource_kind(&change.resource), ResourceKind::Graph(_)) }) .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) .collect(); let mut executed_deletes: Vec<(String, Option)> = Vec::new(); // (graph_id, approval_id) let mut consumed_approval_ids: Vec = Vec::new(); for graph_id in &graph_deletes_to_run { if graph_moving_aborted { diagnostics.push(Diagnostic::warning( "graph_delete_skipped", graph_address(graph_id), "skipped after an earlier graph-moving operation failed in this run", )); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); continue; } let graph_addr = graph_address(graph_id); // Re-locate the consumable approval (classification verified one exists). let approval_id = approval_artifacts .iter() .map(|(_, artifact)| artifact) .find(|artifact| { artifact.consumed_at.is_none() && artifact.resource == graph_addr && artifact.bound_config_digest == desired.config_digest }) .map(|artifact| artifact.approval_id.clone()); let graph_uri = display_path( &desired .config_dir .join(CLUSTER_GRAPHS_DIR) .join(format!("{graph_id}.omni")), ); let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await { Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await { Ok(snapshot) => Some(snapshot.version()), Err(_) => None, }, Err(_) => None, // partial/unopenable roots still get deleted }; let sidecar = RecoverySidecar { schema_version: 1, operation_id: Ulid::new().to_string(), started_at: now_rfc3339(), actor: options.actor.clone(), kind: RecoverySidecarKind::GraphDelete, graph_id: graph_id.clone(), graph_uri: graph_uri.clone(), observed_manifest_version, expected_manifest_version: None, // no post-op manifest exists desired_schema_digest: String::new(), state_cas_base: expected_cas.clone(), approval_id: approval_id.clone(), }; let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); graph_moving_aborted = true; continue; } }; if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_delete") { // Simulated crash before removal: row 8 retires the intent and // the still-valid approval lets a later run retry. diagnostics.push(diagnostic); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); graph_moving_aborted = true; continue; } match fs::remove_dir_all(PathBuf::from(&graph_uri)) { Ok(()) => {} Err(err) if err.kind() == ErrorKind::NotFound => {} // already gone Err(err) => { diagnostics.push(Diagnostic::error( "graph_delete_failed", graph_addr.clone(), format!("could not remove graph root '{graph_uri}': {err}"), )); failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); graph_moving_aborted = true; continue; } } // Crash point: the root is gone, the ledger does not record it yet. // The sweep rolls forward (row 7b) and consumes the approval. if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_delete") { diagnostics.push(diagnostic); return early_return( display_path(&desired.config_dir), Some(desired.config_digest), observations, changes, state.resource_statuses, diagnostics, ); } executed_deletes.push((graph_id.clone(), approval_id.clone())); if let Some(approval_id) = approval_id { consumed_approval_ids.push(approval_id); } completed_op_sidecars.push(sidecar_path); } if !failed_graphs.is_empty() { demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies); } // State mutation. Apply owns query/policy statuses only; graph/schema // statuses belong to refresh/import observation and must not be clobbered // (the sweep above is the one exception: it owns recovery statuses). let mut new_state = state.clone(); for change in &changes { match change.disposition { Some(ApplyDisposition::Applied) => match change.operation { PlanOperation::Create | PlanOperation::Update => { new_state.applied_revision.resources.insert( change.resource.clone(), StateResource { digest: change .after_digest .clone() .expect("create/update always carries an after digest"), // Policies record their applied bindings so the // ledger is serving-sufficient (RFC-005 §D3). applies_to: desired .policy_bindings .get(&change.resource) .cloned(), }, ); set_resource_status_applied(&mut new_state, &change.resource); } PlanOperation::Delete => { new_state.applied_revision.resources.remove(&change.resource); new_state.resource_statuses.remove(&change.resource); } }, Some(ApplyDisposition::Blocked) => { // The sweep owns recovery statuses (Drifted/Error with their // conditions); a generic Blocked must not clobber them. if change.reason.as_deref() != Some("cluster_recovery_pending") { set_resource_status( &mut new_state, &change.resource, ResourceLifecycleStatus::Blocked, change.reason.as_deref().unwrap_or("dependency_not_applied"), "waiting on an unapplied or missing dependency", ); } } _ => {} } } for (graph_id, approval_id) in &executed_deletes { tombstone_graph_subtree( &mut new_state, graph_id, approval_id.as_deref(), options.actor.as_deref(), ); if let Some(approval_id) = approval_id { record_approval_consumed(&mut new_state, approval_id, "apply"); } } recompute_state_graph_digests(&mut new_state, &desired); let mut residual = diff_resources( &state_resource_digests(&new_state), &desired.resource_digests, ); append_policy_binding_changes(&mut residual, Some(&new_state), &desired); let converged = residual.is_empty(); if converged { new_state.applied_revision.config_digest = Some(desired.config_digest.clone()); } let after_value = serde_json::to_value(&new_state).expect("cluster state must serialize deterministically"); let mut state_written = false; let mut state_write_failed = false; if after_value != before_value { new_state.state_revision = new_state.state_revision.saturating_add(1); // The failpoint error routes through state_write_failed so the // persisted-statuses revert contract below is exercised; a cfg_callback // on this point can mutate state.json to simulate a concurrent writer, // making write_state's CAS check fail organically. let write_result = failpoints::maybe_fail("cluster_apply.before_state_write") .and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations)); match write_result { Ok(()) => state_written = true, Err(diagnostic) => { diagnostics.push(diagnostic); state_write_failed = true; } } } // Completed (rows 2/4) sweep sidecars are deleted only once their outcome // is durably recorded; on a failed write they stay and re-sweep next run. if !state_write_failed { for sidecar_path in sweep .completed_sidecars .iter() .chain(completed_op_sidecars.iter()) { let _ = fs::remove_file(sidecar_path); } let mut all_consumed = sweep.consumed_approvals.clone(); all_consumed.extend(consumed_approval_ids.iter().cloned()); mark_approvals_consumed(&backend, &all_consumed); } // On a failed state write, report the statuses that are actually on disk // (the pre-apply snapshot), not the in-memory mutations that were never // persisted — automation reading `resource_statuses` independently of `ok` // must not see phantom status updates. let resource_statuses = if state_write_failed { state.resource_statuses } else { new_state.resource_statuses }; let applied_count = changes .iter() .filter(|change| change.disposition == Some(ApplyDisposition::Applied)) .count(); let deferred_count = changes .iter() .filter(|change| { matches!( change.disposition, Some(ApplyDisposition::Deferred) | Some(ApplyDisposition::Blocked) ) }) .count(); ApplyOutput { ok: !has_errors(&diagnostics), config_dir: display_path(&desired.config_dir), actor: options.actor.clone(), desired_revision: DesiredRevision { config_digest: Some(desired.config_digest), }, state_observations: observations, changes, applied_count, deferred_count, converged, state_written, resource_statuses, diagnostics, } } /// Record a digest-bound human approval for a gated (irreversible) change — /// today: graph deletes. The artifact binds to the exact desired config /// digest and the change's before/after digests, so config or state drift /// invalidates it automatically (a stale approval can never authorize a /// different change). pub async fn approve_config_dir( config_dir: impl AsRef, resource: &str, approved_by: &str, ) -> ApproveOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); let mut observations = backend.observations(); let fail = |config_dir: String, diagnostics: Vec| ApproveOutput { ok: false, config_dir, approval_id: None, resource: None, operation: None, approved_by: None, diagnostics, }; let Some(desired) = outcome.desired else { return fail(display_path(&outcome.config_dir), diagnostics); }; if has_errors(&diagnostics) { return fail(display_path(&desired.config_dir), diagnostics); } let _lock_guard = if desired.state_lock { match backend.acquire_lock("approve", &mut observations) { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); return fail(display_path(&desired.config_dir), diagnostics); } } } else { diagnostics.push(Diagnostic::warning( "state_lock_disabled", "state.lock", "state.lock is false; approve ran without acquiring the cluster state lock", )); None }; let state = match backend.read_state(&mut observations) { Ok(snapshot) => match snapshot.state { Some(state) => state, None => { diagnostics.push(Diagnostic::error( "state_missing", CLUSTER_STATE_FILE, "approve requires an existing state.json; run `cluster import` first", )); return fail(display_path(&desired.config_dir), diagnostics); } }, Err(diagnostic) => { diagnostics.push(diagnostic); return fail(display_path(&desired.config_dir), diagnostics); } }; let prior_resources = state_resource_digests(&state); let changes = diff_resources(&prior_resources, &desired.resource_digests); let gates = compute_approvals(&changes, &BTreeSet::new()); let Some(change) = changes.iter().find(|change| { change.resource == resource && gates.iter().any(|gate| gate.resource == resource) }) else { diagnostics.push(Diagnostic::error( "approval_not_required", resource, "no pending change for this resource requires approval (check `cluster plan`)", )); return fail(display_path(&desired.config_dir), diagnostics); }; let artifact = ApprovalArtifact { schema_version: 1, approval_id: Ulid::new().to_string(), resource: change.resource.clone(), operation: match change.operation { PlanOperation::Create => "create", PlanOperation::Update => "update", PlanOperation::Delete => "delete", } .to_string(), reason: gates .iter() .find(|gate| gate.resource == resource) .map(|gate| gate.reason.clone()) .unwrap_or_default(), bound_config_digest: desired.config_digest.clone(), bound_before_digest: change.before_digest.clone(), bound_after_digest: change.after_digest.clone(), approved_by: approved_by.to_string(), created_at: now_rfc3339(), consumed_at: None, consumed_by_operation: None, }; if let Err(diagnostic) = backend.write_approval_artifact(&artifact) { diagnostics.push(diagnostic); return fail(display_path(&desired.config_dir), diagnostics); } ApproveOutput { ok: !has_errors(&diagnostics), config_dir: display_path(&desired.config_dir), approval_id: Some(artifact.approval_id), resource: Some(artifact.resource), operation: Some(change.operation.clone()), approved_by: Some(artifact.approved_by), diagnostics, } } pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; let backend = LocalStateBackend::new(&parsed.config_dir); let mut observations = backend.observations(); backend.observe_lock(&mut observations, &mut diagnostics); warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics); let mut resource_digests = BTreeMap::new(); let mut resource_statuses = BTreeMap::new(); let mut state_observation_records = BTreeMap::new(); if let Some(raw) = parsed.raw.as_ref() { let _settings = validate_cluster_header(raw, &mut diagnostics); if !has_errors(&diagnostics) { match backend.read_state(&mut observations) { Ok(snapshot) => { if let Some(state) = snapshot.state { // Read-only point-in-time catalog check: report the // findings as diagnostics; persisting Drifted statuses // is refresh's job. Status never writes state. for (address, finding) in verify_catalog_payloads(&parsed.config_dir, &state) { diagnostics.push(payload_finding_diagnostic(&address, &finding)); } resource_digests = state_resource_digests(&state); resource_statuses = state.resource_statuses; state_observation_records = state.observations; } else { diagnostics.push(Diagnostic::warning( "state_missing", CLUSTER_STATE_FILE, "state.json is missing; no applied cluster revision has been recorded", )); } } Err(diagnostic) => diagnostics.push(diagnostic), } } } StatusOutput { ok: !has_errors(&diagnostics), config_dir: display_path(&parsed.config_dir), state_observations: observations, resource_digests, resource_statuses, observations: state_observation_records, diagnostics, } } pub fn force_unlock_config_dir( config_dir: impl AsRef, lock_id: impl AsRef, ) -> ForceUnlockOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; let backend = LocalStateBackend::new(&parsed.config_dir); let mut observations = backend.observations(); let mut lock_removed = false; if let Some(raw) = parsed.raw.as_ref() { let _settings = validate_cluster_header(raw, &mut diagnostics); if !has_errors(&diagnostics) { match backend.force_unlock(lock_id.as_ref(), &mut observations) { Ok(()) => lock_removed = true, Err(diagnostic) => diagnostics.push(diagnostic), } } } ForceUnlockOutput { ok: !has_errors(&diagnostics), config_dir: display_path(&parsed.config_dir), state_observations: observations, lock_removed, diagnostics, } } pub async fn refresh_config_dir(config_dir: impl AsRef) -> StateSyncOutput { sync_config_dir(config_dir.as_ref(), StateSyncOperation::Refresh).await } pub async fn import_config_dir(config_dir: impl AsRef) -> StateSyncOutput { sync_config_dir(config_dir.as_ref(), StateSyncOperation::Import).await } async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput { let outcome = load_desired(config_dir); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); let mut observations = backend.observations(); let Some(desired) = outcome.desired else { return StateSyncOutput { ok: false, operation, config_dir: display_path(&outcome.config_dir), state_observations: observations, resource_digests: BTreeMap::new(), resource_statuses: BTreeMap::new(), observations: BTreeMap::new(), diagnostics, }; }; if has_errors(&diagnostics) { return StateSyncOutput { ok: false, operation, config_dir: display_path(&desired.config_dir), state_observations: observations, resource_digests: desired.resource_digests, resource_statuses: BTreeMap::new(), observations: BTreeMap::new(), diagnostics, }; } let operation_label = state_sync_operation_label(operation); let _lock_guard = if desired.state_lock { match backend.acquire_lock(operation_label, &mut observations) { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); None } } } else { diagnostics.push(Diagnostic::warning( "state_lock_disabled", "state.lock", format!( "state.lock is false; {operation_label} wrote state without acquiring the cluster state lock" ), )); None }; if has_errors(&diagnostics) { return StateSyncOutput { ok: false, operation, config_dir: display_path(&desired.config_dir), state_observations: observations, resource_digests: desired.resource_digests, resource_statuses: BTreeMap::new(), observations: BTreeMap::new(), diagnostics, }; } let snapshot = match backend.read_state(&mut observations) { Ok(snapshot) => snapshot, Err(diagnostic) => { diagnostics.push(diagnostic); return StateSyncOutput { ok: false, operation, config_dir: display_path(&desired.config_dir), state_observations: observations, resource_digests: desired.resource_digests, resource_statuses: BTreeMap::new(), observations: BTreeMap::new(), diagnostics, }; } }; let expected_cas = snapshot.state_cas; let mut state = match (operation, snapshot.state) { (StateSyncOperation::Refresh, Some(state)) => state, (StateSyncOperation::Refresh, None) => { diagnostics.push(Diagnostic::error( "state_missing", CLUSTER_STATE_FILE, "refresh requires an existing state.json; run `cluster import` to bootstrap state", )); return StateSyncOutput { ok: false, operation, config_dir: display_path(&desired.config_dir), state_observations: observations, resource_digests: BTreeMap::new(), resource_statuses: BTreeMap::new(), observations: BTreeMap::new(), diagnostics, }; } (StateSyncOperation::Import, Some(state)) => { diagnostics.push(Diagnostic::error( "state_already_exists", CLUSTER_STATE_FILE, "import creates initial state only when state.json is missing; use `cluster refresh` for an existing state ledger", )); return StateSyncOutput { ok: false, operation, config_dir: display_path(&desired.config_dir), state_observations: observations, resource_digests: state_resource_digests(&state), resource_statuses: state.resource_statuses, observations: state.observations, diagnostics, }; } (StateSyncOperation::Import, None) => initial_import_state(&desired), }; // Recovery sweep first (RFC-004 §D3): classify any interrupted graph // operation before observation/verification so a rolled-forward outcome // is what those passes see. let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await; // Catalog payload verification must run BEFORE graph observation: removing // a drifted query digest first means the live-graph composite recompute // below already excludes it, so the persisted graph. composite stays // consistent and the next plan shows exactly the create + derived update. for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) { diagnostics.push(payload_finding_diagnostic(&address, &finding)); match finding { PayloadFinding::Missing => { state.applied_revision.resources.remove(&address); set_resource_status( &mut state, &address, ResourceLifecycleStatus::Drifted, "payload_missing", "catalog payload blob is missing; re-run `cluster apply` to republish", ); } PayloadFinding::Mismatch { .. } => { state.applied_revision.resources.remove(&address); set_resource_status( &mut state, &address, ResourceLifecycleStatus::Drifted, "payload_mismatch", "catalog payload blob does not match the recorded digest; re-run `cluster apply` to republish", ); } // Transient IO must not trigger a spurious republish: keep the // digest, surface the error, let a later clean refresh converge. PayloadFinding::ReadError(error) => { set_resource_status( &mut state, &address, ResourceLifecycleStatus::Error, "payload_read_error", &error, ); } } } let graph_error_count = observe_declared_graphs(&desired, &mut state).await; if graph_error_count > 0 { diagnostics.push(Diagnostic::error( "graph_observation_error", CLUSTER_GRAPHS_DIR, format!("{graph_error_count} graph observation(s) failed"), )); } if operation == StateSyncOperation::Import && has_errors(&diagnostics) { return StateSyncOutput { ok: false, operation, config_dir: display_path(&desired.config_dir), state_observations: observations, resource_digests: state_resource_digests(&state), resource_statuses: state.resource_statuses, observations: state.observations, diagnostics, }; } if operation == StateSyncOperation::Import { state.state_revision = 1; } else { state.state_revision = state.state_revision.saturating_add(1); } match backend.write_state(&state, expected_cas.as_deref(), &mut observations) { Ok(()) => { // Completed sweep sidecars are deleted only after their outcome // is durably recorded; on failure they stay and re-sweep. for sidecar_path in &sweep.completed_sidecars { let _ = fs::remove_file(sidecar_path); } mark_approvals_consumed(&backend, &sweep.consumed_approvals); } Err(diagnostic) => diagnostics.push(diagnostic), } let resource_digests = state_resource_digests(&state); let ok = !has_errors(&diagnostics); StateSyncOutput { ok, operation, config_dir: display_path(&desired.config_dir), state_observations: observations, resource_digests, resource_statuses: state.resource_statuses, observations: state.observations, diagnostics, } } fn parse_cluster_config(config_dir: &Path) -> ParsedConfig { let config_dir = config_dir.to_path_buf(); let config_file = config_dir.join(CLUSTER_CONFIG_FILE); let mut diagnostics = Vec::new(); if !config_dir.is_dir() { diagnostics.push(Diagnostic::error( "config_dir_not_found", display_path(&config_dir), "`--config` must point at a directory containing cluster.yaml", )); return ParsedConfig { raw: None, diagnostics, config_dir, config_file, }; } let text = match fs::read_to_string(&config_file) { Ok(text) => text, Err(err) => { diagnostics.push(Diagnostic::error( "cluster_config_read_error", CLUSTER_CONFIG_FILE, format!("could not read cluster.yaml: {err}"), )); return ParsedConfig { raw: None, diagnostics, config_dir, config_file, }; } }; diagnostics.extend(duplicate_key_diagnostics(&text)); diagnostics.extend(future_field_diagnostics(&text)); if has_errors(&diagnostics) { return ParsedConfig { raw: None, diagnostics, config_dir, config_file, }; } let raw = match serde_yaml::from_str::(&text) { Ok(raw) => Some(raw), Err(err) => { diagnostics.push(Diagnostic::error( "invalid_cluster_yaml", CLUSTER_CONFIG_FILE, format!("could not parse cluster.yaml: {err}"), )); None } }; ParsedConfig { raw, diagnostics, config_dir, config_file, } } fn validate_cluster_header( raw: &RawClusterConfig, diagnostics: &mut Vec, ) -> ClusterSettings { if raw.version != 1 { diagnostics.push(Diagnostic::error( "unsupported_cluster_config_version", "version", format!( "unsupported cluster config version {}; this build supports version 1", raw.version ), )); } if let Some(name) = raw.metadata.name.as_deref() { if name.trim().is_empty() { diagnostics.push(Diagnostic::error( "empty_metadata_name", "metadata.name", "metadata.name must not be empty when provided", )); } } if let Some(backend) = raw.state.backend.as_deref() { if backend != "cluster" { diagnostics.push(Diagnostic::error( "unsupported_state_backend", "state.backend", "Stage 2C supports only omitted state.backend or `cluster`", )); } } ClusterSettings { state_lock: raw.state.lock.unwrap_or(true), } } fn parse_lock_file_for_unlock(text: &str) -> Result { let lock = serde_json::from_str::(text).map_err(|err| { Diagnostic::error( "invalid_state_lock", CLUSTER_LOCK_FILE, format!("could not parse state lock: {err}"), ) })?; if lock.version != 1 { return Err(Diagnostic::error( "unsupported_state_lock_version", CLUSTER_LOCK_FILE, format!("unsupported cluster state lock version {}", lock.version), )); } Ok(lock) } fn state_lock_held_message(observations: &StateObservations) -> String { match observations.lock_id.as_deref() { Some(lock_id) => format!( "cluster state lock already exists (lock id {lock_id}); run `omnigraph cluster force-unlock {lock_id}` only after confirming no cluster operation is active" ), None => "cluster state lock already exists; remove it only after confirming no cluster operation is active".to_string(), } } fn state_resource_digests(state: &ClusterState) -> BTreeMap { state .applied_revision .resources .iter() .map(|(address, resource)| (address.clone(), resource.digest.clone())) .collect() } fn initial_import_state(desired: &DesiredCluster) -> ClusterState { ClusterState { version: 1, state_revision: 0, applied_revision: AppliedRevisionState { config_digest: Some(desired.config_digest.clone()), resources: BTreeMap::new(), }, resource_statuses: BTreeMap::new(), approval_records: BTreeMap::new(), recovery_records: BTreeMap::new(), observations: BTreeMap::new(), } } async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { let mut graph_error_count = 0; for graph in &desired.graphs { let graph_address = graph_address(&graph.id); let schema_address = schema_address(&graph.id); let graph_path = desired .config_dir .join(CLUSTER_GRAPHS_DIR) .join(format!("{}.omni", graph.id)); let graph_uri = display_path(&graph_path); let observed_at = now_rfc3339(); if !graph_path.exists() { state.applied_revision.resources.remove(&graph_address); state.applied_revision.resources.remove(&schema_address); state.observations.insert( graph_address.clone(), graph_observation_json(GraphObservationJson { address: &graph_address, graph_uri: &graph_uri, observed_at: &observed_at, exists: false, manifest_version: None, schema_digest: None, desired_schema_digest: &graph.schema_digest, schema_matches_desired: Some(false), error: Some("derived graph root is missing"), }), ); set_resource_status( state, &graph_address, ResourceLifecycleStatus::Drifted, "graph_missing", "derived graph root is missing", ); set_resource_status( state, &schema_address, ResourceLifecycleStatus::Drifted, "graph_missing", "derived graph root is missing", ); continue; } match observe_live_graph(&graph_uri).await { Ok(observation) => { let schema_matches = observation.schema_digest == graph.schema_digest; state.applied_revision.resources.insert( schema_address.clone(), StateResource { digest: observation.schema_digest.clone(), applies_to: None, }, ); let query_digests = state_query_digests_for_graph(state, &graph.id); let graph_digest_value = graph_digest( &graph.id, Some(&observation.schema_digest), Some(&query_digests), ); state.applied_revision.resources.insert( graph_address.clone(), StateResource { digest: graph_digest_value, applies_to: None, }, ); state.observations.insert( graph_address.clone(), graph_observation_json(GraphObservationJson { address: &graph_address, graph_uri: &graph_uri, observed_at: &observed_at, exists: true, manifest_version: Some(observation.manifest_version), schema_digest: Some(observation.schema_digest.as_str()), desired_schema_digest: &graph.schema_digest, schema_matches_desired: Some(schema_matches), error: None, }), ); if schema_matches { set_resource_status_applied(state, &graph_address); set_resource_status_applied(state, &schema_address); } else { set_resource_status( state, &graph_address, ResourceLifecycleStatus::Drifted, "schema_mismatch", "live schema digest differs from desired schema digest", ); set_resource_status( state, &schema_address, ResourceLifecycleStatus::Drifted, "schema_mismatch", "live schema digest differs from desired schema digest", ); } } Err(error) => { graph_error_count += 1; state.observations.insert( graph_address.clone(), graph_observation_json(GraphObservationJson { address: &graph_address, graph_uri: &graph_uri, observed_at: &observed_at, exists: true, manifest_version: None, schema_digest: None, desired_schema_digest: &graph.schema_digest, schema_matches_desired: None, error: Some(error.as_str()), }), ); set_resource_status( state, &graph_address, ResourceLifecycleStatus::Error, "graph_observation_error", error.as_str(), ); set_resource_status( state, &schema_address, ResourceLifecycleStatus::Error, "graph_observation_error", error.as_str(), ); } } } graph_error_count } /// RFC-004 §D7: the data-aware preview — the engine's migration plan for a /// desired schema against the live graph, computed read-only (no lock). async fn preview_schema_migration( graph_uri: &str, schema_path: &str, ) -> Result { let source = fs::read_to_string(schema_path).map_err(|err| err.to_string())?; let db = Omnigraph::open_read_only(graph_uri) .await .map_err(|err| err.to_string())?; let preview = db .preview_schema_apply_with_options(&source, SchemaApplyOptions::default()) .await .map_err(|err| err.to_string())?; Ok(preview.plan) } struct LiveGraphObservation { manifest_version: u64, schema_digest: String, } async fn observe_live_graph(graph_uri: &str) -> Result { let db = Omnigraph::open_read_only(graph_uri) .await .map_err(|err| err.to_string())?; let snapshot = db .snapshot_of(ReadTarget::branch("main")) .await .map_err(|err| err.to_string())?; let schema_source = db.schema_source(); Ok(LiveGraphObservation { manifest_version: snapshot.version(), schema_digest: sha256_hex(schema_source.as_bytes()), }) } struct GraphObservationJson<'a> { address: &'a str, graph_uri: &'a str, observed_at: &'a str, exists: bool, manifest_version: Option, schema_digest: Option<&'a str>, desired_schema_digest: &'a str, schema_matches_desired: Option, error: Option<&'a str>, } fn graph_observation_json(observation: GraphObservationJson<'_>) -> serde_json::Value { json!({ "kind": "graph", "address": observation.address, "graph_uri": observation.graph_uri, "observed_at": observation.observed_at, "exists": observation.exists, "manifest_version": observation.manifest_version, "schema_digest": observation.schema_digest, "desired_schema_digest": observation.desired_schema_digest, "schema_matches_desired": observation.schema_matches_desired, "error": observation.error, }) } fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeMap { let prefix = format!("query.{graph_id}."); state .applied_revision .resources .iter() .filter_map(|(address, resource)| { address .strip_prefix(&prefix) .map(|name| (name.to_string(), resource.digest.clone())) }) .collect() } fn set_resource_status_applied(state: &mut ClusterState, address: &str) { state.resource_statuses.insert( address.to_string(), ResourceStatusRecord { status: ResourceLifecycleStatus::Applied, conditions: Vec::new(), message: None, }, ); } fn set_resource_status( state: &mut ClusterState, address: &str, status: ResourceLifecycleStatus, condition: &str, message: &str, ) { state.resource_statuses.insert( address.to_string(), ResourceStatusRecord { status, conditions: vec![condition.to_string()], message: Some(message.to_string()), }, ); } fn load_desired(config_dir: &Path) -> LoadOutcome { let parsed = parse_cluster_config(config_dir); let config_dir = parsed.config_dir; let config_file = parsed.config_file; let mut diagnostics = parsed.diagnostics; let Some(raw) = parsed.raw else { return LoadOutcome { desired: None, diagnostics, config_dir, config_file, }; }; let settings = validate_cluster_header(&raw, &mut diagnostics); let mut resources = BTreeMap::new(); let mut dependencies = BTreeSet::new(); let mut graph_query_digests: BTreeMap> = BTreeMap::new(); let mut graph_schema_digests: BTreeMap = BTreeMap::new(); for (graph_id, graph) in &raw.graphs { validate_id( "graph id", &format!("graphs.{graph_id}"), graph_id, &mut diagnostics, ); let graph_address = graph_address(graph_id); let schema_address = schema_address(graph_id); dependencies.insert(Dependency { from: schema_address.clone(), to: graph_address.clone(), }); let schema_path = resolve_config_path(&config_dir, &graph.schema); let schema_source = match fs::read_to_string(&schema_path) { Ok(source) => { let digest = sha256_hex(source.as_bytes()); graph_schema_digests.insert(graph_id.clone(), digest.clone()); resources.insert( schema_address.clone(), ResourceSummary { address: schema_address.clone(), kind: "schema".to_string(), digest, path: Some(display_path(&schema_path)), }, ); Some(source) } Err(err) => { diagnostics.push(Diagnostic::error( "schema_file_missing", format!("graphs.{graph_id}.schema"), format!( "could not read schema file '{}': {err}", schema_path.display() ), )); None } }; let catalog = schema_source.and_then(|source| match parse_schema(&source) { Ok(schema) => match build_catalog(&schema) { Ok(catalog) => Some(catalog), Err(err) => { diagnostics.push(Diagnostic::error( "schema_catalog_error", format!("graphs.{graph_id}.schema"), err.to_string(), )); None } }, Err(err) => { diagnostics.push(Diagnostic::error( "schema_parse_error", format!("graphs.{graph_id}.schema"), err.to_string(), )); None } }); let (graph_queries, query_contents) = resolve_query_decls(&config_dir, graph_id, &graph.queries, &mut diagnostics); for (query_name, query) in &graph_queries { validate_id( "query name", &format!("graphs.{graph_id}.queries.{query_name}"), query_name, &mut diagnostics, ); let query_address = query_address(graph_id, query_name); dependencies.insert(Dependency { from: query_address.clone(), to: graph_address.clone(), }); dependencies.insert(Dependency { from: query_address.clone(), to: schema_address.clone(), }); let query_path = resolve_config_path(&config_dir, &query.file); let source = match query_contents.get(&query.file) { Some(cached) => Ok(cached.clone()), None => fs::read_to_string(&query_path), }; match source { Ok(source) => { let digest = sha256_hex(source.as_bytes()); graph_query_digests .entry(graph_id.clone()) .or_default() .insert(query_name.clone(), digest.clone()); resources.insert( query_address.clone(), ResourceSummary { address: query_address, kind: "query".to_string(), digest, path: Some(display_path(&query_path)), }, ); validate_query_source( graph_id, query_name, &source, catalog.as_ref(), &mut diagnostics, ); } Err(err) => diagnostics.push(Diagnostic::error( "query_file_missing", format!("graphs.{graph_id}.queries.{query_name}.file"), format!( "could not read query file '{}': {err}", query_path.display() ), )), } } } for graph_id in raw.graphs.keys() { let digest = graph_digest( graph_id, graph_schema_digests.get(graph_id), graph_query_digests.get(graph_id), ); resources.insert( graph_address(graph_id), ResourceSummary { address: graph_address(graph_id), kind: "graph".to_string(), digest, path: None, }, ); } let mut policy_bindings: BTreeMap> = BTreeMap::new(); for (policy_name, policy) in &raw.policies { validate_id( "policy name", &format!("policies.{policy_name}"), policy_name, &mut diagnostics, ); if policy.applies_to.is_empty() { diagnostics.push(Diagnostic::error( "policy_missing_applies_to", format!("policies.{policy_name}.applies_to"), "policy.applies_to must name `cluster` or at least one graph", )); } let policy_address = policy_address(policy_name); let mut normalized_bindings: Vec = Vec::new(); for (idx, target) in policy.applies_to.iter().enumerate() { match normalize_policy_target(target) { PolicyTarget::Cluster => { normalized_bindings.push("cluster".to_string()); } PolicyTarget::Graph(graph_id) => { normalized_bindings.push(graph_address(&graph_id)); if raw.graphs.contains_key(&graph_id) { dependencies.insert(Dependency { from: policy_address.clone(), to: graph_address(&graph_id), }); } else { diagnostics.push(Diagnostic::error( "dangling_graph_reference", format!("policies.{policy_name}.applies_to[{idx}]"), format!( "policy references graph `{graph_id}`, but no graph with that id is declared" ), )); } } PolicyTarget::WrongKind(kind) => diagnostics.push(Diagnostic::error( "wrong_kind_reference", format!("policies.{policy_name}.applies_to[{idx}]"), format!("policy applies_to expects graph refs or `cluster`, got `{kind}`"), )), } } normalized_bindings.sort(); normalized_bindings.dedup(); policy_bindings.insert(policy_address.clone(), normalized_bindings); let policy_path = resolve_config_path(&config_dir, &policy.file); match fs::read(&policy_path) { Ok(bytes) => { resources.insert( policy_address.clone(), ResourceSummary { address: policy_address, kind: "policy".to_string(), digest: sha256_hex(&bytes), path: Some(display_path(&policy_path)), }, ); } Err(err) => diagnostics.push(Diagnostic::error( "policy_file_missing", format!("policies.{policy_name}.file"), format!( "could not read policy file '{}': {err}", policy_path.display() ), )), } } let mut resource_digests = BTreeMap::new(); let mut resource_list = Vec::new(); for (address, resource) in resources { resource_digests.insert(address, resource.digest.clone()); resource_list.push(resource); } let dependencies: Vec<_> = dependencies.into_iter().collect(); let graphs = raw .graphs .keys() .map(|graph_id| DesiredGraph { id: graph_id.clone(), schema_digest: graph_schema_digests .get(graph_id) .cloned() .unwrap_or_default(), }) .collect(); let config_digest = desired_config_digest(&raw, &resource_digests); LoadOutcome { desired: Some(DesiredCluster { config_dir: config_dir.clone(), config_digest, state_lock: settings.state_lock, graphs, resource_digests, resources: resource_list, dependencies, policy_bindings, }), diagnostics, config_dir, config_file, } } fn validate_query_source( graph_id: &str, query_name: &str, source: &str, catalog: Option<&omnigraph_compiler::catalog::Catalog>, diagnostics: &mut Vec, ) { let path = format!("graphs.{graph_id}.queries.{query_name}"); match parse_query(source) { Ok(query_file) => { let Some(query_decl) = query_file.queries.iter().find(|q| q.name == query_name) else { diagnostics.push(Diagnostic::error( "query_key_mismatch", path, format!("no `query {query_name}` declaration found in the referenced .gq file"), )); return; }; if let Some(catalog) = catalog { if let Err(err) = typecheck_query_decl(catalog, query_decl) { diagnostics.push(Diagnostic::error( "query_typecheck_error", format!("graphs.{graph_id}.queries.{query_name}"), err.to_string(), )); } } else { diagnostics.push(Diagnostic::warning( "query_typecheck_skipped", format!("graphs.{graph_id}.queries.{query_name}"), "query parsed, but type-check was skipped because the graph schema is invalid", )); } } Err(err) => diagnostics.push(Diagnostic::error( "query_parse_error", path, err.to_string(), )), } } /// Content-addressed catalog path for an applied resource payload. Extensions /// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name, /// so the catalog layout cannot drift with operator file conventions. fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option { let resources_dir = config_dir.join(CLUSTER_RESOURCES_DIR); match kind { ResourceKind::Query { graph, name } => Some( resources_dir .join("query") .join(graph) .join(name) .join(format!("{digest}.gq")), ), ResourceKind::Policy(name) => Some( resources_dir .join("policy") .join(name) .join(format!("{digest}.yaml")), ), _ => None, } } #[derive(Debug, PartialEq, Eq)] enum PayloadFinding { Missing, Mismatch { actual_digest: String }, ReadError(String), } /// Verify every catalog-backed resource digest in state against its /// content-addressed blob under `__cluster/resources/`. Graph, schema, and /// unknown addresses have no payloads and are skipped. Read-only; findings /// are deterministic (BTreeMap order). Payloads are small (queries, policy /// bundles), so a full digest re-hash is cheap. fn verify_catalog_payloads( config_dir: &Path, state: &ClusterState, ) -> Vec<(String, PayloadFinding)> { let mut findings = Vec::new(); for (address, resource) in &state.applied_revision.resources { let kind = resource_kind(address); let Some(path) = payload_path(config_dir, &kind, &resource.digest) else { continue; }; match fs::read(&path) { Ok(bytes) => { let actual_digest = sha256_hex(&bytes); if actual_digest != resource.digest { findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest })); } } Err(err) if err.kind() == ErrorKind::NotFound => { findings.push((address.clone(), PayloadFinding::Missing)); } Err(err) => { findings.push(( address.clone(), PayloadFinding::ReadError(format!( "could not read catalog payload '{}': {err}", path.display() )), )); } } } findings } fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagnostic { match finding { PayloadFinding::Missing => Diagnostic::warning( "catalog_payload_missing", address, "catalog payload blob is missing; re-run `cluster apply` to republish", ), PayloadFinding::Mismatch { actual_digest } => Diagnostic::warning( "catalog_payload_mismatch", address, format!( "catalog payload blob does not match the recorded digest (actual sha256:{actual_digest}); re-run `cluster apply` to republish" ), ), // An unverifiable blob must not report healthy. PayloadFinding::ReadError(error) => { Diagnostic::error("catalog_payload_read_error", address, error.clone()) } } } /// Write one content-addressed payload blob. Idempotent: an existing /// digest-named file is trusted as-is. The digest re-check is the apply-side /// TOCTOU detector — the source file changing between `load_desired` and the /// payload write must fail loudly, never publish mismatched content. fn write_resource_payload( target: &Path, source: &Path, expected_digest: &str, resource: &str, ) -> Result<(), Diagnostic> { if target.exists() { return Ok(()); } let bytes = fs::read(source).map_err(|err| { Diagnostic::error( "resource_payload_write_error", resource, format!("could not read resource source '{}': {err}", source.display()), ) })?; if sha256_hex(&bytes) != expected_digest { return Err(Diagnostic::error( "resource_content_changed", resource, format!( "resource source '{}' changed while apply was running; re-run `cluster apply`", source.display() ), )); } let parent = target.parent().expect("payload path always has a parent"); fs::create_dir_all(parent).map_err(|err| { Diagnostic::error( "resource_payload_write_error", resource, format!("could not create payload directory: {err}"), ) })?; let file_name = target .file_name() .expect("payload path always has a file name") .to_string_lossy(); let tmp_path = parent.join(format!("{file_name}.tmp.{}", Ulid::new())); let mut file = OpenOptions::new() .write(true) .create_new(true) .open(&tmp_path) .map_err(|err| { Diagnostic::error( "resource_payload_write_error", resource, format!("could not create temporary payload file: {err}"), ) })?; let write_result = file .write_all(&bytes) .and_then(|()| file.sync_all()) .map_err(|err| { Diagnostic::error( "resource_payload_write_error", resource, format!("could not write payload file: {err}"), ) }); drop(file); if let Err(diagnostic) = write_result { let _ = fs::remove_file(&tmp_path); return Err(diagnostic); } if let Err(err) = fs::rename(&tmp_path, target) { let _ = fs::remove_file(&tmp_path); return Err(Diagnostic::error( "resource_payload_write_error", resource, format!("could not move payload file into place: {err}"), )); } Ok(()) } /// Recompute the composite `graph.` digests for state-resident graphs from /// state's own schema/query components. Without this, an applied query change /// would leave the prior composite digest in state and `graph.` would show /// a phantom update in every later plan — apply could never converge. fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredCluster) { for graph in &desired.graphs { let graph_address = graph_address(&graph.id); if !state.applied_revision.resources.contains_key(&graph_address) { continue; } let schema_digest = state .applied_revision .resources .get(&schema_address(&graph.id)) .map(|resource| resource.digest.clone()); let query_digests = state_query_digests_for_graph(state, &graph.id); let digest = graph_digest(&graph.id, schema_digest.as_ref(), Some(&query_digests)); state .applied_revision .resources .insert(graph_address, StateResource { digest, applies_to: None }); } } fn duplicate_key_diagnostics(text: &str) -> Vec { #[derive(Debug)] struct Frame { indent: isize, path: String, keys: BTreeSet, } let mut diagnostics = Vec::new(); let mut stack = vec![Frame { indent: -1, path: String::new(), keys: BTreeSet::new(), }]; for (line_idx, line) in text.lines().enumerate() { let line_without_comment = strip_comment(line); if line_without_comment.trim().is_empty() { continue; } let indent = line_without_comment .chars() .take_while(|ch| *ch == ' ') .count() as isize; let trimmed = line_without_comment.trim_start(); if trimmed.starts_with('-') { continue; } let Some((raw_key, raw_value)) = trimmed.split_once(':') else { continue; }; let key = raw_key.trim(); if key.is_empty() || key.starts_with('{') || key.starts_with('[') { continue; } while stack.last().is_some_and(|frame| indent <= frame.indent) { stack.pop(); } let parent = stack.last_mut().expect("root frame is always present"); let full_path = if parent.path.is_empty() { key.to_string() } else { format!("{}.{}", parent.path, key) }; if !parent.keys.insert(key.to_string()) { diagnostics.push(Diagnostic::error( "duplicate_yaml_key", full_path.clone(), format!("duplicate YAML key `{key}` on line {}", line_idx + 1), )); } if raw_value.trim().is_empty() { stack.push(Frame { indent, path: full_path, keys: BTreeSet::new(), }); } } diagnostics } fn future_field_diagnostics(text: &str) -> Vec { let Ok(value) = serde_yaml::from_str::(text) else { return Vec::new(); }; let Some(mapping) = value.as_mapping() else { return Vec::new(); }; let future_fields = [ "apply", "env_file", "providers", "pipelines", "embeddings", "ui", "aliases", "bindings", ]; mapping .keys() .filter_map(|key| key.as_str()) .filter(|key| future_fields.contains(key)) .map(|key| { Diagnostic::error( "future_phase_field", key, format!("`{key}` is reserved for a later cluster-control phase"), ) }) .collect() } fn strip_comment(line: &str) -> String { let mut in_single_quote = false; let mut in_double_quote = false; let mut escaped = false; for (idx, ch) in line.char_indices() { if escaped { escaped = false; continue; } match ch { '\\' if in_double_quote => escaped = true, '\'' if !in_double_quote => in_single_quote = !in_single_quote, '"' if !in_single_quote => in_double_quote = !in_double_quote, '#' if !in_single_quote && !in_double_quote => return line[..idx].to_string(), _ => {} } } line.to_string() } fn validate_id(kind: &str, path: &str, value: &str, diagnostics: &mut Vec) { let mut chars = value.chars(); let valid = chars .next() .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_') && chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-'); if !valid { diagnostics.push(Diagnostic::error( "invalid_resource_id", path, format!("{kind} `{value}` must start with a letter or `_` and contain only ASCII letters, digits, `_`, or `-`"), )); } } enum PolicyTarget { Cluster, Graph(String), WrongKind(String), } fn normalize_policy_target(value: &str) -> PolicyTarget { if value == "cluster" { PolicyTarget::Cluster } else if let Some(graph_id) = value.strip_prefix("graph.") { PolicyTarget::Graph(graph_id.to_string()) } else if value.contains('.') { PolicyTarget::WrongKind(value.to_string()) } else { PolicyTarget::Graph(value.to_string()) } } fn graph_address(graph_id: &str) -> String { format!("graph.{graph_id}") } fn schema_address(graph_id: &str) -> String { format!("schema.{graph_id}") } fn query_address(graph_id: &str, query_name: &str) -> String { format!("query.{graph_id}.{query_name}") } fn policy_address(policy_name: &str) -> String { format!("policy.{policy_name}") } fn resolve_config_path(config_dir: &Path, path: &Path) -> PathBuf { if path.is_absolute() { path.to_path_buf() } else { config_dir.join(path) } } fn graph_digest( graph_id: &str, schema_digest: Option<&String>, query_digests: Option<&BTreeMap>, ) -> String { let mut input = format!( "graph\0{graph_id}\0schema\0{}\0", schema_digest.map_or("", String::as_str) ); if let Some(query_digests) = query_digests { for (name, digest) in query_digests { input.push_str("query\0"); input.push_str(name); input.push('\0'); input.push_str(digest); input.push('\0'); } } sha256_hex(input.as_bytes()) } fn desired_config_digest( raw: &RawClusterConfig, resource_digests: &BTreeMap, ) -> String { let mut input = String::from("cluster-config\0"); // Hash parsed semantics, not raw YAML bytes, so comments and formatting do // not create a new desired revision and the digest cannot drift from parse. let config_semantics = serde_json::to_string(raw).expect("raw cluster config must serialize deterministically"); input.push_str(&config_semantics); input.push('\0'); for (address, digest) in resource_digests { input.push_str(address); input.push('\0'); input.push_str(digest); input.push('\0'); } sha256_hex(input.as_bytes()) } fn sha256_hex(bytes: &[u8]) -> String { let digest = Sha256::digest(bytes); const HEX: &[u8; 16] = b"0123456789abcdef"; let mut out = String::with_capacity(digest.len() * 2); for byte in digest { out.push(HEX[(byte >> 4) as usize] as char); out.push(HEX[(byte & 0x0f) as usize] as char); } out } fn now_rfc3339() -> String { OffsetDateTime::now_utc() .format(&Rfc3339) .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()) } fn lock_age_seconds(created_at: &str) -> Option { let created_at = OffsetDateTime::parse(created_at, &Rfc3339).ok()?; Some( (OffsetDateTime::now_utc() - created_at) .whole_seconds() .max(0) as u64, ) } fn state_sync_operation_label(operation: StateSyncOperation) -> &'static str { match operation { StateSyncOperation::Refresh => "refresh", StateSyncOperation::Import => "import", } } fn has_errors(diagnostics: &[Diagnostic]) -> bool { diagnostics .iter() .any(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error) } fn count_errors(diagnostics: &[Diagnostic]) -> usize { diagnostics .iter() .filter(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error) .count() } fn display_path(path: &Path) -> String { path.display().to_string() } #[cfg(test)] #[path = "tests.rs"] mod tests;