mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
Merge pull request #176 from ModernRelay/feat/server-cluster-boot-5b
feat(server,cluster): Phase 5 — omnigraph-server boots from cluster state
This commit is contained in:
commit
af6a1096b0
15 changed files with 822 additions and 36 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -4673,6 +4673,7 @@ dependencies = [
|
|||
"futures",
|
||||
"lance",
|
||||
"lance-index",
|
||||
"omnigraph-cluster",
|
||||
"omnigraph-compiler",
|
||||
"omnigraph-engine",
|
||||
"omnigraph-policy",
|
||||
|
|
|
|||
|
|
@ -856,7 +856,7 @@ fn print_cluster_apply_human(output: &ApplyOutput) {
|
|||
" state: revision {}, converged: {}, written: {}",
|
||||
state.state_revision, output.converged, output.state_written
|
||||
);
|
||||
println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml");
|
||||
println!(" note: cluster-booted servers (--cluster) serve this on their next restart; omnigraph.yaml deployments are unaffected");
|
||||
}
|
||||
print_cluster_diagnostics(&output.diagnostics);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -212,6 +212,12 @@ pub fn spawn_server_with_config(config: &Path) -> TestServer {
|
|||
spawn_server_process(command)
|
||||
}
|
||||
|
||||
pub fn spawn_server_with_cluster(cluster_dir: &Path) -> TestServer {
|
||||
let mut command = server_process();
|
||||
command.arg("--cluster").arg(cluster_dir).arg("--unauthenticated");
|
||||
spawn_server_process(command)
|
||||
}
|
||||
|
||||
pub fn spawn_server_with_config_env(config: &Path, envs: &[(&str, &str)]) -> TestServer {
|
||||
let mut command = server_process();
|
||||
command.arg("--config").arg(config);
|
||||
|
|
|
|||
|
|
@ -1633,3 +1633,84 @@ fn local_cli_actor_flag_overrides_config_actor() {
|
|||
"expected 'denied' when --as overrides config to bruno, got: {stderr}"
|
||||
);
|
||||
}
|
||||
|
||||
/// Phase 5 (RFC-005): "applied means serving" — converge a cluster with the
|
||||
/// CLI, boot the real omnigraph-server binary with --cluster, and serve the
|
||||
/// applied stored query over HTTP with zero omnigraph.yaml involvement.
|
||||
#[test]
|
||||
fn local_cluster_apply_then_server_boots_from_cluster_state() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
std::fs::write(
|
||||
temp.path().join("people.pg"),
|
||||
"\nnode Person {\n name: String @key\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
std::fs::write(
|
||||
temp.path().join("people.gq"),
|
||||
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
std::fs::write(
|
||||
temp.path().join("cluster.yaml"),
|
||||
r#"
|
||||
version: 1
|
||||
graphs:
|
||||
knowledge:
|
||||
schema: ./people.pg
|
||||
queries:
|
||||
find_person:
|
||||
file: ./people.gq
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
for command in ["import", "apply"] {
|
||||
let output = cli()
|
||||
.arg("cluster")
|
||||
.arg(command)
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json")
|
||||
.output()
|
||||
.unwrap();
|
||||
assert!(output.status.success(), "cluster {command} failed");
|
||||
}
|
||||
// Seed a row through the graph plane so the stored query has data.
|
||||
let data = temp.path().join("seed.jsonl");
|
||||
std::fs::write(&data, "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n").unwrap();
|
||||
let output = cli()
|
||||
.arg("load")
|
||||
.arg("--data")
|
||||
.arg(&data)
|
||||
.arg(temp.path().join("graphs/knowledge.omni"))
|
||||
.output()
|
||||
.unwrap();
|
||||
assert!(output.status.success(), "graph load failed");
|
||||
|
||||
let server = spawn_server_with_cluster(temp.path());
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let queries: serde_json::Value = client
|
||||
.get(format!("{}/graphs/knowledge/queries", server.base_url))
|
||||
.send()
|
||||
.unwrap()
|
||||
.json()
|
||||
.unwrap();
|
||||
assert!(
|
||||
queries["queries"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|q| q["name"] == "find_person"),
|
||||
"{queries}"
|
||||
);
|
||||
let response = client
|
||||
.post(format!(
|
||||
"{}/graphs/knowledge/queries/find_person",
|
||||
server.base_url
|
||||
))
|
||||
.json(&serde_json::json!({"params": {"name": "Ada"}}))
|
||||
.send()
|
||||
.unwrap();
|
||||
assert!(response.status().is_success(), "{:?}", response.status());
|
||||
let body: serde_json::Value = response.json().unwrap();
|
||||
assert!(body.to_string().contains("Ada"), "{body}");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1699,6 +1699,191 @@ pub async fn approve_config_dir(
|
|||
}
|
||||
}
|
||||
|
||||
/// One graph in a serving snapshot: its id and on-disk root.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServingGraph {
|
||||
pub graph_id: String,
|
||||
pub root: PathBuf,
|
||||
}
|
||||
|
||||
/// One stored query: its graph binding, registry name, and verified source.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServingQuery {
|
||||
pub graph_id: String,
|
||||
pub name: String,
|
||||
pub source: String,
|
||||
}
|
||||
|
||||
/// One policy bundle: its verified catalog blob path and applied bindings
|
||||
/// (normalized typed refs: `cluster` | `graph.<id>`).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServingPolicy {
|
||||
pub name: String,
|
||||
pub blob_path: PathBuf,
|
||||
pub applies_to: Vec<String>,
|
||||
}
|
||||
|
||||
/// Everything a server needs to boot from the cluster catalog (RFC-005 §D2).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServingSnapshot {
|
||||
pub graphs: Vec<ServingGraph>,
|
||||
pub queries: Vec<ServingQuery>,
|
||||
pub policies: Vec<ServingPolicy>,
|
||||
}
|
||||
|
||||
/// Read the applied revision as a serving snapshot — the read-only loader for
|
||||
/// the Phase-5 server boot. All-or-nothing per RFC-005 §D4: every readiness
|
||||
/// failure is collected and the whole snapshot refused; no partial serving.
|
||||
/// Takes no lock: the state file is replaced atomically, so this reads a
|
||||
/// consistent point-in-time ledger.
|
||||
pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnapshot, Vec<Diagnostic>> {
|
||||
let config_dir = config_dir.as_ref().to_path_buf();
|
||||
let backend = LocalStateBackend::new(&config_dir);
|
||||
let mut diagnostics: Vec<Diagnostic> = Vec::new();
|
||||
|
||||
// A ledger a sweep is about to rewrite must not start serving.
|
||||
let sidecars = backend.list_recovery_sidecars(&mut diagnostics);
|
||||
if !sidecars.is_empty() {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"cluster_recovery_pending",
|
||||
CLUSTER_RECOVERIES_DIR,
|
||||
format!(
|
||||
"{} interrupted operation(s) await recovery; run any state-mutating cluster command (e.g. `cluster apply`) to sweep, then retry",
|
||||
sidecars.len()
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let mut observations = backend.observations();
|
||||
let state = match backend.read_state(&mut observations) {
|
||||
Ok(snapshot) => match snapshot.state {
|
||||
Some(state) => Some(state),
|
||||
None => {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"cluster_state_missing",
|
||||
CLUSTER_STATE_FILE,
|
||||
"no cluster state ledger; run `cluster import` and `cluster apply` first",
|
||||
));
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(diagnostic) => {
|
||||
diagnostics.push(diagnostic);
|
||||
None
|
||||
}
|
||||
};
|
||||
let Some(state) = state else {
|
||||
return Err(diagnostics);
|
||||
};
|
||||
|
||||
let mut graphs = Vec::new();
|
||||
let mut queries = Vec::new();
|
||||
let mut policies = Vec::new();
|
||||
for (address, entry) in &state.applied_revision.resources {
|
||||
match resource_kind(address) {
|
||||
ResourceKind::Graph(graph_id) => {
|
||||
graphs.push(ServingGraph {
|
||||
root: config_dir
|
||||
.join(CLUSTER_GRAPHS_DIR)
|
||||
.join(format!("{graph_id}.omni")),
|
||||
graph_id,
|
||||
});
|
||||
}
|
||||
ResourceKind::Schema(_) => {}
|
||||
kind @ ResourceKind::Query { .. } => {
|
||||
let ResourceKind::Query { graph, name } = &kind else {
|
||||
unreachable!()
|
||||
};
|
||||
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
|
||||
Ok(source) => queries.push(ServingQuery {
|
||||
graph_id: graph.clone(),
|
||||
name: name.clone(),
|
||||
source,
|
||||
}),
|
||||
Err(diagnostic) => diagnostics.push(diagnostic),
|
||||
}
|
||||
}
|
||||
kind @ ResourceKind::Policy(_) => {
|
||||
let ResourceKind::Policy(name) = &kind else {
|
||||
unreachable!()
|
||||
};
|
||||
let Some(applies_to) = entry.applies_to.clone() else {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"policy_bindings_missing",
|
||||
address.clone(),
|
||||
"no applied applies_to bindings recorded (ledger predates binding metadata); re-run `cluster apply` to backfill",
|
||||
));
|
||||
continue;
|
||||
};
|
||||
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
|
||||
Ok(_) => policies.push(ServingPolicy {
|
||||
name: name.clone(),
|
||||
blob_path: payload_path(&config_dir, &kind, &entry.digest)
|
||||
.expect("policy kind always has a payload path"),
|
||||
applies_to,
|
||||
}),
|
||||
Err(diagnostic) => diagnostics.push(diagnostic),
|
||||
}
|
||||
}
|
||||
ResourceKind::Unknown => {}
|
||||
}
|
||||
}
|
||||
|
||||
if graphs.is_empty() {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"cluster_empty",
|
||||
CLUSTER_STATE_FILE,
|
||||
"the applied revision records no graphs; apply a cluster with at least one graph before serving from it",
|
||||
));
|
||||
}
|
||||
if has_errors(&diagnostics) {
|
||||
return Err(diagnostics);
|
||||
}
|
||||
Ok(ServingSnapshot {
|
||||
graphs,
|
||||
queries,
|
||||
policies,
|
||||
})
|
||||
}
|
||||
|
||||
/// Read a catalog blob and verify it against the recorded digest.
|
||||
fn read_verified_payload(
|
||||
config_dir: &Path,
|
||||
kind: &ResourceKind,
|
||||
digest: &str,
|
||||
address: &str,
|
||||
) -> Result<String, Diagnostic> {
|
||||
let path = payload_path(config_dir, kind, digest)
|
||||
.expect("query/policy kinds always have a payload path");
|
||||
let bytes = fs::read(&path).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"catalog_payload_missing",
|
||||
address,
|
||||
format!(
|
||||
"catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart",
|
||||
display_path(&path)
|
||||
),
|
||||
)
|
||||
})?;
|
||||
if sha256_hex(&bytes) != digest {
|
||||
return Err(Diagnostic::error(
|
||||
"catalog_payload_digest_mismatch",
|
||||
address,
|
||||
format!(
|
||||
"catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart",
|
||||
display_path(&path)
|
||||
),
|
||||
));
|
||||
}
|
||||
String::from_utf8(bytes).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"catalog_payload_invalid",
|
||||
address,
|
||||
format!("catalog blob is not valid UTF-8: {err}"),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
|
||||
let parsed = parse_cluster_config(config_dir.as_ref());
|
||||
let mut diagnostics = parsed.diagnostics;
|
||||
|
|
@ -7272,6 +7457,109 @@ policies:
|
|||
);
|
||||
}
|
||||
|
||||
// ---- serving snapshot (5B read-only loader) ----
|
||||
|
||||
#[tokio::test]
|
||||
async fn serving_snapshot_reads_converged_cluster() {
|
||||
let dir = fixture();
|
||||
init_derived_graph(dir.path()).await;
|
||||
write_applyable_state(dir.path());
|
||||
let converge = apply_config_dir(dir.path()).await;
|
||||
assert!(converge.converged, "{converge:?}");
|
||||
|
||||
let snapshot = read_serving_snapshot(dir.path()).expect("converged cluster must serve");
|
||||
assert_eq!(snapshot.graphs.len(), 1);
|
||||
assert_eq!(snapshot.graphs[0].graph_id, "knowledge");
|
||||
assert!(snapshot.graphs[0].root.ends_with("graphs/knowledge.omni"));
|
||||
assert_eq!(snapshot.queries.len(), 1);
|
||||
assert_eq!(snapshot.queries[0].name, "find_person");
|
||||
assert!(snapshot.queries[0].source.contains("query find_person"));
|
||||
assert_eq!(snapshot.policies.len(), 1);
|
||||
assert_eq!(snapshot.policies[0].applies_to, vec!["graph.knowledge"]);
|
||||
assert!(snapshot.policies[0].blob_path.exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serving_snapshot_refuses_missing_state() {
|
||||
let dir = fixture();
|
||||
let err = read_serving_snapshot(dir.path()).unwrap_err();
|
||||
assert!(
|
||||
err.iter().any(|diagnostic| diagnostic.code == "cluster_state_missing"),
|
||||
"{err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn serving_snapshot_refuses_pending_recovery() {
|
||||
let dir = fixture();
|
||||
init_derived_graph(dir.path()).await;
|
||||
write_applyable_state(dir.path());
|
||||
apply_config_dir(dir.path()).await;
|
||||
write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SERVE");
|
||||
|
||||
let err = read_serving_snapshot(dir.path()).unwrap_err();
|
||||
assert!(
|
||||
err.iter().any(|diagnostic| diagnostic.code == "cluster_recovery_pending"),
|
||||
"{err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn serving_snapshot_refuses_tampered_blob_and_stripped_bindings() {
|
||||
let dir = fixture();
|
||||
init_derived_graph(dir.path()).await;
|
||||
write_applyable_state(dir.path());
|
||||
apply_config_dir(dir.path()).await;
|
||||
// Tamper with the query blob...
|
||||
let snapshot = read_serving_snapshot(dir.path()).unwrap();
|
||||
let desired = validate_config_dir(dir.path());
|
||||
let query_digest = &desired.resource_digests["query.knowledge.find_person"];
|
||||
let blob = dir
|
||||
.path()
|
||||
.join(CLUSTER_RESOURCES_DIR)
|
||||
.join("query/knowledge/find_person")
|
||||
.join(format!("{query_digest}.gq"));
|
||||
fs::write(&blob, "tampered").unwrap();
|
||||
// ...and strip the policy bindings (pre-5A ledger).
|
||||
let mut state: serde_json::Value = serde_json::from_str(
|
||||
&fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
state["applied_revision"]["resources"]["policy.base"]
|
||||
.as_object_mut()
|
||||
.unwrap()
|
||||
.remove("applies_to");
|
||||
fs::write(
|
||||
dir.path().join(CLUSTER_STATE_FILE),
|
||||
serde_json::to_string_pretty(&state).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let err = read_serving_snapshot(dir.path()).unwrap_err();
|
||||
assert!(
|
||||
err.iter()
|
||||
.any(|diagnostic| diagnostic.code == "catalog_payload_digest_mismatch"),
|
||||
"{err:?}"
|
||||
);
|
||||
assert!(
|
||||
err.iter().any(|diagnostic| diagnostic.code == "policy_bindings_missing"),
|
||||
"{err:?}"
|
||||
);
|
||||
let _ = snapshot; // the pre-tamper read succeeded
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serving_snapshot_refuses_empty_cluster() {
|
||||
let dir = fixture();
|
||||
write_state_resources(dir.path(), &[]); // state exists, no graphs
|
||||
|
||||
let err = read_serving_snapshot(dir.path()).unwrap_err();
|
||||
assert!(
|
||||
err.iter().any(|diagnostic| diagnostic.code == "cluster_empty"),
|
||||
"{err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_warns_on_pending_recovery_sidecar() {
|
||||
let dir = fixture();
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"]
|
|||
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" }
|
||||
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
|
||||
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" }
|
||||
omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.2" }
|
||||
axum = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
color-eyre = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, Regi
|
|||
|
||||
use crate::queries::{QueryRegistry, check, format_check_breakages};
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
|
@ -40,7 +40,7 @@ use axum::middleware::{self, Next};
|
|||
use axum::response::{IntoResponse, Response};
|
||||
use axum::routing::{delete, get, post};
|
||||
use axum::{Json, Router};
|
||||
use color_eyre::eyre::{Result, WrapErr, bail};
|
||||
use color_eyre::eyre::{Result, WrapErr, bail, eyre};
|
||||
pub use config::{
|
||||
AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings,
|
||||
ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
|
||||
|
|
@ -888,13 +888,125 @@ fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> St
|
|||
format!("graph '{label}': stored-query registry failed to load:\n {joined}")
|
||||
}
|
||||
|
||||
/// Build serving settings from a cluster directory's applied revision
|
||||
/// (RFC-005 §D2): graphs at derived roots, stored queries from verified
|
||||
/// catalog blob content, policy bundles from blob paths with their applied
|
||||
/// bindings. Always multi-graph routing. The unauthenticated/env handling
|
||||
/// matches the omnigraph.yaml path.
|
||||
fn load_cluster_settings(
|
||||
cluster_dir: &PathBuf,
|
||||
cli_bind: Option<String>,
|
||||
cli_allow_unauthenticated: bool,
|
||||
) -> Result<ServerConfig> {
|
||||
let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).map_err(|diagnostics| {
|
||||
let details = diagnostics
|
||||
.iter()
|
||||
.map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n ");
|
||||
eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display())
|
||||
})?;
|
||||
|
||||
// Bindings -> Cedar slots. The serving pipeline loads one bundle per
|
||||
// graph plus one server-level bundle; stacked bundles per scope are a
|
||||
// later slice — refuse loudly rather than silently merging policy.
|
||||
let mut server_policy_file: Option<PathBuf> = None;
|
||||
let mut graph_policy_files: BTreeMap<String, PathBuf> = BTreeMap::new();
|
||||
for policy in &snapshot.policies {
|
||||
for binding in &policy.applies_to {
|
||||
if binding == "cluster" {
|
||||
if server_policy_file.replace(policy.blob_path.clone()).is_some() {
|
||||
bail!(
|
||||
"multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
|
||||
);
|
||||
}
|
||||
} else if let Some(graph_id) = binding.strip_prefix("graph.") {
|
||||
if graph_policy_files
|
||||
.insert(graph_id.to_string(), policy.blob_path.clone())
|
||||
.is_some()
|
||||
{
|
||||
bail!(
|
||||
"multiple policy bundles bind graph '{graph_id}'; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
bail!("unrecognized policy binding '{binding}' in the applied revision");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut graphs = Vec::new();
|
||||
for graph in &snapshot.graphs {
|
||||
let specs: Vec<queries::RegistrySpec> = snapshot
|
||||
.queries
|
||||
.iter()
|
||||
.filter(|query| query.graph_id == graph.graph_id)
|
||||
.map(|query| queries::RegistrySpec {
|
||||
name: query.name.clone(),
|
||||
source: query.source.clone(),
|
||||
// The §D5 bridge: the cluster registry has no expose flag
|
||||
// (exposure becomes a policy decision in Phase 6) — cluster
|
||||
// mode lists every stored query.
|
||||
expose: true,
|
||||
tool_name: None,
|
||||
})
|
||||
.collect();
|
||||
let registry = QueryRegistry::from_specs(specs).map_err(|errors| {
|
||||
let details = errors
|
||||
.iter()
|
||||
.map(|error| error.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n ");
|
||||
eyre!(
|
||||
"stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart"
|
||||
)
|
||||
})?;
|
||||
graphs.push(GraphStartupConfig {
|
||||
graph_id: graph.graph_id.clone(),
|
||||
uri: graph.root.to_string_lossy().to_string(),
|
||||
policy_file: graph_policy_files.get(&graph.graph_id).cloned(),
|
||||
queries: registry,
|
||||
});
|
||||
}
|
||||
|
||||
let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
|
||||
.ok()
|
||||
.map(|v| {
|
||||
let trimmed = v.trim();
|
||||
!trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
Ok(ServerConfig {
|
||||
mode: ServerConfigMode::Multi {
|
||||
graphs,
|
||||
config_path: cluster_dir.clone(),
|
||||
server_policy_file,
|
||||
},
|
||||
bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()),
|
||||
allow_unauthenticated: cli_allow_unauthenticated || env_unauth,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn load_server_settings(
|
||||
config_path: Option<&PathBuf>,
|
||||
cli_cluster: Option<&PathBuf>,
|
||||
cli_uri: Option<String>,
|
||||
cli_target: Option<String>,
|
||||
cli_bind: Option<String>,
|
||||
cli_allow_unauthenticated: bool,
|
||||
) -> Result<ServerConfig> {
|
||||
// Rule 0 (RFC-005): --cluster is an exclusive boot source. It is checked
|
||||
// before anything reads omnigraph.yaml — in cluster mode that file is
|
||||
// never opened, not even the implicit current-directory search.
|
||||
if let Some(cluster_dir) = cli_cluster {
|
||||
if cli_uri.is_some() || cli_target.is_some() || config_path.is_some() {
|
||||
bail!(
|
||||
"--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)"
|
||||
);
|
||||
}
|
||||
return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated);
|
||||
}
|
||||
let config = load_config(config_path)?;
|
||||
let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
|
||||
// Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips
|
||||
|
|
@ -1275,7 +1387,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
|
|||
/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this
|
||||
/// trades startup latency for a small amount of concurrent S3 / Lance
|
||||
/// open pressure.
|
||||
async fn open_multi_graph_state(
|
||||
pub async fn open_multi_graph_state(
|
||||
graphs: Vec<GraphStartupConfig>,
|
||||
tokens: Vec<(String, String)>,
|
||||
server_policy_file: Option<&PathBuf>,
|
||||
|
|
@ -3255,7 +3367,7 @@ server:
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let settings = load_server_settings(Some(&config), None, None, None, false).unwrap();
|
||||
let settings = load_server_settings(Some(&config), None, None, None, None, false).unwrap();
|
||||
match &settings.mode {
|
||||
ServerConfigMode::Single { uri, graph_id, .. } => {
|
||||
assert_eq!(uri, "/tmp/demo.omni");
|
||||
|
|
@ -3285,6 +3397,7 @@ server:
|
|||
|
||||
let settings = load_server_settings(
|
||||
Some(&config),
|
||||
None,
|
||||
Some("/tmp/override.omni".to_string()),
|
||||
None,
|
||||
Some("0.0.0.0:9999".to_string()),
|
||||
|
|
@ -3321,7 +3434,7 @@ server:
|
|||
.unwrap();
|
||||
|
||||
let settings =
|
||||
load_server_settings(Some(&config), None, Some("dev".to_string()), None, false)
|
||||
load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false)
|
||||
.unwrap();
|
||||
match &settings.mode {
|
||||
ServerConfigMode::Single { uri, graph_id, .. } => {
|
||||
|
|
@ -3334,7 +3447,7 @@ server:
|
|||
|
||||
#[test]
|
||||
fn server_settings_require_uri_from_cli_or_config() {
|
||||
let error = load_server_settings(None, None, None, None, false).unwrap_err();
|
||||
let error = load_server_settings(None, None, None, None, None, false).unwrap_err();
|
||||
assert!(
|
||||
error.to_string().contains("no graph to serve"),
|
||||
"expected mode-inference error, got: {error}",
|
||||
|
|
@ -3501,7 +3614,7 @@ server:
|
|||
// Truthy values flip Open mode on, even with CLI flag off.
|
||||
for value in ["1", "true", "yes", "TRUE", "anything"] {
|
||||
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, false)
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
|
||||
.expect("settings load should succeed");
|
||||
assert!(
|
||||
settings.allow_unauthenticated,
|
||||
|
|
@ -3512,7 +3625,7 @@ server:
|
|||
// Falsy values keep refusal behavior, even with CLI flag off.
|
||||
for value in ["0", "false", "FALSE", ""] {
|
||||
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, false)
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
|
||||
.expect("settings load should succeed");
|
||||
assert!(
|
||||
!settings.allow_unauthenticated,
|
||||
|
|
@ -3522,7 +3635,7 @@ server:
|
|||
|
||||
// Unset env var: also false.
|
||||
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, false)
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
|
||||
.expect("settings load should succeed");
|
||||
assert!(
|
||||
!settings.allow_unauthenticated,
|
||||
|
|
@ -3533,7 +3646,7 @@ server:
|
|||
// CLI flag wins even when env is falsy — `serve()` honors the
|
||||
// OR of both inputs.
|
||||
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, true)
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, true)
|
||||
.expect("settings load should succeed");
|
||||
assert!(
|
||||
settings.allow_unauthenticated,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,12 @@ struct Cli {
|
|||
target: Option<String>,
|
||||
#[arg(long)]
|
||||
config: Option<PathBuf>,
|
||||
/// Boot from a cluster directory (the applied revision in
|
||||
/// __cluster/state.json + content-addressed catalog blobs) instead of
|
||||
/// omnigraph.yaml. Exclusive: cannot combine with <URI>, --target, or
|
||||
/// --config.
|
||||
#[arg(long)]
|
||||
cluster: Option<PathBuf>,
|
||||
#[arg(long)]
|
||||
bind: Option<String>,
|
||||
/// Run without bearer tokens and without a policy file (MR-723).
|
||||
|
|
@ -32,6 +38,7 @@ async fn main() -> Result<()> {
|
|||
let cli = Cli::parse();
|
||||
let settings: ServerConfig = load_server_settings(
|
||||
cli.config.as_ref(),
|
||||
cli.cluster.as_ref(),
|
||||
cli.uri,
|
||||
cli.target,
|
||||
cli.bind,
|
||||
|
|
|
|||
|
|
@ -5508,7 +5508,7 @@ graphs:
|
|||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, false).unwrap_err();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, None, false).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("invalid graph id 'policies'"),
|
||||
"expected reserved-name rejection, got: {err}"
|
||||
|
|
@ -5575,6 +5575,7 @@ graphs:
|
|||
#[test]
|
||||
fn mode_inference_cli_uri_is_single() {
|
||||
let settings = load_server_settings(
|
||||
None,
|
||||
None,
|
||||
Some("/tmp/cli.omni".to_string()),
|
||||
None,
|
||||
|
|
@ -5605,7 +5606,7 @@ graphs:
|
|||
)
|
||||
.unwrap();
|
||||
let settings =
|
||||
load_server_settings(Some(&config_path), None, Some("alpha".into()), None, true)
|
||||
load_server_settings(Some(&config_path), None, None, Some("alpha".into()), None, true)
|
||||
.unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"),
|
||||
|
|
@ -5631,7 +5632,7 @@ server:
|
|||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"),
|
||||
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"),
|
||||
|
|
@ -5654,7 +5655,7 @@ graphs:
|
|||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Multi { graphs, .. } => {
|
||||
let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect();
|
||||
|
|
@ -5680,7 +5681,7 @@ graphs:
|
|||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
|
||||
let msg = err.to_string();
|
||||
assert!(
|
||||
msg.contains("top-level") && msg.contains("policy.file") && msg.contains("not honored"),
|
||||
|
|
@ -5708,7 +5709,7 @@ graphs:
|
|||
"queries:\n q:\n file: ./q.gq\ngraphs:\n alpha:\n uri: /tmp/alpha.omni\n",
|
||||
)
|
||||
.unwrap();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
|
||||
let msg = err.to_string();
|
||||
assert!(
|
||||
msg.contains("queries") && msg.contains("not honored"),
|
||||
|
|
@ -5729,7 +5730,7 @@ graphs:
|
|||
)
|
||||
.unwrap();
|
||||
let err =
|
||||
load_server_settings(Some(&config_path), None, Some("prod".to_string()), None, true)
|
||||
load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true)
|
||||
.unwrap_err();
|
||||
let msg = err.to_string();
|
||||
assert!(
|
||||
|
|
@ -5756,7 +5757,7 @@ graphs:
|
|||
)
|
||||
.unwrap();
|
||||
let settings =
|
||||
load_server_settings(Some(&config_path), None, Some("prod".to_string()), None, true)
|
||||
load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true)
|
||||
.unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Single {
|
||||
|
|
@ -5795,7 +5796,7 @@ graphs:
|
|||
),
|
||||
)
|
||||
.unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Multi { graphs, .. } => {
|
||||
assert_eq!(graphs[0].uri, graph.to_string_lossy());
|
||||
|
|
@ -5807,7 +5808,7 @@ graphs:
|
|||
/// Rule 5: nothing → error with migration hint.
|
||||
#[test]
|
||||
fn mode_inference_no_inputs_errors_with_migration_hint() {
|
||||
let err = load_server_settings(None, None, None, None, true).unwrap_err();
|
||||
let err = load_server_settings(None, None, None, None, None, true).unwrap_err();
|
||||
let msg = err.to_string();
|
||||
assert!(
|
||||
msg.contains("no graph to serve"),
|
||||
|
|
@ -5822,7 +5823,7 @@ graphs:
|
|||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err();
|
||||
assert!(err.to_string().contains("no graph to serve"));
|
||||
}
|
||||
|
||||
|
|
@ -5843,6 +5844,7 @@ graphs:
|
|||
.unwrap();
|
||||
let settings = load_server_settings(
|
||||
Some(&config_path),
|
||||
None,
|
||||
Some("/tmp/cli-override.omni".to_string()),
|
||||
None,
|
||||
None,
|
||||
|
|
@ -5880,7 +5882,7 @@ graphs:
|
|||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
|
||||
let graphs = match settings.mode {
|
||||
ServerConfigMode::Multi { graphs, .. } => graphs,
|
||||
_ => panic!("expected Multi"),
|
||||
|
|
@ -5914,7 +5916,7 @@ graphs:
|
|||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Multi {
|
||||
server_policy_file, ..
|
||||
|
|
@ -6194,7 +6196,7 @@ graphs:
|
|||
.unwrap();
|
||||
|
||||
let settings: ServerConfig =
|
||||
load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
load_server_settings(Some(&config_path), None, None, None, None, true).unwrap();
|
||||
assert!(matches!(settings.mode, ServerConfigMode::Multi { .. }));
|
||||
|
||||
match settings.mode {
|
||||
|
|
@ -6207,3 +6209,233 @@ graphs:
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Phase 5: cluster-mode boot (RFC-005) ----
|
||||
|
||||
/// Build and converge a real cluster directory: cluster.yaml + schema +
|
||||
/// stored query (+ optional policies), then `import` + `apply` so the
|
||||
/// catalog and state ledger exist exactly as an operator would have them.
|
||||
async fn converged_cluster_dir(policies_yaml: &str) -> tempfile::TempDir {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
fs::write(
|
||||
temp.path().join("people.pg"),
|
||||
"\nnode Person {\n name: String @key\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("people.gq"),
|
||||
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("cluster.yaml"),
|
||||
format!(
|
||||
r#"
|
||||
version: 1
|
||||
graphs:
|
||||
knowledge:
|
||||
schema: ./people.pg
|
||||
queries:
|
||||
find_person:
|
||||
file: ./people.gq
|
||||
{policies_yaml}"#
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
let import = omnigraph_cluster::import_config_dir(temp.path()).await;
|
||||
assert!(import.ok, "{:?}", import.diagnostics);
|
||||
let apply = omnigraph_cluster::apply_config_dir(temp.path()).await;
|
||||
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
|
||||
temp
|
||||
}
|
||||
|
||||
fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result<omnigraph_server::ServerConfig> {
|
||||
omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cluster_boot_serves_applied_state() {
|
||||
let temp = converged_cluster_dir("").await;
|
||||
let settings = cluster_settings(temp.path()).unwrap();
|
||||
let omnigraph_server::ServerConfigMode::Multi {
|
||||
graphs,
|
||||
config_path,
|
||||
server_policy_file,
|
||||
} = settings.mode
|
||||
else {
|
||||
panic!("cluster boot must select multi-graph routing");
|
||||
};
|
||||
assert_eq!(graphs.len(), 1);
|
||||
assert_eq!(graphs[0].graph_id, "knowledge");
|
||||
assert!(server_policy_file.is_none());
|
||||
|
||||
let state =
|
||||
omnigraph_server::open_multi_graph_state(graphs, Vec::new(), None, config_path)
|
||||
.await
|
||||
.unwrap();
|
||||
let app = build_app(state);
|
||||
|
||||
// The management surface keeps its closed-by-default contract: without a
|
||||
// cluster-scoped policy bundle there is no server-level Cedar engine, so
|
||||
// GET /graphs refuses even in cluster mode.
|
||||
let (status, body) = json_response(
|
||||
&app,
|
||||
Request::builder().uri("/graphs").body(Body::empty()).unwrap(),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::FORBIDDEN, "{body}");
|
||||
|
||||
let (status, body) = json_response(
|
||||
&app,
|
||||
Request::builder()
|
||||
.uri("/graphs/knowledge/queries")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK, "{body}");
|
||||
assert!(
|
||||
body["queries"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|q| q["name"] == "find_person"),
|
||||
"{body}"
|
||||
);
|
||||
|
||||
let (status, body) = json_response(
|
||||
&app,
|
||||
Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("/graphs/knowledge/queries/find_person")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(r#"{"params":{"name":"nobody"}}"#))
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(status, StatusCode::OK, "{body}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cluster_boot_wires_policy_bindings_into_cedar_slots() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
drop(temp);
|
||||
let policy_block = r#"policies:
|
||||
graph_rules:
|
||||
file: ./graph.policy.yaml
|
||||
applies_to: [knowledge]
|
||||
cluster_rules:
|
||||
file: ./cluster.policy.yaml
|
||||
applies_to: [cluster]
|
||||
"#;
|
||||
let temp = {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
fs::write(
|
||||
temp.path().join("people.pg"),
|
||||
"\nnode Person {\n name: String @key\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("people.gq"),
|
||||
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("graph.policy.yaml"),
|
||||
permit_all_policy_yaml(&["default"]),
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("cluster.policy.yaml"),
|
||||
permit_all_policy_yaml(&["default"]).replace("protected_branches: [main]\n", "protected_branches: [main]\nkind: server\n"),
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("cluster.yaml"),
|
||||
format!(
|
||||
r#"
|
||||
version: 1
|
||||
graphs:
|
||||
knowledge:
|
||||
schema: ./people.pg
|
||||
queries:
|
||||
find_person:
|
||||
file: ./people.gq
|
||||
{policy_block}"#
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
let import = omnigraph_cluster::import_config_dir(temp.path()).await;
|
||||
assert!(import.ok, "{:?}", import.diagnostics);
|
||||
let apply = omnigraph_cluster::apply_config_dir(temp.path()).await;
|
||||
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
|
||||
temp
|
||||
};
|
||||
|
||||
let settings = cluster_settings(temp.path()).unwrap();
|
||||
let omnigraph_server::ServerConfigMode::Multi {
|
||||
graphs,
|
||||
server_policy_file,
|
||||
..
|
||||
} = settings.mode
|
||||
else {
|
||||
panic!("cluster boot must select multi-graph routing");
|
||||
};
|
||||
let graph_policy = graphs[0].policy_file.as_ref().expect("graph-bound bundle");
|
||||
assert!(
|
||||
graph_policy
|
||||
.to_string_lossy()
|
||||
.contains("__cluster/resources/policy/graph_rules/"),
|
||||
"{graph_policy:?}"
|
||||
);
|
||||
let server_policy = server_policy_file.expect("cluster-bound bundle");
|
||||
assert!(
|
||||
server_policy
|
||||
.to_string_lossy()
|
||||
.contains("__cluster/resources/policy/cluster_rules/"),
|
||||
"{server_policy:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cluster_boot_refusals() {
|
||||
// Mutual exclusion with --config / URI.
|
||||
let temp = converged_cluster_dir("").await;
|
||||
let dir = temp.path().to_path_buf();
|
||||
let err = omnigraph_server::load_server_settings(
|
||||
Some(&dir.join("omnigraph.yaml")),
|
||||
Some(&dir),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.unwrap_err();
|
||||
assert!(err.to_string().contains("exclusive boot source"), "{err}");
|
||||
let err = omnigraph_server::load_server_settings(
|
||||
None,
|
||||
Some(&dir),
|
||||
Some("file:///tmp/x.omni".to_string()),
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.unwrap_err();
|
||||
assert!(err.to_string().contains("exclusive boot source"), "{err}");
|
||||
|
||||
// Tampered catalog blob refuses boot with the remedy.
|
||||
let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person");
|
||||
let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path();
|
||||
fs::write(&blob, "tampered").unwrap();
|
||||
let err = cluster_settings(&dir).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("catalog_payload_digest_mismatch"),
|
||||
"{err}"
|
||||
);
|
||||
assert!(err.to_string().contains("cluster refresh"), "{err}");
|
||||
|
||||
// Missing state refuses with the import/apply remedy.
|
||||
let empty = tempfile::tempdir().unwrap();
|
||||
let err = cluster_settings(empty.path()).unwrap_err();
|
||||
assert!(err.to_string().contains("cluster_state_missing"), "{err}");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
# RFC: Server Boots from Cluster State — Phase 5 of the Cluster Control Plane
|
||||
|
||||
**Status:** Proposed
|
||||
**Status:** Landed (5A policy bindings #175; 5B/5C the `--cluster` boot mode — one PR)
|
||||
**Implementation deviations:** (1) cluster mode reuses `ServerConfigMode::Multi` (a new settings *source*, not a new enum variant; `config_path` carries the cluster dir). (2) Stored queries load via `QueryRegistry::from_specs` from verified blob *content*, not blob paths. (3) More than one policy bundle binding a single scope is a boot error (the serving pipeline holds one bundle per graph + one server-level; stacking is a later slice). (4) `GET /graphs` keeps its closed-by-default contract — without a cluster-bound bundle there is no server-level Cedar engine, so enumeration refuses.
|
||||
**Date:** 2026-06-10
|
||||
**Builds on:** Phase 4 complete ([rfc-004-cluster-graph-schema-apply.md](rfc-004-cluster-graph-schema-apply.md), Landed): `cluster apply` converges graphs, schemas, stored queries, and policies into the cluster catalog. Normative context: [cluster-config-specs.md](cluster-config-specs.md) (the migration model's "window 2"), [cluster-axioms.md](cluster-axioms.md) (axiom 15), [cluster-config-implementation-spec.md](cluster-config-implementation-spec.md) (Phase 5 rollout, Compatibility Stance #7–#9, exit criterion 7).
|
||||
**Target release:** unversioned (phased — see Sequencing).
|
||||
|
|
|
|||
|
|
@ -8,8 +8,8 @@ This file is the always-on map of the test surface. **Consult it before every ta
|
|||
|---|---|---|
|
||||
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
|
||||
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
|
||||
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill) |
|
||||
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
|
||||
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) |
|
||||
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level; incl. cluster-mode boot — converged-dir serving, policy binding wiring, boot refusals), `openapi.rs` (OpenAPI drift / regeneration) |
|
||||
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |
|
||||
|
||||
The engine's `tests/` is the principal coverage surface; most graph-shaped behavior is exercised there.
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ Top-level command families and subcommands. Graph-targeting commands accept eith
|
|||
| `commit list \| show` | inspect commit graph |
|
||||
| `schema plan \| apply \| show (alias: get)` | migrations |
|
||||
| `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` |
|
||||
| `cluster validate \| plan \| apply \| approve \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` and annotates each change with its apply disposition; `apply` executes the config-only (stored-query/policy) subset into the content-addressed local catalog under `__cluster/resources/` — graph/schema changes are deferred loudly, and nothing applied serves traffic (the server still boots from `omnigraph.yaml`); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock <LOCK_ID>` manually removes a held local state lock by exact id. No graph-manifest movement, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 3A |
|
||||
| `cluster validate \| plan \| apply \| approve \| status \| refresh \| import \| force-unlock` | declarative cluster control plane. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`, annotates dispositions, and embeds real schema-migration previews; `apply` converges the cluster — stored-query/policy catalog writes (content-addressed under `__cluster/resources/`), graph creates, schema updates (soft drops only; `--as` records the actor), and graph deletes behind a digest-bound approval from `cluster approve <resource> --as <actor>`; what apply converges is what an `omnigraph-server --cluster <dir>` deployment serves on its next restart (omnigraph.yaml deployments are unaffected); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock <LOCK_ID>` manually removes a held local state lock by exact id |
|
||||
| `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) |
|
||||
| `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review |
|
||||
| `cleanup --keep N --older-than 7d --confirm` | destructive version GC |
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# Cluster Config
|
||||
|
||||
**Status:** Stage 4C — Phase 4 complete (graph create, schema apply, gated graph delete).
|
||||
**Status:** Phase 5 — cluster-booted serving (`omnigraph-server --cluster`).
|
||||
|
||||
Cluster config is the future control-plane configuration surface for a whole
|
||||
OmniGraph deployment. In this stage, OmniGraph can validate a local
|
||||
|
|
@ -190,10 +190,12 @@ Deletes remove the resource from state; their old payload blobs stay on disk
|
|||
(garbage collection is a later stage). Re-running a converged apply is a no-op:
|
||||
no state write, no revision change (`state_written: false`).
|
||||
|
||||
**Applied means recorded in the cluster catalog — nothing more.** The server
|
||||
still boots from `omnigraph.yaml`; no query or policy applied here serves
|
||||
traffic until the server-boot stage ships, as an explicit per-deployment mode
|
||||
switch.
|
||||
**Applied means serving — for deployments that opt in.** A server started
|
||||
with `--cluster <dir>` boots from the applied revision (see
|
||||
[Serving from the cluster](#serving-from-the-cluster-the-mode-switch)); it
|
||||
picks up newly applied state on its next restart. Deployments still booting
|
||||
from `omnigraph.yaml` are untouched: for them, applied means recorded in the
|
||||
catalog, nothing more.
|
||||
|
||||
### Graph creation
|
||||
|
||||
|
|
@ -305,6 +307,40 @@ fully converges. The `graph.<id>` composite digest is recomputed from state's
|
|||
own schema/query digests after each apply, so applied query changes converge
|
||||
without graph movement.
|
||||
|
||||
## Serving from the cluster (the mode switch)
|
||||
|
||||
```bash
|
||||
omnigraph-server --cluster ./company-brain --bind 0.0.0.0:8080
|
||||
```
|
||||
|
||||
`--cluster <dir>` is an **exclusive boot source** (axiom 15): it cannot
|
||||
combine with a graph URI, `--target`, or `--config`, and in this mode
|
||||
`omnigraph.yaml` is never read — not for graphs, not for queries, not for
|
||||
policies. The server serves the **applied revision**: graph roots recorded in
|
||||
`state.json`, stored-query and policy content from the content-addressed
|
||||
catalog at the applied digests (re-verified at boot), and policy bundles
|
||||
wired by their applied `applies_to` bindings — `cluster`-bound bundles become
|
||||
the server-level Cedar engine, graph-bound bundles attach per graph.
|
||||
Un-applied config drift never leaks into serving; `cluster plan` is where
|
||||
drift is visible. Routing is always multi-graph (`/graphs/{id}/...`). Bearer
|
||||
tokens and the bind address stay process-level (flags/env) — they are
|
||||
per-replica facts, not cluster facts.
|
||||
|
||||
Boot is fail-fast: missing or unreadable state, pending recovery sidecars,
|
||||
missing/tampered catalog blobs, policy entries without binding metadata
|
||||
(pre-binding ledgers — re-run `cluster apply`), an empty graph set, more than
|
||||
one policy bundle binding a single scope (split or merge bundles; stacked
|
||||
scopes are a later stage), unopenable graph roots, and stored queries that no
|
||||
longer type-check all refuse startup with a remedy. A held state lock is
|
||||
*not* an error — boot reads the atomically-replaced state file without
|
||||
locking.
|
||||
|
||||
Serving is static per process: the server reads the applied revision once at
|
||||
startup, so picking up newly applied state means restarting it. Stored
|
||||
queries are all listed in `GET /queries` in cluster mode (the cluster
|
||||
registry has no expose flag; exposure becomes a policy decision in a later
|
||||
phase).
|
||||
|
||||
## Status
|
||||
|
||||
`cluster status` reads the same local JSON state ledger and prints what the
|
||||
|
|
|
|||
|
|
@ -13,6 +13,14 @@ Omnigraph supports two broad deployment shapes:
|
|||
|
||||
The server binary and container image expose the same HTTP surface.
|
||||
|
||||
The server also has two **boot sources**: `omnigraph.yaml` (graph targets
|
||||
declared in the per-operator config) or a **cluster directory**
|
||||
(`omnigraph-server --cluster <dir>`), which serves the cluster control
|
||||
plane's applied revision — see
|
||||
[cluster-config.md](cluster-config.md#serving-from-the-cluster-the-mode-switch).
|
||||
The two are exclusive per deployment; switching is a restart with a different
|
||||
flag.
|
||||
|
||||
## Binary Deployment
|
||||
|
||||
Build or install:
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# HTTP Server (`omnigraph-server`)
|
||||
|
||||
Axum 0.8 + tokio + utoipa-generated OpenAPI. **Two modes** (v0.6.0+): single-graph (legacy) and multi-graph (MR-668). Mode is inferred from CLI args + config shape.
|
||||
Axum 0.8 + tokio + utoipa-generated OpenAPI. **Two modes** (v0.6.0+): single-graph (legacy) and multi-graph (MR-668), with **two boot sources** for multi mode: `omnigraph.yaml` or — exclusively — a cluster directory (`--cluster`, RFC-005). Mode is inferred from CLI args + config shape.
|
||||
|
||||
## Modes
|
||||
|
||||
|
|
@ -14,8 +14,20 @@ Axum 0.8 + tokio + utoipa-generated OpenAPI. **Two modes** (v0.6.0+): single-gra
|
|||
|
||||
`omnigraph-server --config omnigraph.yaml` with a non-empty `graphs:` map and **no** single-mode selector (no `server.graph`, no `<URI>`, no `--target`). The server opens every configured graph in parallel at startup (bounded concurrency = 4, fail-fast on the first open error). Routes are nested under `/graphs/{graph_id}/...`. Bare flat paths return 404 in multi mode.
|
||||
|
||||
Mode inference (four-rule matrix):
|
||||
### Cluster-booted multi mode (Phase 5)
|
||||
|
||||
`omnigraph-server --cluster <dir>` boots from the cluster catalog's **applied
|
||||
revision** (`state.json` + content-addressed blobs) instead of
|
||||
`omnigraph.yaml` — an exclusive boot source: combining it with `<URI>`,
|
||||
`--target`, or `--config` is a startup error, and `omnigraph.yaml` is never
|
||||
read in this mode. Always multi-graph routing. See
|
||||
[cluster-config.md](cluster-config.md#serving-from-the-cluster-the-mode-switch)
|
||||
for what is read and the fail-fast readiness rules. `--bind`,
|
||||
`--unauthenticated`, and the bearer-token env vars work identically.
|
||||
|
||||
Mode inference:
|
||||
|
||||
0. CLI `--cluster <dir>` → **multi, cluster-booted** (exclusive)
|
||||
1. CLI positional `<URI>` → single
|
||||
2. CLI `--target <name>` → single
|
||||
3. `server.graph` in config → single
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue