diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ed00d1..b2b18d8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -89,7 +89,9 @@ jobs: crates/omnigraph/src/storage.rs) run_rustfs_ci=true ;; crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;; crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;; - crates/omnigraph-server/tests/server.rs) run_rustfs_ci=true ;; + crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;; + crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;; + crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;; crates/omnigraph-cli/tests/system_local.rs) run_rustfs_ci=true ;; esac done @@ -351,10 +353,14 @@ jobs: run: cargo test --locked -p omnigraph-engine --test s3_storage -- --nocapture - name: Run RustFS server smoke - # The exact test name (not a loose substring): a filter that matches - # nothing passes vacuously, which silently ran zero tests here for a - # while (the old filter said s3_repo; the test says s3_graph). - run: cargo test --locked -p omnigraph-server --test s3 server_opens_s3_graph_directly_and_serves_snapshot_and_read -- --nocapture + # No name filter: every test in the s3 target is bucket-gated, and a + # filter matching nothing passes vacuously (which silently ran zero + # tests here for a while — the old filter said s3_repo, the test + # said s3_graph). + run: cargo test --locked -p omnigraph-server --test s3 -- --nocapture + + - name: Run RustFS cluster e2e + run: cargo test --locked -p omnigraph-cluster --test s3_cluster -- --nocapture - name: Run RustFS CLI smoke run: cargo test --locked -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow -- --nocapture diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index d97bb5b..1422dad 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -28,7 +28,7 @@ mod store; use store::{ClusterStore, StateLockGuard, StateSnapshot}; pub use types::*; use types::*; -pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot}; +pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot, read_serving_snapshot_from_storage}; use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source}; use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind}; use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars}; diff --git a/crates/omnigraph-cluster/src/serve.rs b/crates/omnigraph-cluster/src/serve.rs index b459641..4abd0bf 100644 --- a/crates/omnigraph-cluster/src/serve.rs +++ b/crates/omnigraph-cluster/src/serve.rs @@ -23,7 +23,10 @@ pub struct ServingQuery { #[derive(Debug, Clone)] pub struct ServingPolicy { pub name: String, - pub blob_path: PathBuf, + /// The policy bundle CONTENT, digest-verified against the applied + /// revision at read time. Content, not a path: the catalog may live on + /// object storage, and the server must not re-read mutable state. + pub source: String, pub applies_to: Vec, } @@ -44,7 +47,6 @@ pub async fn read_serving_snapshot( config_dir: impl AsRef, ) -> Result> { let config_dir = config_dir.as_ref().to_path_buf(); - let mut diagnostics: Vec = Vec::new(); // The declared storage: root decides where the ledger/catalog/graphs // live; config parse errors surface through the normal validation path. let parsed = parse_cluster_config(&config_dir); @@ -62,6 +64,25 @@ pub async fn read_serving_snapshot( }, None => ClusterStore::for_config_dir(&config_dir), }; + read_snapshot_with_store(backend).await +} + +/// Read the applied revision directly from a storage root URI — config-free +/// serving: a `--cluster s3://bucket/prefix` server needs no local files at +/// all, only the bucket and credentials. The ledger and catalog ARE the +/// deployment artifact. +pub async fn read_serving_snapshot_from_storage( + storage_root: &str, +) -> Result> { + let backend = + ClusterStore::for_storage_root(storage_root).map_err(|diagnostic| vec![diagnostic])?; + read_snapshot_with_store(backend).await +} + +async fn read_snapshot_with_store( + backend: ClusterStore, +) -> Result> { + let mut diagnostics: Vec = Vec::new(); // A ledger a sweep is about to rewrite must not start serving. let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await; @@ -136,13 +157,9 @@ pub async fn read_serving_snapshot( continue; }; match backend.read_verified_payload(&kind, &entry.digest, address).await { - Ok(_) => policies.push(ServingPolicy { + Ok(source) => policies.push(ServingPolicy { name: name.clone(), - blob_path: PathBuf::from( - backend - .payload_display(&kind, &entry.digest) - .expect("policy kind always has a payload path"), - ), + source, applies_to, }), Err(diagnostic) => diagnostics.push(diagnostic), diff --git a/crates/omnigraph-cluster/src/tests.rs b/crates/omnigraph-cluster/src/tests.rs index ba7019f..63e7da7 100644 --- a/crates/omnigraph-cluster/src/tests.rs +++ b/crates/omnigraph-cluster/src/tests.rs @@ -2843,7 +2843,9 @@ policies: assert!(snapshot.queries[0].source.contains("query find_person")); assert_eq!(snapshot.policies.len(), 1); assert_eq!(snapshot.policies[0].applies_to, vec!["graph.knowledge"]); - assert!(snapshot.policies[0].blob_path.exists()); + // Content, not a path: the catalog may live on object storage. + // The fixture bundle is `rules: []` — assert the verified text. + assert!(snapshot.policies[0].source.contains("rules:")); } #[tokio::test] diff --git a/crates/omnigraph-cluster/tests/s3_cluster.rs b/crates/omnigraph-cluster/tests/s3_cluster.rs new file mode 100644 index 0000000..3c7cef3 --- /dev/null +++ b/crates/omnigraph-cluster/tests/s3_cluster.rs @@ -0,0 +1,162 @@ +//! Cluster-on-object-storage end-to-end (RFC-006): the full control-plane +//! lifecycle with `storage: s3://…` — import, apply (graph roots + catalog +//! on the bucket), serving snapshots from both the config dir and the bare +//! storage URI, schema evolution, and the approved delete (prefix removal). +//! +//! Gated like every S3 suite: skips unless `OMNIGRAPH_S3_TEST_BUCKET` is +//! set (CI runs it against containerized RustFS; locally use the RustFS +//! binary + `AWS_*` env, see docs/dev/testing.md). +//! +//! Runtime flavor is multi_thread on purpose: the state-lock guard's +//! drop-time release uses block_in_place on object stores, which is the +//! production (CLI) runtime shape — and the lock-release regression this +//! suite pins (a spawned delete dying with a short-lived runtime) only +//! reproduces realistically under it. + +use std::env; +use std::fs; + +use omnigraph_cluster::{ + apply_config_dir, import_config_dir, read_serving_snapshot, + read_serving_snapshot_from_storage, status_config_dir, validate_config_dir, +}; +use ulid::Ulid; + +const SCHEMA_V1: &str = "node Person {\n name: String @key\n}\n"; +const SCHEMA_V2: &str = "node Person {\n name: String @key\n title: String?\n}\n"; +const FIND_PERSON_GQ: &str = "query find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n"; +const POLICY_YAML: &str = r#" +version: 1 +actors: + - id: act-admin + roles: [admin] +rules: + - effect: permit + actions: [read, change, schema_apply, branch_create, branch_delete, branch_merge] + roles: [admin] +"#; + +/// Unique per-run storage root under the test bucket, or None to skip. +fn s3_storage_root(suite: &str) -> Option { + let bucket = env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?; + Some(format!("s3://{bucket}/cluster-e2e/{suite}-{}", Ulid::new())) +} + +fn write_cluster_fixture(dir: &std::path::Path, storage_root: &str, schema: &str) { + fs::write(dir.join("people.pg"), schema).unwrap(); + fs::create_dir_all(dir.join("queries")).unwrap(); + fs::write(dir.join("queries/people.gq"), FIND_PERSON_GQ).unwrap(); + fs::write(dir.join("intel.policy.yaml"), POLICY_YAML).unwrap(); + fs::write( + dir.join("cluster.yaml"), + format!( + r#"version: 1 +storage: {storage_root} +graphs: + knowledge: + schema: people.pg + queries: queries/ +policies: + intel: + file: intel.policy.yaml + applies_to: [graph.knowledge] +"# + ), + ) + .unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn s3_cluster_full_lifecycle_import_apply_serve_evolve_delete() { + let Some(root) = s3_storage_root("lifecycle") else { + eprintln!("skipping s3 cluster e2e: OMNIGRAPH_S3_TEST_BUCKET is not set"); + return; + }; + let dir = tempfile::tempdir().unwrap(); + write_cluster_fixture(dir.path(), &root, SCHEMA_V1); + + // validate is config-only and must pass before any bucket I/O. + let validate = validate_config_dir(dir.path()); + assert!(validate.ok, "{:?}", validate.diagnostics); + + let import = import_config_dir(dir.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + + // The lock-release regression (caught live on the first smoke): the + // guard's drop must COMPLETE its bucket delete before the command + // returns — a follow-up command finding `state_lock_held` means the + // release was spawned into a dying runtime. + let status = status_config_dir(dir.path()).await; + assert!(status.ok, "{:?}", status.diagnostics); + assert!( + !status.state_observations.locked, + "import leaked the state lock on the bucket: {:?}", + status.state_observations + ); + + let apply = apply_config_dir(dir.path()).await; + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + + // Nothing stored locally: the config dir holds only declared sources. + assert!(!dir.path().join("__cluster").exists()); + assert!(!dir.path().join("graphs").exists()); + + // Serving snapshot resolves through cluster.yaml's storage: key… + let via_config = read_serving_snapshot(dir.path()).await.unwrap(); + assert_eq!(via_config.graphs.len(), 1); + let graph_root = via_config.graphs[0].root.to_string_lossy().to_string(); + assert!( + graph_root.starts_with("s3://") && graph_root.ends_with("graphs/knowledge.omni"), + "{graph_root}" + ); + assert_eq!(via_config.queries.len(), 1); + assert_eq!(via_config.policies.len(), 1); + assert!( + via_config.policies[0].source.contains("act-admin"), + "policy must carry verified content, not a path" + ); + + // …and config-free, straight from the bucket URI (the deployment + // payoff: a server needs only the URI and credentials). + let via_uri = read_serving_snapshot_from_storage(&root).await.unwrap(); + assert_eq!(via_uri.graphs.len(), 1); + assert_eq!( + via_uri.graphs[0].root.to_string_lossy(), + via_config.graphs[0].root.to_string_lossy() + ); + assert_eq!(via_uri.policies.len(), 1); + + // Schema evolution converges on the bucket. + write_cluster_fixture(dir.path(), &root, SCHEMA_V2); + let evolve = apply_config_dir(dir.path()).await; + assert!(evolve.ok && evolve.converged, "{:?}", evolve.diagnostics); + + // Approved delete: drop the graph from the config; the plan demands an + // approval, the approved apply prefix-deletes the bucket root. + fs::write( + dir.path().join("cluster.yaml"), + format!("version: 1\nstorage: {root}\ngraphs: {{}}\n"), + ) + .unwrap(); + let plan = omnigraph_cluster::plan_config_dir(dir.path()).await; + assert!(plan.ok, "{:?}", plan.diagnostics); + let approval = plan + .approvals_required + .first() + .expect("graph delete requires approval"); + let approve = omnigraph_cluster::approve_config_dir( + dir.path(), + &approval.resource, + "e2e-operator", + ) + .await; + assert!(approve.ok, "{:?}", approve.diagnostics); + let delete = apply_config_dir(dir.path()).await; + assert!(delete.ok && delete.converged, "{:?}", delete.diagnostics); + + let after = read_serving_snapshot_from_storage(&root).await; + assert!( + after.is_err(), + "an empty cluster must refuse to serve, got {after:?}" + ); +} diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 1c70083..3bde2a7 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -193,18 +193,28 @@ pub enum ServerConfigMode { config_path: PathBuf, /// `server.policy.file` (server-level Cedar policy for the /// management endpoints). Wired into `GET /graphs` authorization. - server_policy_file: Option, + server_policy: Option, }, } +/// Where a Cedar policy bundle comes from at startup. File-based for +/// omnigraph.yaml deployments; inline (digest-verified catalog content) +/// for cluster-mode boots, where the catalog may live on object storage +/// and the server must not re-read mutable state after the snapshot. +#[derive(Debug, Clone)] +pub enum PolicySource { + File(PathBuf), + Inline(String), +} + /// One graph's startup-time configuration: id, opened URI, optional -/// per-graph policy file path. Constructed by `load_server_settings` +/// per-graph policy source. Constructed by `load_server_settings` /// in multi mode; consumed by `serve`'s parallel open loop. #[derive(Debug, Clone)] pub struct GraphStartupConfig { pub graph_id: String, pub uri: String, - pub policy_file: Option, + pub policy: Option, /// Per-graph stored-query registry, loaded and identity-checked at /// settings-build time; type-checked against the schema when this /// graph's engine opens. @@ -994,9 +1004,9 @@ pub async fn serve(config: ServerConfig) -> Result<()> { ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(), ServerConfigMode::Multi { graphs, - server_policy_file, + server_policy, .. - } => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()), + } => server_policy.is_some() || graphs.iter().any(|g| g.policy.is_some()), }; let runtime_state = classify_server_runtime_state( !tokens.is_empty(), @@ -1045,7 +1055,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> { ServerConfigMode::Multi { graphs, config_path, - server_policy_file, + server_policy, } => { info!( bind = %bind, @@ -1054,7 +1064,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> { config = %config_path.display(), "serving omnigraph" ); - open_multi_graph_state(graphs, tokens, server_policy_file.as_ref(), config_path).await? + open_multi_graph_state(graphs, tokens, server_policy.as_ref(), config_path).await? } }; @@ -1065,6 +1075,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> { Ok(()) } +/// Load a graph-scoped policy bundle from either source kind. +fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result { + match source { + PolicySource::File(path) => Ok(PolicyEngine::load_graph(path, graph_id)?), + PolicySource::Inline(text) => Ok(PolicyEngine::load_graph_from_source(text, graph_id)?), + } +} + /// Parallel open of every graph in the startup config, with bounded /// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error /// aborts startup; other in-flight opens are dropped (their `Omnigraph` @@ -1076,7 +1094,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> { pub async fn open_multi_graph_state( graphs: Vec, tokens: Vec<(String, String)>, - server_policy_file: Option<&PathBuf>, + server_policy_source: Option<&PolicySource>, config_path: PathBuf, ) -> Result { use futures::{StreamExt, TryStreamExt}; @@ -1089,8 +1107,11 @@ pub async fn open_multi_graph_state( // The placeholder graph_id `"server"` is the sentinel the Cedar // resource-model refactor maps to the singleton // `Omnigraph::Server::"root"` entity at evaluation time. - let server_policy = match server_policy_file { - Some(path) => Some(PolicyEngine::load_server(path)?), + let server_policy = match server_policy_source { + Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?), + Some(PolicySource::Inline(source)) => { + Some(PolicyEngine::load_server_from_source(source)?) + } None => None, }; @@ -1128,9 +1149,9 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result> // owned `Arc`, so no borrow of `db` survives into the match. let queries = validate_and_attach(cfg.queries, &db.catalog(), graph_id.as_str())?; - let (policy_arc, db) = match &cfg.policy_file { - Some(path) => { - let policy = PolicyEngine::load_graph(path, graph_id.as_str())?; + let (policy_arc, db) = match &cfg.policy { + Some(source) => { + let policy = load_graph_policy(source, graph_id.as_str())?; let policy_arc: Arc = Arc::new(policy); let checker = Arc::clone(&policy_arc) as Arc; (Some(policy_arc), db.with_policy(checker)) diff --git a/crates/omnigraph-server/src/main.rs b/crates/omnigraph-server/src/main.rs index 9000910..a138d12 100644 --- a/crates/omnigraph-server/src/main.rs +++ b/crates/omnigraph-server/src/main.rs @@ -14,10 +14,10 @@ struct Cli { target: Option, #[arg(long)] config: Option, - /// Boot from a cluster directory (the applied revision in - /// __cluster/state.json + content-addressed catalog blobs) instead of - /// omnigraph.yaml. Exclusive: cannot combine with , --target, or - /// --config. + /// Boot from a cluster: either a config directory (storage resolved + /// through cluster.yaml) or a storage-root URI directly + /// (s3://bucket/prefix — config-free serving from the bucket). + /// Exclusive: cannot combine with , --target, or --config. #[arg(long)] cluster: Option, #[arg(long)] diff --git a/crates/omnigraph-server/src/settings.rs b/crates/omnigraph-server/src/settings.rs index 6531c3a..59c437b 100644 --- a/crates/omnigraph-server/src/settings.rs +++ b/crates/omnigraph-server/src/settings.rs @@ -14,7 +14,19 @@ pub(crate) async fn load_cluster_settings( cli_bind: Option, cli_allow_unauthenticated: bool, ) -> Result { - let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).await.map_err(|diagnostics| { + // `--cluster` accepts either a config directory (the ledger location is + // resolved through cluster.yaml's `storage:` key) or a storage-root URI + // directly (`s3://bucket/prefix`) — config-free serving: the ledger and + // catalog on the bucket ARE the deployment artifact. + // Any scheme-qualified argument (s3://, file://) is a storage root; a + // bare path is a config directory. + let cluster_arg = cluster_dir.to_string_lossy(); + let snapshot = if cluster_arg.contains("://") { + omnigraph_cluster::read_serving_snapshot_from_storage(cluster_arg.as_ref()).await + } else { + omnigraph_cluster::read_serving_snapshot(cluster_dir).await + } + .map_err(|diagnostics| { let details = diagnostics .iter() .map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message)) @@ -26,19 +38,25 @@ pub(crate) async fn load_cluster_settings( // Bindings -> Cedar slots. The serving pipeline loads one bundle per // graph plus one server-level bundle; stacked bundles per scope are a // later slice — refuse loudly rather than silently merging policy. - let mut server_policy_file: Option = None; - let mut graph_policy_files: BTreeMap = BTreeMap::new(); + let mut server_policy: Option = None; + let mut graph_policies: BTreeMap = BTreeMap::new(); for policy in &snapshot.policies { for binding in &policy.applies_to { if binding == "cluster" { - if server_policy_file.replace(policy.blob_path.clone()).is_some() { + if server_policy + .replace(PolicySource::Inline(policy.source.clone())) + .is_some() + { bail!( "multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)" ); } } else if let Some(graph_id) = binding.strip_prefix("graph.") { - if graph_policy_files - .insert(graph_id.to_string(), policy.blob_path.clone()) + if graph_policies + .insert( + graph_id.to_string(), + PolicySource::Inline(policy.source.clone()), + ) .is_some() { bail!( @@ -80,7 +98,7 @@ pub(crate) async fn load_cluster_settings( graphs.push(GraphStartupConfig { graph_id: graph.graph_id.clone(), uri: graph.root.to_string_lossy().to_string(), - policy_file: graph_policy_files.get(&graph.graph_id).cloned(), + policy: graph_policies.get(&graph.graph_id).cloned(), queries: registry, }); } @@ -97,7 +115,7 @@ pub(crate) async fn load_cluster_settings( mode: ServerConfigMode::Multi { graphs, config_path: cluster_dir.clone(), - server_policy_file, + server_policy, }, bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()), allow_unauthenticated: cli_allow_unauthenticated || env_unauth, @@ -226,18 +244,18 @@ pub async fn load_server_settings( graphs.push(GraphStartupConfig { graph_id: name.clone(), uri, - policy_file: config.resolve_target_policy_file(name), + policy: config.resolve_target_policy_file(name).map(PolicySource::File), queries, }); } let config_path = config_path .cloned() .expect("has_explicit_config implies config_path is Some"); - let server_policy_file = config.resolve_server_policy_file(); + let server_policy = config.resolve_server_policy_file().map(PolicySource::File); ServerConfigMode::Multi { graphs, config_path, - server_policy_file, + server_policy, } } else { // Rule 5 → error with migration hint. @@ -729,11 +747,11 @@ server: .join("alpha.omni") .to_string_lossy() .into_owned(), - policy_file: None, + policy: None, queries: crate::queries::QueryRegistry::default(), }], config_path: temp.path().join("omnigraph.yaml"), - server_policy_file: Some(policy_path), + server_policy: Some(crate::PolicySource::File(policy_path)), }, bind: "127.0.0.1:0".to_string(), allow_unauthenticated: false, diff --git a/crates/omnigraph-server/tests/boot_settings.rs b/crates/omnigraph-server/tests/boot_settings.rs index 0e75486..3869d27 100644 --- a/crates/omnigraph-server/tests/boot_settings.rs +++ b/crates/omnigraph-server/tests/boot_settings.rs @@ -702,12 +702,14 @@ graphs: let alpha = &graphs[0]; let beta = &graphs[1]; assert_eq!(alpha.graph_id, "alpha"); - assert_eq!( - alpha.policy_file.as_ref().unwrap(), - &temp.path().join("policies/alpha.yaml") - ); + let omnigraph_server::PolicySource::File(alpha_policy) = + alpha.policy.as_ref().unwrap() + else { + panic!("yaml-configured policy must stay file-based"); + }; + assert_eq!(alpha_policy, &temp.path().join("policies/alpha.yaml")); assert_eq!(beta.graph_id, "beta"); - assert!(beta.policy_file.is_none()); + assert!(beta.policy.is_none()); } /// `server.policy.file` resolves alongside the graphs map. @@ -729,13 +731,11 @@ graphs: .unwrap(); let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap(); match settings.mode { - ServerConfigMode::Multi { - server_policy_file, .. - } => { - assert_eq!( - server_policy_file.unwrap(), - temp.path().join("server-policy.yaml") - ); + ServerConfigMode::Multi { server_policy, .. } => { + let omnigraph_server::PolicySource::File(path) = server_policy.unwrap() else { + panic!("yaml-configured server policy must stay file-based"); + }; + assert_eq!(path, temp.path().join("server-policy.yaml")); } _ => panic!("expected Multi"), } diff --git a/crates/omnigraph-server/tests/multi_graph.rs b/crates/omnigraph-server/tests/multi_graph.rs index 251f899..5ad847f 100644 --- a/crates/omnigraph-server/tests/multi_graph.rs +++ b/crates/omnigraph-server/tests/multi_graph.rs @@ -401,14 +401,14 @@ async fn cluster_boot_serves_applied_state() { let omnigraph_server::ServerConfigMode::Multi { graphs, config_path, - server_policy_file, + server_policy, } = settings.mode else { panic!("cluster boot must select multi-graph routing"); }; assert_eq!(graphs.len(), 1); assert_eq!(graphs[0].graph_id, "knowledge"); - assert!(server_policy_file.is_none()); + assert!(server_policy.is_none()); let state = omnigraph_server::open_multi_graph_state(graphs, Vec::new(), None, config_path) @@ -516,26 +516,26 @@ graphs: let settings = cluster_settings(temp.path()).await.unwrap(); let omnigraph_server::ServerConfigMode::Multi { graphs, - server_policy_file, + server_policy, .. } = settings.mode else { panic!("cluster boot must select multi-graph routing"); }; - let graph_policy = graphs[0].policy_file.as_ref().expect("graph-bound bundle"); - assert!( - graph_policy - .to_string_lossy() - .contains("__cluster/resources/policy/graph_rules/"), - "{graph_policy:?}" - ); - let server_policy = server_policy_file.expect("cluster-bound bundle"); - assert!( - server_policy - .to_string_lossy() - .contains("__cluster/resources/policy/cluster_rules/"), - "{server_policy:?}" - ); + // Cluster boots carry policy CONTENT (digest-verified catalog blobs), + // not paths — the catalog may live on object storage. + let omnigraph_server::PolicySource::Inline(graph_policy) = + graphs[0].policy.as_ref().expect("graph-bound bundle") + else { + panic!("cluster-mode graph policy must be inline content"); + }; + assert!(graph_policy.contains("actors:"), "{graph_policy:?}"); + let omnigraph_server::PolicySource::Inline(server_policy) = + server_policy.expect("cluster-bound bundle") + else { + panic!("cluster-mode server policy must be inline content"); + }; + assert!(server_policy.contains("kind: server"), "{server_policy:?}"); } #[tokio::test] diff --git a/crates/omnigraph-server/tests/s3.rs b/crates/omnigraph-server/tests/s3.rs index b0126a8..2c61125 100644 --- a/crates/omnigraph-server/tests/s3.rs +++ b/crates/omnigraph-server/tests/s3.rs @@ -75,3 +75,105 @@ async fn server_opens_s3_graph_directly_and_serves_snapshot_and_read() { assert_eq!(read_body["row_count"], 1); assert_eq!(read_body["rows"][0]["p.name"], "Alice"); } + +/// Config-free cluster serving (RFC-006): boot `--cluster s3://bucket/prefix` +/// with NO local files at all — the ledger and catalog on the bucket are the +/// whole deployment artifact. The fixture cluster is applied from a temp +/// config dir, which is then dropped before the server boots from the URI. +#[tokio::test(flavor = "multi_thread")] +async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() { + let Some(bucket) = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok() else { + eprintln!("skipping s3 cluster-serving test: OMNIGRAPH_S3_TEST_BUCKET is not set"); + return; + }; + let unique = format!( + "{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + ); + let root = format!("s3://{bucket}/cluster-serve/{unique}"); + + // Apply a one-graph cluster onto the bucket, seed it, then DROP the + // config dir — the boot below must need nothing local. + { + let dir = tempfile::tempdir().unwrap(); + fs::write( + dir.path().join("people.pg"), + "node Person {\n name: String @key\n}\n", + ) + .unwrap(); + fs::write( + dir.path().join("people.gq"), + "query find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + fs::write( + dir.path().join("cluster.yaml"), + format!( + "version: 1\nstorage: {root}\ngraphs:\n knowledge:\n schema: people.pg\n queries:\n find_person:\n file: people.gq\n" + ), + ) + .unwrap(); + let import = omnigraph_cluster::import_config_dir(dir.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let apply = omnigraph_cluster::apply_config_dir(dir.path()).await; + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + + let graph_uri = format!("{root}/graphs/knowledge.omni"); + let mut db = Omnigraph::open(&graph_uri).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n", + LoadMode::Overwrite, + ) + .await + .unwrap(); + } + + let settings = omnigraph_server::load_server_settings( + None, + Some(&std::path::PathBuf::from(&root)), + None, + None, + None, + true, + ) + .await + .unwrap(); + let omnigraph_server::ServerConfigMode::Multi { + graphs, + config_path, + server_policy, + } = settings.mode + else { + panic!("cluster boot must select multi-graph routing"); + }; + let state = omnigraph_server::open_multi_graph_state( + graphs, + Vec::new(), + server_policy.as_ref(), + config_path, + ) + .await + .unwrap(); + let app = build_app(state); + + let response = tower::ServiceExt::oneshot( + app, + Request::builder() + .method(Method::POST) + .uri("/graphs/knowledge/queries/find_person") + .header("content-type", "application/json") + .body(Body::from(json!({"params": {"name": "Ada"}}).to_string())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let value: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(value["rows"][0]["p.name"], "Ada", "{value}"); +} diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 848594a..a817428 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs` (incl. the full-cycle cluster lifecycle with a spawned `--cluster` server — declare→serve→evolve→drift-heal→approved-delete — and applied-policy enforcement over HTTP), `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated); `tests/s3_cluster.rs` (bucket-gated full lifecycle on object storage) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level; incl. cluster-mode boot — converged-dir serving, policy binding wiring, boot refusals), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | @@ -64,7 +64,8 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav CI runs three S3-backed tests against a containerized RustFS server (`.github/workflows/ci.yml` → `rustfs_integration` job): - `cargo test -p omnigraph-engine --test s3_storage` -- `cargo test -p omnigraph-server --test server server_opens_s3_graph_directly_and_serves_snapshot_and_read` +- `cargo test -p omnigraph-server --test s3` (single-graph serving + config-free `--cluster s3://` boot) +- `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket) - `cargo test -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow` Locally, set `OMNIGRAPH_S3_TEST_BUCKET` (and the usual `AWS_*` vars including `AWS_ENDPOINT_URL_S3` for non-AWS) before running. Without those, S3 tests skip gracefully. diff --git a/docs/user/cluster.md b/docs/user/cluster.md index 19755fb..0d6dac5 100644 --- a/docs/user/cluster.md +++ b/docs/user/cluster.md @@ -84,6 +84,12 @@ OMNIGRAPH_SERVER_BEARER_TOKENS_JSON='{"act-reader":"s3cret"}' \ omnigraph-server --cluster company-brain --bind 0.0.0.0:8080 ``` +`--cluster` accepts either a **config directory** (the storage root resolves +through `cluster.yaml`'s `storage:` key) or a **storage-root URI directly** +(`--cluster s3://bucket/prefix`) — config-free serving: a serving box needs +only the URI and credentials, no checkout of the config repo. The ledger and +catalog on the bucket are the deployment artifact. + `--cluster` is an **exclusive boot source**: it cannot be combined with a graph URI, `--target`, or `--config`, and `omnigraph.yaml` is never read in this mode. Routing is always multi-graph: diff --git a/docs/user/deployment.md b/docs/user/deployment.md index 00f8272..ece7b5d 100644 --- a/docs/user/deployment.md +++ b/docs/user/deployment.md @@ -47,10 +47,31 @@ omnigraph-server s3://my-bucket/graphs/example/releases/2026-04-10-v0.1.0 \ ## Cluster Mode in Containers (AWS, Railway) -A cluster-booted deployment serves a **cluster directory** (config + state -ledger + content-addressed catalog + graph data) from a mounted volume — the -one structural difference from the stateless S3 single-graph shape, which -needs no volume at all. The container contract: +A cluster-booted deployment has **two shapes** since the `storage:` root +(RFC-006): + +- **Bucket, no volume (preferred for cloud)** — the cluster's ledger, + catalog, and graph data live under an object-storage root + (`storage: s3://bucket/prefix` in `cluster.yaml`). The server boots + **config-free** from the bare URI; the container needs no volume at all: + + ```bash + docker run -d \ + -e OMNIGRAPH_CLUSTER=s3://my-bucket/clusters/company-brain \ + -e AWS_ACCESS_KEY_ID=... -e AWS_SECRET_ACCESS_KEY=... \ + -e OMNIGRAPH_SERVER_BEARER_TOKEN=... \ + -p 8080:8080 + ``` + + Day-2 runs from any operator checkout of the config repo: + `omnigraph cluster apply --config ./company-brain` (the `storage:` key + routes every stored byte to the bucket), then restart the service. The + state lock is genuinely cross-machine on object storage, so CI and + operator shells contend safely. + +- **Volume (file-rooted)** — the original shape: the whole cluster + directory on a mounted volume. Still fully supported; the container + contract: ```bash docker run -d \ @@ -102,8 +123,6 @@ above). ### Constraints (current honest list) -- **Cluster directories are local-filesystem** — the volume is mandatory; - S3-hosted cluster dirs are not supported. - **No hot reload** — applied changes serve on the next restart. - **Single-writer apply** — run `cluster apply` from one place at a time (the state lock enforces this; CI or one operator shell, not both). diff --git a/docs/user/server.md b/docs/user/server.md index 0922e74..391b7ae 100644 --- a/docs/user/server.md +++ b/docs/user/server.md @@ -16,7 +16,7 @@ Axum 0.8 + tokio + utoipa-generated OpenAPI. **Two modes** (v0.6.0+): single-gra ### Cluster-booted multi mode (Phase 5) -`omnigraph-server --cluster ` boots from the cluster catalog's **applied +`omnigraph-server --cluster ` boots from the cluster catalog's **applied revision** (`state.json` + content-addressed blobs) instead of `omnigraph.yaml` — an exclusive boot source: combining it with ``, `--target`, or `--config` is a startup error, and `omnigraph.yaml` is never @@ -27,7 +27,7 @@ for what is read and the fail-fast readiness rules. `--bind`, Mode inference: -0. CLI `--cluster ` → **multi, cluster-booted** (exclusive) +0. CLI `--cluster ` → **multi, cluster-booted** (exclusive; a scheme-qualified argument reads the ledger straight from the storage root, no local config) 1. CLI positional `` → single 2. CLI `--target ` → single 3. `server.graph` in config → single