Merge pull request #190 from ModernRelay/feat/cluster-storage-root-v2

feat(cluster): storage root — ledger, catalog, and graphs on the StorageAdapter (RFC-006 PR 2/3)
This commit is contained in:
Andrew Altshuler 2026-06-11 14:54:10 +03:00 committed by GitHub
commit 4e526b3e5a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 1002 additions and 650 deletions

View file

@ -3720,7 +3720,7 @@ async fn main() -> Result<()> {
finish_cluster_approve(&output, json)?;
}
ClusterCommand::Status { config, json } => {
let output = status_config_dir(config);
let output = status_config_dir(config).await;
finish_cluster_status(&output, json)?;
}
ClusterCommand::Refresh { config, json } => {
@ -3736,7 +3736,7 @@ async fn main() -> Result<()> {
config,
json,
} => {
let output = force_unlock_config_dir(config, lock_id);
let output = force_unlock_config_dir(config, lock_id).await;
finish_cluster_force_unlock(&output, json)?;
}
},

View file

@ -23,6 +23,10 @@ serde_yaml = { workspace = true }
sha2 = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
# Runtime handle only — best-effort async lock release in
# StateLockGuard::drop on object-store backends (cluster commands always
# run inside the caller's tokio runtime).
tokio = { workspace = true }
ulid = { workspace = true }
[dev-dependencies]

View file

