From 8dc2f1525509c3aae359317d0851e7ddce74f422 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 14:28:04 +0300 Subject: [PATCH] =?UTF-8?q?feat(cluster):=20the=20storage:=20root=20?= =?UTF-8?q?=E2=80=94=20state,=20catalog,=20and=20graph=20roots=20relocatab?= =?UTF-8?q?le?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cluster.yaml gains an optional storage: URI deciding where everything the cluster STORES lives: the state ledger, lock, content-addressed catalog, recovery sidecars, approval artifacts, and the derived graph roots (/graphs/.omni). Absent, it defaults to the config directory itself — the original layout, byte-compatible, so pre-existing clusters and the whole test suite are untouched. Declared configuration always stays in the working tree (Terraform's config-local/state-remote split); credentials are env-only, never in cluster.yaml. Every command resolves its store from the declared root (a bad root is a loud invalid_storage_root). Graph-root derivation, the delete executor (prefix delete via the adapter), the sweep's existence probes, the catalog payload write/verify/read paths, and the serving snapshot all flow through ClusterStore — the last raw-fs holdouts for stored state are gone, and the deny-list gains the rule that keeps it that way. Tests: default-layout byte-compat, a file:// root relocating the entire cluster (ledger+catalog+graphs under the new root, nothing under the config dir, serving snapshot follows), invalid-root validation. 98 in-crate + 9 failpoints + full workspace gate green. The s3:// flavor lands with PR 3's gated RustFS e2e. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/config.rs | 40 +++- crates/omnigraph-cluster/src/lib.rs | 241 +++++++++++++------------ crates/omnigraph-cluster/src/serve.rs | 70 +++---- crates/omnigraph-cluster/src/store.rs | 28 +++ crates/omnigraph-cluster/src/sweep.rs | 14 +- crates/omnigraph-cluster/src/tests.rs | 64 +++++++ crates/omnigraph-cluster/src/types.rs | 11 +- docs/dev/invariants.md | 4 + docs/user/cluster-config.md | 14 ++ docs/user/cluster.md | 2 + 10 files changed, 309 insertions(+), 179 deletions(-) diff --git a/crates/omnigraph-cluster/src/config.rs b/crates/omnigraph-cluster/src/config.rs index ecdc71c..acc954d 100644 --- a/crates/omnigraph-cluster/src/config.rs +++ b/crates/omnigraph-cluster/src/config.rs @@ -239,8 +239,33 @@ pub(crate) fn validate_cluster_header( } } + if let Some(storage) = raw.storage.as_deref() { + let trimmed = storage.trim(); + if trimmed.is_empty() { + diagnostics.push(Diagnostic::error( + "invalid_storage_root", + "storage", + "storage must be a non-empty URI (e.g. s3://bucket/prefix) when provided", + )); + } else if let Some(rest) = trimmed.strip_prefix("s3://") { + if rest.trim_start_matches('/').is_empty() { + diagnostics.push(Diagnostic::error( + "invalid_storage_root", + "storage", + "storage s3:// URI must name a bucket", + )); + } + } + } + ClusterSettings { state_lock: raw.state.lock.unwrap_or(true), + storage_root: raw + .storage + .as_deref() + .map(str::trim) + .filter(|storage| !storage.is_empty()) + .map(|storage| storage.trim_end_matches('/').to_string()), } } @@ -271,19 +296,19 @@ pub(crate) fn initial_import_state(desired: &DesiredCluster) -> ClusterState { } -pub(crate) async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { +pub(crate) async fn observe_declared_graphs( + desired: &DesiredCluster, + backend: &ClusterStore, + state: &mut ClusterState, +) -> usize { let mut graph_error_count = 0; for graph in &desired.graphs { let graph_address = graph_address(&graph.id); let schema_address = schema_address(&graph.id); - let graph_path = desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{}.omni", graph.id)); - let graph_uri = display_path(&graph_path); + let graph_uri = backend.graph_root(&graph.id); let observed_at = now_rfc3339(); - if !graph_path.exists() { + if !backend.graph_root_exists(&graph_uri).await { state.applied_revision.resources.remove(&graph_address); state.applied_revision.resources.remove(&schema_address); state.observations.insert( @@ -737,6 +762,7 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { desired: Some(DesiredCluster { config_dir: config_dir.clone(), config_digest, + storage_root: settings.storage_root.clone(), state_lock: settings.state_lock, graphs, resource_digests, diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index ec1a02a..d97bb5b 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -29,7 +29,6 @@ use store::{ClusterStore, StateLockGuard, StateSnapshot}; pub use types::*; use types::*; pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot}; -use serve::read_verified_payload; use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source}; use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind}; use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars}; @@ -43,6 +42,18 @@ pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries"; pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals"; +/// The store for a load outcome: the declared `storage:` root when present, +/// the config directory itself otherwise. A bad root is a loud error. +fn store_for( + config_dir: &Path, + storage_root: Option<&str>, +) -> Result { + match storage_root { + Some(root) => ClusterStore::for_storage_root(root), + None => Ok(ClusterStore::for_config_dir(config_dir)), + } +} + pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { let outcome = load_desired(config_dir.as_ref()); let (resource_digests, resources, dependencies) = match outcome.desired { @@ -69,7 +80,17 @@ pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = ClusterStore::for_config_dir(&outcome.config_dir); + let storage_root = outcome + .desired + .as_ref() + .and_then(|desired| desired.storage_root.clone()); + let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&outcome.config_dir) + } + }; let mut observations = backend.observations(); let Some(desired) = outcome.desired else { @@ -169,12 +190,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else { continue; }; - let graph_uri = display_path( - &desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), - ); + let graph_uri = backend.graph_root(&graph_id); let source_path = desired .resources .iter() @@ -242,7 +258,17 @@ pub async fn apply_config_dir_with_options( ) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = ClusterStore::for_config_dir(&outcome.config_dir); + let storage_root = outcome + .desired + .as_ref() + .and_then(|desired| desired.storage_root.clone()); + let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&outcome.config_dir) + } + }; let mut observations = backend.observations(); let actor_for_output = options.actor.clone(); @@ -442,12 +468,7 @@ pub async fn apply_config_dir_with_options( else { continue; }; - let graph_uri = display_path( - &desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), - ); + let graph_uri = backend.graph_root(graph_id); let mut sidecar = RecoverySidecar { schema_version: 1, operation_id: Ulid::new().to_string(), @@ -587,12 +608,7 @@ pub async fn apply_config_dir_with_options( else { continue; }; - let graph_uri = display_path( - &desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), - ); + let graph_uri = backend.graph_root(graph_id); // Read-write open: the engine's own recovery sweep runs here, which // is exactly what we want before moving its manifest. let db = match Omnigraph::open(&graph_uri).await { @@ -767,9 +783,9 @@ pub async fn apply_config_dir_with_options( .after_digest .as_deref() .expect("create/update always carries an after digest"); - let Some(target) = payload_path(&desired.config_dir, &kind, digest) else { + if ClusterStore::payload_relative(&kind, digest).is_none() { continue; - }; + } let Some(source) = source_paths.get(change.resource.as_str()) else { diagnostics.push(Diagnostic::error( "resource_payload_write_error", @@ -779,7 +795,8 @@ pub async fn apply_config_dir_with_options( continue; }; if let Err(diagnostic) = - write_resource_payload(&target, Path::new(source), digest, &change.resource) + write_resource_payload(&backend, &kind, Path::new(source), digest, &change.resource) + .await { diagnostics.push(diagnostic); } @@ -844,12 +861,7 @@ pub async fn apply_config_dir_with_options( && artifact.bound_config_digest == desired.config_digest }) .map(|artifact| artifact.approval_id.clone()); - let graph_uri = display_path( - &desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), - ); + let graph_uri = backend.graph_root(graph_id); let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await { Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await { Ok(snapshot) => Some(snapshot.version()), @@ -888,9 +900,10 @@ pub async fn apply_config_dir_with_options( graph_moving_aborted = true; continue; } - match fs::remove_dir_all(PathBuf::from(&graph_uri)) { + // Prefix delete through the storage layer: remove_dir_all locally, + // list+delete on object stores (idempotent; already-gone is fine). + match backend.delete_graph_root(&graph_uri).await { Ok(()) => {} - Err(err) if err.kind() == ErrorKind::NotFound => {} // already gone Err(err) => { diagnostics.push(Diagnostic::error( "graph_delete_failed", @@ -1088,7 +1101,17 @@ pub async fn approve_config_dir( ) -> ApproveOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let backend = ClusterStore::for_config_dir(&outcome.config_dir); + let storage_root = outcome + .desired + .as_ref() + .and_then(|desired| desired.storage_root.clone()); + let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&outcome.config_dir) + } + }; let mut observations = backend.observations(); let fail = |config_dir: String, diagnostics: Vec| ApproveOutput { @@ -1200,7 +1223,20 @@ pub async fn approve_config_dir( pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; - let backend = ClusterStore::for_config_dir(&parsed.config_dir); + let storage_root = parsed.raw.as_ref().and_then(|raw| { + raw.storage + .as_deref() + .map(str::trim) + .filter(|root| !root.is_empty()) + .map(|root| root.trim_end_matches('/').to_string()) + }); + let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&parsed.config_dir) + } + }; let mut observations = backend.observations(); backend.observe_lock(&mut observations, &mut diagnostics).await; warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics); @@ -1219,7 +1255,7 @@ pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { // findings as diagnostics; persisting Drifted statuses // is refresh's job. Status never writes state. for (address, finding) in - verify_catalog_payloads(&parsed.config_dir, &state) + verify_catalog_payloads(&backend, &state).await { diagnostics.push(payload_finding_diagnostic(&address, &finding)); } @@ -1256,7 +1292,20 @@ pub async fn force_unlock_config_dir( ) -> ForceUnlockOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; - let backend = ClusterStore::for_config_dir(&parsed.config_dir); + let storage_root = parsed.raw.as_ref().and_then(|raw| { + raw.storage + .as_deref() + .map(str::trim) + .filter(|root| !root.is_empty()) + .map(|root| root.trim_end_matches('/').to_string()) + }); + let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&parsed.config_dir) + } + }; let mut observations = backend.observations(); let mut lock_removed = false; @@ -1290,7 +1339,17 @@ pub async fn import_config_dir(config_dir: impl AsRef) -> StateSyncOutput async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput { let outcome = load_desired(config_dir); let mut diagnostics = outcome.diagnostics; - let backend = ClusterStore::for_config_dir(&outcome.config_dir); + let storage_root = outcome + .desired + .as_ref() + .and_then(|desired| desired.storage_root.clone()); + let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) { + Ok(backend) => backend, + Err(diagnostic) => { + diagnostics.push(diagnostic); + ClusterStore::for_config_dir(&outcome.config_dir) + } + }; let mut observations = backend.observations(); let Some(desired) = outcome.desired else { @@ -1418,7 +1477,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St // a drifted query digest first means the live-graph composite recompute // below already excludes it, so the persisted graph. composite stays // consistent and the next plan shows exactly the create + derived update. - for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) { + for (address, finding) in verify_catalog_payloads(&backend, &state).await { diagnostics.push(payload_finding_diagnostic(&address, &finding)); match finding { PayloadFinding::Missing => { @@ -1455,7 +1514,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St } } - let graph_error_count = observe_declared_graphs(&desired, &mut state).await; + let graph_error_count = observe_declared_graphs(&desired, &backend, &mut state).await; if graph_error_count > 0 { diagnostics.push(Diagnostic::error( "graph_observation_error", @@ -1512,28 +1571,6 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St -/// Content-addressed catalog path for an applied resource payload. Extensions -/// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name, -/// so the catalog layout cannot drift with operator file conventions. -fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option { - let resources_dir = config_dir.join(CLUSTER_RESOURCES_DIR); - match kind { - ResourceKind::Query { graph, name } => Some( - resources_dir - .join("query") - .join(graph) - .join(name) - .join(format!("{digest}.gq")), - ), - ResourceKind::Policy(name) => Some( - resources_dir - .join("policy") - .join(name) - .join(format!("{digest}.yaml")), - ), - _ => None, - } -} #[derive(Debug, PartialEq, Eq)] enum PayloadFinding { @@ -1547,34 +1584,26 @@ enum PayloadFinding { /// unknown addresses have no payloads and are skipped. Read-only; findings /// are deterministic (BTreeMap order). Payloads are small (queries, policy /// bundles), so a full digest re-hash is cheap. -fn verify_catalog_payloads( - config_dir: &Path, +async fn verify_catalog_payloads( + backend: &ClusterStore, state: &ClusterState, ) -> Vec<(String, PayloadFinding)> { let mut findings = Vec::new(); for (address, resource) in &state.applied_revision.resources { let kind = resource_kind(address); - let Some(path) = payload_path(config_dir, &kind, &resource.digest) else { + if ClusterStore::payload_relative(&kind, &resource.digest).is_none() { continue; - }; - match fs::read(&path) { - Ok(bytes) => { - let actual_digest = sha256_hex(&bytes); + } + match backend.read_payload(&kind, &resource.digest).await { + Ok(Some(text)) => { + let actual_digest = sha256_hex(text.as_bytes()); if actual_digest != resource.digest { findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest })); } } - Err(err) if err.kind() == ErrorKind::NotFound => { - findings.push((address.clone(), PayloadFinding::Missing)); - } + Ok(None) => findings.push((address.clone(), PayloadFinding::Missing)), Err(err) => { - findings.push(( - address.clone(), - PayloadFinding::ReadError(format!( - "could not read catalog payload '{}': {err}", - path.display() - )), - )); + findings.push((address.clone(), PayloadFinding::ReadError(err))); } } } @@ -1606,13 +1635,15 @@ fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagno /// digest-named file is trusted as-is. The digest re-check is the apply-side /// TOCTOU detector — the source file changing between `load_desired` and the /// payload write must fail loudly, never publish mismatched content. -fn write_resource_payload( - target: &Path, +async fn write_resource_payload( + backend: &ClusterStore, + kind: &ResourceKind, source: &Path, expected_digest: &str, resource: &str, ) -> Result<(), Diagnostic> { - if target.exists() { + if backend.payload_exists(kind, expected_digest).await { + // Content-addressed: an existing digest-named object is identical. return Ok(()); } let bytes = fs::read(source).map_err(|err| { @@ -1623,6 +1654,9 @@ fn write_resource_payload( ) })?; if sha256_hex(&bytes) != expected_digest { + // The apply-side TOCTOU detector: the source changing between + // load_desired and this write must fail loudly, never publish + // mismatched content. return Err(Diagnostic::error( "resource_content_changed", resource, @@ -1632,54 +1666,23 @@ fn write_resource_payload( ), )); } - let parent = target.parent().expect("payload path always has a parent"); - fs::create_dir_all(parent).map_err(|err| { + let content = String::from_utf8(bytes).map_err(|err| { Diagnostic::error( "resource_payload_write_error", resource, - format!("could not create payload directory: {err}"), + format!("resource source is not valid UTF-8: {err}"), ) })?; - let file_name = target - .file_name() - .expect("payload path always has a file name") - .to_string_lossy(); - let tmp_path = parent.join(format!("{file_name}.tmp.{}", Ulid::new())); - let mut file = OpenOptions::new() - .write(true) - .create_new(true) - .open(&tmp_path) + backend + .write_payload(kind, expected_digest, &content) + .await .map_err(|err| { Diagnostic::error( "resource_payload_write_error", resource, - format!("could not create temporary payload file: {err}"), + format!("could not write payload: {err}"), ) - })?; - let write_result = file - .write_all(&bytes) - .and_then(|()| file.sync_all()) - .map_err(|err| { - Diagnostic::error( - "resource_payload_write_error", - resource, - format!("could not write payload file: {err}"), - ) - }); - drop(file); - if let Err(diagnostic) = write_result { - let _ = fs::remove_file(&tmp_path); - return Err(diagnostic); - } - if let Err(err) = fs::rename(&tmp_path, target) { - let _ = fs::remove_file(&tmp_path); - return Err(Diagnostic::error( - "resource_payload_write_error", - resource, - format!("could not move payload file into place: {err}"), - )); - } - Ok(()) + }) } /// Recompute the composite `graph.` digests for state-resident graphs from diff --git a/crates/omnigraph-cluster/src/serve.rs b/crates/omnigraph-cluster/src/serve.rs index 8578aee..b459641 100644 --- a/crates/omnigraph-cluster/src/serve.rs +++ b/crates/omnigraph-cluster/src/serve.rs @@ -44,8 +44,24 @@ pub async fn read_serving_snapshot( config_dir: impl AsRef, ) -> Result> { let config_dir = config_dir.as_ref().to_path_buf(); - let backend = ClusterStore::for_config_dir(&config_dir); let mut diagnostics: Vec = Vec::new(); + // The declared storage: root decides where the ledger/catalog/graphs + // live; config parse errors surface through the normal validation path. + let parsed = parse_cluster_config(&config_dir); + let storage_root = parsed.raw.as_ref().and_then(|raw| { + raw.storage + .as_deref() + .map(str::trim) + .filter(|root| !root.is_empty()) + .map(|root| root.trim_end_matches('/').to_string()) + }); + let backend = match storage_root.as_deref() { + Some(root) => match ClusterStore::for_storage_root(root) { + Ok(backend) => backend, + Err(diagnostic) => return Err(vec![diagnostic]), + }, + None => ClusterStore::for_config_dir(&config_dir), + }; // A ledger a sweep is about to rewrite must not start serving. let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await; @@ -89,9 +105,7 @@ pub async fn read_serving_snapshot( match resource_kind(address) { ResourceKind::Graph(graph_id) => { graphs.push(ServingGraph { - root: config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{graph_id}.omni")), + root: PathBuf::from(backend.graph_root(&graph_id)), graph_id, }); } @@ -100,7 +114,7 @@ pub async fn read_serving_snapshot( let ResourceKind::Query { graph, name } = &kind else { unreachable!() }; - match read_verified_payload(&config_dir, &kind, &entry.digest, address) { + match backend.read_verified_payload(&kind, &entry.digest, address).await { Ok(source) => queries.push(ServingQuery { graph_id: graph.clone(), name: name.clone(), @@ -121,11 +135,14 @@ pub async fn read_serving_snapshot( )); continue; }; - match read_verified_payload(&config_dir, &kind, &entry.digest, address) { + match backend.read_verified_payload(&kind, &entry.digest, address).await { Ok(_) => policies.push(ServingPolicy { name: name.clone(), - blob_path: payload_path(&config_dir, &kind, &entry.digest) - .expect("policy kind always has a payload path"), + blob_path: PathBuf::from( + backend + .payload_display(&kind, &entry.digest) + .expect("policy kind always has a payload path"), + ), applies_to, }), Err(diagnostic) => diagnostics.push(diagnostic), @@ -152,40 +169,3 @@ pub async fn read_serving_snapshot( }) } -/// Read a catalog blob and verify it against the recorded digest. -pub(crate) fn read_verified_payload( - config_dir: &Path, - kind: &ResourceKind, - digest: &str, - address: &str, -) -> Result { - let path = payload_path(config_dir, kind, digest) - .expect("query/policy kinds always have a payload path"); - let bytes = fs::read(&path).map_err(|err| { - Diagnostic::error( - "catalog_payload_missing", - address, - format!( - "catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart", - display_path(&path) - ), - ) - })?; - if sha256_hex(&bytes) != digest { - return Err(Diagnostic::error( - "catalog_payload_digest_mismatch", - address, - format!( - "catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart", - display_path(&path) - ), - )); - } - String::from_utf8(bytes).map_err(|err| { - Diagnostic::error( - "catalog_payload_invalid", - address, - format!("catalog blob is not valid UTF-8: {err}"), - ) - }) -} diff --git a/crates/omnigraph-cluster/src/store.rs b/crates/omnigraph-cluster/src/store.rs index bf328af..f52dd29 100644 --- a/crates/omnigraph-cluster/src/store.rs +++ b/crates/omnigraph-cluster/src/store.rs @@ -375,6 +375,34 @@ impl ClusterStore { .unwrap_or(false) } + /// Raw payload read: `Ok(None)` for a missing blob, `Err` for transport + /// failures — callers classify (verify loops need the three-way split). + pub(crate) async fn read_payload( + &self, + kind: &ResourceKind, + digest: &str, + ) -> Result, String> { + let Some(relative) = Self::payload_relative(kind, digest) else { + return Ok(None); + }; + let uri = self.uri(&relative); + match self.adapter.exists(&uri).await { + Ok(false) => return Ok(None), + Ok(true) => {} + Err(err) => return Err(err.to_string()), + } + self.adapter + .read_text(&uri) + .await + .map(Some) + .map_err(|err| { + format!( + "could not read catalog payload '{}': {err}", + self.display(&relative) + ) + }) + } + /// Idempotent content-addressed write: a payload already present at its /// digest is by definition identical. pub(crate) async fn write_payload( diff --git a/crates/omnigraph-cluster/src/sweep.rs b/crates/omnigraph-cluster/src/sweep.rs index 2cfd7d1..7aecb01 100644 --- a/crates/omnigraph-cluster/src/sweep.rs +++ b/crates/omnigraph-cluster/src/sweep.rs @@ -19,13 +19,13 @@ pub(crate) async fn sweep_recovery_sidecars( for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await { match sidecar.kind { RecoverySidecarKind::GraphCreate => { - sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; + sweep_graph_create_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await; } RecoverySidecarKind::SchemaApply => { sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; } RecoverySidecarKind::GraphDelete => { - sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome); + sweep_graph_delete_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await; } } } @@ -33,6 +33,7 @@ pub(crate) async fn sweep_recovery_sidecars( } pub(crate) async fn sweep_graph_create_sidecar( + backend: &ClusterStore, path: String, sidecar: RecoverySidecar, state: &mut ClusterState, @@ -41,13 +42,12 @@ pub(crate) async fn sweep_graph_create_sidecar( ) { let graph_address = graph_address(&sidecar.graph_id); let schema_addr = schema_address(&sidecar.graph_id); - let graph_path = PathBuf::from(&sidecar.graph_uri); // Row 1: nothing moved — the init never landed. The sidecar is pure // intent; retire it (deferred to the command's post-CAS cleanup, like // every other completed sidecar — a failed CAS simply re-sweeps it) and // let the command's own plan re-propose the create. - if !graph_path.exists() { + if !backend.graph_root_exists(&sidecar.graph_uri).await { outcome.completed_sidecars.push(path); return; } @@ -251,7 +251,8 @@ pub(crate) async fn sweep_schema_apply_sidecar( } } -pub(crate) fn sweep_graph_delete_sidecar( +pub(crate) async fn sweep_graph_delete_sidecar( + backend: &ClusterStore, path: String, sidecar: RecoverySidecar, state: &mut ClusterState, @@ -259,9 +260,8 @@ pub(crate) fn sweep_graph_delete_sidecar( outcome: &mut SweepOutcome, ) { let graph_address = graph_address(&sidecar.graph_id); - let root = PathBuf::from(&sidecar.graph_uri); - if root.exists() { + if backend.graph_root_exists(&sidecar.graph_uri).await { // Row 8: the delete never completed. Prefix removal is idempotent and // works on partial roots, so the repair is simply the re-proposed, // still-approved delete on a later run — retire the stale intent. diff --git a/crates/omnigraph-cluster/src/tests.rs b/crates/omnigraph-cluster/src/tests.rs index 3b7984d..ba7019f 100644 --- a/crates/omnigraph-cluster/src/tests.rs +++ b/crates/omnigraph-cluster/src/tests.rs @@ -2762,6 +2762,70 @@ policies: // ---- serving snapshot (5B read-only loader) ---- + // ---- storage: root (RFC-006) ---- + + #[tokio::test] + async fn storage_root_defaults_to_config_dir_layout() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + let out = apply_config_dir(dir.path()).await; + assert!(out.converged, "{out:?}"); + // No storage: key — the original on-disk layout, byte-compatible. + assert!(dir.path().join(CLUSTER_STATE_FILE).exists()); + assert!(dir.path().join(CLUSTER_RESOURCES_DIR).exists()); + assert!(dir.path().join("graphs/knowledge.omni").exists()); + } + + #[tokio::test] + async fn storage_root_file_uri_relocates_the_cluster() { + let dir = fixture(); + let storage = tempfile::tempdir().unwrap(); + let storage_path = storage.path().to_string_lossy().to_string(); + let mut config = fs::read_to_string(dir.path().join("cluster.yaml")).unwrap(); + config = config.replace("version: 1\n", &format!("version: 1\nstorage: {storage_path}\n")); + fs::write(dir.path().join("cluster.yaml"), config).unwrap(); + + let import = import_config_dir(dir.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{:?}", out.diagnostics); + + // Everything lives under the declared root; nothing under config dir. + assert!(storage.path().join("__cluster/state.json").exists()); + assert!(storage.path().join("graphs/knowledge.omni").exists()); + assert!(storage.path().join(CLUSTER_RESOURCES_DIR).exists()); + assert!(!dir.path().join(CLUSTER_STATE_FILE).exists()); + assert!(!dir.path().join("graphs").exists()); + + // The serving snapshot follows the root. + let snapshot = read_serving_snapshot(dir.path()).await.unwrap(); + assert!( + snapshot.graphs[0] + .root + .starts_with(storage.path()), + "{:?}", + snapshot.graphs[0].root + ); + } + + #[test] + fn storage_root_invalid_uri_fails_validation() { + let dir = fixture(); + let mut config = fs::read_to_string(dir.path().join("cluster.yaml")).unwrap(); + config = config.replace("version: 1\n", "version: 1\nstorage: \"s3://\"\n"); + fs::write(dir.path().join("cluster.yaml"), config).unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "invalid_storage_root"), + "{:?}", + out.diagnostics + ); + } + #[tokio::test] async fn serving_snapshot_reads_converged_cluster() { let dir = fixture(); diff --git a/crates/omnigraph-cluster/src/types.rs b/crates/omnigraph-cluster/src/types.rs index ca960a5..e44e2f4 100644 --- a/crates/omnigraph-cluster/src/types.rs +++ b/crates/omnigraph-cluster/src/types.rs @@ -322,6 +322,8 @@ pub struct ApproveOutput { pub(crate) struct DesiredCluster { pub(crate) config_dir: PathBuf, pub(crate) config_digest: String, + /// The declared `storage:` root, if any (None ⇒ the config dir itself). + pub(crate) storage_root: Option, pub(crate) state_lock: bool, pub(crate) graphs: Vec, pub(crate) resource_digests: BTreeMap, @@ -345,9 +347,10 @@ pub(crate) struct ParsedConfig { pub(crate) config_file: PathBuf, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub(crate) struct ClusterSettings { pub(crate) state_lock: bool, + pub(crate) storage_root: Option, } #[derive(Debug)] @@ -364,6 +367,12 @@ pub(crate) struct RawClusterConfig { pub(crate) version: u32, #[serde(default)] pub(crate) metadata: Metadata, + /// Storage root URI for everything the cluster stores: the state + /// ledger, catalog, sidecars, approvals, and derived graph roots. + /// Absent ⇒ `file://` (the original layout, byte-compatible). + /// `s3://bucket/prefix` puts the whole cluster on object storage. + #[serde(default)] + pub(crate) storage: Option, #[serde(default)] pub(crate) state: StateConfig, #[serde(default)] diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 7642fd9..655e360 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -206,6 +206,10 @@ case is exceptional. fits. - Discarding retrieval score/rank before fusion or projection decisions. - Auto-creating placeholder nodes for orphan edges. +- Raw filesystem I/O for cluster-stored state (ledger, lock, sidecars, + approvals, catalog) outside the cluster crate's storage module — every + stored byte goes through the engine `StorageAdapter` so `file://` and + `s3://` stay one code path. - Wire-protocol-specific code in compiler or engine crates. - Cloud-only correctness fixes or forks of the OSS engine for correctness. - Mutating immutable substrate state in place, including Lance fragments or diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 24d1833..59c9207 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -101,6 +101,20 @@ updates all of its queries together. Paths are relative to the config directory — the cluster is one explicit folder, so no `./` prefixes are needed. +`storage:` (optional) is the **storage root URI** for everything the cluster +stores — the state ledger, lock, content-addressed catalog, recovery +sidecars, approval artifacts, and the derived graph roots +(`/graphs/.omni`). Absent, it defaults to the config directory +itself (the original layout, byte-compatible with pre-existing clusters). +`s3://bucket/prefix` puts the whole cluster on S3-compatible object storage: +the ledger CAS uses conditional writes (verified against AWS S3 semantics and +RustFS), the lock becomes genuinely cross-machine, and graph roots are +engine-native S3 URIs. Credentials are **never** in `cluster.yaml` — the +standard `AWS_*` environment contract applies, identical to graph storage. +Declared configuration (`cluster.yaml` and the schema/query/policy sources it +references) always stays in the working tree: config is versioned in git, +state lives in the store — the Terraform split. + `metadata.name` is a display label. `state.backend` may be omitted or set to `cluster`; external state backends are reserved for a later stage. `state.lock` defaults to `true`. When enabled, `cluster plan`, `cluster apply`, diff --git a/docs/user/cluster.md b/docs/user/cluster.md index 1731f31..19755fb 100644 --- a/docs/user/cluster.md +++ b/docs/user/cluster.md @@ -40,6 +40,8 @@ company-brain/ ```yaml # cluster.yaml version: 1 +# storage: s3://omnigraph-local/clusters/company-brain # optional: put the +# ledger, catalog, and graph data on object storage (default: this folder) metadata: name: company-brain graphs: