feat(cluster): the storage: root — state, catalog, and graph roots relocatable

cluster.yaml gains an optional storage: URI deciding where everything the
cluster STORES lives: the state ledger, lock, content-addressed catalog,
recovery sidecars, approval artifacts, and the derived graph roots
(<storage>/graphs/<id>.omni). Absent, it defaults to the config directory
itself — the original layout, byte-compatible, so pre-existing clusters and
the whole test suite are untouched. Declared configuration always stays in
the working tree (Terraform's config-local/state-remote split); credentials
are env-only, never in cluster.yaml.

Every command resolves its store from the declared root (a bad root is a
loud invalid_storage_root). Graph-root derivation, the delete executor
(prefix delete via the adapter), the sweep's existence probes, the catalog
payload write/verify/read paths, and the serving snapshot all flow through
ClusterStore — the last raw-fs holdouts for stored state are gone, and the
deny-list gains the rule that keeps it that way.

Tests: default-layout byte-compat, a file:// root relocating the entire
cluster (ledger+catalog+graphs under the new root, nothing under the config
dir, serving snapshot follows), invalid-root validation. 98 in-crate + 9
failpoints + full workspace gate green. The s3:// flavor lands with PR 3's
gated RustFS e2e.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-11 14:28:04 +03:00
parent fd002abaa5
commit 8dc2f15255
10 changed files with 309 additions and 179 deletions

View file

@ -239,8 +239,33 @@ pub(crate) fn validate_cluster_header(
}
}
if let Some(storage) = raw.storage.as_deref() {
let trimmed = storage.trim();
if trimmed.is_empty() {
diagnostics.push(Diagnostic::error(
"invalid_storage_root",
"storage",
"storage must be a non-empty URI (e.g. s3://bucket/prefix) when provided",
));
} else if let Some(rest) = trimmed.strip_prefix("s3://") {
if rest.trim_start_matches('/').is_empty() {
diagnostics.push(Diagnostic::error(
"invalid_storage_root",
"storage",
"storage s3:// URI must name a bucket",
));
}
}
}
ClusterSettings {
state_lock: raw.state.lock.unwrap_or(true),
storage_root: raw
.storage
.as_deref()
.map(str::trim)
.filter(|storage| !storage.is_empty())
.map(|storage| storage.trim_end_matches('/').to_string()),
}
}
@ -271,19 +296,19 @@ pub(crate) fn initial_import_state(desired: &DesiredCluster) -> ClusterState {
}
pub(crate) async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize {
pub(crate) async fn observe_declared_graphs(
desired: &DesiredCluster,
backend: &ClusterStore,
state: &mut ClusterState,
) -> usize {
let mut graph_error_count = 0;
for graph in &desired.graphs {
let graph_address = graph_address(&graph.id);
let schema_address = schema_address(&graph.id);
let graph_path = desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{}.omni", graph.id));
let graph_uri = display_path(&graph_path);
let graph_uri = backend.graph_root(&graph.id);
let observed_at = now_rfc3339();
if !graph_path.exists() {
if !backend.graph_root_exists(&graph_uri).await {
state.applied_revision.resources.remove(&graph_address);
state.applied_revision.resources.remove(&schema_address);
state.observations.insert(
@ -737,6 +762,7 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
desired: Some(DesiredCluster {
config_dir: config_dir.clone(),
config_digest,
storage_root: settings.storage_root.clone(),
state_lock: settings.state_lock,
graphs,
resource_digests,

View file

@ -29,7 +29,6 @@ use store::{ClusterStore, StateLockGuard, StateSnapshot};
pub use types::*;
use types::*;
pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot};
use serve::read_verified_payload;
use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source};
use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind};
use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars};
@ -43,6 +42,18 @@ pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources";
pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries";
pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals";
/// The store for a load outcome: the declared `storage:` root when present,
/// the config directory itself otherwise. A bad root is a loud error.
fn store_for(
config_dir: &Path,
storage_root: Option<&str>,
) -> Result<ClusterStore, Diagnostic> {
match storage_root {
Some(root) => ClusterStore::for_storage_root(root),
None => Ok(ClusterStore::for_config_dir(config_dir)),
}
}
pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
let outcome = load_desired(config_dir.as_ref());
let (resource_digests, resources, dependencies) = match outcome.desired {
@ -69,7 +80,17 @@ pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = ClusterStore::for_config_dir(&outcome.config_dir);
let storage_root = outcome
.desired
.as_ref()
.and_then(|desired| desired.storage_root.clone());
let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&outcome.config_dir)
}
};
let mut observations = backend.observations();
let Some(desired) = outcome.desired else {
@ -169,12 +190,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else {
continue;
};
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let graph_uri = backend.graph_root(&graph_id);
let source_path = desired
.resources
.iter()
@ -242,7 +258,17 @@ pub async fn apply_config_dir_with_options(
) -> ApplyOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = ClusterStore::for_config_dir(&outcome.config_dir);
let storage_root = outcome
.desired
.as_ref()
.and_then(|desired| desired.storage_root.clone());
let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&outcome.config_dir)
}
};
let mut observations = backend.observations();
let actor_for_output = options.actor.clone();
@ -442,12 +468,7 @@ pub async fn apply_config_dir_with_options(
else {
continue;
};
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let graph_uri = backend.graph_root(graph_id);
let mut sidecar = RecoverySidecar {
schema_version: 1,
operation_id: Ulid::new().to_string(),
@ -587,12 +608,7 @@ pub async fn apply_config_dir_with_options(
else {
continue;
};
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let graph_uri = backend.graph_root(graph_id);
// Read-write open: the engine's own recovery sweep runs here, which
// is exactly what we want before moving its manifest.
let db = match Omnigraph::open(&graph_uri).await {
@ -767,9 +783,9 @@ pub async fn apply_config_dir_with_options(
.after_digest
.as_deref()
.expect("create/update always carries an after digest");
let Some(target) = payload_path(&desired.config_dir, &kind, digest) else {
if ClusterStore::payload_relative(&kind, digest).is_none() {
continue;
};
}
let Some(source) = source_paths.get(change.resource.as_str()) else {
diagnostics.push(Diagnostic::error(
"resource_payload_write_error",
@ -779,7 +795,8 @@ pub async fn apply_config_dir_with_options(
continue;
};
if let Err(diagnostic) =
write_resource_payload(&target, Path::new(source), digest, &change.resource)
write_resource_payload(&backend, &kind, Path::new(source), digest, &change.resource)
.await
{
diagnostics.push(diagnostic);
}
@ -844,12 +861,7 @@ pub async fn apply_config_dir_with_options(
&& artifact.bound_config_digest == desired.config_digest
})
.map(|artifact| artifact.approval_id.clone());
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let graph_uri = backend.graph_root(graph_id);
let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await {
Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await {
Ok(snapshot) => Some(snapshot.version()),
@ -888,9 +900,10 @@ pub async fn apply_config_dir_with_options(
graph_moving_aborted = true;
continue;
}
match fs::remove_dir_all(PathBuf::from(&graph_uri)) {
// Prefix delete through the storage layer: remove_dir_all locally,
// list+delete on object stores (idempotent; already-gone is fine).
match backend.delete_graph_root(&graph_uri).await {
Ok(()) => {}
Err(err) if err.kind() == ErrorKind::NotFound => {} // already gone
Err(err) => {
diagnostics.push(Diagnostic::error(
"graph_delete_failed",
@ -1088,7 +1101,17 @@ pub async fn approve_config_dir(
) -> ApproveOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = ClusterStore::for_config_dir(&outcome.config_dir);
let storage_root = outcome
.desired
.as_ref()
.and_then(|desired| desired.storage_root.clone());
let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&outcome.config_dir)
}
};
let mut observations = backend.observations();
let fail = |config_dir: String, diagnostics: Vec<Diagnostic>| ApproveOutput {
@ -1200,7 +1223,20 @@ pub async fn approve_config_dir(
pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
let parsed = parse_cluster_config(config_dir.as_ref());
let mut diagnostics = parsed.diagnostics;
let backend = ClusterStore::for_config_dir(&parsed.config_dir);
let storage_root = parsed.raw.as_ref().and_then(|raw| {
raw.storage
.as_deref()
.map(str::trim)
.filter(|root| !root.is_empty())
.map(|root| root.trim_end_matches('/').to_string())
});
let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&parsed.config_dir)
}
};
let mut observations = backend.observations();
backend.observe_lock(&mut observations, &mut diagnostics).await;
warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics);
@ -1219,7 +1255,7 @@ pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
// findings as diagnostics; persisting Drifted statuses
// is refresh's job. Status never writes state.
for (address, finding) in
verify_catalog_payloads(&parsed.config_dir, &state)
verify_catalog_payloads(&backend, &state).await
{
diagnostics.push(payload_finding_diagnostic(&address, &finding));
}
@ -1256,7 +1292,20 @@ pub async fn force_unlock_config_dir(
) -> ForceUnlockOutput {
let parsed = parse_cluster_config(config_dir.as_ref());
let mut diagnostics = parsed.diagnostics;
let backend = ClusterStore::for_config_dir(&parsed.config_dir);
let storage_root = parsed.raw.as_ref().and_then(|raw| {
raw.storage
.as_deref()
.map(str::trim)
.filter(|root| !root.is_empty())
.map(|root| root.trim_end_matches('/').to_string())
});
let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&parsed.config_dir)
}
};
let mut observations = backend.observations();
let mut lock_removed = false;
@ -1290,7 +1339,17 @@ pub async fn import_config_dir(config_dir: impl AsRef<Path>) -> StateSyncOutput
async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput {
let outcome = load_desired(config_dir);
let mut diagnostics = outcome.diagnostics;
let backend = ClusterStore::for_config_dir(&outcome.config_dir);
let storage_root = outcome
.desired
.as_ref()
.and_then(|desired| desired.storage_root.clone());
let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
Ok(backend) => backend,
Err(diagnostic) => {
diagnostics.push(diagnostic);
ClusterStore::for_config_dir(&outcome.config_dir)
}
};
let mut observations = backend.observations();
let Some(desired) = outcome.desired else {
@ -1418,7 +1477,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
// a drifted query digest first means the live-graph composite recompute
// below already excludes it, so the persisted graph.<id> composite stays
// consistent and the next plan shows exactly the create + derived update.
for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) {
for (address, finding) in verify_catalog_payloads(&backend, &state).await {
diagnostics.push(payload_finding_diagnostic(&address, &finding));
match finding {
PayloadFinding::Missing => {
@ -1455,7 +1514,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
}
}
let graph_error_count = observe_declared_graphs(&desired, &mut state).await;
let graph_error_count = observe_declared_graphs(&desired, &backend, &mut state).await;
if graph_error_count > 0 {
diagnostics.push(Diagnostic::error(
"graph_observation_error",
@ -1512,28 +1571,6 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
/// Content-addressed catalog path for an applied resource payload. Extensions
/// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name,
/// so the catalog layout cannot drift with operator file conventions.
fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option<PathBuf> {
let resources_dir = config_dir.join(CLUSTER_RESOURCES_DIR);
match kind {
ResourceKind::Query { graph, name } => Some(
resources_dir
.join("query")
.join(graph)
.join(name)
.join(format!("{digest}.gq")),
),
ResourceKind::Policy(name) => Some(
resources_dir
.join("policy")
.join(name)
.join(format!("{digest}.yaml")),
),
_ => None,
}
}
#[derive(Debug, PartialEq, Eq)]
enum PayloadFinding {
@ -1547,34 +1584,26 @@ enum PayloadFinding {
/// unknown addresses have no payloads and are skipped. Read-only; findings
/// are deterministic (BTreeMap order). Payloads are small (queries, policy
/// bundles), so a full digest re-hash is cheap.
fn verify_catalog_payloads(
config_dir: &Path,
async fn verify_catalog_payloads(
backend: &ClusterStore,
state: &ClusterState,
) -> Vec<(String, PayloadFinding)> {
let mut findings = Vec::new();
for (address, resource) in &state.applied_revision.resources {
let kind = resource_kind(address);
let Some(path) = payload_path(config_dir, &kind, &resource.digest) else {
if ClusterStore::payload_relative(&kind, &resource.digest).is_none() {
continue;
};
match fs::read(&path) {
Ok(bytes) => {
let actual_digest = sha256_hex(&bytes);
}
match backend.read_payload(&kind, &resource.digest).await {
Ok(Some(text)) => {
let actual_digest = sha256_hex(text.as_bytes());
if actual_digest != resource.digest {
findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest }));
}
}
Err(err) if err.kind() == ErrorKind::NotFound => {
findings.push((address.clone(), PayloadFinding::Missing));
}
Ok(None) => findings.push((address.clone(), PayloadFinding::Missing)),
Err(err) => {
findings.push((
address.clone(),
PayloadFinding::ReadError(format!(
"could not read catalog payload '{}': {err}",
path.display()
)),
));
findings.push((address.clone(), PayloadFinding::ReadError(err)));
}
}
}
@ -1606,13 +1635,15 @@ fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagno
/// digest-named file is trusted as-is. The digest re-check is the apply-side
/// TOCTOU detector — the source file changing between `load_desired` and the
/// payload write must fail loudly, never publish mismatched content.
fn write_resource_payload(
target: &Path,
async fn write_resource_payload(
backend: &ClusterStore,
kind: &ResourceKind,
source: &Path,
expected_digest: &str,
resource: &str,
) -> Result<(), Diagnostic> {
if target.exists() {
if backend.payload_exists(kind, expected_digest).await {
// Content-addressed: an existing digest-named object is identical.
return Ok(());
}
let bytes = fs::read(source).map_err(|err| {
@ -1623,6 +1654,9 @@ fn write_resource_payload(
)
})?;
if sha256_hex(&bytes) != expected_digest {
// The apply-side TOCTOU detector: the source changing between
// load_desired and this write must fail loudly, never publish
// mismatched content.
return Err(Diagnostic::error(
"resource_content_changed",
resource,
@ -1632,54 +1666,23 @@ fn write_resource_payload(
),
));
}
let parent = target.parent().expect("payload path always has a parent");
fs::create_dir_all(parent).map_err(|err| {
let content = String::from_utf8(bytes).map_err(|err| {
Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not create payload directory: {err}"),
format!("resource source is not valid UTF-8: {err}"),
)
})?;
let file_name = target
.file_name()
.expect("payload path always has a file name")
.to_string_lossy();
let tmp_path = parent.join(format!("{file_name}.tmp.{}", Ulid::new()));
let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
backend
.write_payload(kind, expected_digest, &content)
.await
.map_err(|err| {
Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not create temporary payload file: {err}"),
format!("could not write payload: {err}"),
)
})?;
let write_result = file
.write_all(&bytes)
.and_then(|()| file.sync_all())
.map_err(|err| {
Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not write payload file: {err}"),
)
});
drop(file);
if let Err(diagnostic) = write_result {
let _ = fs::remove_file(&tmp_path);
return Err(diagnostic);
}
if let Err(err) = fs::rename(&tmp_path, target) {
let _ = fs::remove_file(&tmp_path);
return Err(Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not move payload file into place: {err}"),
));
}
Ok(())
})
}
/// Recompute the composite `graph.<id>` digests for state-resident graphs from

View file

@ -44,8 +44,24 @@ pub async fn read_serving_snapshot(
config_dir: impl AsRef<Path>,
) -> Result<ServingSnapshot, Vec<Diagnostic>> {
let config_dir = config_dir.as_ref().to_path_buf();
let backend = ClusterStore::for_config_dir(&config_dir);
let mut diagnostics: Vec<Diagnostic> = Vec::new();
// The declared storage: root decides where the ledger/catalog/graphs
// live; config parse errors surface through the normal validation path.
let parsed = parse_cluster_config(&config_dir);
let storage_root = parsed.raw.as_ref().and_then(|raw| {
raw.storage
.as_deref()
.map(str::trim)
.filter(|root| !root.is_empty())
.map(|root| root.trim_end_matches('/').to_string())
});
let backend = match storage_root.as_deref() {
Some(root) => match ClusterStore::for_storage_root(root) {
Ok(backend) => backend,
Err(diagnostic) => return Err(vec![diagnostic]),
},
None => ClusterStore::for_config_dir(&config_dir),
};
// A ledger a sweep is about to rewrite must not start serving.
let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await;
@ -89,9 +105,7 @@ pub async fn read_serving_snapshot(
match resource_kind(address) {
ResourceKind::Graph(graph_id) => {
graphs.push(ServingGraph {
root: config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
root: PathBuf::from(backend.graph_root(&graph_id)),
graph_id,
});
}
@ -100,7 +114,7 @@ pub async fn read_serving_snapshot(
let ResourceKind::Query { graph, name } = &kind else {
unreachable!()
};
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
match backend.read_verified_payload(&kind, &entry.digest, address).await {
Ok(source) => queries.push(ServingQuery {
graph_id: graph.clone(),
name: name.clone(),
@ -121,11 +135,14 @@ pub async fn read_serving_snapshot(
));
continue;
};
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
match backend.read_verified_payload(&kind, &entry.digest, address).await {
Ok(_) => policies.push(ServingPolicy {
name: name.clone(),
blob_path: payload_path(&config_dir, &kind, &entry.digest)
.expect("policy kind always has a payload path"),
blob_path: PathBuf::from(
backend
.payload_display(&kind, &entry.digest)
.expect("policy kind always has a payload path"),
),
applies_to,
}),
Err(diagnostic) => diagnostics.push(diagnostic),
@ -152,40 +169,3 @@ pub async fn read_serving_snapshot(
})
}
/// Read a catalog blob and verify it against the recorded digest.
pub(crate) fn read_verified_payload(
config_dir: &Path,
kind: &ResourceKind,
digest: &str,
address: &str,
) -> Result<String, Diagnostic> {
let path = payload_path(config_dir, kind, digest)
.expect("query/policy kinds always have a payload path");
let bytes = fs::read(&path).map_err(|err| {
Diagnostic::error(
"catalog_payload_missing",
address,
format!(
"catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart",
display_path(&path)
),
)
})?;
if sha256_hex(&bytes) != digest {
return Err(Diagnostic::error(
"catalog_payload_digest_mismatch",
address,
format!(
"catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart",
display_path(&path)
),
));
}
String::from_utf8(bytes).map_err(|err| {
Diagnostic::error(
"catalog_payload_invalid",
address,
format!("catalog blob is not valid UTF-8: {err}"),
)
})
}

View file

@ -375,6 +375,34 @@ impl ClusterStore {
.unwrap_or(false)
}
/// Raw payload read: `Ok(None)` for a missing blob, `Err` for transport
/// failures — callers classify (verify loops need the three-way split).
pub(crate) async fn read_payload(
&self,
kind: &ResourceKind,
digest: &str,
) -> Result<Option<String>, String> {
let Some(relative) = Self::payload_relative(kind, digest) else {
return Ok(None);
};
let uri = self.uri(&relative);
match self.adapter.exists(&uri).await {
Ok(false) => return Ok(None),
Ok(true) => {}
Err(err) => return Err(err.to_string()),
}
self.adapter
.read_text(&uri)
.await
.map(Some)
.map_err(|err| {
format!(
"could not read catalog payload '{}': {err}",
self.display(&relative)
)
})
}
/// Idempotent content-addressed write: a payload already present at its
/// digest is by definition identical.
pub(crate) async fn write_payload(

View file

@ -19,13 +19,13 @@ pub(crate) async fn sweep_recovery_sidecars(
for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await {
match sidecar.kind {
RecoverySidecarKind::GraphCreate => {
sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
sweep_graph_create_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await;
}
RecoverySidecarKind::SchemaApply => {
sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
}
RecoverySidecarKind::GraphDelete => {
sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome);
sweep_graph_delete_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await;
}
}
}
@ -33,6 +33,7 @@ pub(crate) async fn sweep_recovery_sidecars(
}
pub(crate) async fn sweep_graph_create_sidecar(
backend: &ClusterStore,
path: String,
sidecar: RecoverySidecar,
state: &mut ClusterState,
@ -41,13 +42,12 @@ pub(crate) async fn sweep_graph_create_sidecar(
) {
let graph_address = graph_address(&sidecar.graph_id);
let schema_addr = schema_address(&sidecar.graph_id);
let graph_path = PathBuf::from(&sidecar.graph_uri);
// Row 1: nothing moved — the init never landed. The sidecar is pure
// intent; retire it (deferred to the command's post-CAS cleanup, like
// every other completed sidecar — a failed CAS simply re-sweeps it) and
// let the command's own plan re-propose the create.
if !graph_path.exists() {
if !backend.graph_root_exists(&sidecar.graph_uri).await {
outcome.completed_sidecars.push(path);
return;
}
@ -251,7 +251,8 @@ pub(crate) async fn sweep_schema_apply_sidecar(
}
}
pub(crate) fn sweep_graph_delete_sidecar(
pub(crate) async fn sweep_graph_delete_sidecar(
backend: &ClusterStore,
path: String,
sidecar: RecoverySidecar,
state: &mut ClusterState,
@ -259,9 +260,8 @@ pub(crate) fn sweep_graph_delete_sidecar(
outcome: &mut SweepOutcome,
) {
let graph_address = graph_address(&sidecar.graph_id);
let root = PathBuf::from(&sidecar.graph_uri);
if root.exists() {
if backend.graph_root_exists(&sidecar.graph_uri).await {
// Row 8: the delete never completed. Prefix removal is idempotent and
// works on partial roots, so the repair is simply the re-proposed,
// still-approved delete on a later run — retire the stale intent.

View file

@ -2762,6 +2762,70 @@ policies:
// ---- serving snapshot (5B read-only loader) ----
// ---- storage: root (RFC-006) ----
#[tokio::test]
async fn storage_root_defaults_to_config_dir_layout() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.converged, "{out:?}");
// No storage: key — the original on-disk layout, byte-compatible.
assert!(dir.path().join(CLUSTER_STATE_FILE).exists());
assert!(dir.path().join(CLUSTER_RESOURCES_DIR).exists());
assert!(dir.path().join("graphs/knowledge.omni").exists());
}
#[tokio::test]
async fn storage_root_file_uri_relocates_the_cluster() {
let dir = fixture();
let storage = tempfile::tempdir().unwrap();
let storage_path = storage.path().to_string_lossy().to_string();
let mut config = fs::read_to_string(dir.path().join("cluster.yaml")).unwrap();
config = config.replace("version: 1\n", &format!("version: 1\nstorage: {storage_path}\n"));
fs::write(dir.path().join("cluster.yaml"), config).unwrap();
let import = import_config_dir(dir.path()).await;
assert!(import.ok, "{:?}", import.diagnostics);
let out = apply_config_dir(dir.path()).await;
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
// Everything lives under the declared root; nothing under config dir.
assert!(storage.path().join("__cluster/state.json").exists());
assert!(storage.path().join("graphs/knowledge.omni").exists());
assert!(storage.path().join(CLUSTER_RESOURCES_DIR).exists());
assert!(!dir.path().join(CLUSTER_STATE_FILE).exists());
assert!(!dir.path().join("graphs").exists());
// The serving snapshot follows the root.
let snapshot = read_serving_snapshot(dir.path()).await.unwrap();
assert!(
snapshot.graphs[0]
.root
.starts_with(storage.path()),
"{:?}",
snapshot.graphs[0].root
);
}
#[test]
fn storage_root_invalid_uri_fails_validation() {
let dir = fixture();
let mut config = fs::read_to_string(dir.path().join("cluster.yaml")).unwrap();
config = config.replace("version: 1\n", "version: 1\nstorage: \"s3://\"\n");
fs::write(dir.path().join("cluster.yaml"), config).unwrap();
let out = validate_config_dir(dir.path());
assert!(!out.ok);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "invalid_storage_root"),
"{:?}",
out.diagnostics
);
}
#[tokio::test]
async fn serving_snapshot_reads_converged_cluster() {
let dir = fixture();

View file

@ -322,6 +322,8 @@ pub struct ApproveOutput {
pub(crate) struct DesiredCluster {
pub(crate) config_dir: PathBuf,
pub(crate) config_digest: String,
/// The declared `storage:` root, if any (None ⇒ the config dir itself).
pub(crate) storage_root: Option<String>,
pub(crate) state_lock: bool,
pub(crate) graphs: Vec<DesiredGraph>,
pub(crate) resource_digests: BTreeMap<String, String>,
@ -345,9 +347,10 @@ pub(crate) struct ParsedConfig {
pub(crate) config_file: PathBuf,
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub(crate) struct ClusterSettings {
pub(crate) state_lock: bool,
pub(crate) storage_root: Option<String>,
}
#[derive(Debug)]
@ -364,6 +367,12 @@ pub(crate) struct RawClusterConfig {
pub(crate) version: u32,
#[serde(default)]
pub(crate) metadata: Metadata,
/// Storage root URI for everything the cluster stores: the state
/// ledger, catalog, sidecars, approvals, and derived graph roots.
/// Absent ⇒ `file://<config-dir>` (the original layout, byte-compatible).
/// `s3://bucket/prefix` puts the whole cluster on object storage.
#[serde(default)]
pub(crate) storage: Option<String>,
#[serde(default)]
pub(crate) state: StateConfig,
#[serde(default)]

View file

@ -206,6 +206,10 @@ case is exceptional.
fits.
- Discarding retrieval score/rank before fusion or projection decisions.
- Auto-creating placeholder nodes for orphan edges.
- Raw filesystem I/O for cluster-stored state (ledger, lock, sidecars,
approvals, catalog) outside the cluster crate's storage module — every
stored byte goes through the engine `StorageAdapter` so `file://` and
`s3://` stay one code path.
- Wire-protocol-specific code in compiler or engine crates.
- Cloud-only correctness fixes or forks of the OSS engine for correctness.
- Mutating immutable substrate state in place, including Lance fragments or

View file

@ -101,6 +101,20 @@ updates all of its queries together. Paths are relative to the config
directory — the cluster is one explicit folder, so no `./` prefixes are
needed.
`storage:` (optional) is the **storage root URI** for everything the cluster
stores — the state ledger, lock, content-addressed catalog, recovery
sidecars, approval artifacts, and the derived graph roots
(`<storage>/graphs/<id>.omni`). Absent, it defaults to the config directory
itself (the original layout, byte-compatible with pre-existing clusters).
`s3://bucket/prefix` puts the whole cluster on S3-compatible object storage:
the ledger CAS uses conditional writes (verified against AWS S3 semantics and
RustFS), the lock becomes genuinely cross-machine, and graph roots are
engine-native S3 URIs. Credentials are **never** in `cluster.yaml` — the
standard `AWS_*` environment contract applies, identical to graph storage.
Declared configuration (`cluster.yaml` and the schema/query/policy sources it
references) always stays in the working tree: config is versioned in git,
state lives in the store — the Terraform split.
`metadata.name` is a display label. `state.backend` may be omitted or set to
`cluster`; external state backends are reserved for a later stage. `state.lock`
defaults to `true`. When enabled, `cluster plan`, `cluster apply`,

View file

@ -40,6 +40,8 @@ company-brain/
```yaml
# cluster.yaml
version: 1
# storage: s3://omnigraph-local/clusters/company-brain # optional: put the
# ledger, catalog, and graph data on object storage (default: this folder)
metadata:
name: company-brain
graphs: