diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index da3cc44..e9cff0c 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -3720,7 +3720,7 @@ async fn main() -> Result<()> { finish_cluster_approve(&output, json)?; } ClusterCommand::Status { config, json } => { - let output = status_config_dir(config); + let output = status_config_dir(config).await; finish_cluster_status(&output, json)?; } ClusterCommand::Refresh { config, json } => { @@ -3736,7 +3736,7 @@ async fn main() -> Result<()> { config, json, } => { - let output = force_unlock_config_dir(config, lock_id); + let output = force_unlock_config_dir(config, lock_id).await; finish_cluster_force_unlock(&output, json)?; } }, diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index b5f99c9..973de6d 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -23,6 +23,10 @@ serde_yaml = { workspace = true } sha2 = { workspace = true } thiserror = { workspace = true } time = { workspace = true } +# Runtime handle only — best-effort async lock release in +# StateLockGuard::drop on object-store backends (cluster commands always +# run inside the caller's tokio runtime). +tokio = { workspace = true } ulid = { workspace = true } [dev-dependencies] diff --git a/crates/omnigraph-cluster/src/diff.rs b/crates/omnigraph-cluster/src/diff.rs index e75db4d..593b2fa 100644 --- a/crates/omnigraph-cluster/src/diff.rs +++ b/crates/omnigraph-cluster/src/diff.rs @@ -142,7 +142,7 @@ pub(crate) fn compute_approvals( /// Near-misses — an artifact for the same resource whose bound digests no /// longer match — warn as `approval_stale` and never authorize anything. pub(crate) fn approved_resources( - artifacts: &[(PathBuf, ApprovalArtifact)], + artifacts: &[(String, ApprovalArtifact)], changes: &[PlanChange], config_digest: &str, diagnostics: &mut Vec, diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index dc66408..ec1a02a 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -25,7 +25,7 @@ mod diff; mod serve; mod sweep; mod store; -use store::{LocalStateBackend, StateLockGuard, StateSnapshot}; +use store::{ClusterStore, StateLockGuard, StateSnapshot}; pub use types::*; use types::*; pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot}; @@ -69,7 +69,7 @@ pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = LocalStateBackend::new(&outcome.config_dir); + let backend = ClusterStore::for_config_dir(&outcome.config_dir); let mut observations = backend.observations(); let Some(desired) = outcome.desired else { @@ -107,7 +107,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } let _lock_guard = if desired.state_lock { - match backend.acquire_lock("plan", &mut observations) { + match backend.acquire_lock("plan", &mut observations).await { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -130,7 +130,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let mut prior_resources = BTreeMap::new(); let mut prior_state: Option = None; if !has_errors(&diagnostics) { - match backend.read_state(&mut observations) { + match backend.read_state(&mut observations).await { Ok(snapshot) => { if let Some(state) = snapshot.state { prior_resources = state_resource_digests(&state); @@ -151,7 +151,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. - let artifacts = backend.list_approval_artifacts(&mut diagnostics); + let artifacts = backend.list_approval_artifacts(&mut diagnostics).await; let approved = approved_resources( &artifacts, &changes, @@ -242,7 +242,7 @@ pub async fn apply_config_dir_with_options( ) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = LocalStateBackend::new(&outcome.config_dir); + let backend = ClusterStore::for_config_dir(&outcome.config_dir); let mut observations = backend.observations(); let actor_for_output = options.actor.clone(); @@ -294,7 +294,7 @@ pub async fn apply_config_dir_with_options( // Named guard: the lock must be held until the state outcome is recorded. let _lock_guard = if desired.state_lock { - match backend.acquire_lock("apply", &mut observations) { + match backend.acquire_lock("apply", &mut observations).await { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -321,7 +321,7 @@ pub async fn apply_config_dir_with_options( ); } - let snapshot = match backend.read_state(&mut observations) { + let snapshot = match backend.read_state(&mut observations).await { Ok(snapshot) => snapshot, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -361,7 +361,7 @@ pub async fn apply_config_dir_with_options( let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); append_policy_binding_changes(&mut changes, Some(&state), &desired); - let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics); + let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics).await; let approved = approved_resources( &approval_artifacts, &changes, @@ -424,7 +424,7 @@ pub async fn apply_config_dir_with_options( }) .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) .collect(); - let mut completed_op_sidecars: Vec = Vec::new(); + let mut completed_op_sidecars: Vec = Vec::new(); let mut failed_graphs: BTreeMap = BTreeMap::new(); let mut graph_moving_aborted = false; for graph_id in &graph_creates_to_run { @@ -462,7 +462,7 @@ pub async fn apply_config_dir_with_options( state_cas_base: expected_cas.clone(), approval_id: None, }; - let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -514,7 +514,7 @@ pub async fn apply_config_dir_with_options( Ok(source) => source, Err(diagnostic) => { diagnostics.push(diagnostic); - let _ = fs::remove_file(&sidecar_path); // nothing moved + backend.delete_object(&sidecar_path).await; // nothing moved failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); graph_moving_aborted = true; continue; @@ -540,7 +540,7 @@ pub async fn apply_config_dir_with_options( if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await { if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await { sidecar.expected_manifest_version = Some(snapshot.version()); - if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { + if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await { diagnostics.push(diagnostic); } } @@ -626,7 +626,7 @@ pub async fn apply_config_dir_with_options( state_cas_base: expected_cas.clone(), approval_id: None, }; - let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -677,7 +677,7 @@ pub async fn apply_config_dir_with_options( Ok(source) => source, Err(diagnostic) => { diagnostics.push(diagnostic); - let _ = fs::remove_file(&sidecar_path); // nothing moved + backend.delete_object(&sidecar_path).await; // nothing moved failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; @@ -695,7 +695,7 @@ pub async fn apply_config_dir_with_options( { Ok(result) => { sidecar.expected_manifest_version = Some(result.manifest_version); - if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { + if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await { diagnostics.push(diagnostic); } } @@ -871,7 +871,7 @@ pub async fn apply_config_dir_with_options( state_cas_base: expected_cas.clone(), approval_id: approval_id.clone(), }; - let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1004,8 +1004,14 @@ pub async fn apply_config_dir_with_options( // persisted-statuses revert contract below is exercised; a cfg_callback // on this point can mutate state.json to simulate a concurrent writer, // making write_state's CAS check fail organically. - let write_result = failpoints::maybe_fail("cluster_apply.before_state_write") - .and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations)); + let write_result = match failpoints::maybe_fail("cluster_apply.before_state_write") { + Ok(()) => { + backend + .write_state(&new_state, expected_cas.as_deref(), &mut observations) + .await + } + Err(diagnostic) => Err(diagnostic), + }; match write_result { Ok(()) => state_written = true, Err(diagnostic) => { @@ -1017,16 +1023,16 @@ pub async fn apply_config_dir_with_options( // Completed (rows 2/4) sweep sidecars are deleted only once their outcome // is durably recorded; on a failed write they stay and re-sweep next run. if !state_write_failed { - for sidecar_path in sweep + for sidecar_uri in sweep .completed_sidecars .iter() .chain(completed_op_sidecars.iter()) { - let _ = fs::remove_file(sidecar_path); + backend.delete_object(sidecar_uri).await; } let mut all_consumed = sweep.consumed_approvals.clone(); all_consumed.extend(consumed_approval_ids.iter().cloned()); - mark_approvals_consumed(&backend, &all_consumed); + mark_approvals_consumed(&backend, &all_consumed).await; } // On a failed state write, report the statuses that are actually on disk // (the pre-apply snapshot), not the in-memory mutations that were never @@ -1082,7 +1088,7 @@ pub async fn approve_config_dir( ) -> ApproveOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = LocalStateBackend::new(&outcome.config_dir); + let backend = ClusterStore::for_config_dir(&outcome.config_dir); let mut observations = backend.observations(); let fail = |config_dir: String, diagnostics: Vec| ApproveOutput { @@ -1103,7 +1109,7 @@ pub async fn approve_config_dir( } let _lock_guard = if desired.state_lock { - match backend.acquire_lock("approve", &mut observations) { + match backend.acquire_lock("approve", &mut observations).await { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1119,7 +1125,7 @@ pub async fn approve_config_dir( None }; - let state = match backend.read_state(&mut observations) { + let state = match backend.read_state(&mut observations).await { Ok(snapshot) => match snapshot.state { Some(state) => state, None => { @@ -1174,7 +1180,7 @@ pub async fn approve_config_dir( consumed_at: None, consumed_by_operation: None, }; - if let Err(diagnostic) = backend.write_approval_artifact(&artifact) { + if let Err(diagnostic) = backend.write_approval_artifact(&artifact).await { diagnostics.push(diagnostic); return fail(display_path(&desired.config_dir), diagnostics); } @@ -1191,12 +1197,12 @@ pub async fn approve_config_dir( } -pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { +pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; - let backend = LocalStateBackend::new(&parsed.config_dir); + let backend = ClusterStore::for_config_dir(&parsed.config_dir); let mut observations = backend.observations(); - backend.observe_lock(&mut observations, &mut diagnostics); + backend.observe_lock(&mut observations, &mut diagnostics).await; warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics); let mut resource_digests = BTreeMap::new(); @@ -1206,7 +1212,7 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { if let Some(raw) = parsed.raw.as_ref() { let _settings = validate_cluster_header(raw, &mut diagnostics); if !has_errors(&diagnostics) { - match backend.read_state(&mut observations) { + match backend.read_state(&mut observations).await { Ok(snapshot) => { if let Some(state) = snapshot.state { // Read-only point-in-time catalog check: report the @@ -1244,20 +1250,20 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { } } -pub fn force_unlock_config_dir( +pub async fn force_unlock_config_dir( config_dir: impl AsRef, lock_id: impl AsRef, ) -> ForceUnlockOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; - let backend = LocalStateBackend::new(&parsed.config_dir); + let backend = ClusterStore::for_config_dir(&parsed.config_dir); let mut observations = backend.observations(); let mut lock_removed = false; if let Some(raw) = parsed.raw.as_ref() { let _settings = validate_cluster_header(raw, &mut diagnostics); if !has_errors(&diagnostics) { - match backend.force_unlock(lock_id.as_ref(), &mut observations) { + match backend.force_unlock(lock_id.as_ref(), &mut observations).await { Ok(()) => lock_removed = true, Err(diagnostic) => diagnostics.push(diagnostic), } @@ -1284,7 +1290,7 @@ pub async fn import_config_dir(config_dir: impl AsRef) -> StateSyncOutput async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput { let outcome = load_desired(config_dir); let mut diagnostics = outcome.diagnostics; - let backend = LocalStateBackend::new(&outcome.config_dir); + let backend = ClusterStore::for_config_dir(&outcome.config_dir); let mut observations = backend.observations(); let Some(desired) = outcome.desired else { @@ -1315,7 +1321,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St let operation_label = state_sync_operation_label(operation); let _lock_guard = if desired.state_lock { - match backend.acquire_lock(operation_label, &mut observations) { + match backend.acquire_lock(operation_label, &mut observations).await { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1346,7 +1352,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St }; } - let snapshot = match backend.read_state(&mut observations) { + let snapshot = match backend.read_state(&mut observations).await { Ok(snapshot) => snapshot, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1477,14 +1483,14 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St state.state_revision = state.state_revision.saturating_add(1); } - match backend.write_state(&state, expected_cas.as_deref(), &mut observations) { + match backend.write_state(&state, expected_cas.as_deref(), &mut observations).await { Ok(()) => { // Completed sweep sidecars are deleted only after their outcome // is durably recorded; on failure they stay and re-sweep. - for sidecar_path in &sweep.completed_sidecars { - let _ = fs::remove_file(sidecar_path); + for sidecar_uri in &sweep.completed_sidecars { + backend.delete_object(sidecar_uri).await; } - mark_approvals_consumed(&backend, &sweep.consumed_approvals); + mark_approvals_consumed(&backend, &sweep.consumed_approvals).await; } Err(diagnostic) => diagnostics.push(diagnostic), } diff --git a/crates/omnigraph-cluster/src/serve.rs b/crates/omnigraph-cluster/src/serve.rs index 0152bc4..8578aee 100644 --- a/crates/omnigraph-cluster/src/serve.rs +++ b/crates/omnigraph-cluster/src/serve.rs @@ -40,13 +40,15 @@ pub struct ServingSnapshot { /// failure is collected and the whole snapshot refused; no partial serving. /// Takes no lock: the state file is replaced atomically, so this reads a /// consistent point-in-time ledger. -pub fn read_serving_snapshot(config_dir: impl AsRef) -> Result> { +pub async fn read_serving_snapshot( + config_dir: impl AsRef, +) -> Result> { let config_dir = config_dir.as_ref().to_path_buf(); - let backend = LocalStateBackend::new(&config_dir); + let backend = ClusterStore::for_config_dir(&config_dir); let mut diagnostics: Vec = Vec::new(); // A ledger a sweep is about to rewrite must not start serving. - let sidecars = backend.list_recovery_sidecars(&mut diagnostics); + let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await; if !sidecars.is_empty() { diagnostics.push(Diagnostic::error( "cluster_recovery_pending", @@ -59,7 +61,7 @@ pub fn read_serving_snapshot(config_dir: impl AsRef) -> Result match snapshot.state { Some(state) => Some(state), None => { diff --git a/crates/omnigraph-cluster/src/store.rs b/crates/omnigraph-cluster/src/store.rs index 8a95661..bf328af 100644 --- a/crates/omnigraph-cluster/src/store.rs +++ b/crates/omnigraph-cluster/src/store.rs @@ -1,230 +1,446 @@ -//! The cluster's storage backend: state ledger, lock, recovery -//! sidecars, approval artifacts (moved verbatim from lib.rs in the -//! modularization). The object-storage port (RFC-006) lands here as a -//! follow-up — this module is the single home for stored-state I/O. +//! The cluster's storage layer: every stored byte (state ledger, lock, +//! recovery sidecars, approval artifacts, catalog payloads) goes through the +//! engine's `StorageAdapter`, so `file://` and `s3://` are one code path +//! (RFC-006). Declared configuration — `cluster.yaml` and the schema/query/ +//! policy sources it references — deliberately does NOT live here: config is +//! read from the operator's working tree (Terraform's config-local / +//! state-remote split). +//! +//! Raw `fs::*` for cluster state outside this module is a deny-list entry. -use super::*; +use std::path::Path; +use std::process; +use std::sync::Arc; -#[derive(Debug)] -pub(crate) struct LocalStateBackend { - state_dir: PathBuf, - state_path: PathBuf, - lock_path: PathBuf, - recoveries_dir: PathBuf, - approvals_dir: PathBuf, +use omnigraph::storage::{StorageAdapter, StorageKind, storage_for_uri, storage_kind_for_uri}; +use time::OffsetDateTime; +use time::format_description::well_known::Rfc3339; +use ulid::Ulid; + +use crate::{ + ApprovalArtifact, CLUSTER_APPROVALS_DIR, CLUSTER_LOCK_FILE, CLUSTER_RECOVERIES_DIR, + CLUSTER_RESOURCES_DIR, CLUSTER_STATE_FILE, ClusterState, Diagnostic, RecoverySidecar, + ResourceKind, StateLockFile, StateObservations, sha256_hex, +}; + +#[derive(Debug, Clone)] +pub(crate) struct ClusterStore { + adapter: Arc, + /// Normalized storage-root URI, no trailing slash: `file:///abs/dir` + /// (the default config-dir layout) or `s3://bucket/prefix`. + root: String, + /// What observations/diagnostics display for stored locations: the plain + /// local path for `file://` roots (byte-compatible with the pre-store + /// outputs), the URI otherwise. + display_root: String, } #[derive(Debug)] pub(crate) struct StateSnapshot { pub(crate) state: Option, + /// Content identity (`sha256:`) — the public CAS vocabulary. pub(crate) state_cas: Option, } #[derive(Debug)] pub(crate) struct StateLockGuard { - path: PathBuf, + adapter: Arc, + uri: String, + kind: StorageKind, } -impl LocalStateBackend { - pub(crate) fn new(config_dir: &Path) -> Self { - let state_dir = config_dir.join(CLUSTER_STATE_DIR); - Self { - state_path: config_dir.join(CLUSTER_STATE_FILE), - lock_path: config_dir.join(CLUSTER_LOCK_FILE), - recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR), - approvals_dir: config_dir.join(CLUSTER_APPROVALS_DIR), - state_dir, - } - } - - /// List approval artifacts in ULID (filename) order; unparseable files - /// warn and stay on disk for the operator. - pub(crate) fn list_approval_artifacts( - &self, - diagnostics: &mut Vec, - ) -> Vec<(PathBuf, ApprovalArtifact)> { - let mut paths = Vec::new(); - match fs::read_dir(&self.approvals_dir) { - Ok(entries) => { - for entry in entries.flatten() { - let path = entry.path(); - if path.extension().is_some_and(|ext| ext == "json") { - paths.push(path); - } +impl Drop for StateLockGuard { + fn drop(&mut self) { + match self.kind { + // Deterministic release on the file backend (tests assert the + // lock is gone the moment a command returns). + StorageKind::Local => { + let path = self.uri.trim_start_matches("file://"); + let _ = std::fs::remove_file(path); + } + // Object stores need an async delete; best-effort spawn. A crash + // here leaves the lock for `force-unlock` — same as a process + // kill, and the same recovery path. + StorageKind::S3 => { + let adapter = Arc::clone(&self.adapter); + let uri = self.uri.clone(); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(async move { + let _ = adapter.delete(&uri).await; + }); } } - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => diagnostics.push(Diagnostic::warning( - "approval_read_error", - CLUSTER_APPROVALS_DIR, - format!("could not list approval artifacts: {err}"), - )), } - paths.sort(); - let mut artifacts = Vec::new(); - for path in paths { - match fs::read_to_string(&path) - .map_err(|err| err.to_string()) - .and_then(|text| { - serde_json::from_str::(&text).map_err(|err| err.to_string()) - }) { - Ok(artifact) if artifact.schema_version == 1 => artifacts.push((path, artifact)), - Ok(artifact) => diagnostics.push(Diagnostic::warning( - "unsupported_approval_version", - display_path(&path), - format!( - "unsupported approval artifact version {}; leaving it in place", - artifact.schema_version - ), - )), + } +} + +impl ClusterStore { + /// The default layout: storage root = the config directory itself + /// (`file://`), byte-compatible with every pre-existing + /// cluster on disk. + pub(crate) fn for_config_dir(config_dir: &Path) -> Self { + let absolute = + std::path::absolute(config_dir).unwrap_or_else(|_| config_dir.to_path_buf()); + let display_root = absolute + .to_string_lossy() + .trim_end_matches('/') + .to_string(); + let root = format!("file://{display_root}"); + let adapter = storage_for_uri(&root) + .expect("local storage adapter construction is infallible for file:// roots"); + Self { + adapter, + root, + display_root, + } + } + + /// An explicit `storage:` root. `file://` URIs and plain paths normalize + /// to the local backend; `s3://bucket/prefix` to the S3 backend (env- + /// driven credentials/endpoint — the same contract as graph storage). + pub(crate) fn for_storage_root(root_uri: &str) -> Result { + let trimmed = root_uri.trim_end_matches('/'); + if storage_kind_for_uri(trimmed) == StorageKind::Local { + let path = trimmed.trim_start_matches("file://"); + return Ok(Self::for_config_dir(Path::new(path))); + } + let adapter = storage_for_uri(trimmed).map_err(|err| { + Diagnostic::error( + "storage_root_invalid", + "storage", + format!("could not initialize storage for '{root_uri}': {err}"), + ) + })?; + Ok(Self { + adapter, + root: trimmed.to_string(), + display_root: trimmed.to_string(), + }) + } + + pub(crate) fn kind(&self) -> StorageKind { + storage_kind_for_uri(&self.root) + } + + fn uri(&self, relative: &str) -> String { + format!("{}/{}", self.root, relative) + } + + fn display(&self, relative: &str) -> String { + format!("{}/{}", self.display_root, relative) + } + + /// Derived graph root for ``: `/graphs/.omni`. A plain + /// local path for `file://` roots (byte-compatible, directly usable by + /// the engine); the S3 URI the engine opens natively otherwise. + pub(crate) fn graph_root(&self, graph_id: &str) -> String { + match self.kind() { + StorageKind::Local => format!("{}/graphs/{graph_id}.omni", self.display_root), + StorageKind::S3 => format!("{}/graphs/{graph_id}.omni", self.root), + } + } + + /// `read_text_versioned`, returning None for a missing object (probed + /// via `exists` — the engine error type doesn't discriminate NotFound). + async fn read_versioned_opt(&self, uri: &str) -> Result, String> { + match self.adapter.exists(uri).await { + Ok(false) => return Ok(None), + Ok(true) => {} + Err(err) => return Err(err.to_string()), + } + self.adapter + .read_text_versioned(uri) + .await + .map(Some) + .map_err(|err| err.to_string()) + } + + /// JSON object write with the strongest atomicity the backend offers: + /// temp + rename on the filesystem (no torn JSON after a crash; the + /// pre-port behavior), a single atomic PUT on object stores (where + /// copy+delete would be weaker, not stronger). + async fn put_json(&self, relative: &str, payload: &str) -> Result<(), String> { + let target = self.uri(relative); + match self.kind() { + StorageKind::Local => { + let tmp = format!("{target}.tmp.{}", Ulid::new()); + self.adapter + .write_text(&tmp, payload) + .await + .map_err(|err| err.to_string())?; + if let Err(err) = self.adapter.rename_text(&tmp, &target).await { + let _ = self.adapter.delete(&tmp).await; + return Err(err.to_string()); + } + Ok(()) + } + StorageKind::S3 => self + .adapter + .write_text(&target, payload) + .await + .map_err(|err| err.to_string()), + } + } + + /// Shared list-and-parse for the sidecar/approval directories: id + /// (filename) order; unparseable objects warn and stay for the operator. + async fn list_json_dir( + &self, + dir: &str, + diagnostics: &mut Vec, + list_error_code: &'static str, + parse_error_code: &'static str, + version_ok: impl Fn(&T) -> bool, + version_error_code: &'static str, + ) -> Vec<(String, T)> { + let dir_uri = self.uri(dir); + let mut uris = match self.adapter.list_dir(&dir_uri).await { + Ok(uris) => uris, + Err(err) => { + diagnostics.push(Diagnostic::warning( + list_error_code, + dir, + format!("could not list '{dir}': {err}"), + )); + return Vec::new(); + } + }; + uris.retain(|uri| uri.ends_with(".json")); + uris.sort(); + let mut out = Vec::new(); + for uri in uris { + match self.adapter.read_text(&uri).await { + Ok(text) => match serde_json::from_str::(&text) { + Ok(value) if version_ok(&value) => out.push((uri, value)), + Ok(_) => diagnostics.push(Diagnostic::warning( + version_error_code, + uri.clone(), + "unsupported schema version; leaving it in place".to_string(), + )), + Err(err) => diagnostics.push(Diagnostic::warning( + parse_error_code, + uri.clone(), + format!("could not parse ({err}); leaving it in place"), + )), + }, Err(err) => diagnostics.push(Diagnostic::warning( - "invalid_approval_artifact", - display_path(&path), - format!("could not parse approval artifact ({err}); leaving it in place"), + parse_error_code, + uri.clone(), + format!("could not read ({err}); leaving it in place"), )), } } - artifacts + out } - /// Atomically write (or rewrite, e.g. on consumption) an approval artifact. - pub(crate) fn write_approval_artifact(&self, artifact: &ApprovalArtifact) -> Result { - fs::create_dir_all(&self.approvals_dir).map_err(|err| { - Diagnostic::error( - "approval_write_error", - CLUSTER_APPROVALS_DIR, - format!("could not create approvals directory: {err}"), - ) - })?; - let target = self - .approvals_dir - .join(format!("{}.json", artifact.approval_id)); + /// Best-effort object removal (sidecar retirement after a CAS lands, + /// lock cleanup) — failures are recoverable by the next sweep. + pub(crate) async fn delete_object(&self, uri: &str) { + let _ = self.adapter.delete(uri).await; + } + + /// Recursive prefix delete for graph roots (approved deletes). Idempotent; + /// S3 non-atomicity is tolerated by the delete protocol's retry shape. + pub(crate) async fn delete_graph_root(&self, graph_uri: &str) -> Result<(), String> { + self.adapter + .delete_prefix(graph_uri) + .await + .map_err(|err| err.to_string()) + } + + /// Existence probe for graph roots in sweep classification. A bare local + /// path or any URI works — resolved through the same adapter machinery + /// the engine uses. + pub(crate) async fn graph_root_exists(&self, graph_uri: &str) -> bool { + match storage_kind_for_uri(graph_uri) { + StorageKind::Local => Path::new(graph_uri.trim_start_matches("file://")).exists(), + StorageKind::S3 => match storage_for_uri(graph_uri) { + Ok(adapter) => !adapter + .list_dir(graph_uri) + .await + .map(|entries| entries.is_empty()) + .unwrap_or(true), + Err(_) => false, + }, + } + } + + // ---- approvals ---- + + pub(crate) async fn list_approval_artifacts( + &self, + diagnostics: &mut Vec, + ) -> Vec<(String, ApprovalArtifact)> { + self.list_json_dir( + CLUSTER_APPROVALS_DIR, + diagnostics, + "approval_read_error", + "invalid_approval_artifact", + |artifact: &ApprovalArtifact| artifact.schema_version == 1, + "unsupported_approval_version", + ) + .await + } + + pub(crate) async fn write_approval_artifact( + &self, + artifact: &ApprovalArtifact, + ) -> Result { + let relative = format!("{CLUSTER_APPROVALS_DIR}/{}.json", artifact.approval_id); let mut payload = serde_json::to_string_pretty(artifact).map_err(|err| { Diagnostic::error( "approval_write_error", - display_path(&target), + self.display(&relative), format!("could not encode approval artifact: {err}"), ) })?; payload.push('\n'); - let tmp_path = self - .approvals_dir - .join(format!("{}.json.tmp.{}", artifact.approval_id, Ulid::new())); - fs::write(&tmp_path, payload.as_bytes()).map_err(|err| { + self.put_json(&relative, &payload).await.map_err(|err| { Diagnostic::error( "approval_write_error", - display_path(&tmp_path), + self.display(&relative), format!("could not write approval artifact: {err}"), ) })?; - if let Err(err) = fs::rename(&tmp_path, &target) { - let _ = fs::remove_file(&tmp_path); - return Err(Diagnostic::error( - "approval_write_error", - display_path(&target), - format!("could not move approval artifact into place: {err}"), - )); - } - Ok(target) + Ok(self.uri(&relative)) } - /// List recovery sidecars in ULID (filename) order. Unparseable files are - /// reported as warnings and skipped — they stay on disk for the operator. - pub(crate) fn list_recovery_sidecars( + // ---- recovery sidecars ---- + + pub(crate) async fn list_recovery_sidecars( &self, diagnostics: &mut Vec, - ) -> Vec<(PathBuf, RecoverySidecar)> { - let mut paths = Vec::new(); - match fs::read_dir(&self.recoveries_dir) { - Ok(entries) => { - for entry in entries.flatten() { - let path = entry.path(); - if path.extension().is_some_and(|ext| ext == "json") { - paths.push(path); - } - } - } - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => { - diagnostics.push(Diagnostic::warning( - "recovery_sidecar_read_error", - CLUSTER_RECOVERIES_DIR, - format!("could not list recovery sidecars: {err}"), - )); - } - } - paths.sort(); - let mut sidecars = Vec::new(); - for path in paths { - match fs::read_to_string(&path) - .map_err(|err| err.to_string()) - .and_then(|text| { - serde_json::from_str::(&text).map_err(|err| err.to_string()) - }) { - Ok(sidecar) if sidecar.schema_version == 1 => sidecars.push((path, sidecar)), - Ok(sidecar) => diagnostics.push(Diagnostic::warning( - "unsupported_recovery_sidecar_version", - display_path(&path), - format!( - "unsupported recovery sidecar version {}; leaving it in place", - sidecar.schema_version - ), - )), - Err(err) => diagnostics.push(Diagnostic::warning( - "invalid_recovery_sidecar", - display_path(&path), - format!("could not parse recovery sidecar ({err}); leaving it in place"), - )), - } - } - sidecars + ) -> Vec<(String, RecoverySidecar)> { + self.list_json_dir( + CLUSTER_RECOVERIES_DIR, + diagnostics, + "recovery_sidecar_read_error", + "invalid_recovery_sidecar", + |sidecar: &RecoverySidecar| sidecar.schema_version == 1, + "unsupported_recovery_sidecar_version", + ) + .await } - /// Atomically write (or rewrite) a recovery sidecar; returns its path. - pub(crate) fn write_recovery_sidecar(&self, sidecar: &RecoverySidecar) -> Result { - fs::create_dir_all(&self.recoveries_dir).map_err(|err| { - Diagnostic::error( - "recovery_sidecar_write_error", - CLUSTER_RECOVERIES_DIR, - format!("could not create recoveries directory: {err}"), - ) - })?; - let target = self - .recoveries_dir - .join(format!("{}.json", sidecar.operation_id)); + pub(crate) async fn write_recovery_sidecar( + &self, + sidecar: &RecoverySidecar, + ) -> Result { + let relative = format!("{CLUSTER_RECOVERIES_DIR}/{}.json", sidecar.operation_id); let mut payload = serde_json::to_string_pretty(sidecar).map_err(|err| { Diagnostic::error( "recovery_sidecar_write_error", - display_path(&target), + self.display(&relative), format!("could not encode recovery sidecar: {err}"), ) })?; payload.push('\n'); - let tmp_path = self - .recoveries_dir - .join(format!("{}.json.tmp.{}", sidecar.operation_id, Ulid::new())); - fs::write(&tmp_path, payload.as_bytes()).map_err(|err| { + self.put_json(&relative, &payload).await.map_err(|err| { Diagnostic::error( "recovery_sidecar_write_error", - display_path(&tmp_path), + self.display(&relative), format!("could not write recovery sidecar: {err}"), ) })?; - if let Err(err) = fs::rename(&tmp_path, &target) { - let _ = fs::remove_file(&tmp_path); + Ok(self.uri(&relative)) + } + + // ---- catalog payloads ---- + + /// Content-addressed catalog location for a query/policy payload + /// (extensions fixed per kind, same as the pre-port layout). + pub(crate) fn payload_relative(kind: &ResourceKind, digest: &str) -> Option { + match kind { + ResourceKind::Query { graph, name } => Some(format!( + "{CLUSTER_RESOURCES_DIR}/query/{graph}/{name}/{digest}.gq" + )), + ResourceKind::Policy(name) => Some(format!( + "{CLUSTER_RESOURCES_DIR}/policy/{name}/{digest}.yaml" + )), + _ => None, + } + } + + pub(crate) fn payload_display(&self, kind: &ResourceKind, digest: &str) -> Option { + Self::payload_relative(kind, digest).map(|relative| self.display(&relative)) + } + + pub(crate) async fn payload_exists(&self, kind: &ResourceKind, digest: &str) -> bool { + let Some(relative) = Self::payload_relative(kind, digest) else { + return false; + }; + self.adapter + .exists(&self.uri(&relative)) + .await + .unwrap_or(false) + } + + /// Idempotent content-addressed write: a payload already present at its + /// digest is by definition identical. + pub(crate) async fn write_payload( + &self, + kind: &ResourceKind, + digest: &str, + content: &str, + ) -> Result<(), String> { + let Some(relative) = Self::payload_relative(kind, digest) else { + return Err("resource kind has no payload".to_string()); + }; + if self + .adapter + .exists(&self.uri(&relative)) + .await + .map_err(|err| err.to_string())? + { + return Ok(()); + } + self.put_json(&relative, content).await + } + + /// Read a catalog payload and verify it against its recorded digest. + pub(crate) async fn read_verified_payload( + &self, + kind: &ResourceKind, + digest: &str, + address: &str, + ) -> Result { + let Some(relative) = Self::payload_relative(kind, digest) else { return Err(Diagnostic::error( - "recovery_sidecar_write_error", - display_path(&target), - format!("could not move recovery sidecar into place: {err}"), + "catalog_payload_missing", + address, + "resource kind has no payload", + )); + }; + let uri = self.uri(&relative); + let text = self.adapter.read_text(&uri).await.map_err(|err| { + Diagnostic::error( + "catalog_payload_missing", + address, + format!( + "catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart", + self.display(&relative) + ), + ) + })?; + if sha256_hex(text.as_bytes()) != digest { + return Err(Diagnostic::error( + "catalog_payload_digest_mismatch", + address, + format!( + "catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart", + self.display(&relative) + ), )); } - Ok(target) + Ok(text) } + // ---- observations ---- + pub(crate) fn observations(&self) -> StateObservations { StateObservations { - state_path: display_path(&self.state_path), - lock_path: display_path(&self.lock_path), + state_path: self.display(CLUSTER_STATE_FILE), + lock_path: self.display(CLUSTER_LOCK_FILE), state_found: false, applied_config_digest: None, state_revision: 0, @@ -241,13 +457,16 @@ impl LocalStateBackend { } } - pub(crate) fn read_state( + // ---- state ledger ---- + + pub(crate) async fn read_state( &self, observations: &mut StateObservations, ) -> Result { - let text = match fs::read_to_string(&self.state_path) { - Ok(text) => text, - Err(err) if err.kind() == ErrorKind::NotFound => { + let state_uri = self.uri(CLUSTER_STATE_FILE); + let (text, _version) = match self.read_versioned_opt(&state_uri).await { + Ok(Some(read)) => read, + Ok(None) => { return Ok(StateSnapshot { state: None, state_cas: None, @@ -295,27 +514,32 @@ impl LocalStateBackend { }) } - pub(crate) fn write_state( + /// CAS-guarded ledger replace. The public contract stays content-level + /// (`expected_cas` = `sha256:` from the snapshot the command read); + /// the physical swap is token-conditioned on a fresh read, so a writer + /// that raced us between the fresh read and the put loses with + /// `state_cas_mismatch` — never a silent overwrite. On S3 the token is + /// the object's ETag and the put is conditional (If-Match); locally it + /// is a content token over the same temp+rename flow as before the port. + pub(crate) async fn write_state( &self, state: &ClusterState, expected_cas: Option<&str>, observations: &mut StateObservations, ) -> Result<(), Diagnostic> { - fs::create_dir_all(&self.state_dir).map_err(|err| { + let state_uri = self.uri(CLUSTER_STATE_FILE); + let current = self.read_versioned_opt(&state_uri).await.map_err(|err| { Diagnostic::error( "state_write_error", - CLUSTER_STATE_DIR, - format!("could not create cluster state directory: {err}"), + CLUSTER_STATE_FILE, + format!("could not read state file before write: {err}"), ) })?; - - let current_cas = self.current_state_cas()?; + let current_cas = current + .as_ref() + .map(|(text, _)| format!("sha256:{}", sha256_hex(text.as_bytes()))); if current_cas.as_deref() != expected_cas { - return Err(Diagnostic::error( - "state_cas_mismatch", - CLUSTER_STATE_FILE, - "state.json changed while the command was running; re-run the command against the latest state", - )); + return Err(state_cas_mismatch()); } let mut payload = serde_json::to_string_pretty(state).map_err(|err| { @@ -327,86 +551,51 @@ impl LocalStateBackend { })?; payload.push('\n'); - let tmp_path = self - .state_dir - .join(format!("state.json.tmp.{}", Ulid::new())); - let mut file = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tmp_path) - .map_err(|err| { - Diagnostic::error( - "state_write_error", - display_path(&tmp_path), - format!("could not create temporary state file: {err}"), - ) - })?; - file.write_all(payload.as_bytes()).map_err(|err| { - Diagnostic::error( - "state_write_error", - display_path(&tmp_path), - format!("could not write temporary state file: {err}"), - ) - })?; - file.sync_all().map_err(|err| { - Diagnostic::error( - "state_write_error", - display_path(&tmp_path), - format!("could not sync temporary state file: {err}"), - ) - })?; - drop(file); - - if let Err(err) = fs::rename(&tmp_path, &self.state_path) { - let _ = fs::remove_file(&tmp_path); - return Err(Diagnostic::error( - "state_write_error", - CLUSTER_STATE_FILE, - format!("could not replace state.json atomically: {err}"), - )); + let written = match current { + None => self + .adapter + .write_text_if_absent(&state_uri, &payload) + .await + .map_err(|err| { + Diagnostic::error( + "state_write_error", + CLUSTER_STATE_FILE, + format!("could not create state.json: {err}"), + ) + })?, + Some((_, version)) => self + .adapter + .write_text_if_match(&state_uri, &payload, &version) + .await + .map_err(|err| { + Diagnostic::error( + "state_write_error", + CLUSTER_STATE_FILE, + format!("could not replace state.json: {err}"), + ) + })? + .is_some(), + }; + if !written { + return Err(state_cas_mismatch()); } - let written = fs::read_to_string(&self.state_path).map_err(|err| { - Diagnostic::error( - "state_write_error", - CLUSTER_STATE_FILE, - format!("could not read state.json after write: {err}"), - ) - })?; observations.state_found = true; observations.applied_config_digest = state.applied_revision.config_digest.clone(); observations.state_revision = state.state_revision; - observations.state_cas = Some(format!("sha256:{}", sha256_hex(written.as_bytes()))); + observations.state_cas = Some(format!("sha256:{}", sha256_hex(payload.as_bytes()))); observations.resource_count = state.applied_revision.resources.len(); - Ok(()) } - pub(crate) fn current_state_cas(&self) -> Result, Diagnostic> { - match fs::read(&self.state_path) { - Ok(bytes) => Ok(Some(format!("sha256:{}", sha256_hex(&bytes)))), - Err(err) if err.kind() == ErrorKind::NotFound => Ok(None), - Err(err) => Err(Diagnostic::error( - "state_read_error", - CLUSTER_STATE_FILE, - format!("could not read state file for CAS check: {err}"), - )), - } - } + // ---- lock ---- - pub(crate) fn acquire_lock( + pub(crate) async fn acquire_lock( &self, operation: &str, observations: &mut StateObservations, ) -> Result { - fs::create_dir_all(&self.state_dir).map_err(|err| { - Diagnostic::error( - "state_lock_error", - CLUSTER_STATE_DIR, - format!("could not create cluster state directory: {err}"), - ) - })?; - + let lock_uri = self.uri(CLUSTER_LOCK_FILE); let lock_id = Ulid::new().to_string(); let lock = StateLockFile { version: 1, @@ -425,31 +614,18 @@ impl LocalStateBackend { ) })?; - match OpenOptions::new() - .write(true) - .create_new(true) - .open(&self.lock_path) - { - Ok(mut file) => { - if let Err(err) = file.write_all(payload.as_bytes()) { - // No guard exists yet, so clean up the create-new file here - // instead of leaving a stale partial lock for the next run. - drop(file); - let _ = fs::remove_file(&self.lock_path); - return Err(Diagnostic::error( - "state_lock_error", - CLUSTER_LOCK_FILE, - format!("could not write state lock: {err}"), - )); - } + match self.adapter.write_text_if_absent(&lock_uri, &payload).await { + Ok(true) => { observations.lock_acquired = true; - observations.acquired_lock_id = Some(lock_id.clone()); + observations.acquired_lock_id = Some(lock_id); Ok(StateLockGuard { - path: self.lock_path.clone(), + adapter: Arc::clone(&self.adapter), + uri: lock_uri, + kind: self.kind(), }) } - Err(err) if err.kind() == ErrorKind::AlreadyExists => { - self.observe_lock_metadata_lossy(observations); + Ok(false) => { + self.observe_lock_metadata_lossy(observations).await; Err(Diagnostic::error( "state_lock_held", CLUSTER_LOCK_FILE, @@ -459,23 +635,24 @@ impl LocalStateBackend { Err(err) => Err(Diagnostic::error( "state_lock_error", CLUSTER_LOCK_FILE, - format!("could not acquire state lock: {err}"), + format!("could not write state lock: {err}"), )), } } - pub(crate) fn force_unlock( + pub(crate) async fn force_unlock( &self, - requested_lock_id: &str, + lock_id: &str, observations: &mut StateObservations, ) -> Result<(), Diagnostic> { - let text = match fs::read_to_string(&self.lock_path) { - Ok(text) => text, - Err(err) if err.kind() == ErrorKind::NotFound => { + let lock_uri = self.uri(CLUSTER_LOCK_FILE); + let text = match self.read_versioned_opt(&lock_uri).await { + Ok(Some((text, _))) => text, + Ok(None) => { return Err(Diagnostic::error( "state_lock_missing", CLUSTER_LOCK_FILE, - "cluster state lock is not present; nothing was unlocked", + "no cluster state lock is present", )); } Err(err) => { @@ -486,42 +663,41 @@ impl LocalStateBackend { )); } }; - observations.locked = true; let lock = parse_lock_file_for_unlock(&text)?; observations.observe_lock_metadata(&lock); - - if lock.lock_id != requested_lock_id { + observations.locked = true; + if lock.lock_id != lock_id { return Err(Diagnostic::error( "state_lock_id_mismatch", CLUSTER_LOCK_FILE, format!( - "cluster state lock id is {}; refusing to unlock with requested id {requested_lock_id}", + "lock id mismatch: held lock is {}, refusing to remove (pass the exact id from `cluster status`)", lock.lock_id ), )); } - - fs::remove_file(&self.lock_path).map_err(|err| { + self.adapter.delete(&lock_uri).await.map_err(|err| { Diagnostic::error( - "state_unlock_error", + "state_lock_error", CLUSTER_LOCK_FILE, format!("could not remove state lock: {err}"), ) - }) + })?; + observations.locked = false; + Ok(()) } - pub(crate) fn observe_lock( + pub(crate) async fn observe_lock( &self, observations: &mut StateObservations, diagnostics: &mut Vec, ) { - if self.lock_path.exists() { - observations.locked = true; - match fs::read_to_string(&self.lock_path) { - Ok(text) => match serde_json::from_str::(&text) { - Ok(lock) if lock.version == 1 => { - observations.observe_lock_metadata(&lock); - } + let lock_uri = self.uri(CLUSTER_LOCK_FILE); + match self.read_versioned_opt(&lock_uri).await { + Ok(Some((text, _))) => { + observations.locked = true; + match serde_json::from_str::(&text) { + Ok(lock) if lock.version == 1 => observations.observe_lock_metadata(&lock), Ok(lock) => diagnostics.push(Diagnostic::warning( "unsupported_state_lock_version", CLUSTER_LOCK_FILE, @@ -532,19 +708,24 @@ impl LocalStateBackend { CLUSTER_LOCK_FILE, format!("could not parse state lock: {err}"), )), - }, - Err(err) => diagnostics.push(Diagnostic::warning( - "state_lock_read_error", - CLUSTER_LOCK_FILE, - format!("could not read state lock: {err}"), - )), + } } + Ok(None) => {} + Err(err) => diagnostics.push(Diagnostic::warning( + "state_lock_read_error", + CLUSTER_LOCK_FILE, + format!("could not read state lock: {err}"), + )), } } - pub(crate) fn observe_lock_metadata_lossy(&self, observations: &mut StateObservations) { + pub(crate) async fn observe_lock_metadata_lossy( + &self, + observations: &mut StateObservations, + ) { observations.locked = true; - if let Ok(text) = fs::read_to_string(&self.lock_path) { + let lock_uri = self.uri(CLUSTER_LOCK_FILE); + if let Ok(Some((text, _))) = self.read_versioned_opt(&lock_uri).await { if let Ok(lock) = serde_json::from_str::(&text) { if lock.version == 1 { observations.observe_lock_metadata(&lock); @@ -554,10 +735,12 @@ impl LocalStateBackend { } } -impl Drop for StateLockGuard { - fn drop(&mut self) { - let _ = fs::remove_file(&self.path); - } +fn state_cas_mismatch() -> Diagnostic { + Diagnostic::error( + "state_cas_mismatch", + CLUSTER_STATE_FILE, + "state.json changed while the command was running; re-run the command against the latest state", + ) } pub(crate) fn parse_lock_file_for_unlock(text: &str) -> Result { diff --git a/crates/omnigraph-cluster/src/sweep.rs b/crates/omnigraph-cluster/src/sweep.rs index 77ad8c5..2cfd7d1 100644 --- a/crates/omnigraph-cluster/src/sweep.rs +++ b/crates/omnigraph-cluster/src/sweep.rs @@ -11,12 +11,12 @@ use super::*; /// Mutations ride the calling command's CAS-checked state write; completed /// sidecars are deleted only after that write lands. pub(crate) async fn sweep_recovery_sidecars( - backend: &LocalStateBackend, + backend: &ClusterStore, state: &mut ClusterState, diagnostics: &mut Vec, ) -> SweepOutcome { let mut outcome = SweepOutcome::default(); - for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) { + for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await { match sidecar.kind { RecoverySidecarKind::GraphCreate => { sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; @@ -33,7 +33,7 @@ pub(crate) async fn sweep_recovery_sidecars( } pub(crate) async fn sweep_graph_create_sidecar( - path: PathBuf, + path: String, sidecar: RecoverySidecar, state: &mut ClusterState, diagnostics: &mut Vec, @@ -44,9 +44,11 @@ pub(crate) async fn sweep_graph_create_sidecar( let graph_path = PathBuf::from(&sidecar.graph_uri); // Row 1: nothing moved — the init never landed. The sidecar is pure - // intent; remove it and let the command's own plan re-propose the create. + // intent; retire it (deferred to the command's post-CAS cleanup, like + // every other completed sidecar — a failed CAS simply re-sweeps it) and + // let the command's own plan re-propose the create. if !graph_path.exists() { - let _ = fs::remove_file(&path); + outcome.completed_sidecars.push(path); return; } @@ -153,7 +155,7 @@ pub(crate) async fn sweep_graph_create_sidecar( } pub(crate) async fn sweep_schema_apply_sidecar( - path: PathBuf, + path: String, sidecar: RecoverySidecar, state: &mut ClusterState, diagnostics: &mut Vec, @@ -250,7 +252,7 @@ pub(crate) async fn sweep_schema_apply_sidecar( } pub(crate) fn sweep_graph_delete_sidecar( - path: PathBuf, + path: String, sidecar: RecoverySidecar, state: &mut ClusterState, diagnostics: &mut Vec, @@ -351,15 +353,15 @@ pub(crate) fn record_approval_consumed(state: &mut ClusterState, approval_id: &s } /// Mark approval artifact files consumed on disk (post-CAS). -pub(crate) fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) { +pub(crate) async fn mark_approvals_consumed(backend: &ClusterStore, approval_ids: &[String]) { if approval_ids.is_empty() { return; } let mut sink = Vec::new(); - for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) { + for (_, mut artifact) in backend.list_approval_artifacts(&mut sink).await { if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() { artifact.consumed_at = Some(now_rfc3339()); - let _ = backend.write_approval_artifact(&artifact); + let _ = backend.write_approval_artifact(&artifact).await; } } } diff --git a/crates/omnigraph-cluster/src/tests.rs b/crates/omnigraph-cluster/src/tests.rs index a03c522..3b7984d 100644 --- a/crates/omnigraph-cluster/src/tests.rs +++ b/crates/omnigraph-cluster/src/tests.rs @@ -351,8 +351,8 @@ policies: })); } - #[test] - fn extended_state_json_status_surfaces_statuses() { + #[tokio::test] + async fn extended_state_json_status_surfaces_statuses() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -380,7 +380,7 @@ policies: }"#; fs::write(state_dir.join("state.json"), state).unwrap(); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.state_observations.state_found); assert_eq!(out.state_observations.state_revision, 42); @@ -400,10 +400,10 @@ policies: ); } - #[test] - fn missing_state_status_succeeds_with_warning() { + #[tokio::test] + async fn missing_state_status_succeeds_with_warning() { let dir = fixture(); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.state_observations.state_found); assert_eq!(out.state_observations.state_revision, 0); @@ -414,14 +414,14 @@ policies: ); } - #[test] - fn invalid_state_status_fails() { + #[tokio::test] + async fn invalid_state_status_fails() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); fs::write(state_dir.join("state.json"), "{").unwrap(); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(!out.ok); assert!(out.state_observations.state_found); assert!( @@ -431,12 +431,12 @@ policies: ); } - #[test] - fn status_surfaces_full_lock_metadata() { + #[tokio::test] + async fn status_surfaces_full_lock_metadata() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "refresh"); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.state_observations.locked); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); @@ -452,12 +452,12 @@ policies: assert!(out.state_observations.lock_age_seconds.is_some()); } - #[test] - fn force_unlock_matching_id_removes_lock() { + #[tokio::test] + async fn force_unlock_matching_id_removes_lock() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "plan"); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.lock_removed); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); @@ -468,12 +468,12 @@ policies: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn force_unlock_wrong_id_fails_and_preserves_lock() { + #[tokio::test] + async fn force_unlock_wrong_id_fails_and_preserves_lock() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "plan"); - let out = force_unlock_config_dir(dir.path(), "other-lock"); + let out = force_unlock_config_dir(dir.path(), "other-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); @@ -485,11 +485,11 @@ policies: assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn force_unlock_missing_lock_fails() { + #[tokio::test] + async fn force_unlock_missing_lock_fails() { let dir = fixture(); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert!(!out.state_observations.locked); @@ -500,14 +500,14 @@ policies: ); } - #[test] - fn force_unlock_invalid_lock_json_fails_and_preserves_lock() { + #[tokio::test] + async fn force_unlock_invalid_lock_json_fails_and_preserves_lock() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); fs::write(state_dir.join("lock.json"), "{").unwrap(); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert!( @@ -518,8 +518,8 @@ policies: assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() { + #[tokio::test] + async fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -529,7 +529,7 @@ policies: ) .unwrap(); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert!( @@ -540,8 +540,8 @@ policies: assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn force_unlock_external_state_backend_rejected() { + #[tokio::test] + async fn force_unlock_external_state_backend_rejected() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "plan"); fs::write( @@ -557,7 +557,7 @@ graphs: ) .unwrap(); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert!( @@ -582,7 +582,7 @@ graphs: .any(|diagnostic| diagnostic.code == "state_lock_held") ); - let unlocked = force_unlock_config_dir(dir.path(), "held-lock"); + let unlocked = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(unlocked.ok, "{:?}", unlocked.diagnostics); let out = plan_config_dir(dir.path()).await; @@ -1886,7 +1886,7 @@ graphs: let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); fs::remove_file(&blob).unwrap(); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.diagnostics.iter().any(|diagnostic| { diagnostic.code == "catalog_payload_missing" @@ -2001,7 +2001,7 @@ graphs: assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); assert_eq!(fs::read_to_string(&blob).unwrap(), original); - let status = status_config_dir(dir.path()); + let status = status_config_dir(dir.path()).await; assert!( !status .diagnostics @@ -2012,12 +2012,12 @@ graphs: ); } - #[test] - fn verification_skips_graph_and_schema_resources() { + #[tokio::test] + async fn verification_skips_graph_and_schema_resources() { let dir = fixture(); write_applyable_state(dir.path()); // graph + schema digests only, no blobs - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!( !out.diagnostics .iter() @@ -2770,7 +2770,7 @@ policies: let converge = apply_config_dir(dir.path()).await; assert!(converge.converged, "{converge:?}"); - let snapshot = read_serving_snapshot(dir.path()).expect("converged cluster must serve"); + let snapshot = read_serving_snapshot(dir.path()).await.expect("converged cluster must serve"); assert_eq!(snapshot.graphs.len(), 1); assert_eq!(snapshot.graphs[0].graph_id, "knowledge"); assert!(snapshot.graphs[0].root.ends_with("graphs/knowledge.omni")); @@ -2782,10 +2782,10 @@ policies: assert!(snapshot.policies[0].blob_path.exists()); } - #[test] - fn serving_snapshot_refuses_missing_state() { + #[tokio::test] + async fn serving_snapshot_refuses_missing_state() { let dir = fixture(); - let err = read_serving_snapshot(dir.path()).unwrap_err(); + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( err.iter().any(|diagnostic| diagnostic.code == "cluster_state_missing"), "{err:?}" @@ -2800,7 +2800,7 @@ policies: apply_config_dir(dir.path()).await; write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SERVE"); - let err = read_serving_snapshot(dir.path()).unwrap_err(); + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( err.iter().any(|diagnostic| diagnostic.code == "cluster_recovery_pending"), "{err:?}" @@ -2814,7 +2814,7 @@ policies: write_applyable_state(dir.path()); apply_config_dir(dir.path()).await; // Tamper with the query blob... - let snapshot = read_serving_snapshot(dir.path()).unwrap(); + let snapshot = read_serving_snapshot(dir.path()).await.unwrap(); let desired = validate_config_dir(dir.path()); let query_digest = &desired.resource_digests["query.knowledge.find_person"]; let blob = dir @@ -2838,7 +2838,7 @@ policies: ) .unwrap(); - let err = read_serving_snapshot(dir.path()).unwrap_err(); + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( err.iter() .any(|diagnostic| diagnostic.code == "catalog_payload_digest_mismatch"), @@ -2851,12 +2851,12 @@ policies: let _ = snapshot; // the pre-tamper read succeeded } - #[test] - fn serving_snapshot_refuses_empty_cluster() { + #[tokio::test] + async fn serving_snapshot_refuses_empty_cluster() { let dir = fixture(); write_state_resources(dir.path(), &[]); // state exists, no graphs - let err = read_serving_snapshot(dir.path()).unwrap_err(); + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( err.iter().any(|diagnostic| diagnostic.code == "cluster_empty"), "{err:?}" @@ -2972,13 +2972,13 @@ policies: ); } - #[test] - fn status_warns_on_pending_recovery_sidecar() { + #[tokio::test] + async fn status_warns_on_pending_recovery_sidecar() { let dir = fixture(); write_applyable_state(dir.path()); write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01STATUS"); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!( out.diagnostics diff --git a/crates/omnigraph-cluster/src/types.rs b/crates/omnigraph-cluster/src/types.rs index c366f04..ca960a5 100644 --- a/crates/omnigraph-cluster/src/types.rs +++ b/crates/omnigraph-cluster/src/types.rs @@ -503,7 +503,8 @@ pub(crate) struct SweepOutcome { pub(crate) pending_graphs: BTreeSet, /// Sidecars whose outcome is recorded (rows 2/4): deleted only after the /// command's state write lands, so a CAS failure re-sweeps them. - pub(crate) completed_sidecars: Vec, + /// Store URIs (the storage layer addresses everything by URI). + pub(crate) completed_sidecars: Vec, /// Approval artifacts consumed by a roll-forward (delete row 7b): their /// files are rewritten with consumed_at only after the state write lands. pub(crate) consumed_approvals: Vec, diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 0038674..f7fc6b1 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -893,12 +893,12 @@ fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> St /// catalog blob content, policy bundles from blob paths with their applied /// bindings. Always multi-graph routing. The unauthenticated/env handling /// matches the omnigraph.yaml path. -fn load_cluster_settings( +async fn load_cluster_settings( cluster_dir: &PathBuf, cli_bind: Option, cli_allow_unauthenticated: bool, ) -> Result { - let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).map_err(|diagnostics| { + let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).await.map_err(|diagnostics| { let details = diagnostics .iter() .map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message)) @@ -988,7 +988,7 @@ fn load_cluster_settings( }) } -pub fn load_server_settings( +pub async fn load_server_settings( config_path: Option<&PathBuf>, cli_cluster: Option<&PathBuf>, cli_uri: Option, @@ -1005,7 +1005,7 @@ pub fn load_server_settings( "--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)" ); } - return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated); + return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await; } let config = load_config(config_path)?; let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string()); @@ -3363,8 +3363,8 @@ mod tests { ); } - #[test] - fn server_settings_load_from_yaml_config() { + #[tokio::test] + async fn server_settings_load_from_yaml_config() { let temp = tempdir().unwrap(); let config = temp.path().join("omnigraph.yaml"); fs::write( @@ -3380,7 +3380,7 @@ server: ) .unwrap(); - let settings = load_server_settings(Some(&config), None, None, None, None, false).unwrap(); + let settings = load_server_settings(Some(&config), None, None, None, None, false).await.unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { assert_eq!(uri, "/tmp/demo.omni"); @@ -3391,8 +3391,8 @@ server: assert_eq!(settings.bind, "0.0.0.0:9090"); } - #[test] - fn server_settings_cli_flags_override_yaml_config() { + #[tokio::test] + async fn server_settings_cli_flags_override_yaml_config() { let temp = tempdir().unwrap(); let config = temp.path().join("omnigraph.yaml"); fs::write( @@ -3416,6 +3416,7 @@ server: Some("0.0.0.0:9999".to_string()), false, ) + .await .unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { @@ -3427,8 +3428,8 @@ server: assert_eq!(settings.bind, "0.0.0.0:9999"); } - #[test] - fn server_settings_can_resolve_named_target() { + #[tokio::test] + async fn server_settings_can_resolve_named_target() { let temp = tempdir().unwrap(); let config = temp.path().join("omnigraph.yaml"); fs::write( @@ -3448,6 +3449,7 @@ server: let settings = load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false) + .await .unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { @@ -3458,9 +3460,9 @@ server: } } - #[test] - fn server_settings_require_uri_from_cli_or_config() { - let error = load_server_settings(None, None, None, None, None, false).unwrap_err(); + #[tokio::test] + async fn server_settings_require_uri_from_cli_or_config() { + let error = load_server_settings(None, None, None, None, None, false).await.unwrap_err(); assert!( error.to_string().contains("no graph to serve"), "expected mode-inference error, got: {error}", @@ -3598,9 +3600,9 @@ server: ); } - #[test] + #[tokio::test] #[serial] - fn unauthenticated_env_var_classification() { + async fn unauthenticated_env_var_classification() { // MR-723 PR A: closes the gap where the env-var read path inside // `load_server_settings` was structurally implemented but not // exercised by any test. Three properties to pin, all in one @@ -3627,7 +3629,7 @@ server: // Truthy values flip Open mode on, even with CLI flag off. for value in ["1", "true", "yes", "TRUE", "anything"] { let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]); - let settings = load_server_settings(Some(&config_path), None, None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await .expect("settings load should succeed"); assert!( settings.allow_unauthenticated, @@ -3638,7 +3640,7 @@ server: // Falsy values keep refusal behavior, even with CLI flag off. for value in ["0", "false", "FALSE", ""] { let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]); - let settings = load_server_settings(Some(&config_path), None, None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await .expect("settings load should succeed"); assert!( !settings.allow_unauthenticated, @@ -3648,7 +3650,7 @@ server: // Unset env var: also false. let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]); - let settings = load_server_settings(Some(&config_path), None, None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await .expect("settings load should succeed"); assert!( !settings.allow_unauthenticated, @@ -3659,7 +3661,7 @@ server: // CLI flag wins even when env is falsy — `serve()` honors the // OR of both inputs. let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true) + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await .expect("settings load should succeed"); assert!( settings.allow_unauthenticated, diff --git a/crates/omnigraph-server/src/main.rs b/crates/omnigraph-server/src/main.rs index c71ea2f..9000910 100644 --- a/crates/omnigraph-server/src/main.rs +++ b/crates/omnigraph-server/src/main.rs @@ -43,6 +43,7 @@ async fn main() -> Result<()> { cli.target, cli.bind, cli.unauthenticated, - )?; + ) + .await?; serve(settings).await } diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 7858587..d11c542 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -5567,8 +5567,8 @@ mod multi_graph_startup { /// `GraphId` validation runs at startup — a reserved name in /// `omnigraph.yaml` produces a clear error rather than getting /// rejected per-request. - #[test] - fn load_server_settings_rejects_reserved_graph_id() { + #[tokio::test] + async fn load_server_settings_rejects_reserved_graph_id() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5580,7 +5580,7 @@ graphs: "#, ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, None, false).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, false).await.unwrap_err(); assert!( err.to_string().contains("invalid graph id 'policies'"), "expected reserved-name rejection, got: {err}" @@ -5644,8 +5644,8 @@ graphs: // ── Four-rule mode inference matrix ─────────────────────────────── /// Rule 1: CLI positional URI → Single. - #[test] - fn mode_inference_cli_uri_is_single() { + #[tokio::test] + async fn mode_inference_cli_uri_is_single() { let settings = load_server_settings( None, None, @@ -5654,6 +5654,7 @@ graphs: None, true, // allow unauth so we get past the runtime-state check ) + .await .unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/cli.omni"), @@ -5662,8 +5663,8 @@ graphs: } /// Rule 2: --target picks one graph from `graphs:` map → Single. - #[test] - fn mode_inference_cli_target_is_single() { + #[tokio::test] + async fn mode_inference_cli_target_is_single() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5679,6 +5680,7 @@ graphs: .unwrap(); let settings = load_server_settings(Some(&config_path), None, None, Some("alpha".into()), None, true) + .await .unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"), @@ -5687,8 +5689,8 @@ graphs: } /// Rule 3: `server.graph` set → Single (target picked from config). - #[test] - fn mode_inference_server_graph_is_single() { + #[tokio::test] + async fn mode_inference_server_graph_is_single() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5704,7 +5706,7 @@ server: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"), ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"), @@ -5712,8 +5714,8 @@ server: } /// Rule 4: `--config` + non-empty `graphs:` + no single-mode selector → Multi. - #[test] - fn mode_inference_config_plus_graphs_is_multi() { + #[tokio::test] + async fn mode_inference_config_plus_graphs_is_multi() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5727,7 +5729,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { ServerConfigMode::Multi { graphs, .. } => { let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect(); @@ -5738,8 +5740,8 @@ graphs: } } - #[test] - fn mode_inference_multi_rejects_top_level_policy_file() { + #[tokio::test] + async fn mode_inference_multi_rejects_top_level_policy_file() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5753,7 +5755,7 @@ graphs: "#, ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err(); let msg = err.to_string(); assert!( msg.contains("top-level") && msg.contains("policy.file") && msg.contains("not honored"), @@ -5769,8 +5771,8 @@ graphs: ); } - #[test] - fn mode_inference_multi_rejects_top_level_queries() { + #[tokio::test] + async fn mode_inference_multi_rejects_top_level_queries() { // Symmetric to the policy guard: a top-level `queries:` block in // multi-graph mode is not honored (each graph uses its own), so it // is a loud error rather than a silent no-op. @@ -5781,7 +5783,7 @@ graphs: "queries:\n q:\n file: ./q.gq\ngraphs:\n alpha:\n uri: /tmp/alpha.omni\n", ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err(); let msg = err.to_string(); assert!( msg.contains("queries") && msg.contains("not honored"), @@ -5789,8 +5791,8 @@ graphs: ); } - #[test] - fn single_mode_named_graph_rejects_top_level_blocks() { + #[tokio::test] + async fn single_mode_named_graph_rejects_top_level_blocks() { // Serving a graph by name (`--target`/`server.graph`) uses its // per-graph block; a populated top-level block would be silently // shadowed, so boot refuses and names the per-graph location. @@ -5803,6 +5805,7 @@ graphs: .unwrap(); let err = load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true) + .await .unwrap_err(); let msg = err.to_string(); assert!( @@ -5811,8 +5814,8 @@ graphs: ); } - #[test] - fn single_mode_named_graph_uses_per_graph_policy_and_queries() { + #[tokio::test] + async fn single_mode_named_graph_uses_per_graph_policy_and_queries() { // The identity rule: `--target prod` attaches `graphs.prod`'s own // policy + queries, not the top-level ones (which are absent here). let temp = tempfile::tempdir().unwrap(); @@ -5830,6 +5833,7 @@ graphs: .unwrap(); let settings = load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true) + .await .unwrap(); match settings.mode { ServerConfigMode::Single { @@ -5851,8 +5855,8 @@ graphs: } } - #[test] - fn mode_inference_normalizes_multi_graph_uris() { + #[tokio::test] + async fn mode_inference_normalizes_multi_graph_uris() { let temp = tempfile::tempdir().unwrap(); let graph = temp.path().join("alpha.omni"); let config_path = temp.path().join("omnigraph.yaml"); @@ -5868,7 +5872,7 @@ graphs: ), ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { ServerConfigMode::Multi { graphs, .. } => { assert_eq!(graphs[0].uri, graph.to_string_lossy()); @@ -5878,9 +5882,9 @@ graphs: } /// Rule 5: nothing → error with migration hint. - #[test] - fn mode_inference_no_inputs_errors_with_migration_hint() { - let err = load_server_settings(None, None, None, None, None, true).unwrap_err(); + #[tokio::test] + async fn mode_inference_no_inputs_errors_with_migration_hint() { + let err = load_server_settings(None, None, None, None, None, true).await.unwrap_err(); let msg = err.to_string(); assert!( msg.contains("no graph to serve"), @@ -5890,19 +5894,19 @@ graphs: /// Rule 4 sub-case: `--config` with empty `graphs:` map and no /// single-mode selector → rule 5 fires (no graph to serve). - #[test] - fn mode_inference_empty_graphs_map_errors() { + #[tokio::test] + async fn mode_inference_empty_graphs_map_errors() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err(); assert!(err.to_string().contains("no graph to serve")); } /// `--config` + `` together: URI wins → Single (the CLI URI /// takes precedence over the config's graphs map). - #[test] - fn mode_inference_cli_uri_overrides_graphs_map() { + #[tokio::test] + async fn mode_inference_cli_uri_overrides_graphs_map() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5922,6 +5926,7 @@ graphs: None, true, ) + .await .unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => { @@ -5937,8 +5942,8 @@ graphs: } /// Per-graph `policy.file` is resolved relative to the config base_dir. - #[test] - fn per_graph_policy_file_is_resolved_relative_to_base_dir() { + #[tokio::test] + async fn per_graph_policy_file_is_resolved_relative_to_base_dir() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5954,7 +5959,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); let graphs = match settings.mode { ServerConfigMode::Multi { graphs, .. } => graphs, _ => panic!("expected Multi"), @@ -5972,8 +5977,8 @@ graphs: } /// `server.policy.file` resolves alongside the graphs map. - #[test] - fn server_policy_file_is_resolved_relative_to_base_dir() { + #[tokio::test] + async fn server_policy_file_is_resolved_relative_to_base_dir() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5988,7 +5993,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { ServerConfigMode::Multi { server_policy_file, .. @@ -6268,7 +6273,7 @@ graphs: .unwrap(); let settings: ServerConfig = - load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); assert!(matches!(settings.mode, ServerConfigMode::Multi { .. })); match settings.mode { @@ -6321,14 +6326,14 @@ graphs: temp } -fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result { - omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true) +async fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result { + omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true).await } #[tokio::test] async fn cluster_boot_serves_applied_state() { let temp = converged_cluster_dir("").await; - let settings = cluster_settings(temp.path()).unwrap(); + let settings = cluster_settings(temp.path()).await.unwrap(); let omnigraph_server::ServerConfigMode::Multi { graphs, config_path, @@ -6444,7 +6449,7 @@ graphs: temp }; - let settings = cluster_settings(temp.path()).unwrap(); + let settings = cluster_settings(temp.path()).await.unwrap(); let omnigraph_server::ServerConfigMode::Multi { graphs, server_policy_file, @@ -6482,6 +6487,7 @@ async fn cluster_boot_refusals() { None, true, ) + .await .unwrap_err(); assert!(err.to_string().contains("exclusive boot source"), "{err}"); let err = omnigraph_server::load_server_settings( @@ -6492,6 +6498,7 @@ async fn cluster_boot_refusals() { None, true, ) + .await .unwrap_err(); assert!(err.to_string().contains("exclusive boot source"), "{err}"); @@ -6499,7 +6506,7 @@ async fn cluster_boot_refusals() { let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person"); let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path(); fs::write(&blob, "tampered").unwrap(); - let err = cluster_settings(&dir).unwrap_err(); + let err = cluster_settings(&dir).await.unwrap_err(); assert!( err.to_string().contains("catalog_payload_digest_mismatch"), "{err}" @@ -6508,6 +6515,6 @@ async fn cluster_boot_refusals() { // Missing state refuses with the import/apply remedy. let empty = tempfile::tempdir().unwrap(); - let err = cluster_settings(empty.path()).unwrap_err(); + let err = cluster_settings(empty.path()).await.unwrap_err(); assert!(err.to_string().contains("cluster_state_missing"), "{err}"); }