@ -239,8 +239,33 @@ pub(crate) fn validate_cluster_header(
}
}
if let Some(storage) = raw.storage.as_deref() {
let trimmed = storage.trim();
if trimmed.is_empty() {
diagnostics.push(Diagnostic::error(
"invalid_storage_root",
"storage",
"storage must be a non-empty URI (e.g. s3://bucket/prefix) when provided",
));
} else if let Some(rest) = trimmed.strip_prefix("s3://") {
if rest.trim_start_matches('/').is_empty() {
diagnostics.push(Diagnostic::error(
"invalid_storage_root",
"storage",
"storage s3:// URI must name a bucket",
));
}
}
}
ClusterSettings {
state_lock: raw.state.lock.unwrap_or(true),
storage_root: raw
.storage
.as_deref()
.map(str::trim)
.filter(|storage| !storage.is_empty())
.map(|storage| storage.trim_end_matches('/').to_string()),
}
}
@ -271,19 +296,19 @@ pub(crate) fn initial_import_state(desired: &DesiredCluster) -> ClusterState {
}
pub(crate) async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize {
pub(crate) async fn observe_declared_graphs(
desired: &DesiredCluster,
backend: &ClusterStore,
state: &mut ClusterState,
) -> usize {
let mut graph_error_count = 0;
for graph in &desired.graphs {
let graph_address = graph_address(&graph.id);
let schema_address = schema_address(&graph.id);
let graph_path = desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{}.omni", graph.id));
let graph_uri = display_path(&graph_path);
let graph_uri = backend.graph_root(&graph.id);
let observed_at = now_rfc3339();
if !graph_path.exists() {
if !backend.graph_root_exists(&graph_uri).await {
state.applied_revision.resources.remove(&graph_address);
state.applied_revision.resources.remove(&schema_address);
state.observations.insert(
@ -737,6 +762,7 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
desired: Some(DesiredCluster {
config_dir: config_dir.clone(),
config_digest,
storage_root: settings.storage_root.clone(),
state_lock: settings.state_lock,
graphs,
resource_digests,

View file

@ -142,7 +142,7 @@ pub(crate) fn compute_approvals(
/// Near-misses — an artifact for the same resource whose bound digests no
/// longer match — warn as `approval_stale` and never authorize anything.
pub(crate) fn approved_resources(
artifacts: &[(PathBuf, ApprovalArtifact)],
artifacts: &[(String, ApprovalArtifact)],
changes: &[PlanChange],
config_digest: &str,
diagnostics: &mut Vec<Diagnostic>,

View file

@ -25,11 +25,10 @@ mod diff;
mod serve;
mod sweep;
mod store;
use store::{LocalStateBackend, StateLockGuard, StateSnapshot};
use store::{ClusterStore, StateLockGuard, StateSnapshot};
pub use types::*;
use types::*;
pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot};
use serve::read_verified_payload;
use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source};
use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind};
use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars};
@ -43,6 +42,18 @@ pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources";
pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries";
pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals";
/// The store for a load outcome: the declared `storage:` root when present,
/// the config directory itself otherwise. A bad root is a loud error.
fn store_for(
config_dir: &Path,
storage_root: Option<&str>,
) -> Result<ClusterStore, Diagnostic> {
match storage_root {
Some(root) => ClusterStore::for_storage_root(root),
None => Ok(ClusterStore::for_config_dir(config_dir)),
}
}
pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
let outcome = load_desired(config_dir.as_ref());
let (resource_digests, resources, dependencies) = match outcome.desired {
@ -69,7 +80,17 @@ pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let storage_root = outcome
.desired
.as_ref()
.and_then(|desired| desired.storage_root.clone());
let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&outcome.config_dir)
}
};
let mut observations = backend.observations();
let Some(desired) = outcome.desired else {
@ -107,7 +128,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
}
let _lock_guard = if desired.state_lock {
match backend.acquire_lock("plan", &mut observations) {
match backend.acquire_lock("plan", &mut observations).await {
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -130,7 +151,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
let mut prior_resources = BTreeMap::new();
let mut prior_state: Option<ClusterState> = None;
if !has_errors(&diagnostics) {
match backend.read_state(&mut observations) {
match backend.read_state(&mut observations).await {
Ok(snapshot) => {
if let Some(state) = snapshot.state {
prior_resources = state_resource_digests(&state);
@ -151,7 +172,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
}
// Plan previews dispositions without sweeping; a pending recovery is
// surfaced as the cluster_recovery_pending warning above instead.
let artifacts = backend.list_approval_artifacts(&mut diagnostics);
let artifacts = backend.list_approval_artifacts(&mut diagnostics).await;
let approved = approved_resources(
&artifacts,
&changes,
@ -169,12 +190,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else {
continue;
};
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let graph_uri = backend.graph_root(&graph_id);
let source_path = desired
.resources
.iter()
@ -242,7 +258,17 @@ pub async fn apply_config_dir_with_options(
) -> ApplyOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let storage_root = outcome
.desired
.as_ref()
.and_then(|desired| desired.storage_root.clone());
let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&outcome.config_dir)
}
};
let mut observations = backend.observations();
let actor_for_output = options.actor.clone();
@ -294,7 +320,7 @@ pub async fn apply_config_dir_with_options(
// Named guard: the lock must be held until the state outcome is recorded.
let _lock_guard = if desired.state_lock {
match backend.acquire_lock("apply", &mut observations) {
match backend.acquire_lock("apply", &mut observations).await {
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -321,7 +347,7 @@ pub async fn apply_config_dir_with_options(
);
}
let snapshot = match backend.read_state(&mut observations) {
let snapshot = match backend.read_state(&mut observations).await {
Ok(snapshot) => snapshot,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -361,7 +387,7 @@ pub async fn apply_config_dir_with_options(
let prior_resources = state_resource_digests(&state);
let mut changes = diff_resources(&prior_resources, &desired.resource_digests);
append_policy_binding_changes(&mut changes, Some(&state), &desired);
let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics);
let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics).await;
let approved = approved_resources(
&approval_artifacts,
&changes,
@ -424,7 +450,7 @@ pub async fn apply_config_dir_with_options(
})
.filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
.collect();
let mut completed_op_sidecars: Vec<PathBuf> = Vec::new();
let mut completed_op_sidecars: Vec<String> = Vec::new();
let mut failed_graphs: BTreeMap<String, FailedGraphOrigin> = BTreeMap::new();
let mut graph_moving_aborted = false;
for graph_id in &graph_creates_to_run {
@ -442,12 +468,7 @@ pub async fn apply_config_dir_with_options(
else {
continue;
};
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let graph_uri = backend.graph_root(graph_id);
let mut sidecar = RecoverySidecar {
schema_version: 1,
operation_id: Ulid::new().to_string(),
@ -462,7 +483,7 @@ pub async fn apply_config_dir_with_options(
state_cas_base: expected_cas.clone(),
approval_id: None,
};
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
Ok(path) => path,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -514,7 +535,7 @@ pub async fn apply_config_dir_with_options(
Ok(source) => source,
Err(diagnostic) => {
diagnostics.push(diagnostic);
let _ = fs::remove_file(&sidecar_path); // nothing moved
backend.delete_object(&sidecar_path).await; // nothing moved
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
graph_moving_aborted = true;
continue;
@ -540,7 +561,7 @@ pub async fn apply_config_dir_with_options(
if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await {
if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await {
sidecar.expected_manifest_version = Some(snapshot.version());
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) {
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await {
diagnostics.push(diagnostic);
}
}
@ -587,12 +608,7 @@ pub async fn apply_config_dir_with_options(
else {
continue;
};
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let graph_uri = backend.graph_root(graph_id);
// Read-write open: the engine's own recovery sweep runs here, which
// is exactly what we want before moving its manifest.
let db = match Omnigraph::open(&graph_uri).await {
@ -626,7 +642,7 @@ pub async fn apply_config_dir_with_options(
state_cas_base: expected_cas.clone(),
approval_id: None,
};
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
Ok(path) => path,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -677,7 +693,7 @@ pub async fn apply_config_dir_with_options(
Ok(source) => source,
Err(diagnostic) => {
diagnostics.push(diagnostic);
let _ = fs::remove_file(&sidecar_path); // nothing moved
backend.delete_object(&sidecar_path).await; // nothing moved
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
graph_moving_aborted = true;
continue;
@ -695,7 +711,7 @@ pub async fn apply_config_dir_with_options(
{
Ok(result) => {
sidecar.expected_manifest_version = Some(result.manifest_version);
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) {
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await {
diagnostics.push(diagnostic);
}
}
@ -767,9 +783,9 @@ pub async fn apply_config_dir_with_options(
.after_digest
.as_deref()
.expect("create/update always carries an after digest");
let Some(target) = payload_path(&desired.config_dir, &kind, digest) else {
if ClusterStore::payload_relative(&kind, digest).is_none() {
continue;
};
}
let Some(source) = source_paths.get(change.resource.as_str()) else {
diagnostics.push(Diagnostic::error(
"resource_payload_write_error",
@ -779,7 +795,8 @@ pub async fn apply_config_dir_with_options(
continue;
};
if let Err(diagnostic) =
write_resource_payload(&target, Path::new(source), digest, &change.resource)
write_resource_payload(&backend, &kind, Path::new(source), digest, &change.resource)
.await
{
diagnostics.push(diagnostic);
}
@ -844,12 +861,7 @@ pub async fn apply_config_dir_with_options(
&& artifact.bound_config_digest == desired.config_digest
})
.map(|artifact| artifact.approval_id.clone());
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let graph_uri = backend.graph_root(graph_id);
let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await {
Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await {
Ok(snapshot) => Some(snapshot.version()),
@ -871,7 +883,7 @@ pub async fn apply_config_dir_with_options(
state_cas_base: expected_cas.clone(),
approval_id: approval_id.clone(),
};
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
Ok(path) => path,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -888,9 +900,10 @@ pub async fn apply_config_dir_with_options(
graph_moving_aborted = true;
continue;
}
match fs::remove_dir_all(PathBuf::from(&graph_uri)) {
// Prefix delete through the storage layer: remove_dir_all locally,
// list+delete on object stores (idempotent; already-gone is fine).
match backend.delete_graph_root(&graph_uri).await {
Ok(()) => {}
Err(err) if err.kind() == ErrorKind::NotFound => {} // already gone
Err(err) => {
diagnostics.push(Diagnostic::error(
"graph_delete_failed",
@ -1004,8 +1017,14 @@ pub async fn apply_config_dir_with_options(
// persisted-statuses revert contract below is exercised; a cfg_callback
// on this point can mutate state.json to simulate a concurrent writer,
// making write_state's CAS check fail organically.
let write_result = failpoints::maybe_fail("cluster_apply.before_state_write")
.and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations));
let write_result = match failpoints::maybe_fail("cluster_apply.before_state_write") {
Ok(()) => {
backend
.write_state(&new_state, expected_cas.as_deref(), &mut observations)
.await
}
Err(diagnostic) => Err(diagnostic),
};
match write_result {
Ok(()) => state_written = true,
Err(diagnostic) => {
@ -1017,16 +1036,16 @@ pub async fn apply_config_dir_with_options(
// Completed (rows 2/4) sweep sidecars are deleted only once their outcome
// is durably recorded; on a failed write they stay and re-sweep next run.
if !state_write_failed {
for sidecar_path in sweep
for sidecar_uri in sweep
.completed_sidecars
.iter()
.chain(completed_op_sidecars.iter())
{
let _ = fs::remove_file(sidecar_path);
backend.delete_object(sidecar_uri).await;
}
let mut all_consumed = sweep.consumed_approvals.clone();
all_consumed.extend(consumed_approval_ids.iter().cloned());
mark_approvals_consumed(&backend, &all_consumed);
mark_approvals_consumed(&backend, &all_consumed).await;
}
// On a failed state write, report the statuses that are actually on disk
// (the pre-apply snapshot), not the in-memory mutations that were never
@ -1082,7 +1101,17 @@ pub async fn approve_config_dir(
) -> ApproveOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let storage_root = outcome
.desired
.as_ref()
.and_then(|desired| desired.storage_root.clone());
let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&outcome.config_dir)
}
};
let mut observations = backend.observations();
let fail = |config_dir: String, diagnostics: Vec<Diagnostic>| ApproveOutput {
@ -1103,7 +1132,7 @@ pub async fn approve_config_dir(
}
let _lock_guard = if desired.state_lock {
match backend.acquire_lock("approve", &mut observations) {
match backend.acquire_lock("approve", &mut observations).await {
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -1119,7 +1148,7 @@ pub async fn approve_config_dir(
None
};
let state = match backend.read_state(&mut observations) {
let state = match backend.read_state(&mut observations).await {
Ok(snapshot) => match snapshot.state {
Some(state) => state,
None => {
@ -1174,7 +1203,7 @@ pub async fn approve_config_dir(
consumed_at: None,
consumed_by_operation: None,
};
if let Err(diagnostic) = backend.write_approval_artifact(&artifact) {
if let Err(diagnostic) = backend.write_approval_artifact(&artifact).await {
diagnostics.push(diagnostic);
return fail(display_path(&desired.config_dir), diagnostics);
}
@ -1191,12 +1220,25 @@ pub async fn approve_config_dir(
}
pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
let parsed = parse_cluster_config(config_dir.as_ref());
let mut diagnostics = parsed.diagnostics;
let backend = LocalStateBackend::new(&parsed.config_dir);
let storage_root = parsed.raw.as_ref().and_then(|raw| {
raw.storage
.as_deref()
.map(str::trim)
.filter(|root| !root.is_empty())
.map(|root| root.trim_end_matches('/').to_string())
});
let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&parsed.config_dir)
}
};
let mut observations = backend.observations();
backend.observe_lock(&mut observations, &mut diagnostics);
backend.observe_lock(&mut observations, &mut diagnostics).await;
warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics);
let mut resource_digests = BTreeMap::new();
@ -1206,14 +1248,14 @@ pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
if let Some(raw) = parsed.raw.as_ref() {
let _settings = validate_cluster_header(raw, &mut diagnostics);
if !has_errors(&diagnostics) {
match backend.read_state(&mut observations) {
match backend.read_state(&mut observations).await {
Ok(snapshot) => {
if let Some(state) = snapshot.state {
// Read-only point-in-time catalog check: report the
// findings as diagnostics; persisting Drifted statuses
// is refresh's job. Status never writes state.
for (address, finding) in
verify_catalog_payloads(&parsed.config_dir, &state)
verify_catalog_payloads(&backend, &state).await
{
diagnostics.push(payload_finding_diagnostic(&address, &finding));
}
@ -1244,20 +1286,33 @@ pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
}
}
pub fn force_unlock_config_dir(
pub async fn force_unlock_config_dir(
config_dir: impl AsRef<Path>,
lock_id: impl AsRef<str>,
) -> ForceUnlockOutput {
let parsed = parse_cluster_config(config_dir.as_ref());
let mut diagnostics = parsed.diagnostics;
let backend = LocalStateBackend::new(&parsed.config_dir);
let storage_root = parsed.raw.as_ref().and_then(|raw| {
raw.storage
.as_deref()
.map(str::trim)
.filter(|root| !root.is_empty())
.map(|root| root.trim_end_matches('/').to_string())
});
let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&parsed.config_dir)
}
};
let mut observations = backend.observations();
let mut lock_removed = false;
if let Some(raw) = parsed.raw.as_ref() {
let _settings = validate_cluster_header(raw, &mut diagnostics);
if !has_errors(&diagnostics) {
match backend.force_unlock(lock_id.as_ref(), &mut observations) {
match backend.force_unlock(lock_id.as_ref(), &mut observations).await {
Ok(()) => lock_removed = true,
Err(diagnostic) => diagnostics.push(diagnostic),
}
@ -1284,7 +1339,17 @@ pub async fn import_config_dir(config_dir: impl AsRef<Path>) -> StateSyncOutput
async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput {
let outcome = load_desired(config_dir);
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let storage_root = outcome
.desired
.as_ref()
.and_then(|desired| desired.storage_root.clone());
let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&outcome.config_dir)
}
};
let mut observations = backend.observations();
let Some(desired) = outcome.desired else {
@ -1315,7 +1380,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
let operation_label = state_sync_operation_label(operation);
let _lock_guard = if desired.state_lock {
match backend.acquire_lock(operation_label, &mut observations) {
match backend.acquire_lock(operation_label, &mut observations).await {
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -1346,7 +1411,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
};
}
let snapshot = match backend.read_state(&mut observations) {
let snapshot = match backend.read_state(&mut observations).await {
Ok(snapshot) => snapshot,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -1412,7 +1477,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
// a drifted query digest first means the live-graph composite recompute
// below already excludes it, so the persisted graph.<id> composite stays
// consistent and the next plan shows exactly the create + derived update.
for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) {
for (address, finding) in verify_catalog_payloads(&backend, &state).await {
diagnostics.push(payload_finding_diagnostic(&address, &finding));
match finding {
PayloadFinding::Missing => {
@ -1449,7 +1514,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
}
}
let graph_error_count = observe_declared_graphs(&desired, &mut state).await;
let graph_error_count = observe_declared_graphs(&desired, &backend, &mut state).await;
if graph_error_count > 0 {
diagnostics.push(Diagnostic::error(
"graph_observation_error",
@ -1477,14 +1542,14 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
state.state_revision = state.state_revision.saturating_add(1);
}
match backend.write_state(&state, expected_cas.as_deref(), &mut observations) {
match backend.write_state(&state, expected_cas.as_deref(), &mut observations).await {
Ok(()) => {
// Completed sweep sidecars are deleted only after their outcome
// is durably recorded; on failure they stay and re-sweep.
for sidecar_path in &sweep.completed_sidecars {
let _ = fs::remove_file(sidecar_path);
for sidecar_uri in &sweep.completed_sidecars {
backend.delete_object(sidecar_uri).await;
}
mark_approvals_consumed(&backend, &sweep.consumed_approvals);
mark_approvals_consumed(&backend, &sweep.consumed_approvals).await;
}
Err(diagnostic) => diagnostics.push(diagnostic),
}
@ -1506,28 +1571,6 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
/// Content-addressed catalog path for an applied resource payload. Extensions
/// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name,
/// so the catalog layout cannot drift with operator file conventions.
fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option<PathBuf> {
let resources_dir = config_dir.join(CLUSTER_RESOURCES_DIR);
match kind {
ResourceKind::Query { graph, name } => Some(
resources_dir
.join("query")
.join(graph)
.join(name)
.join(format!("{digest}.gq")),
),
ResourceKind::Policy(name) => Some(
resources_dir
.join("policy")
.join(name)
.join(format!("{digest}.yaml")),
),
_ => None,
}
}
#[derive(Debug, PartialEq, Eq)]
enum PayloadFinding {
@ -1541,34 +1584,26 @@ enum PayloadFinding {
/// unknown addresses have no payloads and are skipped. Read-only; findings
/// are deterministic (BTreeMap order). Payloads are small (queries, policy
/// bundles), so a full digest re-hash is cheap.
fn verify_catalog_payloads(
config_dir: &Path,
async fn verify_catalog_payloads(
backend: &ClusterStore,
state: &ClusterState,
) -> Vec<(String, PayloadFinding)> {
let mut findings = Vec::new();
for (address, resource) in &state.applied_revision.resources {
let kind = resource_kind(address);
let Some(path) = payload_path(config_dir, &kind, &resource.digest) else {
if ClusterStore::payload_relative(&kind, &resource.digest).is_none() {
continue;
};
match fs::read(&path) {
Ok(bytes) => {
let actual_digest = sha256_hex(&bytes);
}
match backend.read_payload(&kind, &resource.digest).await {
Ok(Some(text)) => {
let actual_digest = sha256_hex(text.as_bytes());
if actual_digest != resource.digest {
findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest }));
}
}
Err(err) if err.kind() == ErrorKind::NotFound => {
findings.push((address.clone(), PayloadFinding::Missing));
}
Ok(None) => findings.push((address.clone(), PayloadFinding::Missing)),
Err(err) => {
findings.push((
address.clone(),
PayloadFinding::ReadError(format!(
"could not read catalog payload '{}': {err}",
path.display()
)),
));
findings.push((address.clone(), PayloadFinding::ReadError(err)));
}
}
}
@ -1600,13 +1635,15 @@ fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagno
/// digest-named file is trusted as-is. The digest re-check is the apply-side
/// TOCTOU detector — the source file changing between `load_desired` and the
/// payload write must fail loudly, never publish mismatched content.
fn write_resource_payload(
target: &Path,
async fn write_resource_payload(
backend: &ClusterStore,
kind: &ResourceKind,
source: &Path,
expected_digest: &str,
resource: &str,
) -> Result<(), Diagnostic> {
if target.exists() {
if backend.payload_exists(kind, expected_digest).await {
// Content-addressed: an existing digest-named object is identical.
return Ok(());
}
let bytes = fs::read(source).map_err(|err| {
@ -1617,6 +1654,9 @@ fn write_resource_payload(
)
})?;
if sha256_hex(&bytes) != expected_digest {
// The apply-side TOCTOU detector: the source changing between
// load_desired and this write must fail loudly, never publish
// mismatched content.
return Err(Diagnostic::error(
"resource_content_changed",
resource,
@ -1626,54 +1666,23 @@ fn write_resource_payload(
),
));
}
let parent = target.parent().expect("payload path always has a parent");
fs::create_dir_all(parent).map_err(|err| {
let content = String::from_utf8(bytes).map_err(|err| {
Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not create payload directory: {err}"),
format!("resource source is not valid UTF-8: {err}"),
)
})?;
let file_name = target
.file_name()
.expect("payload path always has a file name")
.to_string_lossy();
let tmp_path = parent.join(format!("{file_name}.tmp.{}", Ulid::new()));
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
backend
.write_payload(kind, expected_digest, &content)
.await
.map_err(|err| {
Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not create temporary payload file: {err}"),
format!("could not write payload: {err}"),
)
})?;
let write_result = file
.write_all(&bytes)
.and_then(|()| file.sync_all())
.map_err(|err| {
Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not write payload file: {err}"),
)
});
drop(file);
if let Err(diagnostic) = write_result {
let _ = fs::remove_file(&tmp_path);
return Err(diagnostic);
}
if let Err(err) = fs::rename(&tmp_path, target) {
let _ = fs::remove_file(&tmp_path);
return Err(Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not move payload file into place: {err}"),
));
}
Ok(())
})
}
/// Recompute the composite `graph.<id>` digests for state-resident graphs from

View file

@ -40,13 +40,31 @@ pub struct ServingSnapshot {
/// failure is collected and the whole snapshot refused; no partial serving.
/// Takes no lock: the state file is replaced atomically, so this reads a
/// consistent point-in-time ledger.
pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnapshot, Vec<Diagnostic>> {
pub async fn read_serving_snapshot(
config_dir: impl AsRef<Path>,
) -> Result<ServingSnapshot, Vec<Diagnostic>> {
let config_dir = config_dir.as_ref().to_path_buf();
let backend = LocalStateBackend::new(&config_dir);
let mut diagnostics: Vec<Diagnostic> = Vec::new();
// The declared storage: root decides where the ledger/catalog/graphs
// live; config parse errors surface through the normal validation path.
let parsed = parse_cluster_config(&config_dir);
let storage_root = parsed.raw.as_ref().and_then(|raw| {
raw.storage
.as_deref()
.map(str::trim)
.filter(|root| !root.is_empty())
.map(|root| root.trim_end_matches('/').to_string())
});
let backend = match storage_root.as_deref() {
Some(root) => match ClusterStore::for_storage_root(root) {
Ok(backend) => backend,
Err(diagnostic) => return Err(vec![diagnostic]),
},
None => ClusterStore::for_config_dir(&config_dir),
};
// A ledger a sweep is about to rewrite must not start serving.
let sidecars = backend.list_recovery_sidecars(&mut diagnostics);
let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await;
if !sidecars.is_empty() {
diagnostics.push(Diagnostic::error(
"cluster_recovery_pending",
@ -59,7 +77,7 @@ pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnap
}
let mut observations = backend.observations();
let state = match backend.read_state(&mut observations) {
let state = match backend.read_state(&mut observations).await {
Ok(snapshot) => match snapshot.state {
Some(state) => Some(state),
None => {
@ -87,9 +105,7 @@ pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnap
match resource_kind(address) {
ResourceKind::Graph(graph_id) => {
graphs.push(ServingGraph {
root: config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
root: PathBuf::from(backend.graph_root(&graph_id)),
graph_id,
});
}
@ -98,7 +114,7 @@ pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnap
let ResourceKind::Query { graph, name } = &kind else {
unreachable!()
};
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
match backend.read_verified_payload(&kind, &entry.digest, address).await {
Ok(source) => queries.push(ServingQuery {
graph_id: graph.clone(),
name: name.clone(),
@ -119,11 +135,14 @@ pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnap
));
continue;
};
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
match backend.read_verified_payload(&kind, &entry.digest, address).await {
Ok(_) => policies.push(ServingPolicy {
name: name.clone(),
blob_path: payload_path(&config_dir, &kind, &entry.digest)
.expect("policy kind always has a payload path"),
blob_path: PathBuf::from(
backend
.payload_display(&kind, &entry.digest)
.expect("policy kind always has a payload path"),
),
applies_to,
}),
Err(diagnostic) => diagnostics.push(diagnostic),
@ -150,40 +169,3 @@ pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnap
})
}
/// Read a catalog blob and verify it against the recorded digest.
pub(crate) fn read_verified_payload(
config_dir: &Path,
kind: &ResourceKind,
digest: &str,
address: &str,
) -> Result<String, Diagnostic> {
let path = payload_path(config_dir, kind, digest)
.expect("query/policy kinds always have a payload path");
let bytes = fs::read(&path).map_err(|err| {
Diagnostic::error(
"catalog_payload_missing",
address,
format!(
"catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart",
display_path(&path)
),
)
})?;
if sha256_hex(&bytes) != digest {
return Err(Diagnostic::error(
"catalog_payload_digest_mismatch",
address,
format!(
"catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart",
display_path(&path)
),
));
}
String::from_utf8(bytes).map_err(|err| {
Diagnostic::error(
"catalog_payload_invalid",
address,
format!("catalog blob is not valid UTF-8: {err}"),
)
})
}

File diff suppressed because it is too large Load diff

View file

@ -11,21 +11,21 @@ use super::*;
/// Mutations ride the calling command's CAS-checked state write; completed
/// sidecars are deleted only after that write lands.
pub(crate) async fn sweep_recovery_sidecars(
backend: &LocalStateBackend,
backend: &ClusterStore,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
) -> SweepOutcome {
let mut outcome = SweepOutcome::default();
for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) {
for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await {
match sidecar.kind {
RecoverySidecarKind::GraphCreate => {
sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
sweep_graph_create_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await;
}
RecoverySidecarKind::SchemaApply => {
sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
}
RecoverySidecarKind::GraphDelete => {
sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome);
sweep_graph_delete_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await;
}
}
}
@ -33,7 +33,8 @@ pub(crate) async fn sweep_recovery_sidecars(
}
pub(crate) async fn sweep_graph_create_sidecar(
path: PathBuf,
backend: &ClusterStore,
path: String,
sidecar: RecoverySidecar,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
@ -41,12 +42,13 @@ pub(crate) async fn sweep_graph_create_sidecar(
) {
let graph_address = graph_address(&sidecar.graph_id);
let schema_addr = schema_address(&sidecar.graph_id);
let graph_path = PathBuf::from(&sidecar.graph_uri);
// Row 1: nothing moved — the init never landed. The sidecar is pure
// intent; remove it and let the command's own plan re-propose the create.
if !graph_path.exists() {
let _ = fs::remove_file(&path);
// intent; retire it (deferred to the command's post-CAS cleanup, like
// every other completed sidecar — a failed CAS simply re-sweeps it) and
// let the command's own plan re-propose the create.
if !backend.graph_root_exists(&sidecar.graph_uri).await {
outcome.completed_sidecars.push(path);
return;
}
@ -153,7 +155,7 @@ pub(crate) async fn sweep_graph_create_sidecar(
}
pub(crate) async fn sweep_schema_apply_sidecar(
path: PathBuf,
path: String,
sidecar: RecoverySidecar,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
@ -249,17 +251,17 @@ pub(crate) async fn sweep_schema_apply_sidecar(
}
}
pub(crate) fn sweep_graph_delete_sidecar(
path: PathBuf,
pub(crate) async fn sweep_graph_delete_sidecar(
backend: &ClusterStore,
path: String,
sidecar: RecoverySidecar,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
outcome: &mut SweepOutcome,
) {
let graph_address = graph_address(&sidecar.graph_id);
let root = PathBuf::from(&sidecar.graph_uri);
if root.exists() {
if backend.graph_root_exists(&sidecar.graph_uri).await {
// Row 8: the delete never completed. Prefix removal is idempotent and
// works on partial roots, so the repair is simply the re-proposed,
// still-approved delete on a later run — retire the stale intent.
@ -351,15 +353,15 @@ pub(crate) fn record_approval_consumed(state: &mut ClusterState, approval_id: &s
}
/// Mark approval artifact files consumed on disk (post-CAS).
pub(crate) fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) {
pub(crate) async fn mark_approvals_consumed(backend: &ClusterStore, approval_ids: &[String]) {
if approval_ids.is_empty() {
return;
}
let mut sink = Vec::new();
for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) {
for (_, mut artifact) in backend.list_approval_artifacts(&mut sink).await {
if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() {
artifact.consumed_at = Some(now_rfc3339());
let _ = backend.write_approval_artifact(&artifact);
let _ = backend.write_approval_artifact(&artifact).await;
}
}
}

View file

@ -351,8 +351,8 @@ policies:
}));
}
#[test]
fn extended_state_json_status_surfaces_statuses() {
#[tokio::test]
async fn extended_state_json_status_surfaces_statuses() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
@ -380,7 +380,7 @@ policies:
}"#;
fs::write(state_dir.join("state.json"), state).unwrap();
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.state_observations.state_found);
assert_eq!(out.state_observations.state_revision, 42);
@ -400,10 +400,10 @@ policies:
);
}
#[test]
fn missing_state_status_succeeds_with_warning() {
#[tokio::test]
async fn missing_state_status_succeeds_with_warning() {
let dir = fixture();
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.state_observations.state_found);
assert_eq!(out.state_observations.state_revision, 0);
@ -414,14 +414,14 @@ policies:
);
}
#[test]
fn invalid_state_status_fails() {
#[tokio::test]
async fn invalid_state_status_fails() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
fs::write(state_dir.join("state.json"), "{").unwrap();
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.state_observations.state_found);
assert!(
@ -431,12 +431,12 @@ policies:
);
}
#[test]
fn status_surfaces_full_lock_metadata() {
#[tokio::test]
async fn status_surfaces_full_lock_metadata() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "refresh");
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.state_observations.locked);
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
@ -452,12 +452,12 @@ policies:
assert!(out.state_observations.lock_age_seconds.is_some());
}
#[test]
fn force_unlock_matching_id_removes_lock() {
#[tokio::test]
async fn force_unlock_matching_id_removes_lock() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "plan");
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.lock_removed);
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
@ -468,12 +468,12 @@ policies:
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn force_unlock_wrong_id_fails_and_preserves_lock() {
#[tokio::test]
async fn force_unlock_wrong_id_fails_and_preserves_lock() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "plan");
let out = force_unlock_config_dir(dir.path(), "other-lock");
let out = force_unlock_config_dir(dir.path(), "other-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
@ -485,11 +485,11 @@ policies:
assert!(dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn force_unlock_missing_lock_fails() {
#[tokio::test]
async fn force_unlock_missing_lock_fails() {
let dir = fixture();
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert!(!out.state_observations.locked);
@ -500,14 +500,14 @@ policies:
);
}
#[test]
fn force_unlock_invalid_lock_json_fails_and_preserves_lock() {
#[tokio::test]
async fn force_unlock_invalid_lock_json_fails_and_preserves_lock() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
fs::write(state_dir.join("lock.json"), "{").unwrap();
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert!(
@ -518,8 +518,8 @@ policies:
assert!(dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() {
#[tokio::test]
async fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
@ -529,7 +529,7 @@ policies:
)
.unwrap();
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert!(
@ -540,8 +540,8 @@ policies:
assert!(dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn force_unlock_external_state_backend_rejected() {
#[tokio::test]
async fn force_unlock_external_state_backend_rejected() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "plan");
fs::write(
@ -557,7 +557,7 @@ graphs:
)
.unwrap();
let out = force_unlock_config_dir(dir.path(), "held-lock");
let out = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(!out.ok);
assert!(!out.lock_removed);
assert!(
@ -582,7 +582,7 @@ graphs:
.any(|diagnostic| diagnostic.code == "state_lock_held")
);
let unlocked = force_unlock_config_dir(dir.path(), "held-lock");
let unlocked = force_unlock_config_dir(dir.path(), "held-lock").await;
assert!(unlocked.ok, "{:?}", unlocked.diagnostics);
let out = plan_config_dir(dir.path()).await;
@ -1886,7 +1886,7 @@ graphs:
let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap();
fs::remove_file(&blob).unwrap();
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "catalog_payload_missing"
@ -2001,7 +2001,7 @@ graphs:
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
assert_eq!(fs::read_to_string(&blob).unwrap(), original);
let status = status_config_dir(dir.path());
let status = status_config_dir(dir.path()).await;
assert!(
!status
.diagnostics
@ -2012,12 +2012,12 @@ graphs:
);
}
#[test]
fn verification_skips_graph_and_schema_resources() {
#[tokio::test]
async fn verification_skips_graph_and_schema_resources() {
let dir = fixture();
write_applyable_state(dir.path()); // graph + schema digests only, no blobs
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(
!out.diagnostics
.iter()
@ -2762,6 +2762,70 @@ policies:
// ---- serving snapshot (5B read-only loader) ----
// ---- storage: root (RFC-006) ----
#[tokio::test]
async fn storage_root_defaults_to_config_dir_layout() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.converged, "{out:?}");
// No storage: key — the original on-disk layout, byte-compatible.
assert!(dir.path().join(CLUSTER_STATE_FILE).exists());
assert!(dir.path().join(CLUSTER_RESOURCES_DIR).exists());
assert!(dir.path().join("graphs/knowledge.omni").exists());
}
#[tokio::test]
async fn storage_root_file_uri_relocates_the_cluster() {
let dir = fixture();
let storage = tempfile::tempdir().unwrap();
let storage_path = storage.path().to_string_lossy().to_string();
let mut config = fs::read_to_string(dir.path().join("cluster.yaml")).unwrap();
config = config.replace("version: 1\n", &format!("version: 1\nstorage: {storage_path}\n"));
fs::write(dir.path().join("cluster.yaml"), config).unwrap();
let import = import_config_dir(dir.path()).await;
assert!(import.ok, "{:?}", import.diagnostics);
let out = apply_config_dir(dir.path()).await;
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
// Everything lives under the declared root; nothing under config dir.
assert!(storage.path().join("__cluster/state.json").exists());
assert!(storage.path().join("graphs/knowledge.omni").exists());
assert!(storage.path().join(CLUSTER_RESOURCES_DIR).exists());
assert!(!dir.path().join(CLUSTER_STATE_FILE).exists());
assert!(!dir.path().join("graphs").exists());
// The serving snapshot follows the root.
let snapshot = read_serving_snapshot(dir.path()).await.unwrap();
assert!(
snapshot.graphs[0]
.root
.starts_with(storage.path()),
"{:?}",
snapshot.graphs[0].root
);
}
#[test]
fn storage_root_invalid_uri_fails_validation() {
let dir = fixture();
let mut config = fs::read_to_string(dir.path().join("cluster.yaml")).unwrap();
config = config.replace("version: 1\n", "version: 1\nstorage: \"s3://\"\n");
fs::write(dir.path().join("cluster.yaml"), config).unwrap();
let out = validate_config_dir(dir.path());
assert!(!out.ok);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "invalid_storage_root"),
"{:?}",
out.diagnostics
);
}
#[tokio::test]
async fn serving_snapshot_reads_converged_cluster() {
let dir = fixture();
@ -2770,7 +2834,7 @@ policies:
let converge = apply_config_dir(dir.path()).await;
assert!(converge.converged, "{converge:?}");
let snapshot = read_serving_snapshot(dir.path()).expect("converged cluster must serve");
let snapshot = read_serving_snapshot(dir.path()).await.expect("converged cluster must serve");
assert_eq!(snapshot.graphs.len(), 1);
assert_eq!(snapshot.graphs[0].graph_id, "knowledge");
assert!(snapshot.graphs[0].root.ends_with("graphs/knowledge.omni"));
@ -2782,10 +2846,10 @@ policies:
assert!(snapshot.policies[0].blob_path.exists());
}
#[test]
fn serving_snapshot_refuses_missing_state() {
#[tokio::test]
async fn serving_snapshot_refuses_missing_state() {
let dir = fixture();
let err = read_serving_snapshot(dir.path()).unwrap_err();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_state_missing"),
"{err:?}"
@ -2800,7 +2864,7 @@ policies:
apply_config_dir(dir.path()).await;
write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SERVE");
let err = read_serving_snapshot(dir.path()).unwrap_err();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_recovery_pending"),
"{err:?}"
@ -2814,7 +2878,7 @@ policies:
write_applyable_state(dir.path());
apply_config_dir(dir.path()).await;
// Tamper with the query blob...
let snapshot = read_serving_snapshot(dir.path()).unwrap();
let snapshot = read_serving_snapshot(dir.path()).await.unwrap();
let desired = validate_config_dir(dir.path());
let query_digest = &desired.resource_digests["query.knowledge.find_person"];
let blob = dir
@ -2838,7 +2902,7 @@ policies:
)
.unwrap();
let err = read_serving_snapshot(dir.path()).unwrap_err();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter()
.any(|diagnostic| diagnostic.code == "catalog_payload_digest_mismatch"),
@ -2851,12 +2915,12 @@ policies:
let _ = snapshot; // the pre-tamper read succeeded
}
#[test]
fn serving_snapshot_refuses_empty_cluster() {
#[tokio::test]
async fn serving_snapshot_refuses_empty_cluster() {
let dir = fixture();
write_state_resources(dir.path(), &[]); // state exists, no graphs
let err = read_serving_snapshot(dir.path()).unwrap_err();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_empty"),
"{err:?}"
@ -2972,13 +3036,13 @@ policies:
);
}
#[test]
fn status_warns_on_pending_recovery_sidecar() {
#[tokio::test]
async fn status_warns_on_pending_recovery_sidecar() {
let dir = fixture();
write_applyable_state(dir.path());
write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01STATUS");
let out = status_config_dir(dir.path());
let out = status_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(
out.diagnostics

View file

@ -322,6 +322,8 @@ pub struct ApproveOutput {
pub(crate) struct DesiredCluster {
pub(crate) config_dir: PathBuf,
pub(crate) config_digest: String,
/// The declared `storage:` root, if any (None ⇒ the config dir itself).
pub(crate) storage_root: Option<String>,
pub(crate) state_lock: bool,
pub(crate) graphs: Vec<DesiredGraph>,
pub(crate) resource_digests: BTreeMap<String, String>,
@ -345,9 +347,10 @@ pub(crate) struct ParsedConfig {
pub(crate) config_file: PathBuf,
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub(crate) struct ClusterSettings {
pub(crate) state_lock: bool,
pub(crate) storage_root: Option<String>,
}
#[derive(Debug)]
@ -364,6 +367,12 @@ pub(crate) struct RawClusterConfig {
pub(crate) version: u32,
#[serde(default)]
pub(crate) metadata: Metadata,
/// Storage root URI for everything the cluster stores: the state
/// ledger, catalog, sidecars, approvals, and derived graph roots.
/// Absent ⇒ `file://<config-dir>` (the original layout, byte-compatible).
/// `s3://bucket/prefix` puts the whole cluster on object storage.
#[serde(default)]
pub(crate) storage: Option<String>,
#[serde(default)]
pub(crate) state: StateConfig,
#[serde(default)]
@ -503,7 +512,8 @@ pub(crate) struct SweepOutcome {
pub(crate) pending_graphs: BTreeSet<String>,
/// Sidecars whose outcome is recorded (rows 2/4): deleted only after the
/// command's state write lands, so a CAS failure re-sweeps them.
pub(crate) completed_sidecars: Vec<PathBuf>,
/// Store URIs (the storage layer addresses everything by URI).
pub(crate) completed_sidecars: Vec<String>,
/// Approval artifacts consumed by a roll-forward (delete row 7b): their
/// files are rewritten with consumed_at only after the state write lands.
pub(crate) consumed_approvals: Vec<String>,

View file

@ -893,12 +893,12 @@ fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> St
/// catalog blob content, policy bundles from blob paths with their applied
/// bindings. Always multi-graph routing. The unauthenticated/env handling
/// matches the omnigraph.yaml path.
fn load_cluster_settings(
async fn load_cluster_settings(
cluster_dir: &PathBuf,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
) -> Result<ServerConfig> {
let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).map_err(|diagnostics| {
let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).await.map_err(|diagnostics| {
let details = diagnostics
.iter()
.map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
@ -988,7 +988,7 @@ fn load_cluster_settings(
})
}
pub fn load_server_settings(
pub async fn load_server_settings(
config_path: Option<&PathBuf>,
cli_cluster: Option<&PathBuf>,
cli_uri: Option<String>,
@ -1005,7 +1005,7 @@ pub fn load_server_settings(
"--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)"
);
}
return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated);
return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await;
}
let config = load_config(config_path)?;
let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
@ -3363,8 +3363,8 @@ mod tests {
);
}
#[test]
fn server_settings_load_from_yaml_config() {
#[tokio::test]
async fn server_settings_load_from_yaml_config() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
@ -3380,7 +3380,7 @@ server:
)
.unwrap();
let settings = load_server_settings(Some(&config), None, None, None, None, false).unwrap();
let settings = load_server_settings(Some(&config), None, None, None, None, false).await.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
assert_eq!(uri, "/tmp/demo.omni");
@ -3391,8 +3391,8 @@ server:
assert_eq!(settings.bind, "0.0.0.0:9090");
}
#[test]
fn server_settings_cli_flags_override_yaml_config() {
#[tokio::test]
async fn server_settings_cli_flags_override_yaml_config() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
@ -3416,6 +3416,7 @@ server:
Some("0.0.0.0:9999".to_string()),
false,
)
.await
.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
@ -3427,8 +3428,8 @@ server:
assert_eq!(settings.bind, "0.0.0.0:9999");
}
#[test]
fn server_settings_can_resolve_named_target() {
#[tokio::test]
async fn server_settings_can_resolve_named_target() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
@ -3448,6 +3449,7 @@ server:
let settings =
load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false)
.await
.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
@ -3458,9 +3460,9 @@ server:
}
}
#[test]
fn server_settings_require_uri_from_cli_or_config() {
let error = load_server_settings(None, None, None, None, None, false).unwrap_err();
#[tokio::test]
async fn server_settings_require_uri_from_cli_or_config() {
let error = load_server_settings(None, None, None, None, None, false).await.unwrap_err();
assert!(
error.to_string().contains("no graph to serve"),
"expected mode-inference error, got: {error}",
@ -3598,9 +3600,9 @@ server:
);
}
#[test]
#[tokio::test]
#[serial]
fn unauthenticated_env_var_classification() {
async fn unauthenticated_env_var_classification() {
// MR-723 PR A: closes the gap where the env-var read path inside
// `load_server_settings` was structurally implemented but not
// exercised by any test. Three properties to pin, all in one
@ -3627,7 +3629,7 @@ server:
// Truthy values flip Open mode on, even with CLI flag off.
for value in ["1", "true", "yes", "TRUE", "anything"] {
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
settings.allow_unauthenticated,
@ -3638,7 +3640,7 @@ server:
// Falsy values keep refusal behavior, even with CLI flag off.
for value in ["0", "false", "FALSE", ""] {
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
!settings.allow_unauthenticated,
@ -3648,7 +3650,7 @@ server:
// Unset env var: also false.
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
!settings.allow_unauthenticated,
@ -3659,7 +3661,7 @@ server:
// CLI flag wins even when env is falsy — `serve()` honors the
// OR of both inputs.
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, true)
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await
.expect("settings load should succeed");
assert!(
settings.allow_unauthenticated,

View file

@ -43,6 +43,7 @@ async fn main() -> Result<()> {
cli.target,
cli.bind,
cli.unauthenticated,
)?;
)
.await?;
serve(settings).await
}

View file

@ -5567,8 +5567,8 @@ mod multi_graph_startup {
/// `GraphId` validation runs at startup — a reserved name in
/// `omnigraph.yaml` produces a clear error rather than getting
/// rejected per-request.
#[test]
fn load_server_settings_rejects_reserved_graph_id() {
#[tokio::test]
async fn load_server_settings_rejects_reserved_graph_id() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5580,7 +5580,7 @@ graphs:
"#,
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, false).unwrap_err();
let err = load_server_settings(Some(&config_path), None, None, None, None, false).await.unwrap_err();
assert!(
err.to_string().contains("invalid graph id 'policies'"),
"expected reserved-name rejection, got: {err}"
@ -5644,8 +5644,8 @@ graphs:
// ── Four-rule mode inference matrix ───────────────────────────────
/// Rule 1: CLI positional URI → Single.
#[test]
fn mode_inference_cli_uri_is_single() {
#[tokio::test]
async fn mode_inference_cli_uri_is_single() {
let settings = load_server_settings(
None,
None,
@ -5654,6 +5654,7 @@ graphs:
None,
true, // allow unauth so we get past the runtime-state check
)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/cli.omni"),
@ -5662,8 +5663,8 @@ graphs:
}
/// Rule 2: --target picks one graph from `graphs:` map → Single.
#[test]
fn mode_inference_cli_target_is_single() {
#[tokio::test]
async fn mode_inference_cli_target_is_single() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5679,6 +5680,7 @@ graphs:
.unwrap();
let settings =
load_server_settings(Some(&config_path), None, None, Some("alpha".into()), None, true)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"),
@ -5687,8 +5689,8 @@ graphs:
}
/// Rule 3: `server.graph` set → Single (target picked from config).
#[test]
fn mode_inference_server_graph_is_single() {
#[tokio::test]
async fn mode_inference_server_graph_is_single() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5704,7 +5706,7 @@ server:
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"),
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"),
@ -5712,8 +5714,8 @@ server:
}
/// Rule 4: `--config` + non-empty `graphs:` + no single-mode selector → Multi.
#[test]
fn mode_inference_config_plus_graphs_is_multi() {
#[tokio::test]
async fn mode_inference_config_plus_graphs_is_multi() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5727,7 +5729,7 @@ graphs:
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi { graphs, .. } => {
let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect();
@ -5738,8 +5740,8 @@ graphs:
}
}
#[test]
fn mode_inference_multi_rejects_top_level_policy_file() {
#[tokio::test]
async fn mode_inference_multi_rejects_top_level_policy_file() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5753,7 +5755,7 @@ graphs:
"#,
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("top-level") && msg.contains("policy.file") && msg.contains("not honored"),
@ -5769,8 +5771,8 @@ graphs:
);
}
#[test]
fn mode_inference_multi_rejects_top_level_queries() {
#[tokio::test]
async fn mode_inference_multi_rejects_top_level_queries() {
// Symmetric to the policy guard: a top-level `queries:` block in
// multi-graph mode is not honored (each graph uses its own), so it
// is a loud error rather than a silent no-op.
@ -5781,7 +5783,7 @@ graphs:
"queries:\n q:\n file: ./q.gq\ngraphs:\n alpha:\n uri: /tmp/alpha.omni\n",
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("queries") && msg.contains("not honored"),
@ -5789,8 +5791,8 @@ graphs:
);
}
#[test]
fn single_mode_named_graph_rejects_top_level_blocks() {
#[tokio::test]
async fn single_mode_named_graph_rejects_top_level_blocks() {
// Serving a graph by name (`--target`/`server.graph`) uses its
// per-graph block; a populated top-level block would be silently
// shadowed, so boot refuses and names the per-graph location.
@ -5803,6 +5805,7 @@ graphs:
.unwrap();
let err =
load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true)
.await
.unwrap_err();
let msg = err.to_string();
assert!(
@ -5811,8 +5814,8 @@ graphs:
);
}
#[test]
fn single_mode_named_graph_uses_per_graph_policy_and_queries() {
#[tokio::test]
async fn single_mode_named_graph_uses_per_graph_policy_and_queries() {
// The identity rule: `--target prod` attaches `graphs.prod`'s own
// policy + queries, not the top-level ones (which are absent here).
let temp = tempfile::tempdir().unwrap();
@ -5830,6 +5833,7 @@ graphs:
.unwrap();
let settings =
load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single {
@ -5851,8 +5855,8 @@ graphs:
}
}
#[test]
fn mode_inference_normalizes_multi_graph_uris() {
#[tokio::test]
async fn mode_inference_normalizes_multi_graph_uris() {
let temp = tempfile::tempdir().unwrap();
let graph = temp.path().join("alpha.omni");
let config_path = temp.path().join("omnigraph.yaml");
@ -5868,7 +5872,7 @@ graphs:
),
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi { graphs, .. } => {
assert_eq!(graphs[0].uri, graph.to_string_lossy());
@ -5878,9 +5882,9 @@ graphs:
}
/// Rule 5: nothing → error with migration hint.
#[test]
fn mode_inference_no_inputs_errors_with_migration_hint() {
let err = load_server_settings(None, None, None, None, None, true).unwrap_err();
#[tokio::test]
async fn mode_inference_no_inputs_errors_with_migration_hint() {
let err = load_server_settings(None, None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("no graph to serve"),
@ -5890,19 +5894,19 @@ graphs:
/// Rule 4 sub-case: `--config` with empty `graphs:` map and no
/// single-mode selector → rule 5 fires (no graph to serve).
#[test]
fn mode_inference_empty_graphs_map_errors() {
#[tokio::test]
async fn mode_inference_empty_graphs_map_errors() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
assert!(err.to_string().contains("no graph to serve"));
}
/// `--config` + `<URI>` together: URI wins → Single (the CLI URI
/// takes precedence over the config's graphs map).
#[test]
fn mode_inference_cli_uri_overrides_graphs_map() {
#[tokio::test]
async fn mode_inference_cli_uri_overrides_graphs_map() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5922,6 +5926,7 @@ graphs:
None,
true,
)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => {
@ -5937,8 +5942,8 @@ graphs:
}
/// Per-graph `policy.file` is resolved relative to the config base_dir.
#[test]
fn per_graph_policy_file_is_resolved_relative_to_base_dir() {
#[tokio::test]
async fn per_graph_policy_file_is_resolved_relative_to_base_dir() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5954,7 +5959,7 @@ graphs:
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
let graphs = match settings.mode {
ServerConfigMode::Multi { graphs, .. } => graphs,
_ => panic!("expected Multi"),
@ -5972,8 +5977,8 @@ graphs:
}
/// `server.policy.file` resolves alongside the graphs map.
#[test]
fn server_policy_file_is_resolved_relative_to_base_dir() {
#[tokio::test]
async fn server_policy_file_is_resolved_relative_to_base_dir() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
@ -5988,7 +5993,7 @@ graphs:
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi {
server_policy_file, ..
@ -6268,7 +6273,7 @@ graphs:
.unwrap();
let settings: ServerConfig =
load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
assert!(matches!(settings.mode, ServerConfigMode::Multi { .. }));
match settings.mode {
@ -6321,14 +6326,14 @@ graphs:
temp
}
fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result<omnigraph_server::ServerConfig> {
omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true)
async fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result<omnigraph_server::ServerConfig> {
omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true).await
}
#[tokio::test]
async fn cluster_boot_serves_applied_state() {
let temp = converged_cluster_dir("").await;
let settings = cluster_settings(temp.path()).unwrap();
let settings = cluster_settings(temp.path()).await.unwrap();
let omnigraph_server::ServerConfigMode::Multi {
graphs,
config_path,
@ -6444,7 +6449,7 @@ graphs:
temp
};
let settings = cluster_settings(temp.path()).unwrap();
let settings = cluster_settings(temp.path()).await.unwrap();
let omnigraph_server::ServerConfigMode::Multi {
graphs,
server_policy_file,
@ -6482,6 +6487,7 @@ async fn cluster_boot_refusals() {
None,
true,
)
.await
.unwrap_err();
assert!(err.to_string().contains("exclusive boot source"), "{err}");
let err = omnigraph_server::load_server_settings(
@ -6492,6 +6498,7 @@ async fn cluster_boot_refusals() {
None,
true,
)
.await
.unwrap_err();
assert!(err.to_string().contains("exclusive boot source"), "{err}");
@ -6499,7 +6506,7 @@ async fn cluster_boot_refusals() {
let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person");
let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path();
fs::write(&blob, "tampered").unwrap();
let err = cluster_settings(&dir).unwrap_err();
let err = cluster_settings(&dir).await.unwrap_err();
assert!(
err.to_string().contains("catalog_payload_digest_mismatch"),
"{err}"
@ -6508,6 +6515,6 @@ async fn cluster_boot_refusals() {
// Missing state refuses with the import/apply remedy.
let empty = tempfile::tempdir().unwrap();
let err = cluster_settings(empty.path()).unwrap_err();
let err = cluster_settings(empty.path()).await.unwrap_err();
assert!(err.to_string().contains("cluster_state_missing"), "{err}");
}

View file

@ -206,6 +206,10 @@ case is exceptional.
fits.
- Discarding retrieval score/rank before fusion or projection decisions.
- Auto-creating placeholder nodes for orphan edges.
- Raw filesystem I/O for cluster-stored state (ledger, lock, sidecars,
approvals, catalog) outside the cluster crate's storage module — every
stored byte goes through the engine `StorageAdapter` so `file://` and
`s3://` stay one code path.
- Wire-protocol-specific code in compiler or engine crates.
- Cloud-only correctness fixes or forks of the OSS engine for correctness.
- Mutating immutable substrate state in place, including Lance fragments or

View file

@ -101,6 +101,20 @@ updates all of its queries together. Paths are relative to the config
directory — the cluster is one explicit folder, so no `./` prefixes are
needed.
`storage:` (optional) is the **storage root URI** for everything the cluster
stores — the state ledger, lock, content-addressed catalog, recovery
sidecars, approval artifacts, and the derived graph roots
(`<storage>/graphs/<id>.omni`). Absent, it defaults to the config directory
itself (the original layout, byte-compatible with pre-existing clusters).
`s3://bucket/prefix` puts the whole cluster on S3-compatible object storage:
the ledger CAS uses conditional writes (verified against AWS S3 semantics and
RustFS), the lock becomes genuinely cross-machine, and graph roots are
engine-native S3 URIs. Credentials are **never** in `cluster.yaml` — the
standard `AWS_*` environment contract applies, identical to graph storage.
Declared configuration (`cluster.yaml` and the schema/query/policy sources it
references) always stays in the working tree: config is versioned in git,
state lives in the store — the Terraform split.
`metadata.name` is a display label. `state.backend` may be omitted or set to
`cluster`; external state backends are reserved for a later stage. `state.lock`
defaults to `true`. When enabled, `cluster plan`, `cluster apply`,

View file

@ -40,6 +40,8 @@ company-brain/
```yaml
# cluster.yaml
version: 1
# storage: s3://omnigraph-local/clusters/company-brain # optional: put the
# ledger, catalog, and graph data on object storage (default: this folder)
metadata:
name: company-brain
graphs: