From fd002abaa50380f65fda89567adbf3e9c7043561 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 14:11:14 +0300 Subject: [PATCH 1/3] feat(cluster): port the storage backend to the engine StorageAdapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LocalStateBackend becomes ClusterStore: every stored byte — state ledger, lock, recovery sidecars, approval artifacts — now flows through the engine's StorageAdapter, making file:// and s3:// one code path. Behavior on the file backend is byte-compatible (layout, CAS semantics, diagnostics, lock release timing) and the entire pre-existing suite passes unchanged. Mechanics: the ledger CAS keeps its public sha256 vocabulary while the physical swap is token-conditioned (ETag If-Match on S3 via PR #186's primitives; content-token + temp/rename locally — the pre-port semantics); the lock is a create-only put (genuinely cross-machine on object stores) with deterministic drop-release locally and best-effort spawned release on S3; sidecars/approvals address by URI (SweepOutcome and the executors carry strings); sweep row-1 retirement joins the uniform deferred post-CAS cleanup. ClusterStore also gains the catalog-payload and graph-root methods that commit 2 wires in. Async ripple: status/force-unlock/serving-snapshot and the server's settings loader chain go async (CLI dispatch and ~20 test hosts follow, mechanically). tokio joins the cluster crate's runtime deps for the lock guard's handle. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 4 +- crates/omnigraph-cluster/Cargo.toml | 4 + crates/omnigraph-cluster/src/diff.rs | 2 +- crates/omnigraph-cluster/src/lib.rs | 86 +-- crates/omnigraph-cluster/src/serve.rs | 10 +- crates/omnigraph-cluster/src/store.rs | 791 +++++++++++++++--------- crates/omnigraph-cluster/src/sweep.rs | 22 +- crates/omnigraph-cluster/src/tests.rs | 98 +-- crates/omnigraph-cluster/src/types.rs | 3 +- crates/omnigraph-server/src/lib.rs | 42 +- crates/omnigraph-server/src/main.rs | 3 +- crates/omnigraph-server/tests/server.rs | 101 +-- 12 files changed, 687 insertions(+), 479 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index da3cc44..e9cff0c 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -3720,7 +3720,7 @@ async fn main() -> Result<()> { finish_cluster_approve(&output, json)?; } ClusterCommand::Status { config, json } => { - let output = status_config_dir(config); + let output = status_config_dir(config).await; finish_cluster_status(&output, json)?; } ClusterCommand::Refresh { config, json } => { @@ -3736,7 +3736,7 @@ async fn main() -> Result<()> { config, json, } => { - let output = force_unlock_config_dir(config, lock_id); + let output = force_unlock_config_dir(config, lock_id).await; finish_cluster_force_unlock(&output, json)?; } }, diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index b5f99c9..973de6d 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -23,6 +23,10 @@ serde_yaml = { workspace = true } sha2 = { workspace = true } thiserror = { workspace = true } time = { workspace = true } +# Runtime handle only — best-effort async lock release in +# StateLockGuard::drop on object-store backends (cluster commands always +# run inside the caller's tokio runtime). +tokio = { workspace = true } ulid = { workspace = true } [dev-dependencies] diff --git a/crates/omnigraph-cluster/src/diff.rs b/crates/omnigraph-cluster/src/diff.rs index e75db4d..593b2fa 100644 --- a/crates/omnigraph-cluster/src/diff.rs +++ b/crates/omnigraph-cluster/src/diff.rs @@ -142,7 +142,7 @@ pub(crate) fn compute_approvals( /// Near-misses — an artifact for the same resource whose bound digests no /// longer match — warn as `approval_stale` and never authorize anything. pub(crate) fn approved_resources( - artifacts: &[(PathBuf, ApprovalArtifact)], + artifacts: &[(String, ApprovalArtifact)], changes: &[PlanChange], config_digest: &str, diagnostics: &mut Vec, diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index dc66408..ec1a02a 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -25,7 +25,7 @@ mod diff; mod serve; mod sweep; mod store; -use store::{LocalStateBackend, StateLockGuard, StateSnapshot}; +use store::{ClusterStore, StateLockGuard, StateSnapshot}; pub use types::*; use types::*; pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot}; @@ -69,7 +69,7 @@ pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = LocalStateBackend::new(&outcome.config_dir); + let backend = ClusterStore::for_config_dir(&outcome.config_dir); let mut observations = backend.observations(); let Some(desired) = outcome.desired else { @@ -107,7 +107,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } let _lock_guard = if desired.state_lock { - match backend.acquire_lock("plan", &mut observations) { + match backend.acquire_lock("plan", &mut observations).await { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -130,7 +130,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let mut prior_resources = BTreeMap::new(); let mut prior_state: Option = None; if !has_errors(&diagnostics) { - match backend.read_state(&mut observations) { + match backend.read_state(&mut observations).await { Ok(snapshot) => { if let Some(state) = snapshot.state { prior_resources = state_resource_digests(&state); @@ -151,7 +151,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. - let artifacts = backend.list_approval_artifacts(&mut diagnostics); + let artifacts = backend.list_approval_artifacts(&mut diagnostics).await; let approved = approved_resources( &artifacts, &changes, @@ -242,7 +242,7 @@ pub async fn apply_config_dir_with_options( ) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = LocalStateBackend::new(&outcome.config_dir); + let backend = ClusterStore::for_config_dir(&outcome.config_dir); let mut observations = backend.observations(); let actor_for_output = options.actor.clone(); @@ -294,7 +294,7 @@ pub async fn apply_config_dir_with_options( // Named guard: the lock must be held until the state outcome is recorded. let _lock_guard = if desired.state_lock { - match backend.acquire_lock("apply", &mut observations) { + match backend.acquire_lock("apply", &mut observations).await { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -321,7 +321,7 @@ pub async fn apply_config_dir_with_options( ); } - let snapshot = match backend.read_state(&mut observations) { + let snapshot = match backend.read_state(&mut observations).await { Ok(snapshot) => snapshot, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -361,7 +361,7 @@ pub async fn apply_config_dir_with_options( let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); append_policy_binding_changes(&mut changes, Some(&state), &desired); - let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics); + let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics).await; let approved = approved_resources( &approval_artifacts, &changes, @@ -424,7 +424,7 @@ pub async fn apply_config_dir_with_options( }) .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) .collect(); - let mut completed_op_sidecars: Vec = Vec::new(); + let mut completed_op_sidecars: Vec = Vec::new(); let mut failed_graphs: BTreeMap = BTreeMap::new(); let mut graph_moving_aborted = false; for graph_id in &graph_creates_to_run { @@ -462,7 +462,7 @@ pub async fn apply_config_dir_with_options( state_cas_base: expected_cas.clone(), approval_id: None, }; - let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -514,7 +514,7 @@ pub async fn apply_config_dir_with_options( Ok(source) => source, Err(diagnostic) => { diagnostics.push(diagnostic); - let _ = fs::remove_file(&sidecar_path); // nothing moved + backend.delete_object(&sidecar_path).await; // nothing moved failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); graph_moving_aborted = true; continue; @@ -540,7 +540,7 @@ pub async fn apply_config_dir_with_options( if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await { if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await { sidecar.expected_manifest_version = Some(snapshot.version()); - if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { + if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await { diagnostics.push(diagnostic); } } @@ -626,7 +626,7 @@ pub async fn apply_config_dir_with_options( state_cas_base: expected_cas.clone(), approval_id: None, }; - let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -677,7 +677,7 @@ pub async fn apply_config_dir_with_options( Ok(source) => source, Err(diagnostic) => { diagnostics.push(diagnostic); - let _ = fs::remove_file(&sidecar_path); // nothing moved + backend.delete_object(&sidecar_path).await; // nothing moved failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); graph_moving_aborted = true; continue; @@ -695,7 +695,7 @@ pub async fn apply_config_dir_with_options( { Ok(result) => { sidecar.expected_manifest_version = Some(result.manifest_version); - if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { + if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await { diagnostics.push(diagnostic); } } @@ -871,7 +871,7 @@ pub async fn apply_config_dir_with_options( state_cas_base: expected_cas.clone(), approval_id: approval_id.clone(), }; - let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1004,8 +1004,14 @@ pub async fn apply_config_dir_with_options( // persisted-statuses revert contract below is exercised; a cfg_callback // on this point can mutate state.json to simulate a concurrent writer, // making write_state's CAS check fail organically. - let write_result = failpoints::maybe_fail("cluster_apply.before_state_write") - .and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations)); + let write_result = match failpoints::maybe_fail("cluster_apply.before_state_write") { + Ok(()) => { + backend + .write_state(&new_state, expected_cas.as_deref(), &mut observations) + .await + } + Err(diagnostic) => Err(diagnostic), + }; match write_result { Ok(()) => state_written = true, Err(diagnostic) => { @@ -1017,16 +1023,16 @@ pub async fn apply_config_dir_with_options( // Completed (rows 2/4) sweep sidecars are deleted only once their outcome // is durably recorded; on a failed write they stay and re-sweep next run. if !state_write_failed { - for sidecar_path in sweep + for sidecar_uri in sweep .completed_sidecars .iter() .chain(completed_op_sidecars.iter()) { - let _ = fs::remove_file(sidecar_path); + backend.delete_object(sidecar_uri).await; } let mut all_consumed = sweep.consumed_approvals.clone(); all_consumed.extend(consumed_approval_ids.iter().cloned()); - mark_approvals_consumed(&backend, &all_consumed); + mark_approvals_consumed(&backend, &all_consumed).await; } // On a failed state write, report the statuses that are actually on disk // (the pre-apply snapshot), not the in-memory mutations that were never @@ -1082,7 +1088,7 @@ pub async fn approve_config_dir( ) -> ApproveOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = LocalStateBackend::new(&outcome.config_dir); + let backend = ClusterStore::for_config_dir(&outcome.config_dir); let mut observations = backend.observations(); let fail = |config_dir: String, diagnostics: Vec| ApproveOutput { @@ -1103,7 +1109,7 @@ pub async fn approve_config_dir( } let _lock_guard = if desired.state_lock { - match backend.acquire_lock("approve", &mut observations) { + match backend.acquire_lock("approve", &mut observations).await { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1119,7 +1125,7 @@ pub async fn approve_config_dir( None }; - let state = match backend.read_state(&mut observations) { + let state = match backend.read_state(&mut observations).await { Ok(snapshot) => match snapshot.state { Some(state) => state, None => { @@ -1174,7 +1180,7 @@ pub async fn approve_config_dir( consumed_at: None, consumed_by_operation: None, }; - if let Err(diagnostic) = backend.write_approval_artifact(&artifact) { + if let Err(diagnostic) = backend.write_approval_artifact(&artifact).await { diagnostics.push(diagnostic); return fail(display_path(&desired.config_dir), diagnostics); } @@ -1191,12 +1197,12 @@ pub async fn approve_config_dir( } -pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { +pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; - let backend = LocalStateBackend::new(&parsed.config_dir); + let backend = ClusterStore::for_config_dir(&parsed.config_dir); let mut observations = backend.observations(); - backend.observe_lock(&mut observations, &mut diagnostics); + backend.observe_lock(&mut observations, &mut diagnostics).await; warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics); let mut resource_digests = BTreeMap::new(); @@ -1206,7 +1212,7 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { if let Some(raw) = parsed.raw.as_ref() { let _settings = validate_cluster_header(raw, &mut diagnostics); if !has_errors(&diagnostics) { - match backend.read_state(&mut observations) { + match backend.read_state(&mut observations).await { Ok(snapshot) => { if let Some(state) = snapshot.state { // Read-only point-in-time catalog check: report the @@ -1244,20 +1250,20 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { } } -pub fn force_unlock_config_dir( +pub async fn force_unlock_config_dir( config_dir: impl AsRef, lock_id: impl AsRef, ) -> ForceUnlockOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; - let backend = LocalStateBackend::new(&parsed.config_dir); + let backend = ClusterStore::for_config_dir(&parsed.config_dir); let mut observations = backend.observations(); let mut lock_removed = false; if let Some(raw) = parsed.raw.as_ref() { let _settings = validate_cluster_header(raw, &mut diagnostics); if !has_errors(&diagnostics) { - match backend.force_unlock(lock_id.as_ref(), &mut observations) { + match backend.force_unlock(lock_id.as_ref(), &mut observations).await { Ok(()) => lock_removed = true, Err(diagnostic) => diagnostics.push(diagnostic), } @@ -1284,7 +1290,7 @@ pub async fn import_config_dir(config_dir: impl AsRef) -> StateSyncOutput async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput { let outcome = load_desired(config_dir); let mut diagnostics = outcome.diagnostics; - let backend = LocalStateBackend::new(&outcome.config_dir); + let backend = ClusterStore::for_config_dir(&outcome.config_dir); let mut observations = backend.observations(); let Some(desired) = outcome.desired else { @@ -1315,7 +1321,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St let operation_label = state_sync_operation_label(operation); let _lock_guard = if desired.state_lock { - match backend.acquire_lock(operation_label, &mut observations) { + match backend.acquire_lock(operation_label, &mut observations).await { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1346,7 +1352,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St }; } - let snapshot = match backend.read_state(&mut observations) { + let snapshot = match backend.read_state(&mut observations).await { Ok(snapshot) => snapshot, Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1477,14 +1483,14 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St state.state_revision = state.state_revision.saturating_add(1); } - match backend.write_state(&state, expected_cas.as_deref(), &mut observations) { + match backend.write_state(&state, expected_cas.as_deref(), &mut observations).await { Ok(()) => { // Completed sweep sidecars are deleted only after their outcome // is durably recorded; on failure they stay and re-sweep. - for sidecar_path in &sweep.completed_sidecars { - let _ = fs::remove_file(sidecar_path); + for sidecar_uri in &sweep.completed_sidecars { + backend.delete_object(sidecar_uri).await; } - mark_approvals_consumed(&backend, &sweep.consumed_approvals); + mark_approvals_consumed(&backend, &sweep.consumed_approvals).await; } Err(diagnostic) => diagnostics.push(diagnostic), } diff --git a/crates/omnigraph-cluster/src/serve.rs b/crates/omnigraph-cluster/src/serve.rs index 0152bc4..8578aee 100644 --- a/crates/omnigraph-cluster/src/serve.rs +++ b/crates/omnigraph-cluster/src/serve.rs @@ -40,13 +40,15 @@ pub struct ServingSnapshot { /// failure is collected and the whole snapshot refused; no partial serving. /// Takes no lock: the state file is replaced atomically, so this reads a /// consistent point-in-time ledger. -pub fn read_serving_snapshot(config_dir: impl AsRef) -> Result> { +pub async fn read_serving_snapshot( + config_dir: impl AsRef, +) -> Result> { let config_dir = config_dir.as_ref().to_path_buf(); - let backend = LocalStateBackend::new(&config_dir); + let backend = ClusterStore::for_config_dir(&config_dir); let mut diagnostics: Vec = Vec::new(); // A ledger a sweep is about to rewrite must not start serving. - let sidecars = backend.list_recovery_sidecars(&mut diagnostics); + let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await; if !sidecars.is_empty() { diagnostics.push(Diagnostic::error( "cluster_recovery_pending", @@ -59,7 +61,7 @@ pub fn read_serving_snapshot(config_dir: impl AsRef) -> Result match snapshot.state { Some(state) => Some(state), None => { diff --git a/crates/omnigraph-cluster/src/store.rs b/crates/omnigraph-cluster/src/store.rs index 8a95661..bf328af 100644 --- a/crates/omnigraph-cluster/src/store.rs +++ b/crates/omnigraph-cluster/src/store.rs @@ -1,230 +1,446 @@ -//! The cluster's storage backend: state ledger, lock, recovery -//! sidecars, approval artifacts (moved verbatim from lib.rs in the -//! modularization). The object-storage port (RFC-006) lands here as a -//! follow-up — this module is the single home for stored-state I/O. +//! The cluster's storage layer: every stored byte (state ledger, lock, +//! recovery sidecars, approval artifacts, catalog payloads) goes through the +//! engine's `StorageAdapter`, so `file://` and `s3://` are one code path +//! (RFC-006). Declared configuration — `cluster.yaml` and the schema/query/ +//! policy sources it references — deliberately does NOT live here: config is +//! read from the operator's working tree (Terraform's config-local / +//! state-remote split). +//! +//! Raw `fs::*` for cluster state outside this module is a deny-list entry. -use super::*; +use std::path::Path; +use std::process; +use std::sync::Arc; -#[derive(Debug)] -pub(crate) struct LocalStateBackend { - state_dir: PathBuf, - state_path: PathBuf, - lock_path: PathBuf, - recoveries_dir: PathBuf, - approvals_dir: PathBuf, +use omnigraph::storage::{StorageAdapter, StorageKind, storage_for_uri, storage_kind_for_uri}; +use time::OffsetDateTime; +use time::format_description::well_known::Rfc3339; +use ulid::Ulid; + +use crate::{ + ApprovalArtifact, CLUSTER_APPROVALS_DIR, CLUSTER_LOCK_FILE, CLUSTER_RECOVERIES_DIR, + CLUSTER_RESOURCES_DIR, CLUSTER_STATE_FILE, ClusterState, Diagnostic, RecoverySidecar, + ResourceKind, StateLockFile, StateObservations, sha256_hex, +}; + +#[derive(Debug, Clone)] +pub(crate) struct ClusterStore { + adapter: Arc, + /// Normalized storage-root URI, no trailing slash: `file:///abs/dir` + /// (the default config-dir layout) or `s3://bucket/prefix`. + root: String, + /// What observations/diagnostics display for stored locations: the plain + /// local path for `file://` roots (byte-compatible with the pre-store + /// outputs), the URI otherwise. + display_root: String, } #[derive(Debug)] pub(crate) struct StateSnapshot { pub(crate) state: Option, + /// Content identity (`sha256:`) — the public CAS vocabulary. pub(crate) state_cas: Option, } #[derive(Debug)] pub(crate) struct StateLockGuard { - path: PathBuf, + adapter: Arc, + uri: String, + kind: StorageKind, } -impl LocalStateBackend { - pub(crate) fn new(config_dir: &Path) -> Self { - let state_dir = config_dir.join(CLUSTER_STATE_DIR); - Self { - state_path: config_dir.join(CLUSTER_STATE_FILE), - lock_path: config_dir.join(CLUSTER_LOCK_FILE), - recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR), - approvals_dir: config_dir.join(CLUSTER_APPROVALS_DIR), - state_dir, - } - } - - /// List approval artifacts in ULID (filename) order; unparseable files - /// warn and stay on disk for the operator. - pub(crate) fn list_approval_artifacts( - &self, - diagnostics: &mut Vec, - ) -> Vec<(PathBuf, ApprovalArtifact)> { - let mut paths = Vec::new(); - match fs::read_dir(&self.approvals_dir) { - Ok(entries) => { - for entry in entries.flatten() { - let path = entry.path(); - if path.extension().is_some_and(|ext| ext == "json") { - paths.push(path); - } +impl Drop for StateLockGuard { + fn drop(&mut self) { + match self.kind { + // Deterministic release on the file backend (tests assert the + // lock is gone the moment a command returns). + StorageKind::Local => { + let path = self.uri.trim_start_matches("file://"); + let _ = std::fs::remove_file(path); + } + // Object stores need an async delete; best-effort spawn. A crash + // here leaves the lock for `force-unlock` — same as a process + // kill, and the same recovery path. + StorageKind::S3 => { + let adapter = Arc::clone(&self.adapter); + let uri = self.uri.clone(); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(async move { + let _ = adapter.delete(&uri).await; + }); } } - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => diagnostics.push(Diagnostic::warning( - "approval_read_error", - CLUSTER_APPROVALS_DIR, - format!("could not list approval artifacts: {err}"), - )), } - paths.sort(); - let mut artifacts = Vec::new(); - for path in paths { - match fs::read_to_string(&path) - .map_err(|err| err.to_string()) - .and_then(|text| { - serde_json::from_str::(&text).map_err(|err| err.to_string()) - }) { - Ok(artifact) if artifact.schema_version == 1 => artifacts.push((path, artifact)), - Ok(artifact) => diagnostics.push(Diagnostic::warning( - "unsupported_approval_version", - display_path(&path), - format!( - "unsupported approval artifact version {}; leaving it in place", - artifact.schema_version - ), - )), + } +} + +impl ClusterStore { + /// The default layout: storage root = the config directory itself + /// (`file://`), byte-compatible with every pre-existing + /// cluster on disk. + pub(crate) fn for_config_dir(config_dir: &Path) -> Self { + let absolute = + std::path::absolute(config_dir).unwrap_or_else(|_| config_dir.to_path_buf()); + let display_root = absolute + .to_string_lossy() + .trim_end_matches('/') + .to_string(); + let root = format!("file://{display_root}"); + let adapter = storage_for_uri(&root) + .expect("local storage adapter construction is infallible for file:// roots"); + Self { + adapter, + root, + display_root, + } + } + + /// An explicit `storage:` root. `file://` URIs and plain paths normalize + /// to the local backend; `s3://bucket/prefix` to the S3 backend (env- + /// driven credentials/endpoint — the same contract as graph storage). + pub(crate) fn for_storage_root(root_uri: &str) -> Result { + let trimmed = root_uri.trim_end_matches('/'); + if storage_kind_for_uri(trimmed) == StorageKind::Local { + let path = trimmed.trim_start_matches("file://"); + return Ok(Self::for_config_dir(Path::new(path))); + } + let adapter = storage_for_uri(trimmed).map_err(|err| { + Diagnostic::error( + "storage_root_invalid", + "storage", + format!("could not initialize storage for '{root_uri}': {err}"), + ) + })?; + Ok(Self { + adapter, + root: trimmed.to_string(), + display_root: trimmed.to_string(), + }) + } + + pub(crate) fn kind(&self) -> StorageKind { + storage_kind_for_uri(&self.root) + } + + fn uri(&self, relative: &str) -> String { + format!("{}/{}", self.root, relative) + } + + fn display(&self, relative: &str) -> String { + format!("{}/{}", self.display_root, relative) + } + + /// Derived graph root for ``: `/graphs/.omni`. A plain + /// local path for `file://` roots (byte-compatible, directly usable by + /// the engine); the S3 URI the engine opens natively otherwise. + pub(crate) fn graph_root(&self, graph_id: &str) -> String { + match self.kind() { + StorageKind::Local => format!("{}/graphs/{graph_id}.omni", self.display_root), + StorageKind::S3 => format!("{}/graphs/{graph_id}.omni", self.root), + } + } + + /// `read_text_versioned`, returning None for a missing object (probed + /// via `exists` — the engine error type doesn't discriminate NotFound). + async fn read_versioned_opt(&self, uri: &str) -> Result, String> { + match self.adapter.exists(uri).await { + Ok(false) => return Ok(None), + Ok(true) => {} + Err(err) => return Err(err.to_string()), + } + self.adapter + .read_text_versioned(uri) + .await + .map(Some) + .map_err(|err| err.to_string()) + } + + /// JSON object write with the strongest atomicity the backend offers: + /// temp + rename on the filesystem (no torn JSON after a crash; the + /// pre-port behavior), a single atomic PUT on object stores (where + /// copy+delete would be weaker, not stronger). + async fn put_json(&self, relative: &str, payload: &str) -> Result<(), String> { + let target = self.uri(relative); + match self.kind() { + StorageKind::Local => { + let tmp = format!("{target}.tmp.{}", Ulid::new()); + self.adapter + .write_text(&tmp, payload) + .await + .map_err(|err| err.to_string())?; + if let Err(err) = self.adapter.rename_text(&tmp, &target).await { + let _ = self.adapter.delete(&tmp).await; + return Err(err.to_string()); + } + Ok(()) + } + StorageKind::S3 => self + .adapter + .write_text(&target, payload) + .await + .map_err(|err| err.to_string()), + } + } + + /// Shared list-and-parse for the sidecar/approval directories: id + /// (filename) order; unparseable objects warn and stay for the operator. + async fn list_json_dir( + &self, + dir: &str, + diagnostics: &mut Vec, + list_error_code: &'static str, + parse_error_code: &'static str, + version_ok: impl Fn(&T) -> bool, + version_error_code: &'static str, + ) -> Vec<(String, T)> { + let dir_uri = self.uri(dir); + let mut uris = match self.adapter.list_dir(&dir_uri).await { + Ok(uris) => uris, + Err(err) => { + diagnostics.push(Diagnostic::warning( + list_error_code, + dir, + format!("could not list '{dir}': {err}"), + )); + return Vec::new(); + } + }; + uris.retain(|uri| uri.ends_with(".json")); + uris.sort(); + let mut out = Vec::new(); + for uri in uris { + match self.adapter.read_text(&uri).await { + Ok(text) => match serde_json::from_str::(&text) { + Ok(value) if version_ok(&value) => out.push((uri, value)), + Ok(_) => diagnostics.push(Diagnostic::warning( + version_error_code, + uri.clone(), + "unsupported schema version; leaving it in place".to_string(), + )), + Err(err) => diagnostics.push(Diagnostic::warning( + parse_error_code, + uri.clone(), + format!("could not parse ({err}); leaving it in place"), + )), + }, Err(err) => diagnostics.push(Diagnostic::warning( - "invalid_approval_artifact", - display_path(&path), - format!("could not parse approval artifact ({err}); leaving it in place"), + parse_error_code, + uri.clone(), + format!("could not read ({err}); leaving it in place"), )), } } - artifacts + out } - /// Atomically write (or rewrite, e.g. on consumption) an approval artifact. - pub(crate) fn write_approval_artifact(&self, artifact: &ApprovalArtifact) -> Result { - fs::create_dir_all(&self.approvals_dir).map_err(|err| { - Diagnostic::error( - "approval_write_error", - CLUSTER_APPROVALS_DIR, - format!("could not create approvals directory: {err}"), - ) - })?; - let target = self - .approvals_dir - .join(format!("{}.json", artifact.approval_id)); + /// Best-effort object removal (sidecar retirement after a CAS lands, + /// lock cleanup) — failures are recoverable by the next sweep. + pub(crate) async fn delete_object(&self, uri: &str) { + let _ = self.adapter.delete(uri).await; + } + + /// Recursive prefix delete for graph roots (approved deletes). Idempotent; + /// S3 non-atomicity is tolerated by the delete protocol's retry shape. + pub(crate) async fn delete_graph_root(&self, graph_uri: &str) -> Result<(), String> { + self.adapter + .delete_prefix(graph_uri) + .await + .map_err(|err| err.to_string()) + } + + /// Existence probe for graph roots in sweep classification. A bare local + /// path or any URI works — resolved through the same adapter machinery + /// the engine uses. + pub(crate) async fn graph_root_exists(&self, graph_uri: &str) -> bool { + match storage_kind_for_uri(graph_uri) { + StorageKind::Local => Path::new(graph_uri.trim_start_matches("file://")).exists(), + StorageKind::S3 => match storage_for_uri(graph_uri) { + Ok(adapter) => !adapter + .list_dir(graph_uri) + .await + .map(|entries| entries.is_empty()) + .unwrap_or(true), + Err(_) => false, + }, + } + } + + // ---- approvals ---- + + pub(crate) async fn list_approval_artifacts( + &self, + diagnostics: &mut Vec, + ) -> Vec<(String, ApprovalArtifact)> { + self.list_json_dir( + CLUSTER_APPROVALS_DIR, + diagnostics, + "approval_read_error", + "invalid_approval_artifact", + |artifact: &ApprovalArtifact| artifact.schema_version == 1, + "unsupported_approval_version", + ) + .await + } + + pub(crate) async fn write_approval_artifact( + &self, + artifact: &ApprovalArtifact, + ) -> Result { + let relative = format!("{CLUSTER_APPROVALS_DIR}/{}.json", artifact.approval_id); let mut payload = serde_json::to_string_pretty(artifact).map_err(|err| { Diagnostic::error( "approval_write_error", - display_path(&target), + self.display(&relative), format!("could not encode approval artifact: {err}"), ) })?; payload.push('\n'); - let tmp_path = self - .approvals_dir - .join(format!("{}.json.tmp.{}", artifact.approval_id, Ulid::new())); - fs::write(&tmp_path, payload.as_bytes()).map_err(|err| { + self.put_json(&relative, &payload).await.map_err(|err| { Diagnostic::error( "approval_write_error", - display_path(&tmp_path), + self.display(&relative), format!("could not write approval artifact: {err}"), ) })?; - if let Err(err) = fs::rename(&tmp_path, &target) { - let _ = fs::remove_file(&tmp_path); - return Err(Diagnostic::error( - "approval_write_error", - display_path(&target), - format!("could not move approval artifact into place: {err}"), - )); - } - Ok(target) + Ok(self.uri(&relative)) } - /// List recovery sidecars in ULID (filename) order. Unparseable files are - /// reported as warnings and skipped — they stay on disk for the operator. - pub(crate) fn list_recovery_sidecars( + // ---- recovery sidecars ---- + + pub(crate) async fn list_recovery_sidecars( &self, diagnostics: &mut Vec, - ) -> Vec<(PathBuf, RecoverySidecar)> { - let mut paths = Vec::new(); - match fs::read_dir(&self.recoveries_dir) { - Ok(entries) => { - for entry in entries.flatten() { - let path = entry.path(); - if path.extension().is_some_and(|ext| ext == "json") { - paths.push(path); - } - } - } - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => { - diagnostics.push(Diagnostic::warning( - "recovery_sidecar_read_error", - CLUSTER_RECOVERIES_DIR, - format!("could not list recovery sidecars: {err}"), - )); - } - } - paths.sort(); - let mut sidecars = Vec::new(); - for path in paths { - match fs::read_to_string(&path) - .map_err(|err| err.to_string()) - .and_then(|text| { - serde_json::from_str::(&text).map_err(|err| err.to_string()) - }) { - Ok(sidecar) if sidecar.schema_version == 1 => sidecars.push((path, sidecar)), - Ok(sidecar) => diagnostics.push(Diagnostic::warning( - "unsupported_recovery_sidecar_version", - display_path(&path), - format!( - "unsupported recovery sidecar version {}; leaving it in place", - sidecar.schema_version - ), - )), - Err(err) => diagnostics.push(Diagnostic::warning( - "invalid_recovery_sidecar", - display_path(&path), - format!("could not parse recovery sidecar ({err}); leaving it in place"), - )), - } - } - sidecars + ) -> Vec<(String, RecoverySidecar)> { + self.list_json_dir( + CLUSTER_RECOVERIES_DIR, + diagnostics, + "recovery_sidecar_read_error", + "invalid_recovery_sidecar", + |sidecar: &RecoverySidecar| sidecar.schema_version == 1, + "unsupported_recovery_sidecar_version", + ) + .await } - /// Atomically write (or rewrite) a recovery sidecar; returns its path. - pub(crate) fn write_recovery_sidecar(&self, sidecar: &RecoverySidecar) -> Result { - fs::create_dir_all(&self.recoveries_dir).map_err(|err| { - Diagnostic::error( - "recovery_sidecar_write_error", - CLUSTER_RECOVERIES_DIR, - format!("could not create recoveries directory: {err}"), - ) - })?; - let target = self - .recoveries_dir - .join(format!("{}.json", sidecar.operation_id)); + pub(crate) async fn write_recovery_sidecar( + &self, + sidecar: &RecoverySidecar, + ) -> Result { + let relative = format!("{CLUSTER_RECOVERIES_DIR}/{}.json", sidecar.operation_id); let mut payload = serde_json::to_string_pretty(sidecar).map_err(|err| { Diagnostic::error( "recovery_sidecar_write_error", - display_path(&target), + self.display(&relative), format!("could not encode recovery sidecar: {err}"), ) })?; payload.push('\n'); - let tmp_path = self - .recoveries_dir - .join(format!("{}.json.tmp.{}", sidecar.operation_id, Ulid::new())); - fs::write(&tmp_path, payload.as_bytes()).map_err(|err| { + self.put_json(&relative, &payload).await.map_err(|err| { Diagnostic::error( "recovery_sidecar_write_error", - display_path(&tmp_path), + self.display(&relative), format!("could not write recovery sidecar: {err}"), ) })?; - if let Err(err) = fs::rename(&tmp_path, &target) { - let _ = fs::remove_file(&tmp_path); + Ok(self.uri(&relative)) + } + + // ---- catalog payloads ---- + + /// Content-addressed catalog location for a query/policy payload + /// (extensions fixed per kind, same as the pre-port layout). + pub(crate) fn payload_relative(kind: &ResourceKind, digest: &str) -> Option { + match kind { + ResourceKind::Query { graph, name } => Some(format!( + "{CLUSTER_RESOURCES_DIR}/query/{graph}/{name}/{digest}.gq" + )), + ResourceKind::Policy(name) => Some(format!( + "{CLUSTER_RESOURCES_DIR}/policy/{name}/{digest}.yaml" + )), + _ => None, + } + } + + pub(crate) fn payload_display(&self, kind: &ResourceKind, digest: &str) -> Option { + Self::payload_relative(kind, digest).map(|relative| self.display(&relative)) + } + + pub(crate) async fn payload_exists(&self, kind: &ResourceKind, digest: &str) -> bool { + let Some(relative) = Self::payload_relative(kind, digest) else { + return false; + }; + self.adapter + .exists(&self.uri(&relative)) + .await + .unwrap_or(false) + } + + /// Idempotent content-addressed write: a payload already present at its + /// digest is by definition identical. + pub(crate) async fn write_payload( + &self, + kind: &ResourceKind, + digest: &str, + content: &str, + ) -> Result<(), String> { + let Some(relative) = Self::payload_relative(kind, digest) else { + return Err("resource kind has no payload".to_string()); + }; + if self + .adapter + .exists(&self.uri(&relative)) + .await + .map_err(|err| err.to_string())? + { + return Ok(()); + } + self.put_json(&relative, content).await + } + + /// Read a catalog payload and verify it against its recorded digest. + pub(crate) async fn read_verified_payload( + &self, + kind: &ResourceKind, + digest: &str, + address: &str, + ) -> Result { + let Some(relative) = Self::payload_relative(kind, digest) else { return Err(Diagnostic::error( - "recovery_sidecar_write_error", - display_path(&target), - format!("could not move recovery sidecar into place: {err}"), + "catalog_payload_missing", + address, + "resource kind has no payload", + )); + }; + let uri = self.uri(&relative); + let text = self.adapter.read_text(&uri).await.map_err(|err| { + Diagnostic::error( + "catalog_payload_missing", + address, + format!( + "catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart", + self.display(&relative) + ), + ) + })?; + if sha256_hex(text.as_bytes()) != digest { + return Err(Diagnostic::error( + "catalog_payload_digest_mismatch", + address, + format!( + "catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart", + self.display(&relative) + ), )); } - Ok(target) + Ok(text) } + // ---- observations ---- + pub(crate) fn observations(&self) -> StateObservations { StateObservations { - state_path: display_path(&self.state_path), - lock_path: display_path(&self.lock_path), + state_path: self.display(CLUSTER_STATE_FILE), + lock_path: self.display(CLUSTER_LOCK_FILE), state_found: false, applied_config_digest: None, state_revision: 0, @@ -241,13 +457,16 @@ impl LocalStateBackend { } } - pub(crate) fn read_state( + // ---- state ledger ---- + + pub(crate) async fn read_state( &self, observations: &mut StateObservations, ) -> Result { - let text = match fs::read_to_string(&self.state_path) { - Ok(text) => text, - Err(err) if err.kind() == ErrorKind::NotFound => { + let state_uri = self.uri(CLUSTER_STATE_FILE); + let (text, _version) = match self.read_versioned_opt(&state_uri).await { + Ok(Some(read)) => read, + Ok(None) => { return Ok(StateSnapshot { state: None, state_cas: None, @@ -295,27 +514,32 @@ impl LocalStateBackend { }) } - pub(crate) fn write_state( + /// CAS-guarded ledger replace. The public contract stays content-level + /// (`expected_cas` = `sha256:` from the snapshot the command read); + /// the physical swap is token-conditioned on a fresh read, so a writer + /// that raced us between the fresh read and the put loses with + /// `state_cas_mismatch` — never a silent overwrite. On S3 the token is + /// the object's ETag and the put is conditional (If-Match); locally it + /// is a content token over the same temp+rename flow as before the port. + pub(crate) async fn write_state( &self, state: &ClusterState, expected_cas: Option<&str>, observations: &mut StateObservations, ) -> Result<(), Diagnostic> { - fs::create_dir_all(&self.state_dir).map_err(|err| { + let state_uri = self.uri(CLUSTER_STATE_FILE); + let current = self.read_versioned_opt(&state_uri).await.map_err(|err| { Diagnostic::error( "state_write_error", - CLUSTER_STATE_DIR, - format!("could not create cluster state directory: {err}"), + CLUSTER_STATE_FILE, + format!("could not read state file before write: {err}"), ) })?; - - let current_cas = self.current_state_cas()?; + let current_cas = current + .as_ref() + .map(|(text, _)| format!("sha256:{}", sha256_hex(text.as_bytes()))); if current_cas.as_deref() != expected_cas { - return Err(Diagnostic::error( - "state_cas_mismatch", - CLUSTER_STATE_FILE, - "state.json changed while the command was running; re-run the command against the latest state", - )); + return Err(state_cas_mismatch()); } let mut payload = serde_json::to_string_pretty(state).map_err(|err| { @@ -327,86 +551,51 @@ impl LocalStateBackend { })?; payload.push('\n'); - let tmp_path = self - .state_dir - .join(format!("state.json.tmp.{}", Ulid::new())); - let mut file = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tmp_path) - .map_err(|err| { - Diagnostic::error( - "state_write_error", - display_path(&tmp_path), - format!("could not create temporary state file: {err}"), - ) - })?; - file.write_all(payload.as_bytes()).map_err(|err| { - Diagnostic::error( - "state_write_error", - display_path(&tmp_path), - format!("could not write temporary state file: {err}"), - ) - })?; - file.sync_all().map_err(|err| { - Diagnostic::error( - "state_write_error", - display_path(&tmp_path), - format!("could not sync temporary state file: {err}"), - ) - })?; - drop(file); - - if let Err(err) = fs::rename(&tmp_path, &self.state_path) { - let _ = fs::remove_file(&tmp_path); - return Err(Diagnostic::error( - "state_write_error", - CLUSTER_STATE_FILE, - format!("could not replace state.json atomically: {err}"), - )); + let written = match current { + None => self + .adapter + .write_text_if_absent(&state_uri, &payload) + .await + .map_err(|err| { + Diagnostic::error( + "state_write_error", + CLUSTER_STATE_FILE, + format!("could not create state.json: {err}"), + ) + })?, + Some((_, version)) => self + .adapter + .write_text_if_match(&state_uri, &payload, &version) + .await + .map_err(|err| { + Diagnostic::error( + "state_write_error", + CLUSTER_STATE_FILE, + format!("could not replace state.json: {err}"), + ) + })? + .is_some(), + }; + if !written { + return Err(state_cas_mismatch()); } - let written = fs::read_to_string(&self.state_path).map_err(|err| { - Diagnostic::error( - "state_write_error", - CLUSTER_STATE_FILE, - format!("could not read state.json after write: {err}"), - ) - })?; observations.state_found = true; observations.applied_config_digest = state.applied_revision.config_digest.clone(); observations.state_revision = state.state_revision; - observations.state_cas = Some(format!("sha256:{}", sha256_hex(written.as_bytes()))); + observations.state_cas = Some(format!("sha256:{}", sha256_hex(payload.as_bytes()))); observations.resource_count = state.applied_revision.resources.len(); - Ok(()) } - pub(crate) fn current_state_cas(&self) -> Result, Diagnostic> { - match fs::read(&self.state_path) { - Ok(bytes) => Ok(Some(format!("sha256:{}", sha256_hex(&bytes)))), - Err(err) if err.kind() == ErrorKind::NotFound => Ok(None), - Err(err) => Err(Diagnostic::error( - "state_read_error", - CLUSTER_STATE_FILE, - format!("could not read state file for CAS check: {err}"), - )), - } - } + // ---- lock ---- - pub(crate) fn acquire_lock( + pub(crate) async fn acquire_lock( &self, operation: &str, observations: &mut StateObservations, ) -> Result { - fs::create_dir_all(&self.state_dir).map_err(|err| { - Diagnostic::error( - "state_lock_error", - CLUSTER_STATE_DIR, - format!("could not create cluster state directory: {err}"), - ) - })?; - + let lock_uri = self.uri(CLUSTER_LOCK_FILE); let lock_id = Ulid::new().to_string(); let lock = StateLockFile { version: 1, @@ -425,31 +614,18 @@ impl LocalStateBackend { ) })?; - match OpenOptions::new() - .write(true) - .create_new(true) - .open(&self.lock_path) - { - Ok(mut file) => { - if let Err(err) = file.write_all(payload.as_bytes()) { - // No guard exists yet, so clean up the create-new file here - // instead of leaving a stale partial lock for the next run. - drop(file); - let _ = fs::remove_file(&self.lock_path); - return Err(Diagnostic::error( - "state_lock_error", - CLUSTER_LOCK_FILE, - format!("could not write state lock: {err}"), - )); - } + match self.adapter.write_text_if_absent(&lock_uri, &payload).await { + Ok(true) => { observations.lock_acquired = true; - observations.acquired_lock_id = Some(lock_id.clone()); + observations.acquired_lock_id = Some(lock_id); Ok(StateLockGuard { - path: self.lock_path.clone(), + adapter: Arc::clone(&self.adapter), + uri: lock_uri, + kind: self.kind(), }) } - Err(err) if err.kind() == ErrorKind::AlreadyExists => { - self.observe_lock_metadata_lossy(observations); + Ok(false) => { + self.observe_lock_metadata_lossy(observations).await; Err(Diagnostic::error( "state_lock_held", CLUSTER_LOCK_FILE, @@ -459,23 +635,24 @@ impl LocalStateBackend { Err(err) => Err(Diagnostic::error( "state_lock_error", CLUSTER_LOCK_FILE, - format!("could not acquire state lock: {err}"), + format!("could not write state lock: {err}"), )), } } - pub(crate) fn force_unlock( + pub(crate) async fn force_unlock( &self, - requested_lock_id: &str, + lock_id: &str, observations: &mut StateObservations, ) -> Result<(), Diagnostic> { - let text = match fs::read_to_string(&self.lock_path) { - Ok(text) => text, - Err(err) if err.kind() == ErrorKind::NotFound => { + let lock_uri = self.uri(CLUSTER_LOCK_FILE); + let text = match self.read_versioned_opt(&lock_uri).await { + Ok(Some((text, _))) => text, + Ok(None) => { return Err(Diagnostic::error( "state_lock_missing", CLUSTER_LOCK_FILE, - "cluster state lock is not present; nothing was unlocked", + "no cluster state lock is present", )); } Err(err) => { @@ -486,42 +663,41 @@ impl LocalStateBackend { )); } }; - observations.locked = true; let lock = parse_lock_file_for_unlock(&text)?; observations.observe_lock_metadata(&lock); - - if lock.lock_id != requested_lock_id { + observations.locked = true; + if lock.lock_id != lock_id { return Err(Diagnostic::error( "state_lock_id_mismatch", CLUSTER_LOCK_FILE, format!( - "cluster state lock id is {}; refusing to unlock with requested id {requested_lock_id}", + "lock id mismatch: held lock is {}, refusing to remove (pass the exact id from `cluster status`)", lock.lock_id ), )); } - - fs::remove_file(&self.lock_path).map_err(|err| { + self.adapter.delete(&lock_uri).await.map_err(|err| { Diagnostic::error( - "state_unlock_error", + "state_lock_error", CLUSTER_LOCK_FILE, format!("could not remove state lock: {err}"), ) - }) + })?; + observations.locked = false; + Ok(()) } - pub(crate) fn observe_lock( + pub(crate) async fn observe_lock( &self, observations: &mut StateObservations, diagnostics: &mut Vec, ) { - if self.lock_path.exists() { - observations.locked = true; - match fs::read_to_string(&self.lock_path) { - Ok(text) => match serde_json::from_str::(&text) { - Ok(lock) if lock.version == 1 => { - observations.observe_lock_metadata(&lock); - } + let lock_uri = self.uri(CLUSTER_LOCK_FILE); + match self.read_versioned_opt(&lock_uri).await { + Ok(Some((text, _))) => { + observations.locked = true; + match serde_json::from_str::(&text) { + Ok(lock) if lock.version == 1 => observations.observe_lock_metadata(&lock), Ok(lock) => diagnostics.push(Diagnostic::warning( "unsupported_state_lock_version", CLUSTER_LOCK_FILE, @@ -532,19 +708,24 @@ impl LocalStateBackend { CLUSTER_LOCK_FILE, format!("could not parse state lock: {err}"), )), - }, - Err(err) => diagnostics.push(Diagnostic::warning( - "state_lock_read_error", - CLUSTER_LOCK_FILE, - format!("could not read state lock: {err}"), - )), + } } + Ok(None) => {} + Err(err) => diagnostics.push(Diagnostic::warning( + "state_lock_read_error", + CLUSTER_LOCK_FILE, + format!("could not read state lock: {err}"), + )), } } - pub(crate) fn observe_lock_metadata_lossy(&self, observations: &mut StateObservations) { + pub(crate) async fn observe_lock_metadata_lossy( + &self, + observations: &mut StateObservations, + ) { observations.locked = true; - if let Ok(text) = fs::read_to_string(&self.lock_path) { + let lock_uri = self.uri(CLUSTER_LOCK_FILE); + if let Ok(Some((text, _))) = self.read_versioned_opt(&lock_uri).await { if let Ok(lock) = serde_json::from_str::(&text) { if lock.version == 1 { observations.observe_lock_metadata(&lock); @@ -554,10 +735,12 @@ impl LocalStateBackend { } } -impl Drop for StateLockGuard { - fn drop(&mut self) { - let _ = fs::remove_file(&self.path); - } +fn state_cas_mismatch() -> Diagnostic { + Diagnostic::error( + "state_cas_mismatch", + CLUSTER_STATE_FILE, + "state.json changed while the command was running; re-run the command against the latest state", + ) } pub(crate) fn parse_lock_file_for_unlock(text: &str) -> Result { diff --git a/crates/omnigraph-cluster/src/sweep.rs b/crates/omnigraph-cluster/src/sweep.rs index 77ad8c5..2cfd7d1 100644 --- a/crates/omnigraph-cluster/src/sweep.rs +++ b/crates/omnigraph-cluster/src/sweep.rs @@ -11,12 +11,12 @@ use super::*; /// Mutations ride the calling command's CAS-checked state write; completed /// sidecars are deleted only after that write lands. pub(crate) async fn sweep_recovery_sidecars( - backend: &LocalStateBackend, + backend: &ClusterStore, state: &mut ClusterState, diagnostics: &mut Vec, ) -> SweepOutcome { let mut outcome = SweepOutcome::default(); - for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) { + for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await { match sidecar.kind { RecoverySidecarKind::GraphCreate => { sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; @@ -33,7 +33,7 @@ pub(crate) async fn sweep_recovery_sidecars( } pub(crate) async fn sweep_graph_create_sidecar( - path: PathBuf, + path: String, sidecar: RecoverySidecar, state: &mut ClusterState, diagnostics: &mut Vec, @@ -44,9 +44,11 @@ pub(crate) async fn sweep_graph_create_sidecar( let graph_path = PathBuf::from(&sidecar.graph_uri); // Row 1: nothing moved — the init never landed. The sidecar is pure - // intent; remove it and let the command's own plan re-propose the create. + // intent; retire it (deferred to the command's post-CAS cleanup, like + // every other completed sidecar — a failed CAS simply re-sweeps it) and + // let the command's own plan re-propose the create. if !graph_path.exists() { - let _ = fs::remove_file(&path); + outcome.completed_sidecars.push(path); return; } @@ -153,7 +155,7 @@ pub(crate) async fn sweep_graph_create_sidecar( } pub(crate) async fn sweep_schema_apply_sidecar( - path: PathBuf, + path: String, sidecar: RecoverySidecar, state: &mut ClusterState, diagnostics: &mut Vec, @@ -250,7 +252,7 @@ pub(crate) async fn sweep_schema_apply_sidecar( } pub(crate) fn sweep_graph_delete_sidecar( - path: PathBuf, + path: String, sidecar: RecoverySidecar, state: &mut ClusterState, diagnostics: &mut Vec, @@ -351,15 +353,15 @@ pub(crate) fn record_approval_consumed(state: &mut ClusterState, approval_id: &s } /// Mark approval artifact files consumed on disk (post-CAS). -pub(crate) fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) { +pub(crate) async fn mark_approvals_consumed(backend: &ClusterStore, approval_ids: &[String]) { if approval_ids.is_empty() { return; } let mut sink = Vec::new(); - for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) { + for (_, mut artifact) in backend.list_approval_artifacts(&mut sink).await { if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() { artifact.consumed_at = Some(now_rfc3339()); - let _ = backend.write_approval_artifact(&artifact); + let _ = backend.write_approval_artifact(&artifact).await; } } } diff --git a/crates/omnigraph-cluster/src/tests.rs b/crates/omnigraph-cluster/src/tests.rs index a03c522..3b7984d 100644 --- a/crates/omnigraph-cluster/src/tests.rs +++ b/crates/omnigraph-cluster/src/tests.rs @@ -351,8 +351,8 @@ policies: })); } - #[test] - fn extended_state_json_status_surfaces_statuses() { + #[tokio::test] + async fn extended_state_json_status_surfaces_statuses() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -380,7 +380,7 @@ policies: }"#; fs::write(state_dir.join("state.json"), state).unwrap(); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.state_observations.state_found); assert_eq!(out.state_observations.state_revision, 42); @@ -400,10 +400,10 @@ policies: ); } - #[test] - fn missing_state_status_succeeds_with_warning() { + #[tokio::test] + async fn missing_state_status_succeeds_with_warning() { let dir = fixture(); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.state_observations.state_found); assert_eq!(out.state_observations.state_revision, 0); @@ -414,14 +414,14 @@ policies: ); } - #[test] - fn invalid_state_status_fails() { + #[tokio::test] + async fn invalid_state_status_fails() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); fs::write(state_dir.join("state.json"), "{").unwrap(); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(!out.ok); assert!(out.state_observations.state_found); assert!( @@ -431,12 +431,12 @@ policies: ); } - #[test] - fn status_surfaces_full_lock_metadata() { + #[tokio::test] + async fn status_surfaces_full_lock_metadata() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "refresh"); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.state_observations.locked); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); @@ -452,12 +452,12 @@ policies: assert!(out.state_observations.lock_age_seconds.is_some()); } - #[test] - fn force_unlock_matching_id_removes_lock() { + #[tokio::test] + async fn force_unlock_matching_id_removes_lock() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "plan"); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.lock_removed); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); @@ -468,12 +468,12 @@ policies: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn force_unlock_wrong_id_fails_and_preserves_lock() { + #[tokio::test] + async fn force_unlock_wrong_id_fails_and_preserves_lock() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "plan"); - let out = force_unlock_config_dir(dir.path(), "other-lock"); + let out = force_unlock_config_dir(dir.path(), "other-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); @@ -485,11 +485,11 @@ policies: assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn force_unlock_missing_lock_fails() { + #[tokio::test] + async fn force_unlock_missing_lock_fails() { let dir = fixture(); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert!(!out.state_observations.locked); @@ -500,14 +500,14 @@ policies: ); } - #[test] - fn force_unlock_invalid_lock_json_fails_and_preserves_lock() { + #[tokio::test] + async fn force_unlock_invalid_lock_json_fails_and_preserves_lock() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); fs::write(state_dir.join("lock.json"), "{").unwrap(); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert!( @@ -518,8 +518,8 @@ policies: assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() { + #[tokio::test] + async fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -529,7 +529,7 @@ policies: ) .unwrap(); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert!( @@ -540,8 +540,8 @@ policies: assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn force_unlock_external_state_backend_rejected() { + #[tokio::test] + async fn force_unlock_external_state_backend_rejected() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "plan"); fs::write( @@ -557,7 +557,7 @@ graphs: ) .unwrap(); - let out = force_unlock_config_dir(dir.path(), "held-lock"); + let out = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(!out.ok); assert!(!out.lock_removed); assert!( @@ -582,7 +582,7 @@ graphs: .any(|diagnostic| diagnostic.code == "state_lock_held") ); - let unlocked = force_unlock_config_dir(dir.path(), "held-lock"); + let unlocked = force_unlock_config_dir(dir.path(), "held-lock").await; assert!(unlocked.ok, "{:?}", unlocked.diagnostics); let out = plan_config_dir(dir.path()).await; @@ -1886,7 +1886,7 @@ graphs: let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); fs::remove_file(&blob).unwrap(); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.diagnostics.iter().any(|diagnostic| { diagnostic.code == "catalog_payload_missing" @@ -2001,7 +2001,7 @@ graphs: assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); assert_eq!(fs::read_to_string(&blob).unwrap(), original); - let status = status_config_dir(dir.path()); + let status = status_config_dir(dir.path()).await; assert!( !status .diagnostics @@ -2012,12 +2012,12 @@ graphs: ); } - #[test] - fn verification_skips_graph_and_schema_resources() { + #[tokio::test] + async fn verification_skips_graph_and_schema_resources() { let dir = fixture(); write_applyable_state(dir.path()); // graph + schema digests only, no blobs - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!( !out.diagnostics .iter() @@ -2770,7 +2770,7 @@ policies: let converge = apply_config_dir(dir.path()).await; assert!(converge.converged, "{converge:?}"); - let snapshot = read_serving_snapshot(dir.path()).expect("converged cluster must serve"); + let snapshot = read_serving_snapshot(dir.path()).await.expect("converged cluster must serve"); assert_eq!(snapshot.graphs.len(), 1); assert_eq!(snapshot.graphs[0].graph_id, "knowledge"); assert!(snapshot.graphs[0].root.ends_with("graphs/knowledge.omni")); @@ -2782,10 +2782,10 @@ policies: assert!(snapshot.policies[0].blob_path.exists()); } - #[test] - fn serving_snapshot_refuses_missing_state() { + #[tokio::test] + async fn serving_snapshot_refuses_missing_state() { let dir = fixture(); - let err = read_serving_snapshot(dir.path()).unwrap_err(); + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( err.iter().any(|diagnostic| diagnostic.code == "cluster_state_missing"), "{err:?}" @@ -2800,7 +2800,7 @@ policies: apply_config_dir(dir.path()).await; write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SERVE"); - let err = read_serving_snapshot(dir.path()).unwrap_err(); + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( err.iter().any(|diagnostic| diagnostic.code == "cluster_recovery_pending"), "{err:?}" @@ -2814,7 +2814,7 @@ policies: write_applyable_state(dir.path()); apply_config_dir(dir.path()).await; // Tamper with the query blob... - let snapshot = read_serving_snapshot(dir.path()).unwrap(); + let snapshot = read_serving_snapshot(dir.path()).await.unwrap(); let desired = validate_config_dir(dir.path()); let query_digest = &desired.resource_digests["query.knowledge.find_person"]; let blob = dir @@ -2838,7 +2838,7 @@ policies: ) .unwrap(); - let err = read_serving_snapshot(dir.path()).unwrap_err(); + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( err.iter() .any(|diagnostic| diagnostic.code == "catalog_payload_digest_mismatch"), @@ -2851,12 +2851,12 @@ policies: let _ = snapshot; // the pre-tamper read succeeded } - #[test] - fn serving_snapshot_refuses_empty_cluster() { + #[tokio::test] + async fn serving_snapshot_refuses_empty_cluster() { let dir = fixture(); write_state_resources(dir.path(), &[]); // state exists, no graphs - let err = read_serving_snapshot(dir.path()).unwrap_err(); + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); assert!( err.iter().any(|diagnostic| diagnostic.code == "cluster_empty"), "{err:?}" @@ -2972,13 +2972,13 @@ policies: ); } - #[test] - fn status_warns_on_pending_recovery_sidecar() { + #[tokio::test] + async fn status_warns_on_pending_recovery_sidecar() { let dir = fixture(); write_applyable_state(dir.path()); write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01STATUS"); - let out = status_config_dir(dir.path()); + let out = status_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!( out.diagnostics diff --git a/crates/omnigraph-cluster/src/types.rs b/crates/omnigraph-cluster/src/types.rs index c366f04..ca960a5 100644 --- a/crates/omnigraph-cluster/src/types.rs +++ b/crates/omnigraph-cluster/src/types.rs @@ -503,7 +503,8 @@ pub(crate) struct SweepOutcome { pub(crate) pending_graphs: BTreeSet, /// Sidecars whose outcome is recorded (rows 2/4): deleted only after the /// command's state write lands, so a CAS failure re-sweeps them. - pub(crate) completed_sidecars: Vec, + /// Store URIs (the storage layer addresses everything by URI). + pub(crate) completed_sidecars: Vec, /// Approval artifacts consumed by a roll-forward (delete row 7b): their /// files are rewritten with consumed_at only after the state write lands. pub(crate) consumed_approvals: Vec, diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 0038674..f7fc6b1 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -893,12 +893,12 @@ fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> St /// catalog blob content, policy bundles from blob paths with their applied /// bindings. Always multi-graph routing. The unauthenticated/env handling /// matches the omnigraph.yaml path. -fn load_cluster_settings( +async fn load_cluster_settings( cluster_dir: &PathBuf, cli_bind: Option, cli_allow_unauthenticated: bool, ) -> Result { - let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).map_err(|diagnostics| { + let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).await.map_err(|diagnostics| { let details = diagnostics .iter() .map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message)) @@ -988,7 +988,7 @@ fn load_cluster_settings( }) } -pub fn load_server_settings( +pub async fn load_server_settings( config_path: Option<&PathBuf>, cli_cluster: Option<&PathBuf>, cli_uri: Option, @@ -1005,7 +1005,7 @@ pub fn load_server_settings( "--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)" ); } - return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated); + return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await; } let config = load_config(config_path)?; let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string()); @@ -3363,8 +3363,8 @@ mod tests { ); } - #[test] - fn server_settings_load_from_yaml_config() { + #[tokio::test] + async fn server_settings_load_from_yaml_config() { let temp = tempdir().unwrap(); let config = temp.path().join("omnigraph.yaml"); fs::write( @@ -3380,7 +3380,7 @@ server: ) .unwrap(); - let settings = load_server_settings(Some(&config), None, None, None, None, false).unwrap(); + let settings = load_server_settings(Some(&config), None, None, None, None, false).await.unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { assert_eq!(uri, "/tmp/demo.omni"); @@ -3391,8 +3391,8 @@ server: assert_eq!(settings.bind, "0.0.0.0:9090"); } - #[test] - fn server_settings_cli_flags_override_yaml_config() { + #[tokio::test] + async fn server_settings_cli_flags_override_yaml_config() { let temp = tempdir().unwrap(); let config = temp.path().join("omnigraph.yaml"); fs::write( @@ -3416,6 +3416,7 @@ server: Some("0.0.0.0:9999".to_string()), false, ) + .await .unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { @@ -3427,8 +3428,8 @@ server: assert_eq!(settings.bind, "0.0.0.0:9999"); } - #[test] - fn server_settings_can_resolve_named_target() { + #[tokio::test] + async fn server_settings_can_resolve_named_target() { let temp = tempdir().unwrap(); let config = temp.path().join("omnigraph.yaml"); fs::write( @@ -3448,6 +3449,7 @@ server: let settings = load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false) + .await .unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { @@ -3458,9 +3460,9 @@ server: } } - #[test] - fn server_settings_require_uri_from_cli_or_config() { - let error = load_server_settings(None, None, None, None, None, false).unwrap_err(); + #[tokio::test] + async fn server_settings_require_uri_from_cli_or_config() { + let error = load_server_settings(None, None, None, None, None, false).await.unwrap_err(); assert!( error.to_string().contains("no graph to serve"), "expected mode-inference error, got: {error}", @@ -3598,9 +3600,9 @@ server: ); } - #[test] + #[tokio::test] #[serial] - fn unauthenticated_env_var_classification() { + async fn unauthenticated_env_var_classification() { // MR-723 PR A: closes the gap where the env-var read path inside // `load_server_settings` was structurally implemented but not // exercised by any test. Three properties to pin, all in one @@ -3627,7 +3629,7 @@ server: // Truthy values flip Open mode on, even with CLI flag off. for value in ["1", "true", "yes", "TRUE", "anything"] { let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]); - let settings = load_server_settings(Some(&config_path), None, None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await .expect("settings load should succeed"); assert!( settings.allow_unauthenticated, @@ -3638,7 +3640,7 @@ server: // Falsy values keep refusal behavior, even with CLI flag off. for value in ["0", "false", "FALSE", ""] { let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]); - let settings = load_server_settings(Some(&config_path), None, None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await .expect("settings load should succeed"); assert!( !settings.allow_unauthenticated, @@ -3648,7 +3650,7 @@ server: // Unset env var: also false. let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]); - let settings = load_server_settings(Some(&config_path), None, None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await .expect("settings load should succeed"); assert!( !settings.allow_unauthenticated, @@ -3659,7 +3661,7 @@ server: // CLI flag wins even when env is falsy — `serve()` honors the // OR of both inputs. let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true) + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await .expect("settings load should succeed"); assert!( settings.allow_unauthenticated, diff --git a/crates/omnigraph-server/src/main.rs b/crates/omnigraph-server/src/main.rs index c71ea2f..9000910 100644 --- a/crates/omnigraph-server/src/main.rs +++ b/crates/omnigraph-server/src/main.rs @@ -43,6 +43,7 @@ async fn main() -> Result<()> { cli.target, cli.bind, cli.unauthenticated, - )?; + ) + .await?; serve(settings).await } diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 7858587..d11c542 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -5567,8 +5567,8 @@ mod multi_graph_startup { /// `GraphId` validation runs at startup — a reserved name in /// `omnigraph.yaml` produces a clear error rather than getting /// rejected per-request. - #[test] - fn load_server_settings_rejects_reserved_graph_id() { + #[tokio::test] + async fn load_server_settings_rejects_reserved_graph_id() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5580,7 +5580,7 @@ graphs: "#, ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, None, false).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, false).await.unwrap_err(); assert!( err.to_string().contains("invalid graph id 'policies'"), "expected reserved-name rejection, got: {err}" @@ -5644,8 +5644,8 @@ graphs: // ── Four-rule mode inference matrix ─────────────────────────────── /// Rule 1: CLI positional URI → Single. - #[test] - fn mode_inference_cli_uri_is_single() { + #[tokio::test] + async fn mode_inference_cli_uri_is_single() { let settings = load_server_settings( None, None, @@ -5654,6 +5654,7 @@ graphs: None, true, // allow unauth so we get past the runtime-state check ) + .await .unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/cli.omni"), @@ -5662,8 +5663,8 @@ graphs: } /// Rule 2: --target picks one graph from `graphs:` map → Single. - #[test] - fn mode_inference_cli_target_is_single() { + #[tokio::test] + async fn mode_inference_cli_target_is_single() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5679,6 +5680,7 @@ graphs: .unwrap(); let settings = load_server_settings(Some(&config_path), None, None, Some("alpha".into()), None, true) + .await .unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"), @@ -5687,8 +5689,8 @@ graphs: } /// Rule 3: `server.graph` set → Single (target picked from config). - #[test] - fn mode_inference_server_graph_is_single() { + #[tokio::test] + async fn mode_inference_server_graph_is_single() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5704,7 +5706,7 @@ server: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"), ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"), @@ -5712,8 +5714,8 @@ server: } /// Rule 4: `--config` + non-empty `graphs:` + no single-mode selector → Multi. - #[test] - fn mode_inference_config_plus_graphs_is_multi() { + #[tokio::test] + async fn mode_inference_config_plus_graphs_is_multi() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5727,7 +5729,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { ServerConfigMode::Multi { graphs, .. } => { let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect(); @@ -5738,8 +5740,8 @@ graphs: } } - #[test] - fn mode_inference_multi_rejects_top_level_policy_file() { + #[tokio::test] + async fn mode_inference_multi_rejects_top_level_policy_file() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5753,7 +5755,7 @@ graphs: "#, ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err(); let msg = err.to_string(); assert!( msg.contains("top-level") && msg.contains("policy.file") && msg.contains("not honored"), @@ -5769,8 +5771,8 @@ graphs: ); } - #[test] - fn mode_inference_multi_rejects_top_level_queries() { + #[tokio::test] + async fn mode_inference_multi_rejects_top_level_queries() { // Symmetric to the policy guard: a top-level `queries:` block in // multi-graph mode is not honored (each graph uses its own), so it // is a loud error rather than a silent no-op. @@ -5781,7 +5783,7 @@ graphs: "queries:\n q:\n file: ./q.gq\ngraphs:\n alpha:\n uri: /tmp/alpha.omni\n", ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err(); let msg = err.to_string(); assert!( msg.contains("queries") && msg.contains("not honored"), @@ -5789,8 +5791,8 @@ graphs: ); } - #[test] - fn single_mode_named_graph_rejects_top_level_blocks() { + #[tokio::test] + async fn single_mode_named_graph_rejects_top_level_blocks() { // Serving a graph by name (`--target`/`server.graph`) uses its // per-graph block; a populated top-level block would be silently // shadowed, so boot refuses and names the per-graph location. @@ -5803,6 +5805,7 @@ graphs: .unwrap(); let err = load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true) + .await .unwrap_err(); let msg = err.to_string(); assert!( @@ -5811,8 +5814,8 @@ graphs: ); } - #[test] - fn single_mode_named_graph_uses_per_graph_policy_and_queries() { + #[tokio::test] + async fn single_mode_named_graph_uses_per_graph_policy_and_queries() { // The identity rule: `--target prod` attaches `graphs.prod`'s own // policy + queries, not the top-level ones (which are absent here). let temp = tempfile::tempdir().unwrap(); @@ -5830,6 +5833,7 @@ graphs: .unwrap(); let settings = load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true) + .await .unwrap(); match settings.mode { ServerConfigMode::Single { @@ -5851,8 +5855,8 @@ graphs: } } - #[test] - fn mode_inference_normalizes_multi_graph_uris() { + #[tokio::test] + async fn mode_inference_normalizes_multi_graph_uris() { let temp = tempfile::tempdir().unwrap(); let graph = temp.path().join("alpha.omni"); let config_path = temp.path().join("omnigraph.yaml"); @@ -5868,7 +5872,7 @@ graphs: ), ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { ServerConfigMode::Multi { graphs, .. } => { assert_eq!(graphs[0].uri, graph.to_string_lossy()); @@ -5878,9 +5882,9 @@ graphs: } /// Rule 5: nothing → error with migration hint. - #[test] - fn mode_inference_no_inputs_errors_with_migration_hint() { - let err = load_server_settings(None, None, None, None, None, true).unwrap_err(); + #[tokio::test] + async fn mode_inference_no_inputs_errors_with_migration_hint() { + let err = load_server_settings(None, None, None, None, None, true).await.unwrap_err(); let msg = err.to_string(); assert!( msg.contains("no graph to serve"), @@ -5890,19 +5894,19 @@ graphs: /// Rule 4 sub-case: `--config` with empty `graphs:` map and no /// single-mode selector → rule 5 fires (no graph to serve). - #[test] - fn mode_inference_empty_graphs_map_errors() { + #[tokio::test] + async fn mode_inference_empty_graphs_map_errors() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err(); assert!(err.to_string().contains("no graph to serve")); } /// `--config` + `` together: URI wins → Single (the CLI URI /// takes precedence over the config's graphs map). - #[test] - fn mode_inference_cli_uri_overrides_graphs_map() { + #[tokio::test] + async fn mode_inference_cli_uri_overrides_graphs_map() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5922,6 +5926,7 @@ graphs: None, true, ) + .await .unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => { @@ -5937,8 +5942,8 @@ graphs: } /// Per-graph `policy.file` is resolved relative to the config base_dir. - #[test] - fn per_graph_policy_file_is_resolved_relative_to_base_dir() { + #[tokio::test] + async fn per_graph_policy_file_is_resolved_relative_to_base_dir() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5954,7 +5959,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); let graphs = match settings.mode { ServerConfigMode::Multi { graphs, .. } => graphs, _ => panic!("expected Multi"), @@ -5972,8 +5977,8 @@ graphs: } /// `server.policy.file` resolves alongside the graphs map. - #[test] - fn server_policy_file_is_resolved_relative_to_base_dir() { + #[tokio::test] + async fn server_policy_file_is_resolved_relative_to_base_dir() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( @@ -5988,7 +5993,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { ServerConfigMode::Multi { server_policy_file, .. @@ -6268,7 +6273,7 @@ graphs: .unwrap(); let settings: ServerConfig = - load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); + load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); assert!(matches!(settings.mode, ServerConfigMode::Multi { .. })); match settings.mode { @@ -6321,14 +6326,14 @@ graphs: temp } -fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result { - omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true) +async fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result { + omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true).await } #[tokio::test] async fn cluster_boot_serves_applied_state() { let temp = converged_cluster_dir("").await; - let settings = cluster_settings(temp.path()).unwrap(); + let settings = cluster_settings(temp.path()).await.unwrap(); let omnigraph_server::ServerConfigMode::Multi { graphs, config_path, @@ -6444,7 +6449,7 @@ graphs: temp }; - let settings = cluster_settings(temp.path()).unwrap(); + let settings = cluster_settings(temp.path()).await.unwrap(); let omnigraph_server::ServerConfigMode::Multi { graphs, server_policy_file, @@ -6482,6 +6487,7 @@ async fn cluster_boot_refusals() { None, true, ) + .await .unwrap_err(); assert!(err.to_string().contains("exclusive boot source"), "{err}"); let err = omnigraph_server::load_server_settings( @@ -6492,6 +6498,7 @@ async fn cluster_boot_refusals() { None, true, ) + .await .unwrap_err(); assert!(err.to_string().contains("exclusive boot source"), "{err}"); @@ -6499,7 +6506,7 @@ async fn cluster_boot_refusals() { let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person"); let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path(); fs::write(&blob, "tampered").unwrap(); - let err = cluster_settings(&dir).unwrap_err(); + let err = cluster_settings(&dir).await.unwrap_err(); assert!( err.to_string().contains("catalog_payload_digest_mismatch"), "{err}" @@ -6508,6 +6515,6 @@ async fn cluster_boot_refusals() { // Missing state refuses with the import/apply remedy. let empty = tempfile::tempdir().unwrap(); - let err = cluster_settings(empty.path()).unwrap_err(); + let err = cluster_settings(empty.path()).await.unwrap_err(); assert!(err.to_string().contains("cluster_state_missing"), "{err}"); } From 8dc2f1525509c3aae359317d0851e7ddce74f422 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 14:28:04 +0300 Subject: [PATCH 2/3] =?UTF-8?q?feat(cluster):=20the=20storage:=20root=20?= =?UTF-8?q?=E2=80=94=20state,=20catalog,=20and=20graph=20roots=20relocatab?= =?UTF-8?q?le?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cluster.yaml gains an optional storage: URI deciding where everything the cluster STORES lives: the state ledger, lock, content-addressed catalog, recovery sidecars, approval artifacts, and the derived graph roots (/graphs/.omni). Absent, it defaults to the config directory itself — the original layout, byte-compatible, so pre-existing clusters and the whole test suite are untouched. Declared configuration always stays in the working tree (Terraform's config-local/state-remote split); credentials are env-only, never in cluster.yaml. Every command resolves its store from the declared root (a bad root is a loud invalid_storage_root). Graph-root derivation, the delete executor (prefix delete via the adapter), the sweep's existence probes, the catalog payload write/verify/read paths, and the serving snapshot all flow through ClusterStore — the last raw-fs holdouts for stored state are gone, and the deny-list gains the rule that keeps it that way. Tests: default-layout byte-compat, a file:// root relocating the entire cluster (ledger+catalog+graphs under the new root, nothing under the config dir, serving snapshot follows), invalid-root validation. 98 in-crate + 9 failpoints + full workspace gate green. The s3:// flavor lands with PR 3's gated RustFS e2e. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/config.rs | 40 +++- crates/omnigraph-cluster/src/lib.rs | 241 +++++++++++++------------ crates/omnigraph-cluster/src/serve.rs | 70 +++---- crates/omnigraph-cluster/src/store.rs | 28 +++ crates/omnigraph-cluster/src/sweep.rs | 14 +- crates/omnigraph-cluster/src/tests.rs | 64 +++++++ crates/omnigraph-cluster/src/types.rs | 11 +- docs/dev/invariants.md | 4 + docs/user/cluster-config.md | 14 ++ docs/user/cluster.md | 2 + 10 files changed, 309 insertions(+), 179 deletions(-) diff --git a/crates/omnigraph-cluster/src/config.rs b/crates/omnigraph-cluster/src/config.rs index ecdc71c..acc954d 100644 --- a/crates/omnigraph-cluster/src/config.rs +++ b/crates/omnigraph-cluster/src/config.rs @@ -239,8 +239,33 @@ pub(crate) fn validate_cluster_header( } } + if let Some(storage) = raw.storage.as_deref() { + let trimmed = storage.trim(); + if trimmed.is_empty() { + diagnostics.push(Diagnostic::error( + "invalid_storage_root", + "storage", + "storage must be a non-empty URI (e.g. s3://bucket/prefix) when provided", + )); + } else if let Some(rest) = trimmed.strip_prefix("s3://") { + if rest.trim_start_matches('/').is_empty() { + diagnostics.push(Diagnostic::error( + "invalid_storage_root", + "storage", + "storage s3:// URI must name a bucket", + )); + } + } + } + ClusterSettings { state_lock: raw.state.lock.unwrap_or(true), + storage_root: raw + .storage + .as_deref() + .map(str::trim) + .filter(|storage| !storage.is_empty()) + .map(|storage| storage.trim_end_matches('/').to_string()), } } @@ -271,19 +296,19 @@ pub(crate) fn initial_import_state(desired: &DesiredCluster) -> ClusterState { } -pub(crate) async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { +pub(crate) async fn observe_declared_graphs( + desired: &DesiredCluster, + backend: &ClusterStore, + state: &mut ClusterState, +) -> usize { let mut graph_error_count = 0; for graph in &desired.graphs { let graph_address = graph_address(&graph.id); let schema_address = schema_address(&graph.id); - let graph_path = desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{}.omni", graph.id)); - let graph_uri = display_path(&graph_path); + let graph_uri = backend.graph_root(&graph.id); let observed_at = now_rfc3339(); - if !graph_path.exists() { + if !backend.graph_root_exists(&graph_uri).await { state.applied_revision.resources.remove(&graph_address); state.applied_revision.resources.remove(&schema_address); state.observations.insert( @@ -737,6 +762,7 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { desired: Some(DesiredCluster { config_dir: config_dir.clone(), config_digest, + storage_root: settings.storage_root.clone(), state_lock: settings.state_lock, graphs, resource_digests, diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index ec1a02a..d97bb5b 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -29,7 +29,6 @@ use store::{ClusterStore, StateLockGuard, StateSnapshot}; pub use types::*; use types::*; pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot}; -use serve::read_verified_payload; use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source}; use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind}; use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars}; @@ -43,6 +42,18 @@ pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries"; pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals"; +/// The store for a load outcome: the declared `storage:` root when present, +/// the config directory itself otherwise. A bad root is a loud error. +fn store_for( + config_dir: &Path, + storage_root: Option<&str>, +) -> Result { + match storage_root { + Some(root) => ClusterStore::for_storage_root(root), + None => Ok(ClusterStore::for_config_dir(config_dir)), + } +} + pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { let outcome = load_desired(config_dir.as_ref()); let (resource_digests, resources, dependencies) = match outcome.desired { @@ -69,7 +80,17 @@ pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = ClusterStore::for_config_dir(&outcome.config_dir); + let storage_root = outcome + .desired + .as_ref() + .and_then(|desired| desired.storage_root.clone()); + let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&outcome.config_dir) + } + }; let mut observations = backend.observations(); let Some(desired) = outcome.desired else { @@ -169,12 +190,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else { continue; }; - let graph_uri = display_path( - &desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), - ); + let graph_uri = backend.graph_root(&graph_id); let source_path = desired .resources .iter() @@ -242,7 +258,17 @@ pub async fn apply_config_dir_with_options( ) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = ClusterStore::for_config_dir(&outcome.config_dir); + let storage_root = outcome + .desired + .as_ref() + .and_then(|desired| desired.storage_root.clone()); + let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&outcome.config_dir) + } + }; let mut observations = backend.observations(); let actor_for_output = options.actor.clone(); @@ -442,12 +468,7 @@ pub async fn apply_config_dir_with_options( else { continue; }; - let graph_uri = display_path( - &desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), - ); + let graph_uri = backend.graph_root(graph_id); let mut sidecar = RecoverySidecar { schema_version: 1, operation_id: Ulid::new().to_string(), @@ -587,12 +608,7 @@ pub async fn apply_config_dir_with_options( else { continue; }; - let graph_uri = display_path( - &desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), - ); + let graph_uri = backend.graph_root(graph_id); // Read-write open: the engine's own recovery sweep runs here, which // is exactly what we want before moving its manifest. let db = match Omnigraph::open(&graph_uri).await { @@ -767,9 +783,9 @@ pub async fn apply_config_dir_with_options( .after_digest .as_deref() .expect("create/update always carries an after digest"); - let Some(target) = payload_path(&desired.config_dir, &kind, digest) else { + if ClusterStore::payload_relative(&kind, digest).is_none() { continue; - }; + } let Some(source) = source_paths.get(change.resource.as_str()) else { diagnostics.push(Diagnostic::error( "resource_payload_write_error", @@ -779,7 +795,8 @@ pub async fn apply_config_dir_with_options( continue; }; if let Err(diagnostic) = - write_resource_payload(&target, Path::new(source), digest, &change.resource) + write_resource_payload(&backend, &kind, Path::new(source), digest, &change.resource) + .await { diagnostics.push(diagnostic); } @@ -844,12 +861,7 @@ pub async fn apply_config_dir_with_options( && artifact.bound_config_digest == desired.config_digest }) .map(|artifact| artifact.approval_id.clone()); - let graph_uri = display_path( - &desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), - ); + let graph_uri = backend.graph_root(graph_id); let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await { Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await { Ok(snapshot) => Some(snapshot.version()), @@ -888,9 +900,10 @@ pub async fn apply_config_dir_with_options( graph_moving_aborted = true; continue; } - match fs::remove_dir_all(PathBuf::from(&graph_uri)) { + // Prefix delete through the storage layer: remove_dir_all locally, + // list+delete on object stores (idempotent; already-gone is fine). + match backend.delete_graph_root(&graph_uri).await { Ok(()) => {} - Err(err) if err.kind() == ErrorKind::NotFound => {} // already gone Err(err) => { diagnostics.push(Diagnostic::error( "graph_delete_failed", @@ -1088,7 +1101,17 @@ pub async fn approve_config_dir( ) -> ApproveOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = ClusterStore::for_config_dir(&outcome.config_dir); + let storage_root = outcome + .desired + .as_ref() + .and_then(|desired| desired.storage_root.clone()); + let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&outcome.config_dir) + } + }; let mut observations = backend.observations(); let fail = |config_dir: String, diagnostics: Vec| ApproveOutput { @@ -1200,7 +1223,20 @@ pub async fn approve_config_dir( pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; - let backend = ClusterStore::for_config_dir(&parsed.config_dir); + let storage_root = parsed.raw.as_ref().and_then(|raw| { + raw.storage + .as_deref() + .map(str::trim) + .filter(|root| !root.is_empty()) + .map(|root| root.trim_end_matches('/').to_string()) + }); + let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&parsed.config_dir) + } + }; let mut observations = backend.observations(); backend.observe_lock(&mut observations, &mut diagnostics).await; warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics); @@ -1219,7 +1255,7 @@ pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { // findings as diagnostics; persisting Drifted statuses // is refresh's job. Status never writes state. for (address, finding) in - verify_catalog_payloads(&parsed.config_dir, &state) + verify_catalog_payloads(&backend, &state).await { diagnostics.push(payload_finding_diagnostic(&address, &finding)); } @@ -1256,7 +1292,20 @@ pub async fn force_unlock_config_dir( ) -> ForceUnlockOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; - let backend = ClusterStore::for_config_dir(&parsed.config_dir); + let storage_root = parsed.raw.as_ref().and_then(|raw| { + raw.storage + .as_deref() + .map(str::trim) + .filter(|root| !root.is_empty()) + .map(|root| root.trim_end_matches('/').to_string()) + }); + let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&parsed.config_dir) + } + }; let mut observations = backend.observations(); let mut lock_removed = false; @@ -1290,7 +1339,17 @@ pub async fn import_config_dir(config_dir: impl AsRef) -> StateSyncOutput async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput { let outcome = load_desired(config_dir); let mut diagnostics = outcome.diagnostics; - let backend = ClusterStore::for_config_dir(&outcome.config_dir); + let storage_root = outcome + .desired + .as_ref() + .and_then(|desired| desired.storage_root.clone()); + let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&outcome.config_dir) + } + }; let mut observations = backend.observations(); let Some(desired) = outcome.desired else { @@ -1418,7 +1477,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St // a drifted query digest first means the live-graph composite recompute // below already excludes it, so the persisted graph. composite stays // consistent and the next plan shows exactly the create + derived update. - for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) { + for (address, finding) in verify_catalog_payloads(&backend, &state).await { diagnostics.push(payload_finding_diagnostic(&address, &finding)); match finding { PayloadFinding::Missing => { @@ -1455,7 +1514,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St } } - let graph_error_count = observe_declared_graphs(&desired, &mut state).await; + let graph_error_count = observe_declared_graphs(&desired, &backend, &mut state).await; if graph_error_count > 0 { diagnostics.push(Diagnostic::error( "graph_observation_error", @@ -1512,28 +1571,6 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St -/// Content-addressed catalog path for an applied resource payload. Extensions -/// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name, -/// so the catalog layout cannot drift with operator file conventions. -fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option { - let resources_dir = config_dir.join(CLUSTER_RESOURCES_DIR); - match kind { - ResourceKind::Query { graph, name } => Some( - resources_dir - .join("query") - .join(graph) - .join(name) - .join(format!("{digest}.gq")), - ), - ResourceKind::Policy(name) => Some( - resources_dir - .join("policy") - .join(name) - .join(format!("{digest}.yaml")), - ), - _ => None, - } -} #[derive(Debug, PartialEq, Eq)] enum PayloadFinding { @@ -1547,34 +1584,26 @@ enum PayloadFinding { /// unknown addresses have no payloads and are skipped. Read-only; findings /// are deterministic (BTreeMap order). Payloads are small (queries, policy /// bundles), so a full digest re-hash is cheap. -fn verify_catalog_payloads( - config_dir: &Path, +async fn verify_catalog_payloads( + backend: &ClusterStore, state: &ClusterState, ) -> Vec<(String, PayloadFinding)> { let mut findings = Vec::new(); for (address, resource) in &state.applied_revision.resources { let kind = resource_kind(address); - let Some(path) = payload_path(config_dir, &kind, &resource.digest) else { + if ClusterStore::payload_relative(&kind, &resource.digest).is_none() { continue; - }; - match fs::read(&path) { - Ok(bytes) => { - let actual_digest = sha256_hex(&bytes); + } + match backend.read_payload(&kind, &resource.digest).await { + Ok(Some(text)) => { + let actual_digest = sha256_hex(text.as_bytes()); if actual_digest != resource.digest { findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest })); } } - Err(err) if err.kind() == ErrorKind::NotFound => { - findings.push((address.clone(), PayloadFinding::Missing)); - } + Ok(None) => findings.push((address.clone(), PayloadFinding::Missing)), Err(err) => { - findings.push(( - address.clone(), - PayloadFinding::ReadError(format!( - "could not read catalog payload '{}': {err}", - path.display() - )), - )); + findings.push((address.clone(), PayloadFinding::ReadError(err))); } } } @@ -1606,13 +1635,15 @@ fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagno /// digest-named file is trusted as-is. The digest re-check is the apply-side /// TOCTOU detector — the source file changing between `load_desired` and the /// payload write must fail loudly, never publish mismatched content. -fn write_resource_payload( - target: &Path, +async fn write_resource_payload( + backend: &ClusterStore, + kind: &ResourceKind, source: &Path, expected_digest: &str, resource: &str, ) -> Result<(), Diagnostic> { - if target.exists() { + if backend.payload_exists(kind, expected_digest).await { + // Content-addressed: an existing digest-named object is identical. return Ok(()); } let bytes = fs::read(source).map_err(|err| { @@ -1623,6 +1654,9 @@ fn write_resource_payload( ) })?; if sha256_hex(&bytes) != expected_digest { + // The apply-side TOCTOU detector: the source changing between + // load_desired and this write must fail loudly, never publish + // mismatched content. return Err(Diagnostic::error( "resource_content_changed", resource, @@ -1632,54 +1666,23 @@ fn write_resource_payload( ), )); } - let parent = target.parent().expect("payload path always has a parent"); - fs::create_dir_all(parent).map_err(|err| { + let content = String::from_utf8(bytes).map_err(|err| { Diagnostic::error( "resource_payload_write_error", resource, - format!("could not create payload directory: {err}"), + format!("resource source is not valid UTF-8: {err}"), ) })?; - let file_name = target - .file_name() - .expect("payload path always has a file name") - .to_string_lossy(); - let tmp_path = parent.join(format!("{file_name}.tmp.{}", Ulid::new())); - let mut file = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tmp_path) + backend + .write_payload(kind, expected_digest, &content) + .await .map_err(|err| { Diagnostic::error( "resource_payload_write_error", resource, - format!("could not create temporary payload file: {err}"), + format!("could not write payload: {err}"), ) - })?; - let write_result = file - .write_all(&bytes) - .and_then(|()| file.sync_all()) - .map_err(|err| { - Diagnostic::error( - "resource_payload_write_error", - resource, - format!("could not write payload file: {err}"), - ) - }); - drop(file); - if let Err(diagnostic) = write_result { - let _ = fs::remove_file(&tmp_path); - return Err(diagnostic); - } - if let Err(err) = fs::rename(&tmp_path, target) { - let _ = fs::remove_file(&tmp_path); - return Err(Diagnostic::error( - "resource_payload_write_error", - resource, - format!("could not move payload file into place: {err}"), - )); - } - Ok(()) + }) } /// Recompute the composite `graph.` digests for state-resident graphs from diff --git a/crates/omnigraph-cluster/src/serve.rs b/crates/omnigraph-cluster/src/serve.rs index 8578aee..b459641 100644 --- a/crates/omnigraph-cluster/src/serve.rs +++ b/crates/omnigraph-cluster/src/serve.rs @@ -44,8 +44,24 @@ pub async fn read_serving_snapshot( config_dir: impl AsRef, ) -> Result> { let config_dir = config_dir.as_ref().to_path_buf(); - let backend = ClusterStore::for_config_dir(&config_dir); let mut diagnostics: Vec = Vec::new(); + // The declared storage: root decides where the ledger/catalog/graphs + // live; config parse errors surface through the normal validation path. + let parsed = parse_cluster_config(&config_dir); + let storage_root = parsed.raw.as_ref().and_then(|raw| { + raw.storage + .as_deref() + .map(str::trim) + .filter(|root| !root.is_empty()) + .map(|root| root.trim_end_matches('/').to_string()) + }); + let backend = match storage_root.as_deref() { + Some(root) => match ClusterStore::for_storage_root(root) { + Ok(backend) => backend, + Err(diagnostic) => return Err(vec![diagnostic]), + }, + None => ClusterStore::for_config_dir(&config_dir), + }; // A ledger a sweep is about to rewrite must not start serving. let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await; @@ -89,9 +105,7 @@ pub async fn read_serving_snapshot( match resource_kind(address) { ResourceKind::Graph(graph_id) => { graphs.push(ServingGraph { - root: config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), + root: PathBuf::from(backend.graph_root(&graph_id)), graph_id, }); } @@ -100,7 +114,7 @@ pub async fn read_serving_snapshot( let ResourceKind::Query { graph, name } = &kind else { unreachable!() }; - match read_verified_payload(&config_dir, &kind, &entry.digest, address) { + match backend.read_verified_payload(&kind, &entry.digest, address).await { Ok(source) => queries.push(ServingQuery { graph_id: graph.clone(), name: name.clone(), @@ -121,11 +135,14 @@ pub async fn read_serving_snapshot( )); continue; }; - match read_verified_payload(&config_dir, &kind, &entry.digest, address) { + match backend.read_verified_payload(&kind, &entry.digest, address).await { Ok(_) => policies.push(ServingPolicy { name: name.clone(), - blob_path: payload_path(&config_dir, &kind, &entry.digest) - .expect("policy kind always has a payload path"), + blob_path: PathBuf::from( + backend + .payload_display(&kind, &entry.digest) + .expect("policy kind always has a payload path"), + ), applies_to, }), Err(diagnostic) => diagnostics.push(diagnostic), @@ -152,40 +169,3 @@ pub async fn read_serving_snapshot( }) } -/// Read a catalog blob and verify it against the recorded digest. -pub(crate) fn read_verified_payload( - config_dir: &Path, - kind: &ResourceKind, - digest: &str, - address: &str, -) -> Result { - let path = payload_path(config_dir, kind, digest) - .expect("query/policy kinds always have a payload path"); - let bytes = fs::read(&path).map_err(|err| { - Diagnostic::error( - "catalog_payload_missing", - address, - format!( - "catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart", - display_path(&path) - ), - ) - })?; - if sha256_hex(&bytes) != digest { - return Err(Diagnostic::error( - "catalog_payload_digest_mismatch", - address, - format!( - "catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart", - display_path(&path) - ), - )); - } - String::from_utf8(bytes).map_err(|err| { - Diagnostic::error( - "catalog_payload_invalid", - address, - format!("catalog blob is not valid UTF-8: {err}"), - ) - }) -} diff --git a/crates/omnigraph-cluster/src/store.rs b/crates/omnigraph-cluster/src/store.rs index bf328af..f52dd29 100644 --- a/crates/omnigraph-cluster/src/store.rs +++ b/crates/omnigraph-cluster/src/store.rs @@ -375,6 +375,34 @@ impl ClusterStore { .unwrap_or(false) } + /// Raw payload read: `Ok(None)` for a missing blob, `Err` for transport + /// failures — callers classify (verify loops need the three-way split). + pub(crate) async fn read_payload( + &self, + kind: &ResourceKind, + digest: &str, + ) -> Result, String> { + let Some(relative) = Self::payload_relative(kind, digest) else { + return Ok(None); + }; + let uri = self.uri(&relative); + match self.adapter.exists(&uri).await { + Ok(false) => return Ok(None), + Ok(true) => {} + Err(err) => return Err(err.to_string()), + } + self.adapter + .read_text(&uri) + .await + .map(Some) + .map_err(|err| { + format!( + "could not read catalog payload '{}': {err}", + self.display(&relative) + ) + }) + } + /// Idempotent content-addressed write: a payload already present at its /// digest is by definition identical. pub(crate) async fn write_payload( diff --git a/crates/omnigraph-cluster/src/sweep.rs b/crates/omnigraph-cluster/src/sweep.rs index 2cfd7d1..7aecb01 100644 --- a/crates/omnigraph-cluster/src/sweep.rs +++ b/crates/omnigraph-cluster/src/sweep.rs @@ -19,13 +19,13 @@ pub(crate) async fn sweep_recovery_sidecars( for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await { match sidecar.kind { RecoverySidecarKind::GraphCreate => { - sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; + sweep_graph_create_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await; } RecoverySidecarKind::SchemaApply => { sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; } RecoverySidecarKind::GraphDelete => { - sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome); + sweep_graph_delete_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await; } } } @@ -33,6 +33,7 @@ pub(crate) async fn sweep_recovery_sidecars( } pub(crate) async fn sweep_graph_create_sidecar( + backend: &ClusterStore, path: String, sidecar: RecoverySidecar, state: &mut ClusterState, @@ -41,13 +42,12 @@ pub(crate) async fn sweep_graph_create_sidecar( ) { let graph_address = graph_address(&sidecar.graph_id); let schema_addr = schema_address(&sidecar.graph_id); - let graph_path = PathBuf::from(&sidecar.graph_uri); // Row 1: nothing moved — the init never landed. The sidecar is pure // intent; retire it (deferred to the command's post-CAS cleanup, like // every other completed sidecar — a failed CAS simply re-sweeps it) and // let the command's own plan re-propose the create. - if !graph_path.exists() { + if !backend.graph_root_exists(&sidecar.graph_uri).await { outcome.completed_sidecars.push(path); return; } @@ -251,7 +251,8 @@ pub(crate) async fn sweep_schema_apply_sidecar( } } -pub(crate) fn sweep_graph_delete_sidecar( +pub(crate) async fn sweep_graph_delete_sidecar( + backend: &ClusterStore, path: String, sidecar: RecoverySidecar, state: &mut ClusterState, @@ -259,9 +260,8 @@ pub(crate) fn sweep_graph_delete_sidecar( outcome: &mut SweepOutcome, ) { let graph_address = graph_address(&sidecar.graph_id); - let root = PathBuf::from(&sidecar.graph_uri); - if root.exists() { + if backend.graph_root_exists(&sidecar.graph_uri).await { // Row 8: the delete never completed. Prefix removal is idempotent and // works on partial roots, so the repair is simply the re-proposed, // still-approved delete on a later run — retire the stale intent. diff --git a/crates/omnigraph-cluster/src/tests.rs b/crates/omnigraph-cluster/src/tests.rs index 3b7984d..ba7019f 100644 --- a/crates/omnigraph-cluster/src/tests.rs +++ b/crates/omnigraph-cluster/src/tests.rs @@ -2762,6 +2762,70 @@ policies: // ---- serving snapshot (5B read-only loader) ---- + // ---- storage: root (RFC-006) ---- + + #[tokio::test] + async fn storage_root_defaults_to_config_dir_layout() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + let out = apply_config_dir(dir.path()).await; + assert!(out.converged, "{out:?}"); + // No storage: key — the original on-disk layout, byte-compatible. + assert!(dir.path().join(CLUSTER_STATE_FILE).exists()); + assert!(dir.path().join(CLUSTER_RESOURCES_DIR).exists()); + assert!(dir.path().join("graphs/knowledge.omni").exists()); + } + + #[tokio::test] + async fn storage_root_file_uri_relocates_the_cluster() { + let dir = fixture(); + let storage = tempfile::tempdir().unwrap(); + let storage_path = storage.path().to_string_lossy().to_string(); + let mut config = fs::read_to_string(dir.path().join("cluster.yaml")).unwrap(); + config = config.replace("version: 1\n", &format!("version: 1\nstorage: {storage_path}\n")); + fs::write(dir.path().join("cluster.yaml"), config).unwrap(); + + let import = import_config_dir(dir.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{:?}", out.diagnostics); + + // Everything lives under the declared root; nothing under config dir. + assert!(storage.path().join("__cluster/state.json").exists()); + assert!(storage.path().join("graphs/knowledge.omni").exists()); + assert!(storage.path().join(CLUSTER_RESOURCES_DIR).exists()); + assert!(!dir.path().join(CLUSTER_STATE_FILE).exists()); + assert!(!dir.path().join("graphs").exists()); + + // The serving snapshot follows the root. + let snapshot = read_serving_snapshot(dir.path()).await.unwrap(); + assert!( + snapshot.graphs[0] + .root + .starts_with(storage.path()), + "{:?}", + snapshot.graphs[0].root + ); + } + + #[test] + fn storage_root_invalid_uri_fails_validation() { + let dir = fixture(); + let mut config = fs::read_to_string(dir.path().join("cluster.yaml")).unwrap(); + config = config.replace("version: 1\n", "version: 1\nstorage: \"s3://\"\n"); + fs::write(dir.path().join("cluster.yaml"), config).unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "invalid_storage_root"), + "{:?}", + out.diagnostics + ); + } + #[tokio::test] async fn serving_snapshot_reads_converged_cluster() { let dir = fixture(); diff --git a/crates/omnigraph-cluster/src/types.rs b/crates/omnigraph-cluster/src/types.rs index ca960a5..e44e2f4 100644 --- a/crates/omnigraph-cluster/src/types.rs +++ b/crates/omnigraph-cluster/src/types.rs @@ -322,6 +322,8 @@ pub struct ApproveOutput { pub(crate) struct DesiredCluster { pub(crate) config_dir: PathBuf, pub(crate) config_digest: String, + /// The declared `storage:` root, if any (None ⇒ the config dir itself). + pub(crate) storage_root: Option, pub(crate) state_lock: bool, pub(crate) graphs: Vec, pub(crate) resource_digests: BTreeMap, @@ -345,9 +347,10 @@ pub(crate) struct ParsedConfig { pub(crate) config_file: PathBuf, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub(crate) struct ClusterSettings { pub(crate) state_lock: bool, + pub(crate) storage_root: Option, } #[derive(Debug)] @@ -364,6 +367,12 @@ pub(crate) struct RawClusterConfig { pub(crate) version: u32, #[serde(default)] pub(crate) metadata: Metadata, + /// Storage root URI for everything the cluster stores: the state + /// ledger, catalog, sidecars, approvals, and derived graph roots. + /// Absent ⇒ `file://` (the original layout, byte-compatible). + /// `s3://bucket/prefix` puts the whole cluster on object storage. + #[serde(default)] + pub(crate) storage: Option, #[serde(default)] pub(crate) state: StateConfig, #[serde(default)] diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 7642fd9..655e360 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -206,6 +206,10 @@ case is exceptional. fits. - Discarding retrieval score/rank before fusion or projection decisions. - Auto-creating placeholder nodes for orphan edges. +- Raw filesystem I/O for cluster-stored state (ledger, lock, sidecars, + approvals, catalog) outside the cluster crate's storage module — every + stored byte goes through the engine `StorageAdapter` so `file://` and + `s3://` stay one code path. - Wire-protocol-specific code in compiler or engine crates. - Cloud-only correctness fixes or forks of the OSS engine for correctness. - Mutating immutable substrate state in place, including Lance fragments or diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 24d1833..59c9207 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -101,6 +101,20 @@ updates all of its queries together. Paths are relative to the config directory — the cluster is one explicit folder, so no `./` prefixes are needed. +`storage:` (optional) is the **storage root URI** for everything the cluster +stores — the state ledger, lock, content-addressed catalog, recovery +sidecars, approval artifacts, and the derived graph roots +(`/graphs/.omni`). Absent, it defaults to the config directory +itself (the original layout, byte-compatible with pre-existing clusters). +`s3://bucket/prefix` puts the whole cluster on S3-compatible object storage: +the ledger CAS uses conditional writes (verified against AWS S3 semantics and +RustFS), the lock becomes genuinely cross-machine, and graph roots are +engine-native S3 URIs. Credentials are **never** in `cluster.yaml` — the +standard `AWS_*` environment contract applies, identical to graph storage. +Declared configuration (`cluster.yaml` and the schema/query/policy sources it +references) always stays in the working tree: config is versioned in git, +state lives in the store — the Terraform split. + `metadata.name` is a display label. `state.backend` may be omitted or set to `cluster`; external state backends are reserved for a later stage. `state.lock` defaults to `true`. When enabled, `cluster plan`, `cluster apply`, diff --git a/docs/user/cluster.md b/docs/user/cluster.md index 1731f31..19755fb 100644 --- a/docs/user/cluster.md +++ b/docs/user/cluster.md @@ -40,6 +40,8 @@ company-brain/ ```yaml # cluster.yaml version: 1 +# storage: s3://omnigraph-local/clusters/company-brain # optional: put the +# ledger, catalog, and graph data on object storage (default: this folder) metadata: name: company-brain graphs: From f6ae3e4fa30656d745fe711e2cb56943b497a4ae Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 14:33:26 +0300 Subject: [PATCH 3/3] fix(cluster): lock release must complete before a CLI process exits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Caught by the first live s3 smoke: StateLockGuard's spawned async delete dies with the runtime when a short-lived CLI process exits right after the command — import's lock survived into the next command as state_lock_held. On the multi-thread runtime (the CLI, and the gated s3 tests) block_in_place waits for the delete to complete; current-thread runtimes keep the spawn fallback with force-unlock as the documented recovery, same as a crash. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/store.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/crates/omnigraph-cluster/src/store.rs b/crates/omnigraph-cluster/src/store.rs index f52dd29..4d33d2c 100644 --- a/crates/omnigraph-cluster/src/store.rs +++ b/crates/omnigraph-cluster/src/store.rs @@ -58,16 +58,30 @@ impl Drop for StateLockGuard { let path = self.uri.trim_start_matches("file://"); let _ = std::fs::remove_file(path); } - // Object stores need an async delete; best-effort spawn. A crash - // here leaves the lock for `force-unlock` — same as a process - // kill, and the same recovery path. + // Object stores need an async delete, and it must COMPLETE + // before a short-lived CLI process exits — a spawned task dies + // with the runtime and leaks the lock (caught by the s3 smoke + // test: import's lock survived into the next command). On the + // multi-thread runtime (the CLI and the gated s3 tests), + // block_in_place waits for the delete; on a current-thread + // runtime that's not allowed, so fall back to a spawn — + // best-effort, with `force-unlock` as the documented recovery, + // same as a crash. StorageKind::S3 => { let adapter = Arc::clone(&self.adapter); let uri = self.uri.clone(); if let Ok(handle) = tokio::runtime::Handle::try_current() { - handle.spawn(async move { - let _ = adapter.delete(&uri).await; - }); + if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread { + tokio::task::block_in_place(move || { + handle.block_on(async move { + let _ = adapter.delete(&uri).await; + }); + }); + } else { + handle.spawn(async move { + let _ = adapter.delete(&uri).await; + }); + } } } }