feat(cluster): port the storage backend to the engine StorageAdapter

LocalStateBackend becomes ClusterStore: every stored byte — state ledger,
lock, recovery sidecars, approval artifacts — now flows through the
engine's StorageAdapter, making file:// and s3:// one code path. Behavior
on the file backend is byte-compatible (layout, CAS semantics, diagnostics,
lock release timing) and the entire pre-existing suite passes unchanged.

Mechanics: the ledger CAS keeps its public sha256 vocabulary while the
physical swap is token-conditioned (ETag If-Match on S3 via PR #186's
primitives; content-token + temp/rename locally — the pre-port semantics);
the lock is a create-only put (genuinely cross-machine on object stores)
with deterministic drop-release locally and best-effort spawned release on
S3; sidecars/approvals address by URI (SweepOutcome and the executors carry
strings); sweep row-1 retirement joins the uniform deferred post-CAS
cleanup. ClusterStore also gains the catalog-payload and graph-root
methods that commit 2 wires in.

Async ripple: status/force-unlock/serving-snapshot and the server's
settings loader chain go async (CLI dispatch and ~20 test hosts follow,
mechanically). tokio joins the cluster crate's runtime deps for the lock
guard's handle.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-11 14:11:14 +03:00
parent 2f58fc47fa
commit fd002abaa5
12 changed files with 687 additions and 479 deletions

View file

@ -3720,7 +3720,7 @@ async fn main() -> Result<()> {
finish_cluster_approve(&output, json)?;
}
ClusterCommand::Status { config, json } => {
let output = status_config_dir(config);
let output = status_config_dir(config).await;
finish_cluster_status(&output, json)?;
}
ClusterCommand::Refresh { config, json } => {
@ -3736,7 +3736,7 @@ async fn main() -> Result<()> {
config,
json,
} => {
let output = force_unlock_config_dir(config, lock_id);
let output = force_unlock_config_dir(config, lock_id).await;
finish_cluster_force_unlock(&output, json)?;
}
},

View file

@ -23,6 +23,10 @@ serde_yaml = { workspace = true }
sha2 = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
# Runtime handle only — best-effort async lock release in
# StateLockGuard::drop on object-store backends (cluster commands always
# run inside the caller's tokio runtime).
tokio = { workspace = true }
ulid = { workspace = true }
[dev-dependencies]

View file

