From f5b43164b8b64d1c31b1a52b693ecb616760ef22 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 17:39:26 +0300 Subject: [PATCH 1/4] feat(cluster): pub read-only serving-snapshot API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RFC-005 §D2/§D4: read_serving_snapshot reads the applied revision as everything a server needs to boot — graphs at derived roots, stored-query sources read from the content-addressed catalog and re-hashed against the recorded digests, policy blob paths with their applied applies_to bindings. All-or-nothing: missing state, pending recovery sidecars, missing/tampered blobs, pre-5A entries without bindings, and an empty graph set each refuse the snapshot with a remedy; no partial serving. Lock-free by design — the state file is replaced atomically, so the read is a consistent point-in-time ledger. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/lib.rs | 288 ++++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index af2ef81..7703bb8 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -1699,6 +1699,191 @@ pub async fn approve_config_dir( } } +/// One graph in a serving snapshot: its id and on-disk root. +#[derive(Debug, Clone)] +pub struct ServingGraph { + pub graph_id: String, + pub root: PathBuf, +} + +/// One stored query: its graph binding, registry name, and verified source. +#[derive(Debug, Clone)] +pub struct ServingQuery { + pub graph_id: String, + pub name: String, + pub source: String, +} + +/// One policy bundle: its verified catalog blob path and applied bindings +/// (normalized typed refs: `cluster` | `graph.`). +#[derive(Debug, Clone)] +pub struct ServingPolicy { + pub name: String, + pub blob_path: PathBuf, + pub applies_to: Vec, +} + +/// Everything a server needs to boot from the cluster catalog (RFC-005 §D2). +#[derive(Debug, Clone)] +pub struct ServingSnapshot { + pub graphs: Vec, + pub queries: Vec, + pub policies: Vec, +} + +/// Read the applied revision as a serving snapshot — the read-only loader for +/// the Phase-5 server boot. All-or-nothing per RFC-005 §D4: every readiness +/// failure is collected and the whole snapshot refused; no partial serving. +/// Takes no lock: the state file is replaced atomically, so this reads a +/// consistent point-in-time ledger. +pub fn read_serving_snapshot(config_dir: impl AsRef) -> Result> { + let config_dir = config_dir.as_ref().to_path_buf(); + let backend = LocalStateBackend::new(&config_dir); + let mut diagnostics: Vec = Vec::new(); + + // A ledger a sweep is about to rewrite must not start serving. + let sidecars = backend.list_recovery_sidecars(&mut diagnostics); + if !sidecars.is_empty() { + diagnostics.push(Diagnostic::error( + "cluster_recovery_pending", + CLUSTER_RECOVERIES_DIR, + format!( + "{} interrupted operation(s) await recovery; run any state-mutating cluster command (e.g. `cluster apply`) to sweep, then retry", + sidecars.len() + ), + )); + } + + let mut observations = backend.observations(); + let state = match backend.read_state(&mut observations) { + Ok(snapshot) => match snapshot.state { + Some(state) => Some(state), + None => { + diagnostics.push(Diagnostic::error( + "cluster_state_missing", + CLUSTER_STATE_FILE, + "no cluster state ledger; run `cluster import` and `cluster apply` first", + )); + None + } + }, + Err(diagnostic) => { + diagnostics.push(diagnostic); + None + } + }; + let Some(state) = state else { + return Err(diagnostics); + }; + + let mut graphs = Vec::new(); + let mut queries = Vec::new(); + let mut policies = Vec::new(); + for (address, entry) in &state.applied_revision.resources { + match resource_kind(address) { + ResourceKind::Graph(graph_id) => { + graphs.push(ServingGraph { + root: config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + graph_id, + }); + } + ResourceKind::Schema(_) => {} + kind @ ResourceKind::Query { .. } => { + let ResourceKind::Query { graph, name } = &kind else { + unreachable!() + }; + match read_verified_payload(&config_dir, &kind, &entry.digest, address) { + Ok(source) => queries.push(ServingQuery { + graph_id: graph.clone(), + name: name.clone(), + source, + }), + Err(diagnostic) => diagnostics.push(diagnostic), + } + } + kind @ ResourceKind::Policy(_) => { + let ResourceKind::Policy(name) = &kind else { + unreachable!() + }; + let Some(applies_to) = entry.applies_to.clone() else { + diagnostics.push(Diagnostic::error( + "policy_bindings_missing", + address.clone(), + "no applied applies_to bindings recorded (ledger predates binding metadata); re-run `cluster apply` to backfill", + )); + continue; + }; + match read_verified_payload(&config_dir, &kind, &entry.digest, address) { + Ok(_) => policies.push(ServingPolicy { + name: name.clone(), + blob_path: payload_path(&config_dir, &kind, &entry.digest) + .expect("policy kind always has a payload path"), + applies_to, + }), + Err(diagnostic) => diagnostics.push(diagnostic), + } + } + ResourceKind::Unknown => {} + } + } + + if graphs.is_empty() { + diagnostics.push(Diagnostic::error( + "cluster_empty", + CLUSTER_STATE_FILE, + "the applied revision records no graphs; apply a cluster with at least one graph before serving from it", + )); + } + if has_errors(&diagnostics) { + return Err(diagnostics); + } + Ok(ServingSnapshot { + graphs, + queries, + policies, + }) +} + +/// Read a catalog blob and verify it against the recorded digest. +fn read_verified_payload( + config_dir: &Path, + kind: &ResourceKind, + digest: &str, + address: &str, +) -> Result { + let path = payload_path(config_dir, kind, digest) + .expect("query/policy kinds always have a payload path"); + let bytes = fs::read(&path).map_err(|err| { + Diagnostic::error( + "catalog_payload_missing", + address, + format!( + "catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart", + display_path(&path) + ), + ) + })?; + if sha256_hex(&bytes) != digest { + return Err(Diagnostic::error( + "catalog_payload_digest_mismatch", + address, + format!( + "catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart", + display_path(&path) + ), + )); + } + String::from_utf8(bytes).map_err(|err| { + Diagnostic::error( + "catalog_payload_invalid", + address, + format!("catalog blob is not valid UTF-8: {err}"), + ) + }) +} + pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; @@ -7272,6 +7457,109 @@ policies: ); } + // ---- serving snapshot (5B read-only loader) ---- + + #[tokio::test] + async fn serving_snapshot_reads_converged_cluster() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + + let snapshot = read_serving_snapshot(dir.path()).expect("converged cluster must serve"); + assert_eq!(snapshot.graphs.len(), 1); + assert_eq!(snapshot.graphs[0].graph_id, "knowledge"); + assert!(snapshot.graphs[0].root.ends_with("graphs/knowledge.omni")); + assert_eq!(snapshot.queries.len(), 1); + assert_eq!(snapshot.queries[0].name, "find_person"); + assert!(snapshot.queries[0].source.contains("query find_person")); + assert_eq!(snapshot.policies.len(), 1); + assert_eq!(snapshot.policies[0].applies_to, vec!["graph.knowledge"]); + assert!(snapshot.policies[0].blob_path.exists()); + } + + #[test] + fn serving_snapshot_refuses_missing_state() { + let dir = fixture(); + let err = read_serving_snapshot(dir.path()).unwrap_err(); + assert!( + err.iter().any(|diagnostic| diagnostic.code == "cluster_state_missing"), + "{err:?}" + ); + } + + #[tokio::test] + async fn serving_snapshot_refuses_pending_recovery() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + apply_config_dir(dir.path()).await; + write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SERVE"); + + let err = read_serving_snapshot(dir.path()).unwrap_err(); + assert!( + err.iter().any(|diagnostic| diagnostic.code == "cluster_recovery_pending"), + "{err:?}" + ); + } + + #[tokio::test] + async fn serving_snapshot_refuses_tampered_blob_and_stripped_bindings() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + apply_config_dir(dir.path()).await; + // Tamper with the query blob... + let snapshot = read_serving_snapshot(dir.path()).unwrap(); + let desired = validate_config_dir(dir.path()); + let query_digest = &desired.resource_digests["query.knowledge.find_person"]; + let blob = dir + .path() + .join(CLUSTER_RESOURCES_DIR) + .join("query/knowledge/find_person") + .join(format!("{query_digest}.gq")); + fs::write(&blob, "tampered").unwrap(); + // ...and strip the policy bindings (pre-5A ledger). + let mut state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(), + ) + .unwrap(); + state["applied_revision"]["resources"]["policy.base"] + .as_object_mut() + .unwrap() + .remove("applies_to"); + fs::write( + dir.path().join(CLUSTER_STATE_FILE), + serde_json::to_string_pretty(&state).unwrap(), + ) + .unwrap(); + + let err = read_serving_snapshot(dir.path()).unwrap_err(); + assert!( + err.iter() + .any(|diagnostic| diagnostic.code == "catalog_payload_digest_mismatch"), + "{err:?}" + ); + assert!( + err.iter().any(|diagnostic| diagnostic.code == "policy_bindings_missing"), + "{err:?}" + ); + let _ = snapshot; // the pre-tamper read succeeded + } + + #[test] + fn serving_snapshot_refuses_empty_cluster() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); // state exists, no graphs + + let err = read_serving_snapshot(dir.path()).unwrap_err(); + assert!( + err.iter().any(|diagnostic| diagnostic.code == "cluster_empty"), + "{err:?}" + ); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture(); From 948a54daa7e27cf370b255a448b44e2be7e6bd83 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 17:48:10 +0300 Subject: [PATCH 2/4] feat(server): boot from cluster state via --cluster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RFC-005 §D1/§D2: omnigraph-server --cluster is rule 0 of the mode inference — an exclusive boot source (hard error when combined with a graph URI, --target, or --config) that never opens omnigraph.yaml, not even the implicit current-directory search. The cluster branch reads the applied revision through omnigraph-cluster's serving-snapshot API and feeds the EXISTING multi-graph pipeline: GraphStartupConfig per recorded graph at its derived root, stored queries built via QueryRegistry::from_specs from verified blob content (expose-all — the §D5 bridge until Phase 6 policy-owned exposure), cluster-bound policy bundles as the server-level Cedar engine and graph-bound bundles per graph, straight from the content-addressed blob paths. Multiple bundles binding one scope refuse boot (one-bundle-per-scope is the serving pipeline's shape; stacking is a later slice). Everything downstream — parallel opens, query type-checking, registry, routing, auth, OpenAPI — is reused unchanged; cluster mode is a new source, not a new pipeline. First server->cluster crate dependency: read-only types + one fn; omnigraph-cluster stays HTTP-free. open_multi_graph_state goes pub for integration tests. Co-Authored-By: Claude Fable 5 --- Cargo.lock | 1 + crates/omnigraph-server/Cargo.toml | 1 + crates/omnigraph-server/src/lib.rs | 133 +++++++++++- crates/omnigraph-server/src/main.rs | 7 + crates/omnigraph-server/tests/server.rs | 260 ++++++++++++++++++++++-- 5 files changed, 378 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6a1b8a..675fad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4673,6 +4673,7 @@ dependencies = [ "futures", "lance", "lance-index", + "omnigraph-cluster", "omnigraph-compiler", "omnigraph-engine", "omnigraph-policy", diff --git a/crates/omnigraph-server/Cargo.toml b/crates/omnigraph-server/Cargo.toml index 5f87082..5393221 100644 --- a/crates/omnigraph-server/Cargo.toml +++ b/crates/omnigraph-server/Cargo.toml @@ -22,6 +22,7 @@ aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"] omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" } omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" } +omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.2" } axum = { workspace = true } clap = { workspace = true } color-eyre = { workspace = true } diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 60ebef3..3b9ff1d 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -14,7 +14,7 @@ pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, Regi use crate::queries::{QueryRegistry, check, format_check_breakages}; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs; use std::io; use std::io::Write; @@ -40,7 +40,7 @@ use axum::middleware::{self, Next}; use axum::response::{IntoResponse, Response}; use axum::routing::{delete, get, post}; use axum::{Json, Router}; -use color_eyre::eyre::{Result, WrapErr, bail}; +use color_eyre::eyre::{Result, WrapErr, bail, eyre}; pub use config::{ AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings, ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig, @@ -888,13 +888,125 @@ fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> St format!("graph '{label}': stored-query registry failed to load:\n {joined}") } +/// Build serving settings from a cluster directory's applied revision +/// (RFC-005 §D2): graphs at derived roots, stored queries from verified +/// catalog blob content, policy bundles from blob paths with their applied +/// bindings. Always multi-graph routing. The unauthenticated/env handling +/// matches the omnigraph.yaml path. +fn load_cluster_settings( + cluster_dir: &PathBuf, + cli_bind: Option, + cli_allow_unauthenticated: bool, +) -> Result { + let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).map_err(|diagnostics| { + let details = diagnostics + .iter() + .map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message)) + .collect::>() + .join("\n "); + eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display()) + })?; + + // Bindings -> Cedar slots. The serving pipeline loads one bundle per + // graph plus one server-level bundle; stacked bundles per scope are a + // later slice — refuse loudly rather than silently merging policy. + let mut server_policy_file: Option = None; + let mut graph_policy_files: BTreeMap = BTreeMap::new(); + for policy in &snapshot.policies { + for binding in &policy.applies_to { + if binding == "cluster" { + if server_policy_file.replace(policy.blob_path.clone()).is_some() { + bail!( + "multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)" + ); + } + } else if let Some(graph_id) = binding.strip_prefix("graph.") { + if graph_policy_files + .insert(graph_id.to_string(), policy.blob_path.clone()) + .is_some() + { + bail!( + "multiple policy bundles bind graph '{graph_id}'; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)" + ); + } + } else { + bail!("unrecognized policy binding '{binding}' in the applied revision"); + } + } + } + + let mut graphs = Vec::new(); + for graph in &snapshot.graphs { + let specs: Vec = snapshot + .queries + .iter() + .filter(|query| query.graph_id == graph.graph_id) + .map(|query| queries::RegistrySpec { + name: query.name.clone(), + source: query.source.clone(), + // The §D5 bridge: the cluster registry has no expose flag + // (exposure becomes a policy decision in Phase 6) — cluster + // mode lists every stored query. + expose: true, + tool_name: None, + }) + .collect(); + let registry = QueryRegistry::from_specs(specs).map_err(|errors| { + let details = errors + .iter() + .map(|error| error.to_string()) + .collect::>() + .join("\n "); + eyre!( + "stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart" + ) + })?; + graphs.push(GraphStartupConfig { + graph_id: graph.graph_id.clone(), + uri: graph.root.to_string_lossy().to_string(), + policy_file: graph_policy_files.get(&graph.graph_id).cloned(), + queries: registry, + }); + } + + let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED") + .ok() + .map(|v| { + let trimmed = v.trim(); + !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false") + }) + .unwrap_or(false); + + Ok(ServerConfig { + mode: ServerConfigMode::Multi { + graphs, + config_path: cluster_dir.clone(), + server_policy_file, + }, + bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()), + allow_unauthenticated: cli_allow_unauthenticated || env_unauth, + }) +} + pub fn load_server_settings( config_path: Option<&PathBuf>, + cli_cluster: Option<&PathBuf>, cli_uri: Option, cli_target: Option, cli_bind: Option, cli_allow_unauthenticated: bool, ) -> Result { + // Rule 0 (RFC-005): --cluster is an exclusive boot source. It is checked + // before anything reads omnigraph.yaml — in cluster mode that file is + // never opened, not even the implicit current-directory search. + if let Some(cluster_dir) = cli_cluster { + if cli_uri.is_some() || cli_target.is_some() || config_path.is_some() { + bail!( + "--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)" + ); + } + return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated); + } let config = load_config(config_path)?; let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string()); // Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips @@ -1275,7 +1387,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> { /// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this /// trades startup latency for a small amount of concurrent S3 / Lance /// open pressure. -async fn open_multi_graph_state( +pub async fn open_multi_graph_state( graphs: Vec, tokens: Vec<(String, String)>, server_policy_file: Option<&PathBuf>, @@ -3255,7 +3367,7 @@ server: ) .unwrap(); - let settings = load_server_settings(Some(&config), None, None, None, false).unwrap(); + let settings = load_server_settings(Some(&config), None, None, None, None, false).unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { assert_eq!(uri, "/tmp/demo.omni"); @@ -3285,6 +3397,7 @@ server: let settings = load_server_settings( Some(&config), + None, Some("/tmp/override.omni".to_string()), None, Some("0.0.0.0:9999".to_string()), @@ -3321,7 +3434,7 @@ server: .unwrap(); let settings = - load_server_settings(Some(&config), None, Some("dev".to_string()), None, false) + load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false) .unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { @@ -3334,7 +3447,7 @@ server: #[test] fn server_settings_require_uri_from_cli_or_config() { - let error = load_server_settings(None, None, None, None, false).unwrap_err(); + let error = load_server_settings(None, None, None, None, None, false).unwrap_err(); assert!( error.to_string().contains("no graph to serve"), "expected mode-inference error, got: {error}", @@ -3501,7 +3614,7 @@ server: // Truthy values flip Open mode on, even with CLI flag off. for value in ["1", "true", "yes", "TRUE", "anything"] { let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]); - let settings = load_server_settings(Some(&config_path), None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false) .expect("settings load should succeed"); assert!( settings.allow_unauthenticated, @@ -3512,7 +3625,7 @@ server: // Falsy values keep refusal behavior, even with CLI flag off. for value in ["0", "false", "FALSE", ""] { let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]); - let settings = load_server_settings(Some(&config_path), None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false) .expect("settings load should succeed"); assert!( !settings.allow_unauthenticated, @@ -3522,7 +3635,7 @@ server: // Unset env var: also false. let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]); - let settings = load_server_settings(Some(&config_path), None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false) .expect("settings load should succeed"); assert!( !settings.allow_unauthenticated, @@ -3533,7 +3646,7 @@ server: // CLI flag wins even when env is falsy — `serve()` honors the // OR of both inputs. let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]); - let settings = load_server_settings(Some(&config_path), None, None, None, true) + let settings = load_server_settings(Some(&config_path), None, None, None, None, true) .expect("settings load should succeed"); assert!( settings.allow_unauthenticated, diff --git a/crates/omnigraph-server/src/main.rs b/crates/omnigraph-server/src/main.rs index 4e1c256..c71ea2f 100644 --- a/crates/omnigraph-server/src/main.rs +++ b/crates/omnigraph-server/src/main.rs @@ -14,6 +14,12 @@ struct Cli { target: Option, #[arg(long)] config: Option, + /// Boot from a cluster directory (the applied revision in + /// __cluster/state.json + content-addressed catalog blobs) instead of + /// omnigraph.yaml. Exclusive: cannot combine with , --target, or + /// --config. + #[arg(long)] + cluster: Option, #[arg(long)] bind: Option, /// Run without bearer tokens and without a policy file (MR-723). @@ -32,6 +38,7 @@ async fn main() -> Result<()> { let cli = Cli::parse(); let settings: ServerConfig = load_server_settings( cli.config.as_ref(), + cli.cluster.as_ref(), cli.uri, cli.target, cli.bind, diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 4a49a14..bf99b8d 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -5508,7 +5508,7 @@ graphs: "#, ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, false).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, false).unwrap_err(); assert!( err.to_string().contains("invalid graph id 'policies'"), "expected reserved-name rejection, got: {err}" @@ -5575,6 +5575,7 @@ graphs: #[test] fn mode_inference_cli_uri_is_single() { let settings = load_server_settings( + None, None, Some("/tmp/cli.omni".to_string()), None, @@ -5605,7 +5606,7 @@ graphs: ) .unwrap(); let settings = - load_server_settings(Some(&config_path), None, Some("alpha".into()), None, true) + load_server_settings(Some(&config_path), None, None, Some("alpha".into()), None, true) .unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"), @@ -5631,7 +5632,7 @@ server: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"), ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"), @@ -5654,7 +5655,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); match settings.mode { ServerConfigMode::Multi { graphs, .. } => { let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect(); @@ -5680,7 +5681,7 @@ graphs: "#, ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); let msg = err.to_string(); assert!( msg.contains("top-level") && msg.contains("policy.file") && msg.contains("not honored"), @@ -5708,7 +5709,7 @@ graphs: "queries:\n q:\n file: ./q.gq\ngraphs:\n alpha:\n uri: /tmp/alpha.omni\n", ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); let msg = err.to_string(); assert!( msg.contains("queries") && msg.contains("not honored"), @@ -5729,7 +5730,7 @@ graphs: ) .unwrap(); let err = - load_server_settings(Some(&config_path), None, Some("prod".to_string()), None, true) + load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true) .unwrap_err(); let msg = err.to_string(); assert!( @@ -5756,7 +5757,7 @@ graphs: ) .unwrap(); let settings = - load_server_settings(Some(&config_path), None, Some("prod".to_string()), None, true) + load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true) .unwrap(); match settings.mode { ServerConfigMode::Single { @@ -5795,7 +5796,7 @@ graphs: ), ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); match settings.mode { ServerConfigMode::Multi { graphs, .. } => { assert_eq!(graphs[0].uri, graph.to_string_lossy()); @@ -5807,7 +5808,7 @@ graphs: /// Rule 5: nothing → error with migration hint. #[test] fn mode_inference_no_inputs_errors_with_migration_hint() { - let err = load_server_settings(None, None, None, None, true).unwrap_err(); + let err = load_server_settings(None, None, None, None, None, true).unwrap_err(); let msg = err.to_string(); assert!( msg.contains("no graph to serve"), @@ -5822,7 +5823,7 @@ graphs: let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); assert!(err.to_string().contains("no graph to serve")); } @@ -5843,6 +5844,7 @@ graphs: .unwrap(); let settings = load_server_settings( Some(&config_path), + None, Some("/tmp/cli-override.omni".to_string()), None, None, @@ -5880,7 +5882,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); let graphs = match settings.mode { ServerConfigMode::Multi { graphs, .. } => graphs, _ => panic!("expected Multi"), @@ -5914,7 +5916,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); match settings.mode { ServerConfigMode::Multi { server_policy_file, .. @@ -6194,7 +6196,7 @@ graphs: .unwrap(); let settings: ServerConfig = - load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); assert!(matches!(settings.mode, ServerConfigMode::Multi { .. })); match settings.mode { @@ -6207,3 +6209,233 @@ graphs: } } } + +// ---- Phase 5: cluster-mode boot (RFC-005) ---- + +/// Build and converge a real cluster directory: cluster.yaml + schema + +/// stored query (+ optional policies), then `import` + `apply` so the +/// catalog and state ledger exist exactly as an operator would have them. +async fn converged_cluster_dir(policies_yaml: &str) -> tempfile::TempDir { + let temp = tempfile::tempdir().unwrap(); + fs::write( + temp.path().join("people.pg"), + "\nnode Person {\n name: String @key\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("cluster.yaml"), + format!( + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +{policies_yaml}"# + ), + ) + .unwrap(); + let import = omnigraph_cluster::import_config_dir(temp.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let apply = omnigraph_cluster::apply_config_dir(temp.path()).await; + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + temp +} + +fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result { + omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true) +} + +#[tokio::test] +async fn cluster_boot_serves_applied_state() { + let temp = converged_cluster_dir("").await; + let settings = cluster_settings(temp.path()).unwrap(); + let omnigraph_server::ServerConfigMode::Multi { + graphs, + config_path, + server_policy_file, + } = settings.mode + else { + panic!("cluster boot must select multi-graph routing"); + }; + assert_eq!(graphs.len(), 1); + assert_eq!(graphs[0].graph_id, "knowledge"); + assert!(server_policy_file.is_none()); + + let state = + omnigraph_server::open_multi_graph_state(graphs, Vec::new(), None, config_path) + .await + .unwrap(); + let app = build_app(state); + + // The management surface keeps its closed-by-default contract: without a + // cluster-scoped policy bundle there is no server-level Cedar engine, so + // GET /graphs refuses even in cluster mode. + let (status, body) = json_response( + &app, + Request::builder().uri("/graphs").body(Body::empty()).unwrap(), + ) + .await; + assert_eq!(status, StatusCode::FORBIDDEN, "{body}"); + + let (status, body) = json_response( + &app, + Request::builder() + .uri("/graphs/knowledge/queries") + .body(Body::empty()) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); + assert!( + body["queries"] + .as_array() + .unwrap() + .iter() + .any(|q| q["name"] == "find_person"), + "{body}" + ); + + let (status, body) = json_response( + &app, + Request::builder() + .method(Method::POST) + .uri("/graphs/knowledge/queries/find_person") + .header("content-type", "application/json") + .body(Body::from(r#"{"params":{"name":"nobody"}}"#)) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); +} + +#[tokio::test] +async fn cluster_boot_wires_policy_bindings_into_cedar_slots() { + let temp = tempfile::tempdir().unwrap(); + drop(temp); + let policy_block = r#"policies: + graph_rules: + file: ./graph.policy.yaml + applies_to: [knowledge] + cluster_rules: + file: ./cluster.policy.yaml + applies_to: [cluster] +"#; + let temp = { + let temp = tempfile::tempdir().unwrap(); + fs::write( + temp.path().join("people.pg"), + "\nnode Person {\n name: String @key\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("graph.policy.yaml"), + permit_all_policy_yaml(&["default"]), + ) + .unwrap(); + fs::write( + temp.path().join("cluster.policy.yaml"), + permit_all_policy_yaml(&["default"]).replace("protected_branches: [main]\n", "protected_branches: [main]\nkind: server\n"), + ) + .unwrap(); + fs::write( + temp.path().join("cluster.yaml"), + format!( + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +{policy_block}"# + ), + ) + .unwrap(); + let import = omnigraph_cluster::import_config_dir(temp.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let apply = omnigraph_cluster::apply_config_dir(temp.path()).await; + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + temp + }; + + let settings = cluster_settings(temp.path()).unwrap(); + let omnigraph_server::ServerConfigMode::Multi { + graphs, + server_policy_file, + .. + } = settings.mode + else { + panic!("cluster boot must select multi-graph routing"); + }; + let graph_policy = graphs[0].policy_file.as_ref().expect("graph-bound bundle"); + assert!( + graph_policy + .to_string_lossy() + .contains("__cluster/resources/policy/graph_rules/"), + "{graph_policy:?}" + ); + let server_policy = server_policy_file.expect("cluster-bound bundle"); + assert!( + server_policy + .to_string_lossy() + .contains("__cluster/resources/policy/cluster_rules/"), + "{server_policy:?}" + ); +} + +#[tokio::test] +async fn cluster_boot_refusals() { + // Mutual exclusion with --config / URI. + let temp = converged_cluster_dir("").await; + let dir = temp.path().to_path_buf(); + let err = omnigraph_server::load_server_settings( + Some(&dir.join("omnigraph.yaml")), + Some(&dir), + None, + None, + None, + true, + ) + .unwrap_err(); + assert!(err.to_string().contains("exclusive boot source"), "{err}"); + let err = omnigraph_server::load_server_settings( + None, + Some(&dir), + Some("file:///tmp/x.omni".to_string()), + None, + None, + true, + ) + .unwrap_err(); + assert!(err.to_string().contains("exclusive boot source"), "{err}"); + + // Tampered catalog blob refuses boot with the remedy. + let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person"); + let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path(); + fs::write(&blob, "tampered").unwrap(); + let err = cluster_settings(&dir).unwrap_err(); + assert!( + err.to_string().contains("catalog_payload_digest_mismatch"), + "{err}" + ); + assert!(err.to_string().contains("cluster refresh"), "{err}"); + + // Missing state refuses with the import/apply remedy. + let empty = tempfile::tempdir().unwrap(); + let err = cluster_settings(empty.path()).unwrap_err(); + assert!(err.to_string().contains("cluster_state_missing"), "{err}"); +} From f3eb60fa4e8b2f0138501780cc27f6ce5a31f09e Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 17:51:40 +0300 Subject: [PATCH 3/4] test(cli): applied-means-serving system e2e The Phase-5 contract end to end with real binaries: cluster import + apply via the CLI, seed a row through the graph plane, boot omnigraph-server with --cluster (no omnigraph.yaml anywhere), and the applied stored query serves the row over HTTP through the multi-graph routes. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/tests/support/mod.rs | 6 ++ crates/omnigraph-cli/tests/system_local.rs | 81 ++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/crates/omnigraph-cli/tests/support/mod.rs b/crates/omnigraph-cli/tests/support/mod.rs index b62d861..5c17182 100644 --- a/crates/omnigraph-cli/tests/support/mod.rs +++ b/crates/omnigraph-cli/tests/support/mod.rs @@ -212,6 +212,12 @@ pub fn spawn_server_with_config(config: &Path) -> TestServer { spawn_server_process(command) } +pub fn spawn_server_with_cluster(cluster_dir: &Path) -> TestServer { + let mut command = server_process(); + command.arg("--cluster").arg(cluster_dir).arg("--unauthenticated"); + spawn_server_process(command) +} + pub fn spawn_server_with_config_env(config: &Path, envs: &[(&str, &str)]) -> TestServer { let mut command = server_process(); command.arg("--config").arg(config); diff --git a/crates/omnigraph-cli/tests/system_local.rs b/crates/omnigraph-cli/tests/system_local.rs index 4fc3e9a..81476b0 100644 --- a/crates/omnigraph-cli/tests/system_local.rs +++ b/crates/omnigraph-cli/tests/system_local.rs @@ -1633,3 +1633,84 @@ fn local_cli_actor_flag_overrides_config_actor() { "expected 'denied' when --as overrides config to bruno, got: {stderr}" ); } + +/// Phase 5 (RFC-005): "applied means serving" — converge a cluster with the +/// CLI, boot the real omnigraph-server binary with --cluster, and serve the +/// applied stored query over HTTP with zero omnigraph.yaml involvement. +#[test] +fn local_cluster_apply_then_server_boots_from_cluster_state() { + let temp = tempfile::tempdir().unwrap(); + std::fs::write( + temp.path().join("people.pg"), + "\nnode Person {\n name: String @key\n}\n", + ) + .unwrap(); + std::fs::write( + temp.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + std::fs::write( + temp.path().join("cluster.yaml"), + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +"#, + ) + .unwrap(); + for command in ["import", "apply"] { + let output = cli() + .arg("cluster") + .arg(command) + .arg("--config") + .arg(temp.path()) + .arg("--json") + .output() + .unwrap(); + assert!(output.status.success(), "cluster {command} failed"); + } + // Seed a row through the graph plane so the stored query has data. + let data = temp.path().join("seed.jsonl"); + std::fs::write(&data, "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n").unwrap(); + let output = cli() + .arg("load") + .arg("--data") + .arg(&data) + .arg(temp.path().join("graphs/knowledge.omni")) + .output() + .unwrap(); + assert!(output.status.success(), "graph load failed"); + + let server = spawn_server_with_cluster(temp.path()); + let client = reqwest::blocking::Client::new(); + let queries: serde_json::Value = client + .get(format!("{}/graphs/knowledge/queries", server.base_url)) + .send() + .unwrap() + .json() + .unwrap(); + assert!( + queries["queries"] + .as_array() + .unwrap() + .iter() + .any(|q| q["name"] == "find_person"), + "{queries}" + ); + let response = client + .post(format!( + "{}/graphs/knowledge/queries/find_person", + server.base_url + )) + .json(&serde_json::json!({"params": {"name": "Ada"}})) + .send() + .unwrap(); + assert!(response.status().is_success(), "{:?}", response.status()); + let body: serde_json::Value = response.json().unwrap(); + assert!(body.to_string().contains("Ada"), "{body}"); +} From 711865e6f117b0f0c1ff09eb32c60a36dc0e3f1f Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 17:55:15 +0300 Subject: [PATCH 4/4] docs(cluster,server): the Phase 5 mode switch; retire applied-not-serving caveats MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The standing caveat ('applied means recorded in the cluster catalog — nothing more; the server still boots from omnigraph.yaml') retires: cluster docs gain the 'Serving from the cluster' section (exclusivity, applied- revision serving, fail-fast readiness, restart-to-pick-up, expose-all bridge), server.md gains mode-inference rule 0 and the cluster-booted multi mode, deployment.md the boot-source choice, and the CLI's apply note plus the cli-reference cluster row (stale back to Stage 3A) now describe the full convergence surface. RFC-005 flips to Landed with four implementation deviations recorded. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 2 +- docs/dev/rfc-005-server-cluster-boot.md | 3 +- docs/dev/testing.md | 4 +-- docs/user/cli-reference.md | 2 +- docs/user/cluster-config.md | 46 ++++++++++++++++++++++--- docs/user/deployment.md | 8 +++++ docs/user/server.md | 16 +++++++-- 7 files changed, 69 insertions(+), 12 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 673adb7..da4f8e8 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -856,7 +856,7 @@ fn print_cluster_apply_human(output: &ApplyOutput) { " state: revision {}, converged: {}, written: {}", state.state_revision, output.converged, output.state_written ); - println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml"); + println!(" note: cluster-booted servers (--cluster) serve this on their next restart; omnigraph.yaml deployments are unaffected"); } print_cluster_diagnostics(&output.diagnostics); } diff --git a/docs/dev/rfc-005-server-cluster-boot.md b/docs/dev/rfc-005-server-cluster-boot.md index 81d5129..85df875 100644 --- a/docs/dev/rfc-005-server-cluster-boot.md +++ b/docs/dev/rfc-005-server-cluster-boot.md @@ -1,6 +1,7 @@ # RFC: Server Boots from Cluster State — Phase 5 of the Cluster Control Plane -**Status:** Proposed +**Status:** Landed (5A policy bindings #175; 5B/5C the `--cluster` boot mode — one PR) +**Implementation deviations:** (1) cluster mode reuses `ServerConfigMode::Multi` (a new settings *source*, not a new enum variant; `config_path` carries the cluster dir). (2) Stored queries load via `QueryRegistry::from_specs` from verified blob *content*, not blob paths. (3) More than one policy bundle binding a single scope is a boot error (the serving pipeline holds one bundle per graph + one server-level; stacking is a later slice). (4) `GET /graphs` keeps its closed-by-default contract — without a cluster-bound bundle there is no server-level Cedar engine, so enumeration refuses. **Date:** 2026-06-10 **Builds on:** Phase 4 complete ([rfc-004-cluster-graph-schema-apply.md](rfc-004-cluster-graph-schema-apply.md), Landed): `cluster apply` converges graphs, schemas, stored queries, and policies into the cluster catalog. Normative context: [cluster-config-specs.md](cluster-config-specs.md) (the migration model's "window 2"), [cluster-axioms.md](cluster-axioms.md) (axiom 15), [cluster-config-implementation-spec.md](cluster-config-implementation-spec.md) (Phase 5 rollout, Compatibility Stance #7–#9, exit criterion 7). **Target release:** unversioned (phased — see Sequencing). diff --git a/docs/dev/testing.md b/docs/dev/testing.md index a7a6cb3..eba70c9 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,8 +8,8 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill) | -| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) | +| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level; incl. cluster-mode boot — converged-dir serving, policy binding wiring, boot refusals), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | The engine's `tests/` is the principal coverage surface; most graph-shaped behavior is exercised there. diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 9dc8a25..6d864cc 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -19,7 +19,7 @@ Top-level command families and subcommands. Graph-targeting commands accept eith | `commit list \| show` | inspect commit graph | | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | -| `cluster validate \| plan \| apply \| approve \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` and annotates each change with its apply disposition; `apply` executes the config-only (stored-query/policy) subset into the content-addressed local catalog under `__cluster/resources/` — graph/schema changes are deferred loudly, and nothing applied serves traffic (the server still boots from `omnigraph.yaml`); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock ` manually removes a held local state lock by exact id. No graph-manifest movement, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 3A | +| `cluster validate \| plan \| apply \| approve \| status \| refresh \| import \| force-unlock` | declarative cluster control plane. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`, annotates dispositions, and embeds real schema-migration previews; `apply` converges the cluster — stored-query/policy catalog writes (content-addressed under `__cluster/resources/`), graph creates, schema updates (soft drops only; `--as` records the actor), and graph deletes behind a digest-bound approval from `cluster approve --as `; what apply converges is what an `omnigraph-server --cluster ` deployment serves on its next restart (omnigraph.yaml deployments are unaffected); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock ` manually removes a held local state lock by exact id | | `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) | | `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 284bfbf..5c51b1f 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -1,6 +1,6 @@ # Cluster Config -**Status:** Stage 4C — Phase 4 complete (graph create, schema apply, gated graph delete). +**Status:** Phase 5 — cluster-booted serving (`omnigraph-server --cluster`). Cluster config is the future control-plane configuration surface for a whole OmniGraph deployment. In this stage, OmniGraph can validate a local @@ -190,10 +190,12 @@ Deletes remove the resource from state; their old payload blobs stay on disk (garbage collection is a later stage). Re-running a converged apply is a no-op: no state write, no revision change (`state_written: false`). -**Applied means recorded in the cluster catalog — nothing more.** The server -still boots from `omnigraph.yaml`; no query or policy applied here serves -traffic until the server-boot stage ships, as an explicit per-deployment mode -switch. +**Applied means serving — for deployments that opt in.** A server started +with `--cluster ` boots from the applied revision (see +[Serving from the cluster](#serving-from-the-cluster-the-mode-switch)); it +picks up newly applied state on its next restart. Deployments still booting +from `omnigraph.yaml` are untouched: for them, applied means recorded in the +catalog, nothing more. ### Graph creation @@ -305,6 +307,40 @@ fully converges. The `graph.` composite digest is recomputed from state's own schema/query digests after each apply, so applied query changes converge without graph movement. +## Serving from the cluster (the mode switch) + +```bash +omnigraph-server --cluster ./company-brain --bind 0.0.0.0:8080 +``` + +`--cluster ` is an **exclusive boot source** (axiom 15): it cannot +combine with a graph URI, `--target`, or `--config`, and in this mode +`omnigraph.yaml` is never read — not for graphs, not for queries, not for +policies. The server serves the **applied revision**: graph roots recorded in +`state.json`, stored-query and policy content from the content-addressed +catalog at the applied digests (re-verified at boot), and policy bundles +wired by their applied `applies_to` bindings — `cluster`-bound bundles become +the server-level Cedar engine, graph-bound bundles attach per graph. +Un-applied config drift never leaks into serving; `cluster plan` is where +drift is visible. Routing is always multi-graph (`/graphs/{id}/...`). Bearer +tokens and the bind address stay process-level (flags/env) — they are +per-replica facts, not cluster facts. + +Boot is fail-fast: missing or unreadable state, pending recovery sidecars, +missing/tampered catalog blobs, policy entries without binding metadata +(pre-binding ledgers — re-run `cluster apply`), an empty graph set, more than +one policy bundle binding a single scope (split or merge bundles; stacked +scopes are a later stage), unopenable graph roots, and stored queries that no +longer type-check all refuse startup with a remedy. A held state lock is +*not* an error — boot reads the atomically-replaced state file without +locking. + +Serving is static per process: the server reads the applied revision once at +startup, so picking up newly applied state means restarting it. Stored +queries are all listed in `GET /queries` in cluster mode (the cluster +registry has no expose flag; exposure becomes a policy decision in a later +phase). + ## Status `cluster status` reads the same local JSON state ledger and prints what the diff --git a/docs/user/deployment.md b/docs/user/deployment.md index 9a4466c..328784f 100644 --- a/docs/user/deployment.md +++ b/docs/user/deployment.md @@ -13,6 +13,14 @@ Omnigraph supports two broad deployment shapes: The server binary and container image expose the same HTTP surface. +The server also has two **boot sources**: `omnigraph.yaml` (graph targets +declared in the per-operator config) or a **cluster directory** +(`omnigraph-server --cluster `), which serves the cluster control +plane's applied revision — see +[cluster-config.md](cluster-config.md#serving-from-the-cluster-the-mode-switch). +The two are exclusive per deployment; switching is a restart with a different +flag. + ## Binary Deployment Build or install: diff --git a/docs/user/server.md b/docs/user/server.md index 67b5afe..60988ca 100644 --- a/docs/user/server.md +++ b/docs/user/server.md @@ -1,6 +1,6 @@ # HTTP Server (`omnigraph-server`) -Axum 0.8 + tokio + utoipa-generated OpenAPI. **Two modes** (v0.6.0+): single-graph (legacy) and multi-graph (MR-668). Mode is inferred from CLI args + config shape. +Axum 0.8 + tokio + utoipa-generated OpenAPI. **Two modes** (v0.6.0+): single-graph (legacy) and multi-graph (MR-668), with **two boot sources** for multi mode: `omnigraph.yaml` or — exclusively — a cluster directory (`--cluster`, RFC-005). Mode is inferred from CLI args + config shape. ## Modes @@ -14,8 +14,20 @@ Axum 0.8 + tokio + utoipa-generated OpenAPI. **Two modes** (v0.6.0+): single-gra `omnigraph-server --config omnigraph.yaml` with a non-empty `graphs:` map and **no** single-mode selector (no `server.graph`, no ``, no `--target`). The server opens every configured graph in parallel at startup (bounded concurrency = 4, fail-fast on the first open error). Routes are nested under `/graphs/{graph_id}/...`. Bare flat paths return 404 in multi mode. -Mode inference (four-rule matrix): +### Cluster-booted multi mode (Phase 5) +`omnigraph-server --cluster ` boots from the cluster catalog's **applied +revision** (`state.json` + content-addressed blobs) instead of +`omnigraph.yaml` — an exclusive boot source: combining it with ``, +`--target`, or `--config` is a startup error, and `omnigraph.yaml` is never +read in this mode. Always multi-graph routing. See +[cluster-config.md](cluster-config.md#serving-from-the-cluster-the-mode-switch) +for what is read and the fail-fast readiness rules. `--bind`, +`--unauthenticated`, and the bearer-token env vars work identically. + +Mode inference: + +0. CLI `--cluster ` → **multi, cluster-booted** (exclusive) 1. CLI positional `` → single 2. CLI `--target ` → single 3. `server.graph` in config → single