@ -142,7 +142,7 @@ pub(crate) fn compute_approvals(
/// Near-misses — an artifact for the same resource whose bound digests no
/// longer match — warn as `approval_stale` and never authorize anything.
pub(crate) fn approved_resources(
artifacts: &[(PathBuf, ApprovalArtifact)],
artifacts: &[(String, ApprovalArtifact)],
changes: &[PlanChange],
config_digest: &str,
diagnostics: &mut Vec<Diagnostic>,

View file

@ -25,7 +25,7 @@ mod diff;
mod serve;
mod sweep;
mod store;
use store::{LocalStateBackend, StateLockGuard, StateSnapshot};
use store::{ClusterStore, StateLockGuard, StateSnapshot};
pub use types::*;
use types::*;
pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot};
@ -69,7 +69,7 @@ pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let backend = ClusterStore::for_config_dir(&outcome.config_dir);
let mut observations = backend.observations();
let Some(desired) = outcome.desired else {
@ -107,7 +107,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
}
let _lock_guard = if desired.state_lock {
match backend.acquire_lock("plan", &mut observations) {
match backend.acquire_lock("plan", &mut observations).await {
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -130,7 +130,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
let mut prior_resources = BTreeMap::new();
let mut prior_state: Option<ClusterState> = None;
if !has_errors(&diagnostics) {
match backend.read_state(&mut observations) {
match backend.read_state(&mut observations).await {
Ok(snapshot) => {
if let Some(state) = snapshot.state {
prior_resources = state_resource_digests(&state);
@ -151,7 +151,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
}
// Plan previews dispositions without sweeping; a pending recovery is
// surfaced as the cluster_recovery_pending warning above instead.
let artifacts = backend.list_approval_artifacts(&mut diagnostics);
let artifacts = backend.list_approval_artifacts(&mut diagnostics).await;
let approved = approved_resources(
&artifacts,
&changes,
@ -242,7 +242,7 @@ pub async fn apply_config_dir_with_options(
) -> ApplyOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let backend = ClusterStore::for_config_dir(&outcome.config_dir);
let mut observations = backend.observations();
let actor_for_output = options.actor.clone();
@ -294,7 +294,7 @@ pub async fn apply_config_dir_with_options(
// Named guard: the lock must be held until the state outcome is recorded.
let _lock_guard = if desired.state_lock {
match backend.acquire_lock("apply", &mut observations) {
match backend.acquire_lock("apply", &mut observations).await {
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -321,7 +321,7 @@ pub async fn apply_config_dir_with_options(
);
}
let snapshot = match backend.read_state(&mut observations) {
let snapshot = match backend.read_state(&mut observations).await {
Ok(snapshot) => snapshot,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -361,7 +361,7 @@ pub async fn apply_config_dir_with_options(
let prior_resources = state_resource_digests(&state);
let mut changes = diff_resources(&prior_resources, &desired.resource_digests);
append_policy_binding_changes(&mut changes, Some(&state), &desired);
let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics);
let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics).await;
let approved = approved_resources(
&approval_artifacts,
&changes,
@ -424,7 +424,7 @@ pub async fn apply_config_dir_with_options(
})
.filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
.collect();
let mut completed_op_sidecars: Vec<PathBuf> = Vec::new();
let mut completed_op_sidecars: Vec<String> = Vec::new();
let mut failed_graphs: BTreeMap<String, FailedGraphOrigin> = BTreeMap::new();
let mut graph_moving_aborted = false;
for graph_id in &graph_creates_to_run {
@ -462,7 +462,7 @@ pub async fn apply_config_dir_with_options(
state_cas_base: expected_cas.clone(),
approval_id: None,
};
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
Ok(path) => path,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -514,7 +514,7 @@ pub async fn apply_config_dir_with_options(
Ok(source) => source,
Err(diagnostic) => {
diagnostics.push(diagnostic);
let _ = fs::remove_file(&sidecar_path); // nothing moved
backend.delete_object(&sidecar_path).await; // nothing moved
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
graph_moving_aborted = true;
continue;
@ -540,7 +540,7 @@ pub async fn apply_config_dir_with_options(
if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await {
if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await {
sidecar.expected_manifest_version = Some(snapshot.version());
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) {
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await {
diagnostics.push(diagnostic);
}
}
@ -626,7 +626,7 @@ pub async fn apply_config_dir_with_options(
state_cas_base: expected_cas.clone(),
approval_id: None,
};
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
Ok(path) => path,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -677,7 +677,7 @@ pub async fn apply_config_dir_with_options(
Ok(source) => source,
Err(diagnostic) => {
diagnostics.push(diagnostic);
let _ = fs::remove_file(&sidecar_path); // nothing moved
backend.delete_object(&sidecar_path).await; // nothing moved
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
graph_moving_aborted = true;
continue;
@ -695,7 +695,7 @@ pub async fn apply_config_dir_with_options(
{
Ok(result) => {
sidecar.expected_manifest_version = Some(result.manifest_version);
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) {
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await {
diagnostics.push(diagnostic);
}
}
@ -871,7 +871,7 @@ pub async fn apply_config_dir_with_options(
state_cas_base: expected_cas.clone(),
approval_id: approval_id.clone(),
};
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
Ok(path) => path,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -1004,8 +1004,14 @@ pub async fn apply_config_dir_with_options(
// persisted-statuses revert contract below is exercised; a cfg_callback
// on this point can mutate state.json to simulate a concurrent writer,
// making write_state's CAS check fail organically.
let write_result = failpoints::maybe_fail("cluster_apply.before_state_write")
.and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations));
let write_result = match failpoints::maybe_fail("cluster_apply.before_state_write") {
Ok(()) => {
backend
.write_state(&new_state, expected_cas.as_deref(), &mut observations)
.await
}
Err(diagnostic) => Err(diagnostic),
};
match write_result {
Ok(()) => state_written = true,
Err(diagnostic) => {
@ -1017,16 +1023,16 @@ pub async fn apply_config_dir_with_options(
// Completed (rows 2/4) sweep sidecars are deleted only once their outcome
// is durably recorded; on a failed write they stay and re-sweep next run.
if !state_write_failed {
for sidecar_path in sweep
for sidecar_uri in sweep
.completed_sidecars
.iter()
.chain(completed_op_sidecars.iter())
{
let _ = fs::remove_file(sidecar_path);
backend.delete_object(sidecar_uri).await;
}
let mut all_consumed = sweep.consumed_approvals.clone();
all_consumed.extend(consumed_approval_ids.iter().cloned());
mark_approvals_consumed(&backend, &all_consumed);
mark_approvals_consumed(&backend, &all_consumed).await;
}
// On a failed state write, report the statuses that are actually on disk
// (the pre-apply snapshot), not the in-memory mutations that were never
@ -1082,7 +1088,7 @@ pub async fn approve_config_dir(
) -> ApproveOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let backend = ClusterStore::for_config_dir(&outcome.config_dir);
let mut observations = backend.observations();
let fail = |config_dir: String, diagnostics: Vec<Diagnostic>| ApproveOutput {
@ -1103,7 +1109,7 @@ pub async fn approve_config_dir(
}
let _lock_guard = if desired.state_lock {
match backend.acquire_lock("approve", &mut observations) {
match backend.acquire_lock("approve", &mut observations).await {
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -1119,7 +1125,7 @@ pub async fn approve_config_dir(
None
};
let state = match backend.read_state(&mut observations) {
let state = match backend.read_state(&mut observations).await {
Ok(snapshot) => match snapshot.state {
Some(state) => state,
None => {
@ -1174,7 +1180,7 @@ pub async fn approve_config_dir(
consumed_at: None,
consumed_by_operation: None,
};
if let Err(diagnostic) = backend.write_approval_artifact(&artifact) {
if let Err(diagnostic) = backend.write_approval_artifact(&artifact).await {
diagnostics.push(diagnostic);
return fail(display_path(&desired.config_dir), diagnostics);
}
@ -1191,12 +1197,12 @@ pub async fn approve_config_dir(
}
pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
let parsed = parse_cluster_config(config_dir.as_ref());
let mut diagnostics = parsed.diagnostics;
let backend = LocalStateBackend::new(&parsed.config_dir);
let backend = ClusterStore::for_config_dir(&parsed.config_dir);
let mut observations = backend.observations();
backend.observe_lock(&mut observations, &mut diagnostics);
backend.observe_lock(&mut observations, &mut diagnostics).await;
warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics);
let mut resource_digests = BTreeMap::new();
@ -1206,7 +1212,7 @@ pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
if let Some(raw) = parsed.raw.as_ref() {
let _settings = validate_cluster_header(raw, &mut diagnostics);
if !has_errors(&diagnostics) {
match backend.read_state(&mut observations) {
match backend.read_state(&mut observations).await {
Ok(snapshot) => {
if let Some(state) = snapshot.state {
// Read-only point-in-time catalog check: report the
@ -1244,20 +1250,20 @@ pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
}
}
pub fn force_unlock_config_dir(
pub async fn force_unlock_config_dir(
config_dir: impl AsRef<Path>,
lock_id: impl AsRef<str>,
) -> ForceUnlockOutput {
let parsed = parse_cluster_config(config_dir.as_ref());
let mut diagnostics = parsed.diagnostics;
let backend = LocalStateBackend::new(&parsed.config_dir);
let backend = ClusterStore::for_config_dir(&parsed.config_dir);
let mut observations = backend.observations();
let mut lock_removed = false;
if let Some(raw) = parsed.raw.as_ref() {
let _settings = validate_cluster_header(raw, &mut diagnostics);
if !has_errors(&diagnostics) {
match backend.force_unlock(lock_id.as_ref(), &mut observations) {
match backend.force_unlock(lock_id.as_ref(), &mut observations).await {
Ok(()) => lock_removed = true,
Err(diagnostic) => diagnostics.push(diagnostic),
}
@ -1284,7 +1290,7 @@ pub async fn import_config_dir(config_dir: impl AsRef<Path>) -> StateSyncOutput
async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput {
let outcome = load_desired(config_dir);
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let backend = ClusterStore::for_config_dir(&outcome.config_dir);
let mut observations = backend.observations();
let Some(desired) = outcome.desired else {
@ -1315,7 +1321,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
let operation_label = state_sync_operation_label(operation);
let _lock_guard = if desired.state_lock {
match backend.acquire_lock(operation_label, &mut observations) {
match backend.acquire_lock(operation_label, &mut observations).await {
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -1346,7 +1352,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
};
}
let snapshot = match backend.read_state(&mut observations) {
let snapshot = match backend.read_state(&mut observations).await {
Ok(snapshot) => snapshot,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -1477,14 +1483,14 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
state.state_revision = state.state_revision.saturating_add(1);
}
match backend.write_state(&state, expected_cas.as_deref(), &mut observations) {
match backend.write_state(&state, expected_cas.as_deref(), &mut observations).await {
Ok(()) => {
// Completed sweep sidecars are deleted only after their outcome
// is durably recorded; on failure they stay and re-sweep.
for sidecar_path in &sweep.completed_sidecars {
let _ = fs::remove_file(sidecar_path);
for sidecar_uri in &sweep.completed_sidecars {
backend.delete_object(sidecar_uri).await;
}
mark_approvals_consumed(&backend, &sweep.consumed_approvals);
mark_approvals_consumed(&backend, &sweep.consumed_approvals).await;
}
Err(diagnostic) => diagnostics.push(diagnostic),
}

View file

@ -40,13 +40,15 @@ pub struct ServingSnapshot {
/// failure is collected and the whole snapshot refused; no partial serving.
/// Takes no lock: the state file is replaced atomically, so this reads a
/// consistent point-in-time ledger.
pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnapshot, Vec<Diagnostic>> {
pub async fn read_serving_snapshot(
config_dir: impl AsRef<Path>,
) -> Result<ServingSnapshot, Vec<Diagnostic>> {
let config_dir = config_dir.as_ref().to_path_buf();
let backend = LocalStateBackend::new(&config_dir);
let backend = ClusterStore::for_config_dir(&config_dir);
let mut diagnostics: Vec<Diagnostic> = Vec::new();
// A ledger a sweep is about to rewrite must not start serving.
let sidecars = backend.list_recovery_sidecars(&mut diagnostics);
let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await;
if !sidecars.is_empty() {
diagnostics.push(Diagnostic::error(
"cluster_recovery_pending",
@ -59,7 +61,7 @@ pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnap
}
let mut observations = backend.observations();
let state = match backend.read_state(&mut observations) {
let state = match backend.read_state(&mut observations).await {
Ok(snapshot) => match snapshot.state {
Some(state) => Some(state),
None => {

View file

@ -1,230 +1,446 @@
//! The cluster's storage backend: state ledger, lock, recovery
//! sidecars, approval artifacts (moved verbatim from lib.rs in the
//! modularization). The object-storage port (RFC-006) lands here as a
//! follow-up — this module is the single home for stored-state I/O.
//! The cluster's storage layer: every stored byte (state ledger, lock,
//! recovery sidecars, approval artifacts, catalog payloads) goes through the
//! engine's `StorageAdapter`, so `file://` and `s3://` are one code path
//! (RFC-006). Declared configuration — `cluster.yaml` and the schema/query/
//! policy sources it references — deliberately does NOT live here: config is
//! read from the operator's working tree (Terraform's config-local /
//! state-remote split).
//!
//! Raw `fs::*` for cluster state outside this module is a deny-list entry.
use super::*;
use std::path::Path;
use std::process;
use std::sync::Arc;
#[derive(Debug)]
pub(crate) struct LocalStateBackend {
state_dir: PathBuf,
state_path: PathBuf,
lock_path: PathBuf,
recoveries_dir: PathBuf,
approvals_dir: PathBuf,
use omnigraph::storage::{StorageAdapter, StorageKind, storage_for_uri, storage_kind_for_uri};
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use ulid::Ulid;
use crate::{
ApprovalArtifact, CLUSTER_APPROVALS_DIR, CLUSTER_LOCK_FILE, CLUSTER_RECOVERIES_DIR,
CLUSTER_RESOURCES_DIR, CLUSTER_STATE_FILE, ClusterState, Diagnostic, RecoverySidecar,
ResourceKind, StateLockFile, StateObservations, sha256_hex,
};
#[derive(Debug, Clone)]
pub(crate) struct ClusterStore {
adapter: Arc<dyn StorageAdapter>,
/// Normalized storage-root URI, no trailing slash: `file:///abs/dir`
/// (the default config-dir layout) or `s3://bucket/prefix`.
root: String,
/// What observations/diagnostics display for stored locations: the plain
/// local path for `file://` roots (byte-compatible with the pre-store
/// outputs), the URI otherwise.
display_root: String,
}
#[derive(Debug)]
pub(crate) struct StateSnapshot {
pub(crate) state: Option<ClusterState>,
/// Content identity (`sha256:<hex>`) — the public CAS vocabulary.
pub(crate) state_cas: Option<String>,
}
#[derive(Debug)]
pub(crate) struct StateLockGuard {
path: PathBuf,
adapter: Arc<dyn StorageAdapter>,
uri: String,
kind: StorageKind,
}
impl LocalStateBackend {
pub(crate) fn new(config_dir: &Path) -> Self {
let state_dir = config_dir.join(CLUSTER_STATE_DIR);
Self {
state_path: config_dir.join(CLUSTER_STATE_FILE),
lock_path: config_dir.join(CLUSTER_LOCK_FILE),
recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR),
approvals_dir: config_dir.join(CLUSTER_APPROVALS_DIR),
state_dir,
}
}
/// List approval artifacts in ULID (filename) order; unparseable files
/// warn and stay on disk for the operator.
pub(crate) fn list_approval_artifacts(
&self,
diagnostics: &mut Vec<Diagnostic>,
) -> Vec<(PathBuf, ApprovalArtifact)> {
let mut paths = Vec::new();
match fs::read_dir(&self.approvals_dir) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().is_some_and(|ext| ext == "json") {
paths.push(path);
}
impl Drop for StateLockGuard {
fn drop(&mut self) {
match self.kind {
// Deterministic release on the file backend (tests assert the
// lock is gone the moment a command returns).
StorageKind::Local => {
let path = self.uri.trim_start_matches("file://");
let _ = std::fs::remove_file(path);
}
// Object stores need an async delete; best-effort spawn. A crash
// here leaves the lock for `force-unlock` — same as a process
// kill, and the same recovery path.
StorageKind::S3 => {
let adapter = Arc::clone(&self.adapter);
let uri = self.uri.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = adapter.delete(&uri).await;
});
}
}
Err(err) if err.kind() == ErrorKind::NotFound => {}
Err(err) => diagnostics.push(Diagnostic::warning(
"approval_read_error",
CLUSTER_APPROVALS_DIR,
format!("could not list approval artifacts: {err}"),
)),
}
paths.sort();
let mut artifacts = Vec::new();
for path in paths {
match fs::read_to_string(&path)
.map_err(|err| err.to_string())
.and_then(|text| {
serde_json::from_str::<ApprovalArtifact>(&text).map_err(|err| err.to_string())
}) {
Ok(artifact) if artifact.schema_version == 1 => artifacts.push((path, artifact)),
Ok(artifact) => diagnostics.push(Diagnostic::warning(
"unsupported_approval_version",
display_path(&path),
format!(
"unsupported approval artifact version {}; leaving it in place",
artifact.schema_version
),
)),
}
}
impl ClusterStore {
/// The default layout: storage root = the config directory itself
/// (`file://<abs config dir>`), byte-compatible with every pre-existing
/// cluster on disk.
pub(crate) fn for_config_dir(config_dir: &Path) -> Self {
let absolute =
std::path::absolute(config_dir).unwrap_or_else(|_| config_dir.to_path_buf());
let display_root = absolute
.to_string_lossy()
.trim_end_matches('/')
.to_string();
let root = format!("file://{display_root}");
let adapter = storage_for_uri(&root)
.expect("local storage adapter construction is infallible for file:// roots");
Self {
adapter,
root,
display_root,
}
}
/// An explicit `storage:` root. `file://` URIs and plain paths normalize
/// to the local backend; `s3://bucket/prefix` to the S3 backend (env-
/// driven credentials/endpoint — the same contract as graph storage).
pub(crate) fn for_storage_root(root_uri: &str) -> Result<Self, Diagnostic> {
let trimmed = root_uri.trim_end_matches('/');
if storage_kind_for_uri(trimmed) == StorageKind::Local {
let path = trimmed.trim_start_matches("file://");
return Ok(Self::for_config_dir(Path::new(path)));
}
let adapter = storage_for_uri(trimmed).map_err(|err| {
Diagnostic::error(
"storage_root_invalid",
"storage",
format!("could not initialize storage for '{root_uri}': {err}"),
)
})?;
Ok(Self {
adapter,
root: trimmed.to_string(),
display_root: trimmed.to_string(),
})
}
pub(crate) fn kind(&self) -> StorageKind {
storage_kind_for_uri(&self.root)
}
fn uri(&self, relative: &str) -> String {
format!("{}/{}", self.root, relative)
}
fn display(&self, relative: &str) -> String {
format!("{}/{}", self.display_root, relative)
}
/// Derived graph root for `<id>`: `<storage>/graphs/<id>.omni`. A plain
/// local path for `file://` roots (byte-compatible, directly usable by
/// the engine); the S3 URI the engine opens natively otherwise.
pub(crate) fn graph_root(&self, graph_id: &str) -> String {
match self.kind() {
StorageKind::Local => format!("{}/graphs/{graph_id}.omni", self.display_root),
StorageKind::S3 => format!("{}/graphs/{graph_id}.omni", self.root),
}
}
/// `read_text_versioned`, returning None for a missing object (probed
/// via `exists` — the engine error type doesn't discriminate NotFound).
async fn read_versioned_opt(&self, uri: &str) -> Result<Option<(String, String)>, String> {
match self.adapter.exists(uri).await {
Ok(false) => return Ok(None),
Ok(true) => {}
Err(err) => return Err(err.to_string()),
}
self.adapter
.read_text_versioned(uri)
.await
.map(Some)
.map_err(|err| err.to_string())
}
/// JSON object write with the strongest atomicity the backend offers:
/// temp + rename on the filesystem (no torn JSON after a crash; the
/// pre-port behavior), a single atomic PUT on object stores (where
/// copy+delete would be weaker, not stronger).
async fn put_json(&self, relative: &str, payload: &str) -> Result<(), String> {
let target = self.uri(relative);
match self.kind() {
StorageKind::Local => {
let tmp = format!("{target}.tmp.{}", Ulid::new());
self.adapter
.write_text(&tmp, payload)
.await
.map_err(|err| err.to_string())?;
if let Err(err) = self.adapter.rename_text(&tmp, &target).await {
let _ = self.adapter.delete(&tmp).await;
return Err(err.to_string());
}
Ok(())
}
StorageKind::S3 => self
.adapter
.write_text(&target, payload)
.await
.map_err(|err| err.to_string()),
}
}
/// Shared list-and-parse for the sidecar/approval directories: id
/// (filename) order; unparseable objects warn and stay for the operator.
async fn list_json_dir<T: serde::de::DeserializeOwned>(
&self,
dir: &str,
diagnostics: &mut Vec<Diagnostic>,
list_error_code: &'static str,
parse_error_code: &'static str,
version_ok: impl Fn(&T) -> bool,
version_error_code: &'static str,
) -> Vec<(String, T)> {
let dir_uri = self.uri(dir);
let mut uris = match self.adapter.list_dir(&dir_uri).await {
Ok(uris) => uris,
Err(err) => {
diagnostics.push(Diagnostic::warning(
list_error_code,
dir,
format!("could not list '{dir}': {err}"),
));
return Vec::new();
}
};
uris.retain(|uri| uri.ends_with(".json"));
uris.sort();
let mut out = Vec::new();
for uri in uris {
match self.adapter.read_text(&uri).await {
Ok(text) => match serde_json::from_str::<T>(&text) {
Ok(value) if version_ok(&value) => out.push((uri, value)),
Ok(_) => diagnostics.push(Diagnostic::warning(
version_error_code,
uri.clone(),
"unsupported schema version; leaving it in place".to_string(),
)),
Err(err) => diagnostics.push(Diagnostic::warning(
parse_error_code,
uri.clone(),
format!("could not parse ({err}); leaving it in place"),
)),
},
Err(err) => diagnostics.push(Diagnostic::warning(
"invalid_approval_artifact",
display_path(&path),
format!("could not parse approval artifact ({err}); leaving it in place"),
parse_error_code,
uri.clone(),
format!("could not read ({err}); leaving it in place"),
)),
}
}
artifacts
out
}
/// Atomically write (or rewrite, e.g. on consumption) an approval artifact.
pub(crate) fn write_approval_artifact(&self, artifact: &ApprovalArtifact) -> Result<PathBuf, Diagnostic> {
fs::create_dir_all(&self.approvals_dir).map_err(|err| {
Diagnostic::error(
"approval_write_error",
CLUSTER_APPROVALS_DIR,
format!("could not create approvals directory: {err}"),
)
})?;
let target = self
.approvals_dir
.join(format!("{}.json", artifact.approval_id));
/// Best-effort object removal (sidecar retirement after a CAS lands,
/// lock cleanup) — failures are recoverable by the next sweep.
pub(crate) async fn delete_object(&self, uri: &str) {
let _ = self.adapter.delete(uri).await;
}
/// Recursive prefix delete for graph roots (approved deletes). Idempotent;
/// S3 non-atomicity is tolerated by the delete protocol's retry shape.
pub(crate) async fn delete_graph_root(&self, graph_uri: &str) -> Result<(), String> {
self.adapter
.delete_prefix(graph_uri)
.await
.map_err(|err| err.to_string())
}
/// Existence probe for graph roots in sweep classification. A bare local
/// path or any URI works — resolved through the same adapter machinery
/// the engine uses.
pub(crate) async fn graph_root_exists(&self, graph_uri: &str) -> bool {
match storage_kind_for_uri(graph_uri) {
StorageKind::Local => Path::new(graph_uri.trim_start_matches("file://")).exists(),
StorageKind::S3 => match storage_for_uri(graph_uri) {
Ok(adapter) => !adapter
.list_dir(graph_uri)
.await
.map(|entries| entries.is_empty())
.unwrap_or(true),
Err(_) => false,
},
}
}
// ---- approvals ----
pub(crate) async fn list_approval_artifacts(
&self,
diagnostics: &mut Vec<Diagnostic>,
) -> Vec<(String, ApprovalArtifact)> {
self.list_json_dir(
CLUSTER_APPROVALS_DIR,
diagnostics,
"approval_read_error",
"invalid_approval_artifact",
|artifact: &ApprovalArtifact| artifact.schema_version == 1,
"unsupported_approval_version",
)
.await
}
pub(crate) async fn write_approval_artifact(
&self,
artifact: &ApprovalArtifact,
) -> Result<String, Diagnostic> {
let relative = format!("{CLUSTER_APPROVALS_DIR}/{}.json", artifact.approval_id);
let mut payload = serde_json::to_string_pretty(artifact).map_err(|err| {
Diagnostic::error(
"approval_write_error",
display_path(&target),
self.display(&relative),
format!("could not encode approval artifact: {err}"),
)
})?;
payload.push('\n');
let tmp_path = self
.approvals_dir
.join(format!("{}.json.tmp.{}", artifact.approval_id, Ulid::new()));
fs::write(&tmp_path, payload.as_bytes()).map_err(|err| {
self.put_json(&relative, &payload).await.map_err(|err| {
Diagnostic::error(
"approval_write_error",
display_path(&tmp_path),
self.display(&relative),
format!("could not write approval artifact: {err}"),
)
})?;
if let Err(err) = fs::rename(&tmp_path, &target) {
let _ = fs::remove_file(&tmp_path);
return Err(Diagnostic::error(
"approval_write_error",
display_path(&target),
format!("could not move approval artifact into place: {err}"),
));
}
Ok(target)
Ok(self.uri(&relative))
}
/// List recovery sidecars in ULID (filename) order. Unparseable files are
/// reported as warnings and skipped — they stay on disk for the operator.
pub(crate) fn list_recovery_sidecars(
// ---- recovery sidecars ----
pub(crate) async fn list_recovery_sidecars(
&self,
diagnostics: &mut Vec<Diagnostic>,
) -> Vec<(PathBuf, RecoverySidecar)> {
let mut paths = Vec::new();
match fs::read_dir(&self.recoveries_dir) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().is_some_and(|ext| ext == "json") {
paths.push(path);
}
}
}
Err(err) if err.kind() == ErrorKind::NotFound => {}
Err(err) => {
diagnostics.push(Diagnostic::warning(
"recovery_sidecar_read_error",
CLUSTER_RECOVERIES_DIR,
format!("could not list recovery sidecars: {err}"),
));
}
}
paths.sort();
let mut sidecars = Vec::new();
for path in paths {
match fs::read_to_string(&path)
.map_err(|err| err.to_string())
.and_then(|text| {
serde_json::from_str::<RecoverySidecar>(&text).map_err(|err| err.to_string())
}) {
Ok(sidecar) if sidecar.schema_version == 1 => sidecars.push((path, sidecar)),
Ok(sidecar) => diagnostics.push(Diagnostic::warning(
"unsupported_recovery_sidecar_version",
display_path(&path),
format!(
"unsupported recovery sidecar version {}; leaving it in place",
sidecar.schema_version
),
)),
Err(err) => diagnostics.push(Diagnostic::warning(
"invalid_recovery_sidecar",
display_path(&path),
format!("could not parse recovery sidecar ({err}); leaving it in place"),
)),
}
}
sidecars
) -> Vec<(String, RecoverySidecar)> {
self.list_json_dir(
CLUSTER_RECOVERIES_DIR,
diagnostics,
"recovery_sidecar_read_error",
"invalid_recovery_sidecar",
|sidecar: &RecoverySidecar| sidecar.schema_version == 1,
"unsupported_recovery_sidecar_version",
)
.await
}
/// Atomically write (or rewrite) a recovery sidecar; returns its path.
pub(crate) fn write_recovery_sidecar(&self, sidecar: &RecoverySidecar) -> Result<PathBuf, Diagnostic> {
fs::create_dir_all(&self.recoveries_dir).map_err(|err| {
Diagnostic::error(
"recovery_sidecar_write_error",
CLUSTER_RECOVERIES_DIR,
format!("could not create recoveries directory: {err}"),
)
})?;
let target = self
.recoveries_dir
.join(format!("{}.json", sidecar.operation_id));
pub(crate) async fn write_recovery_sidecar(
&self,
sidecar: &RecoverySidecar,
) -> Result<String, Diagnostic> {
let relative = format!("{CLUSTER_RECOVERIES_DIR}/{}.json", sidecar.operation_id);
let mut payload = serde_json::to_string_pretty(sidecar).map_err(|err| {
Diagnostic::error(
"recovery_sidecar_write_error",
display_path(&target),
self.display(&relative),
format!("could not encode recovery sidecar: {err}"),
)
})?;
payload.push('\n');
let tmp_path = self
.recoveries_dir
.join(format!("{}.json.tmp.{}", sidecar.operation_id, Ulid::new()));
fs::write(&tmp_path, payload.as_bytes()).map_err(|err| {
self.put_json(&relative, &payload).await.map_err(|err| {
Diagnostic::error(
"recovery_sidecar_write_error",
display_path(&tmp_path),
self.display(&relative),
format!("could not write recovery sidecar: {err}"),
)
})?;
if let Err(err) = fs::rename(&tmp_path, &target) {
let _ = fs::remove_file(&tmp_path);
Ok(self.uri(&relative))
}
// ---- catalog payloads ----
/// Content-addressed catalog location for a query/policy payload
/// (extensions fixed per kind, same as the pre-port layout).
pub(crate) fn payload_relative(kind: &ResourceKind, digest: &str) -> Option<String> {
match kind {
ResourceKind::Query { graph, name } => Some(format!(
"{CLUSTER_RESOURCES_DIR}/query/{graph}/{name}/{digest}.gq"
)),
ResourceKind::Policy(name) => Some(format!(
"{CLUSTER_RESOURCES_DIR}/policy/{name}/{digest}.yaml"
)),
_ => None,
}
}
pub(crate) fn payload_display(&self, kind: &ResourceKind, digest: &str) -> Option<String> {
Self::payload_relative(kind, digest).map(|relative| self.display(&relative))
}
pub(crate) async fn payload_exists(&self, kind: &ResourceKind, digest: &str) -> bool {
let Some(relative) = Self::payload_relative(kind, digest) else {
return false;
};
self.adapter
.exists(&self.uri(&relative))
.await
.unwrap_or(false)
}
/// Idempotent content-addressed write: a payload already present at its
/// digest is by definition identical.
pub(crate) async fn write_payload(
&self,
kind: &ResourceKind,
digest: &str,
content: &str,
) -> Result<(), String> {
let Some(relative) = Self::payload_relative(kind, digest) else {
return Err("resource kind has no payload".to_string());
};
if self
.adapter
.exists(&self.uri(&relative))
.await
.map_err(|err| err.to_string())?
{
return Ok(());
}
self.put_json(&relative, content).await
}
/// Read a catalog payload and verify it against its recorded digest.
pub(crate) async fn read_verified_payload(
&self,
kind: &ResourceKind,
digest: &str,
address: &str,
) -> Result<String, Diagnostic> {
let Some(relative) = Self::payload_relative(kind, digest) else {
return Err(Diagnostic::error(
"recovery_sidecar_write_error",
display_path(&target),
format!("could not move recovery sidecar into place: {err}"),
"catalog_payload_missing",
address,
"resource kind has no payload",
));
};
let uri = self.uri(&relative);
let text = self.adapter.read_text(&uri).await.map_err(|err| {
Diagnostic::error(
"catalog_payload_missing",
address,
format!(
"catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart",
self.display(&relative)
),
)
})?;
if sha256_hex(text.as_bytes()) != digest {
return Err(Diagnostic::error(
"catalog_payload_digest_mismatch",
address,
format!(
"catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart",
self.display(&relative)
),
));
}
Ok(target)
Ok(text)
}
// ---- observations ----
pub(crate) fn observations(&self) -> StateObservations {
StateObservations {
state_path: display_path(&self.state_path),
lock_path: display_path(&self.lock_path),
state_path: self.display(CLUSTER_STATE_FILE),
lock_path: self.display(CLUSTER_LOCK_FILE),
state_found: false,
applied_config_digest: None,
state_revision: 0,
@ -241,13 +457,16 @@ impl LocalStateBackend {
}
}
pub(crate) fn read_state(
// ---- state ledger ----
pub(crate) async fn read_state(
&self,
observations: &mut StateObservations,
) -> Result<StateSnapshot, Diagnostic> {
let text = match fs::read_to_string(&self.state_path) {
Ok(text) => text,
Err(err) if err.kind() == ErrorKind::NotFound => {
let state_uri = self.uri(CLUSTER_STATE_FILE);
let (text, _version) = match self.read_versioned_opt(&state_uri).await {
Ok(Some(read)) => read,
Ok(None) => {
return Ok(StateSnapshot {
state: None,
state_cas: None,
@ -295,27 +514,32 @@ impl LocalStateBackend {
})
}
pub(crate) fn write_state(
/// CAS-guarded ledger replace. The public contract stays content-level
/// (`expected_cas` = `sha256:<hex>` from the snapshot the command read);
/// the physical swap is token-conditioned on a fresh read, so a writer
/// that raced us between the fresh read and the put loses with
/// `state_cas_mismatch` — never a silent overwrite. On S3 the token is
/// the object's ETag and the put is conditional (If-Match); locally it
/// is a content token over the same temp+rename flow as before the port.
pub(crate) async fn write_state(
&self,
state: &ClusterState,
expected_cas: Option<&str>,
observations: &mut StateObservations,
) -> Result<(), Diagnostic> {
fs::create_dir_all(&self.state_dir).map_err(|err| {
let state_uri = self.uri(CLUSTER_STATE_FILE);
let current = self.read_versioned_opt(&state_uri).await.map_err(|err| {
Diagnostic::error(
"state_write_error",
CLUSTER_STATE_DIR,
format!("could not create cluster state directory: {err}"),
CLUSTER_STATE_FILE,
format!("could not read state file before write: {err}"),
)
})?;
let current_cas = self.current_state_cas()?;
let current_cas = current
.as_ref()
.map(|(text, _)| format!("sha256:{}", sha256_hex(text.as_bytes())));
if current_cas.as_deref() != expected_cas {
return Err(Diagnostic::error(
"state_cas_mismatch",
CLUSTER_STATE_FILE,
"state.json changed while the command was running; re-run the command against the latest state",
));
return Err(state_cas_mismatch());
}
let mut payload = serde_json::to_string_pretty(state).map_err(|err| {
@ -327,86 +551,51 @@ impl LocalStateBackend {
})?;
payload.push('\n');
let tmp_path = self
.state_dir
.join(format!("state.json.tmp.{}", Ulid::new()));
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
.map_err(|err| {
Diagnostic::error(
"state_write_error",
display_path(&tmp_path),
format!("could not create temporary state file: {err}"),
)
})?;
file.write_all(payload.as_bytes()).map_err(|err| {
Diagnostic::error(
"state_write_error",
display_path(&tmp_path),
format!("could not write temporary state file: {err}"),
)
})?;
file.sync_all().map_err(|err| {
Diagnostic::error(
"state_write_error",
display_path(&tmp_path),
format!("could not sync temporary state file: {err}"),
)
})?;
drop(file);
if let Err(err) = fs::rename(&tmp_path, &self.state_path) {
let _ = fs::remove_file(&tmp_path);
return Err(Diagnostic::error(
"state_write_error",
CLUSTER_STATE_FILE,
format!("could not replace state.json atomically: {err}"),
));
let written = match current {
None => self
.adapter
.write_text_if_absent(&state_uri, &payload)
.await
.map_err(|err| {
Diagnostic::error(
"state_write_error",
CLUSTER_STATE_FILE,
format!("could not create state.json: {err}"),
)
})?,
Some((_, version)) => self
.adapter
.write_text_if_match(&state_uri, &payload, &version)
.await
.map_err(|err| {
Diagnostic::error(
"state_write_error",
CLUSTER_STATE_FILE,
format!("could not replace state.json: {err}"),
)
})?
.is_some(),
};
if !written {
return Err(state_cas_mismatch());
}
let written = fs::read_to_string(&self.state_path).map_err(|err| {
Diagnostic::error(
"state_write_error",
CLUSTER_STATE_FILE,
format!("could not read state.json after write: {err}"),
)
})?;
observations.state_found = true;
observations.applied_config_digest = state.applied_revision.config_digest.clone();
observations.state_revision = state.state_revision;
observations.state_cas = Some(format!("sha256:{}", sha256_hex(written.as_bytes())));
observations.state_cas = Some(format!("sha256:{}", sha256_hex(payload.as_bytes())));
observations.resource_count = state.applied_revision.resources.len();
Ok(())
}
pub(crate) fn current_state_cas(&self) -> Result<Option<String>, Diagnostic> {
match fs::read(&self.state_path) {
Ok(bytes) => Ok(Some(format!("sha256:{}", sha256_hex(&bytes)))),
Err(err) if err.kind() == ErrorKind::NotFound => Ok(None),
Err(err) => Err(Diagnostic::error(
"state_read_error",
CLUSTER_STATE_FILE,
format!("could not read state file for CAS check: {err}"),
)),
}
}
// ---- lock ----
pub(crate) fn acquire_lock(
pub(crate) async fn acquire_lock(
&self,
operation: &str,
observations: &mut StateObservations,
) -> Result<StateLockGuard, Diagnostic> {
fs::create_dir_all(&self.state_dir).map_err(|err| {
Diagnostic::error(
"state_lock_error",
CLUSTER_STATE_DIR,
format!("could not create cluster state directory: {err}"),
)
})?;
let lock_uri = self.uri(CLUSTER_LOCK_FILE);
let lock_id = Ulid::new().to_string();
let lock = StateLockFile {
version: 1,
@ -425,31 +614,18 @@ impl LocalStateBackend {
)
})?;
match OpenOptions::new()
.write(true)
.create_new(true)
.open(&self.lock_path)
{
Ok(mut file) => {
if let Err(err) = file.write_all(payload.as_bytes()) {
// No guard exists yet, so clean up the create-new file here
// instead of leaving a stale partial lock for the next run.
drop(file);
let _ = fs::remove_file(&self.lock_path);
return Err(Diagnostic::error(
"state_lock_error",
CLUSTER_LOCK_FILE,
format!("could not write state lock: {err}"),
));
}
match self.adapter.write_text_if_absent(&lock_uri, &payload).await {
Ok(true) => {
observations.lock_acquired = true;
observations.acquired_lock_id = Some(lock_id.clone());
observations.acquired_lock_id = Some(lock_id);
Ok(StateLockGuard {
path: self.lock_path.clone(),
adapter: Arc::clone(&self.adapter),
uri: lock_uri,
kind: self.kind(),
})
}
Err(err) if err.kind() == ErrorKind::AlreadyExists => {
self.observe_lock_metadata_lossy(observations);
Ok(false) => {
self.observe_lock_metadata_lossy(observations).await;
Err(Diagnostic::error(
"state_lock_held",
CLUSTER_LOCK_FILE,
@ -459,23 +635,24 @@ impl LocalStateBackend {
Err(err) => Err(Diagnostic::error(
"state_lock_error",
CLUSTER_LOCK_FILE,
format!("could not acquire state lock: {err}"),
format!("could not write state lock: {err}"),
)),
}
}
pub(crate) fn force_unlock(
pub(crate) async fn force_unlock(
&self,
requested_lock_id: &str,
lock_id: &str,
observations: &mut StateObservations,
) -> Result<(), Diagnostic> {
let text = match fs::read_to_string(&self.lock_path) {
Ok(text) => text,
Err(err) if err.kind() == ErrorKind::NotFound => {
let lock_uri = self.uri(CLUSTER_LOCK_FILE);
let text = match self.read_versioned_opt(&lock_uri).await {
Ok(Some((text, _))) => text,
Ok(None) => {
return Err(Diagnostic::error(
"state_lock_missing",
CLUSTER_LOCK_FILE,
"cluster state lock is not present; nothing was unlocked",
"no cluster state lock is present",
));
}
Err(err) => {
@ -486,42 +663,41 @@ impl LocalStateBackend {
));
}
};
observations.locked = true;
let lock = parse_lock_file_for_unlock(&text)?;
observations.observe_lock_metadata(&lock);
if lock.lock_id != requested_lock_id {
observations.locked = true;
if lock.lock_id != lock_id {
return Err(Diagnostic::error(
"state_lock_id_mismatch",
CLUSTER_LOCK_FILE,
format!(
"cluster state lock id is {}; refusing to unlock with requested id {requested_lock_id}",
"lock id mismatch: held lock is {}, refusing to remove (pass the exact id from `cluster status`)",
lock.lock_id
),
));
}
fs::remove_file(&self.lock_path).map_err(|err| {
self.adapter.delete(&lock_uri).await.map_err(|err| {
Diagnostic::error(
"state_unlock_error",
"state_lock_error",
CLUSTER_LOCK_FILE,
format!("could not remove state lock: {err}"),
)
})
})?;
observations.locked = false;
Ok(())
}
pub(crate) fn observe_lock(
pub(crate) async fn observe_lock(
&self,
observations: &mut StateObservations,
diagnostics: &mut Vec<Diagnostic>,
) {
if self.lock_path.exists() {
observations.locked = true;
match fs::read_to_string(&self.lock_path) {
Ok(text) => match serde_json::from_str::<StateLockFile>(&text) {
Ok(lock) if lock.version == 1 => {
observations.observe_lock_metadata(&lock);
}
let lock_uri = self.uri(CLUSTER_LOCK_FILE);
match self.read_versioned_opt(&lock_uri).await {
Ok(Some((text, _))) => {
observations.locked = true;
match serde_json::from_str::<StateLockFile>(&text) {
Ok(lock) if lock.version == 1 => observations.observe_lock_metadata(&lock),
Ok(lock) => diagnostics.push(Diagnostic::warning(
"unsupported_state_lock_version",
CLUSTER_LOCK_FILE,
@ -532,19 +708,24 @@ impl LocalStateBackend {
CLUSTER_LOCK_FILE,
format!("could not parse state lock: {err}"),
)),
},
Err(err) => diagnostics.push(Diagnostic::warning(
"state_lock_read_error",
CLUSTER_LOCK_FILE,
format!("could not read state lock: {err}"),
)),
}
}
Ok(None) => {}
Err(err) => diagnostics.push(Diagnostic::warning(
"state_lock_read_error",
CLUSTER_LOCK_FILE,
format!("could not read state lock: {err}"),
)),
}
}
pub(crate) fn observe_lock_metadata_lossy(&self, observations: &mut StateObservations) {
pub(crate) async fn observe_lock_metadata_lossy(
&self,
observations: &mut StateObservations,
) {
observations.locked = true;
if let Ok(text) = fs::read_to_string(&self.lock_path) {
let lock_uri = self.uri(CLUSTER_LOCK_FILE);
if let Ok(Some((text, _))) = self.read_versioned_opt(&lock_uri).await {
if let Ok(lock) = serde_json::from_str::<StateLockFile>(&text) {
if lock.version == 1 {
observations.observe_lock_metadata(&lock);
@ -554,10 +735,12 @@ impl LocalStateBackend {
}
}
impl Drop for StateLockGuard {
fn drop(&mut self) {
let _ = fs::remove_file(&self.path);
}
fn state_cas_mismatch() -> Diagnostic {
Diagnostic::error(
"state_cas_mismatch",
CLUSTER_STATE_FILE,
"state.json changed while the command was running; re-run the command against the latest state",
)
}
pub(crate) fn parse_lock_file_for_unlock(text: &str) -> Result<StateLockFile, Diagnostic> {

View file

@ -11,12 +11,12 @@ use super::*;
/// Mutations ride the calling command's CAS-checked state write; completed
/// sidecars are deleted only after that write lands.
pub(crate) async fn sweep_recovery_sidecars(
backend: &LocalStateBackend,
backend: &ClusterStore,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
) -> SweepOutcome {
let mut outcome = SweepOutcome::default();
for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) {
for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await {
match sidecar.kind {
RecoverySidecarKind::GraphCreate => {
sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
@ -33,7 +33,7 @@ pub(crate) async fn sweep_recovery_sidecars(
}
pub(crate) async fn sweep_graph_create_sidecar(
path: PathBuf,
path: String,
sidecar: RecoverySidecar,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
@ -44,9 +44,11 @@ pub(crate) async fn sweep_graph_create_sidecar(
let graph_path = PathBuf::from(&sidecar.graph_uri);
// Row 1: nothing moved — the init never landed. The sidecar is pure
// intent; remove it and let the command's own plan re-propose the create.
// intent; retire it (deferred to the command's post-CAS cleanup, like
// every other completed sidecar — a failed CAS simply re-sweeps it) and
// let the command's own plan re-propose the create.
if !graph_path.exists() {
let _ = fs::remove_file(&path);
outcome.completed_sidecars.push(path);
return;
}
@ -153,7 +155,7 @@ pub(crate) async fn sweep_graph_create_sidecar(
}
pub(crate) async fn sweep_schema_apply_sidecar(
path: PathBuf,
path: String,
sidecar: RecoverySidecar,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
@ -250,7 +252,7 @@ pub(crate) async fn sweep_schema_apply_sidecar(
}
pub(crate) fn sweep_graph_delete_sidecar(
path: PathBuf,
path: String,
sidecar: RecoverySidecar,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
@ -351,15 +353,15 @@ pub(crate) fn record_approval_consumed(state: &mut ClusterState, approval_id: &s
}
/// Mark approval artifact files consumed on disk (post-CAS).
pub(crate) fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) {
pub(crate) async fn mark_approvals_consumed(backend: &ClusterStore, approval_ids: &[String]) {
if approval_ids.is_empty() {
return;
}
let mut sink = Vec::new();
for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) {
for (_, mut artifact) in backend.list_approval_artifacts(&mut sink).await {
if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() {
artifact.consumed_at = Some(now_rfc3339());
let _ = backend.write_approval_artifact(&artifact);
let _ = backend.write_approval_artifact(&artifact).await;
}
}
}

View file

@ -351,8 +351,8 @@ policies:
}));
}
#[test]
fn extended_state_json_status_surfaces_statuses() {
#[tokio::test]
async fn extended_state_json_status_surfaces_statuses() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
@ -380,7 +380,7 @@ policies:
}"#;
fs::write(state_dir.join("state.json"), state).unwrap();
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.state_observations.state_found);
assert_eq!(out.state_observations.state_revision, 42);
@ -400,10 +400,10 @@ policies:
);
}
#[test]
fn missing_state_status_succeeds_with_warning() {
#[tokio::test]
async fn missing_state_status_succeeds_with_warning() {
let dir = fixture();
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.state_observations.state_found);
assert_eq!(out.state_observations.state_revision, 0);
@ -414,14 +414,14 @@ policies:
);
}
#[test]
fn invalid_state_status_fails() {
#[tokio::test]
async fn invalid_state_status_fails() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
fs::write(state_dir.join("state.json"), "{").unwrap();
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.state_observations.state_found);
assert!(
@ -431,12 +431,12 @@ policies:
);
}
#[test]
fn status_surfaces_full_lock_metadata() {
#[tokio::test]
async fn status_surfaces_full_lock_metadata() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "refresh");
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.state_observations.locked);
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
@ -452,12 +452,12 @@ policies:
assert!(out.state_observations.lock_age_seconds.is_some());
}
#[test]
fn force_unlock_matching_id_removes_lock() {
#[tokio::test]
async fn force_unlock_matching_id_removes_lock() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "plan");
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.lock_removed);
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
@ -468,12 +468,12 @@ policies:
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn force_unlock_wrong_id_fails_and_preserves_lock() {
#[tokio::test]
async fn force_unlock_wrong_id_fails_and_preserves_lock() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "plan");
let out = force_unlock_config_dir(dir.path(), "other-lock");
let out = force_unlock_config_dir(dir.path(), "other-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
@ -485,11 +485,11 @@ policies:
assert!(dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn force_unlock_missing_lock_fails() {
#[tokio::test]
async fn force_unlock_missing_lock_fails() {
let dir = fixture();
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert!(!out.state_observations.locked);
@ -500,14 +500,14 @@ policies:
);
}
#[test]
fn force_unlock_invalid_lock_json_fails_and_preserves_lock() {
#[tokio::test]
async fn force_unlock_invalid_lock_json_fails_and_preserves_lock() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
fs::write(state_dir.join("lock.json"), "{").unwrap();
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert!(
@ -518,8 +518,8 @@ policies:
assert!(dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() {
#[tokio::test]
async fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
@ -529,7 +529,7 @@ policies:
)
.unwrap();
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert!(
@ -540,8 +540,8 @@ policies:
assert!(dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn force_unlock_external_state_backend_rejected() {
#[tokio::test]
async fn force_unlock_external_state_backend_rejected() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "plan");
fs::write(
@ -557,7 +557,7 @@ graphs:
)
.unwrap();
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert!(
@ -582,7 +582,7 @@ graphs:
.any(|diagnostic| diagnostic.code == "state_lock_held")
);
let unlocked = force_unlock_config_dir(dir.path(), "held-lock");
let unlocked = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(unlocked.ok, "{:?}", unlocked.diagnostics);
let out = plan_config_dir(dir.path()).await;
@ -1886,7 +1886,7 @@ graphs:
let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap();
fs::remove_file(&blob).unwrap();
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "catalog_payload_missing"
@ -2001,7 +2001,7 @@ graphs:
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
assert_eq!(fs::read_to_string(&blob).unwrap(), original);
let status = status_config_dir(dir.path());
let status = status_config_dir(dir.path()).await;
assert!(
!status
.diagnostics
@ -2012,12 +2012,12 @@ graphs:
);
}
#[test]
fn verification_skips_graph_and_schema_resources() {
#[tokio::test]
async fn verification_skips_graph_and_schema_resources() {
let dir = fixture();
write_applyable_state(dir.path()); // graph + schema digests only, no blobs
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(
!out.diagnostics
.iter()
@ -2770,7 +2770,7 @@ policies:
let converge = apply_config_dir(dir.path()).await;
assert!(converge.converged, "{converge:?}");
let snapshot = read_serving_snapshot(dir.path()).expect("converged cluster must serve");
let snapshot = read_serving_snapshot(dir.path()).await.expect("converged cluster must serve");
assert_eq!(snapshot.graphs.len(), 1);
assert_eq!(snapshot.graphs[0].graph_id, "knowledge");
assert!(snapshot.graphs[0].root.ends_with("graphs/knowledge.omni"));
@ -2782,10 +2782,10 @@ policies:
assert!(snapshot.policies[0].blob_path.exists());
}
#[test]
fn serving_snapshot_refuses_missing_state() {
#[tokio::test]
async fn serving_snapshot_refuses_missing_state() {
let dir = fixture();
let err = read_serving_snapshot(dir.path()).unwrap_err();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_state_missing"),
"{err:?}"
@ -2800,7 +2800,7 @@ policies:
apply_config_dir(dir.path()).await;
write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SERVE");
let err = read_serving_snapshot(dir.path()).unwrap_err();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_recovery_pending"),
"{err:?}"
@ -2814,7 +2814,7 @@ policies:
write_applyable_state(dir.path());
apply_config_dir(dir.path()).await;
// Tamper with the query blob...
let snapshot = read_serving_snapshot(dir.path()).unwrap();
let snapshot = read_serving_snapshot(dir.path()).await.unwrap();
let desired = validate_config_dir(dir.path());
let query_digest = &desired.resource_digests["query.knowledge.find_person"];
let blob = dir
@ -2838,7 +2838,7 @@ policies:
)
.unwrap();
let err = read_serving_snapshot(dir.path()).unwrap_err();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter()
.any(|diagnostic| diagnostic.code == "catalog_payload_digest_mismatch"),
@ -2851,12 +2851,12 @@ policies:
let _ = snapshot; // the pre-tamper read succeeded
}
#[test]
fn serving_snapshot_refuses_empty_cluster() {
#[tokio::test]
async fn serving_snapshot_refuses_empty_cluster() {
let dir = fixture();
write_state_resources(dir.path(), &[]); // state exists, no graphs
let err = read_serving_snapshot(dir.path()).unwrap_err();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_empty"),
"{err:?}"
@ -2972,13 +2972,13 @@ policies:
);
}
#[test]
fn status_warns_on_pending_recovery_sidecar() {
#[tokio::test]
async fn status_warns_on_pending_recovery_sidecar() {
let dir = fixture();
write_applyable_state(dir.path());
write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01STATUS");
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(
out.diagnostics

View file

@ -503,7 +503,8 @@ pub(crate) struct SweepOutcome {
pub(crate) pending_graphs: BTreeSet<String>,
/// Sidecars whose outcome is recorded (rows 2/4): deleted only after the
/// command's state write lands, so a CAS failure re-sweeps them.
pub(crate) completed_sidecars: Vec<PathBuf>,
/// Store URIs (the storage layer addresses everything by URI).
pub(crate) completed_sidecars: Vec<String>,
/// Approval artifacts consumed by a roll-forward (delete row 7b): their
/// files are rewritten with consumed_at only after the state write lands.
pub(crate) consumed_approvals: Vec<String>,

View file

@ -893,12 +893,12 @@ fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> St
/// catalog blob content, policy bundles from blob paths with their applied
/// bindings. Always multi-graph routing. The unauthenticated/env handling
/// matches the omnigraph.yaml path.
fn load_cluster_settings(
async fn load_cluster_settings(
cluster_dir: &PathBuf,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
) -> Result<ServerConfig> {
let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).map_err(|diagnostics| {
let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).await.map_err(|diagnostics| {
let details = diagnostics
.iter()
.map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
@ -988,7 +988,7 @@ fn load_cluster_settings(
})
}
pub fn load_server_settings(
pub async fn load_server_settings(
config_path: Option<&PathBuf>,
cli_cluster: Option<&PathBuf>,
cli_uri: Option<String>,
@ -1005,7 +1005,7 @@ pub fn load_server_settings(
"--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)"
);
}
return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated);
return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await;
}
let config = load_config(config_path)?;
let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
@ -3363,8 +3363,8 @@ mod tests {
);
}
#[test]
fn server_settings_load_from_yaml_config() {
#[tokio::test]
async fn server_settings_load_from_yaml_config() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
@ -3380,7 +3380,7 @@ server:
)
.unwrap();
let settings = load_server_settings(Some(&config), None, None, None, None, false).unwrap();
let settings = load_server_settings(Some(&config), None, None, None, None, false).await.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
assert_eq!(uri, "/tmp/demo.omni");
@ -3391,8 +3391,8 @@ server:
assert_eq!(settings.bind, "0.0.0.0:9090");
}
#[test]
fn server_settings_cli_flags_override_yaml_config() {
#[tokio::test]
async fn server_settings_cli_flags_override_yaml_config() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
@ -3416,6 +3416,7 @@ server:
Some("0.0.0.0:9999".to_string()),
false,
)
.await
.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
@ -3427,8 +3428,8 @@ server:
assert_eq!(settings.bind, "0.0.0.0:9999");
}
#[test]
fn server_settings_can_resolve_named_target() {
#[tokio::test]
async fn server_settings_can_resolve_named_target() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
@ -3448,6 +3449,7 @@ server:
let settings =
load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false)
.await
.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
@ -3458,9 +3460,9 @@ server:
}
}
#[test]
fn server_settings_require_uri_from_cli_or_config() {
let error = load_server_settings(None, None, None, None, None, false).unwrap_err();
#[tokio::test]
async fn server_settings_require_uri_from_cli_or_config() {
let error = load_server_settings(None, None, None, None, None, false).await.unwrap_err();
assert!(
error.to_string().contains("no graph to serve"),
"expected mode-inference error, got: {error}",
@ -3598,9 +3600,9 @@ server:
);
}
#[test]
#[tokio::test]
#[serial]
fn unauthenticated_env_var_classification() {
async fn unauthenticated_env_var_classification() {
// MR-723 PR A: closes the gap where the env-var read path inside
// `load_server_settings` was structurally implemented but not
// exercised by any test. Three properties to pin, all in one
@ -3627,7 +3629,7 @@ server:
// Truthy values flip Open mode on, even with CLI flag off.
for value in ["1", "true", "yes", "TRUE", "anything"] {
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
settings.allow_unauthenticated,
@ -3638,7 +3640,7 @@ server:
// Falsy values keep refusal behavior, even with CLI flag off.
for value in ["0", "false", "FALSE", ""] {
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
!settings.allow_unauthenticated,
@ -3648,7 +3650,7 @@ server:
// Unset env var: also false.
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
!settings.allow_unauthenticated,
@ -3659,7 +3661,7 @@ server:
// CLI flag wins even when env is falsy — `serve()` honors the
// OR of both inputs.
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, true)
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await
.expect("settings load should succeed");
assert!(
settings.allow_unauthenticated,

View file

@ -43,6 +43,7 @@ async fn main() -> Result<()> {
cli.target,
cli.bind,
cli.unauthenticated,
)?;
)
.await?;
serve(settings).await
}

View file

@ -5567,8 +5567,8 @@ mod multi_graph_startup {
/// `GraphId` validation runs at startup — a reserved name in
/// `omnigraph.yaml` produces a clear error rather than getting
/// rejected per-request.
#[test]
fn load_server_settings_rejects_reserved_graph_id() {
#[tokio::test]
async fn load_server_settings_rejects_reserved_graph_id() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5580,7 +5580,7 @@ graphs:
"#,
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, false).unwrap_err();
let err = load_server_settings(Some(&config_path), None, None, None, None, false).await.unwrap_err();
assert!(
err.to_string().contains("invalid graph id 'policies'"),
"expected reserved-name rejection, got: {err}"
@ -5644,8 +5644,8 @@ graphs:
// ── Four-rule mode inference matrix ───────────────────────────────
/// Rule 1: CLI positional URI → Single.
#[test]
fn mode_inference_cli_uri_is_single() {
#[tokio::test]
async fn mode_inference_cli_uri_is_single() {
let settings = load_server_settings(
None,
None,
@ -5654,6 +5654,7 @@ graphs:
None,
true, // allow unauth so we get past the runtime-state check
)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/cli.omni"),
@ -5662,8 +5663,8 @@ graphs:
}
/// Rule 2: --target picks one graph from `graphs:` map → Single.
#[test]
fn mode_inference_cli_target_is_single() {
#[tokio::test]
async fn mode_inference_cli_target_is_single() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5679,6 +5680,7 @@ graphs:
.unwrap();
let settings =
load_server_settings(Some(&config_path), None, None, Some("alpha".into()), None, true)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"),
@ -5687,8 +5689,8 @@ graphs:
}
/// Rule 3: `server.graph` set → Single (target picked from config).
#[test]
fn mode_inference_server_graph_is_single() {
#[tokio::test]
async fn mode_inference_server_graph_is_single() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5704,7 +5706,7 @@ server:
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"),
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"),
@ -5712,8 +5714,8 @@ server:
}
/// Rule 4: `--config` + non-empty `graphs:` + no single-mode selector → Multi.
#[test]
fn mode_inference_config_plus_graphs_is_multi() {
#[tokio::test]
async fn mode_inference_config_plus_graphs_is_multi() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5727,7 +5729,7 @@ graphs:
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi { graphs, .. } => {
let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect();
@ -5738,8 +5740,8 @@ graphs:
}
}
#[test]
fn mode_inference_multi_rejects_top_level_policy_file() {
#[tokio::test]
async fn mode_inference_multi_rejects_top_level_policy_file() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5753,7 +5755,7 @@ graphs:
"#,
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("top-level") && msg.contains("policy.file") && msg.contains("not honored"),
@ -5769,8 +5771,8 @@ graphs:
);
}
#[test]
fn mode_inference_multi_rejects_top_level_queries() {
#[tokio::test]
async fn mode_inference_multi_rejects_top_level_queries() {
// Symmetric to the policy guard: a top-level `queries:` block in
// multi-graph mode is not honored (each graph uses its own), so it
// is a loud error rather than a silent no-op.
@ -5781,7 +5783,7 @@ graphs:
"queries:\n q:\n file: ./q.gq\ngraphs:\n alpha:\n uri: /tmp/alpha.omni\n",
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("queries") && msg.contains("not honored"),
@ -5789,8 +5791,8 @@ graphs:
);
}
#[test]
fn single_mode_named_graph_rejects_top_level_blocks() {
#[tokio::test]
async fn single_mode_named_graph_rejects_top_level_blocks() {
// Serving a graph by name (`--target`/`server.graph`) uses its
// per-graph block; a populated top-level block would be silently
// shadowed, so boot refuses and names the per-graph location.
@ -5803,6 +5805,7 @@ graphs:
.unwrap();
let err =
load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true)
.await
.unwrap_err();
let msg = err.to_string();
assert!(
@ -5811,8 +5814,8 @@ graphs:
);
}
#[test]
fn single_mode_named_graph_uses_per_graph_policy_and_queries() {
#[tokio::test]
async fn single_mode_named_graph_uses_per_graph_policy_and_queries() {
// The identity rule: `--target prod` attaches `graphs.prod`'s own
// policy + queries, not the top-level ones (which are absent here).
let temp = tempfile::tempdir().unwrap();
@ -5830,6 +5833,7 @@ graphs:
.unwrap();
let settings =
load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single {
@ -5851,8 +5855,8 @@ graphs:
}
}
#[test]
fn mode_inference_normalizes_multi_graph_uris() {
#[tokio::test]
async fn mode_inference_normalizes_multi_graph_uris() {
let temp = tempfile::tempdir().unwrap();
let graph = temp.path().join("alpha.omni");
let config_path = temp.path().join("omnigraph.yaml");
@ -5868,7 +5872,7 @@ graphs:
),
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi { graphs, .. } => {
assert_eq!(graphs[0].uri, graph.to_string_lossy());
@ -5878,9 +5882,9 @@ graphs:
}
/// Rule 5: nothing → error with migration hint.
#[test]
fn mode_inference_no_inputs_errors_with_migration_hint() {
let err = load_server_settings(None, None, None, None, None, true).unwrap_err();
#[tokio::test]
async fn mode_inference_no_inputs_errors_with_migration_hint() {
let err = load_server_settings(None, None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("no graph to serve"),
@ -5890,19 +5894,19 @@ graphs:
/// Rule 4 sub-case: `--config` with empty `graphs:` map and no
/// single-mode selector → rule 5 fires (no graph to serve).
#[test]
fn mode_inference_empty_graphs_map_errors() {
#[tokio::test]
async fn mode_inference_empty_graphs_map_errors() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
assert!(err.to_string().contains("no graph to serve"));
}
/// `--config` + `<URI>` together: URI wins → Single (the CLI URI
/// takes precedence over the config's graphs map).
#[test]
fn mode_inference_cli_uri_overrides_graphs_map() {
#[tokio::test]
async fn mode_inference_cli_uri_overrides_graphs_map() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5922,6 +5926,7 @@ graphs:
None,
true,
)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => {
@ -5937,8 +5942,8 @@ graphs:
}
/// Per-graph `policy.file` is resolved relative to the config base_dir.
#[test]
fn per_graph_policy_file_is_resolved_relative_to_base_dir() {
#[tokio::test]
async fn per_graph_policy_file_is_resolved_relative_to_base_dir() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5954,7 +5959,7 @@ graphs:
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
let graphs = match settings.mode {
ServerConfigMode::Multi { graphs, .. } => graphs,
_ => panic!("expected Multi"),
@ -5972,8 +5977,8 @@ graphs:
}
/// `server.policy.file` resolves alongside the graphs map.
#[test]
fn server_policy_file_is_resolved_relative_to_base_dir() {
#[tokio::test]
async fn server_policy_file_is_resolved_relative_to_base_dir() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5988,7 +5993,7 @@ graphs:
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi {
server_policy_file, ..
@ -6268,7 +6273,7 @@ graphs:
.unwrap();
let settings: ServerConfig =
load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
assert!(matches!(settings.mode, ServerConfigMode::Multi { .. }));
match settings.mode {
@ -6321,14 +6326,14 @@ graphs:
temp
}
fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result<omnigraph_server::ServerConfig> {
omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true)
async fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result<omnigraph_server::ServerConfig> {
omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true).await
}
#[tokio::test]
async fn cluster_boot_serves_applied_state() {
let temp = converged_cluster_dir("").await;
let settings = cluster_settings(temp.path()).unwrap();
let settings = cluster_settings(temp.path()).await.unwrap();
let omnigraph_server::ServerConfigMode::Multi {
graphs,
config_path,
@ -6444,7 +6449,7 @@ graphs:
temp
};
let settings = cluster_settings(temp.path()).unwrap();
let settings = cluster_settings(temp.path()).await.unwrap();
let omnigraph_server::ServerConfigMode::Multi {
graphs,
server_policy_file,
@ -6482,6 +6487,7 @@ async fn cluster_boot_refusals() {
None,
true,
)
.await
.unwrap_err();
assert!(err.to_string().contains("exclusive boot source"), "{err}");
let err = omnigraph_server::load_server_settings(
@ -6492,6 +6498,7 @@ async fn cluster_boot_refusals() {
None,
true,
)
.await
.unwrap_err();
assert!(err.to_string().contains("exclusive boot source"), "{err}");
@ -6499,7 +6506,7 @@ async fn cluster_boot_refusals() {
let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person");
let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path();
fs::write(&blob, "tampered").unwrap();
let err = cluster_settings(&dir).unwrap_err();
let err = cluster_settings(&dir).await.unwrap_err();
assert!(
err.to_string().contains("catalog_payload_digest_mismatch"),
"{err}"
@ -6508,6 +6515,6 @@ async fn cluster_boot_refusals() {
// Missing state refuses with the import/apply remedy.
let empty = tempfile::tempdir().unwrap();
let err = cluster_settings(empty.path()).unwrap_err();
let err = cluster_settings(empty.path()).await.unwrap_err();
assert!(err.to_string().contains("cluster_state_missing"), "{err}");
}