mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Add cluster JSON state ledger status
This commit is contained in:
parent
043b02e617
commit
a7956ea5a9
8 changed files with 925 additions and 76 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -4575,6 +4575,8 @@ dependencies = [
|
|||
"sha2",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"time",
|
||||
"ulid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -11,7 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
|
|||
use omnigraph::loader::LoadMode;
|
||||
use omnigraph::storage::normalize_root_uri;
|
||||
use omnigraph_cluster::{
|
||||
DiagnosticSeverity, PlanOutput, ValidateOutput, plan_config_dir, validate_config_dir,
|
||||
DiagnosticSeverity, PlanOutput, StatusOutput, ValidateOutput, plan_config_dir,
|
||||
status_config_dir, validate_config_dir,
|
||||
};
|
||||
use omnigraph_compiler::query::parser::parse_query;
|
||||
use omnigraph_compiler::schema::parser::parse_schema;
|
||||
|
|
@ -340,6 +341,15 @@ enum ClusterCommand {
|
|||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Read the local JSON state ledger without scanning live graph resources.
|
||||
Status {
|
||||
/// Cluster config directory containing cluster.yaml.
|
||||
#[arg(long, default_value = ".")]
|
||||
config: PathBuf,
|
||||
/// Emit JSON instead of human text.
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
}
|
||||
|
||||
/// Operations on the graph registry of a multi-graph server (MR-668).
|
||||
|
|
@ -745,6 +755,34 @@ fn print_cluster_plan_human(output: &PlanOutput) {
|
|||
print_cluster_diagnostics(&output.diagnostics);
|
||||
}
|
||||
|
||||
fn print_cluster_status_human(output: &StatusOutput) {
|
||||
if output.ok {
|
||||
let state = &output.state_observations;
|
||||
if state.state_found {
|
||||
println!(
|
||||
"cluster state: revision {}, {} resource(s)",
|
||||
state.state_revision, state.resource_count
|
||||
);
|
||||
if let Some(digest) = state.applied_config_digest.as_deref() {
|
||||
println!(" applied config: {digest}");
|
||||
}
|
||||
if state.locked {
|
||||
match state.lock_id.as_deref() {
|
||||
Some(lock_id) => println!(" lock: held ({lock_id})"),
|
||||
None => println!(" lock: held"),
|
||||
}
|
||||
} else {
|
||||
println!(" lock: not held");
|
||||
}
|
||||
} else {
|
||||
println!("cluster state missing");
|
||||
}
|
||||
} else {
|
||||
println!("cluster status failed");
|
||||
}
|
||||
print_cluster_diagnostics(&output.diagnostics);
|
||||
}
|
||||
|
||||
fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) {
|
||||
for diagnostic in diagnostics {
|
||||
let label = match diagnostic.severity {
|
||||
|
|
@ -784,6 +822,19 @@ fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> {
|
||||
if json {
|
||||
print_json(output)?;
|
||||
} else {
|
||||
print_cluster_status_human(output);
|
||||
}
|
||||
if !output.ok {
|
||||
io::stdout().flush()?;
|
||||
std::process::exit(1);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_remote_uri(uri: &str) -> bool {
|
||||
uri.starts_with("http://") || uri.starts_with("https://")
|
||||
}
|
||||
|
|
@ -3217,6 +3268,10 @@ async fn main() -> Result<()> {
|
|||
let output = plan_config_dir(config);
|
||||
finish_cluster_plan(&output, json)?;
|
||||
}
|
||||
ClusterCommand::Status { config, json } => {
|
||||
let output = status_config_dir(config);
|
||||
finish_cluster_status(&output, json)?;
|
||||
}
|
||||
},
|
||||
Command::Graphs { command } => match command {
|
||||
GraphsCommand::List {
|
||||
|
|
|
|||
|
|
@ -214,6 +214,168 @@ fn cluster_plan_json_reads_inferred_local_state() {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_status_json_reports_missing_state() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("status")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["state_observations"]["state_found"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_missing"),
|
||||
"missing state should be a warning diagnostic: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_status_json_reports_extended_state() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"
|
||||
{
|
||||
"version": 1,
|
||||
"state_revision": 5,
|
||||
"applied_revision": {
|
||||
"config_digest": "applied",
|
||||
"resources": {
|
||||
"graph.knowledge": { "digest": "graph-digest" }
|
||||
}
|
||||
},
|
||||
"resource_statuses": {
|
||||
"graph.knowledge": { "status": "applied", "conditions": ["healthy"] }
|
||||
},
|
||||
"approval_records": {},
|
||||
"recovery_records": {},
|
||||
"observations": {}
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("status")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["state_observations"]["state_revision"], 5);
|
||||
assert!(
|
||||
json["state_observations"]["state_cas"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.starts_with("sha256:")
|
||||
);
|
||||
assert_eq!(json["resource_digests"]["graph.knowledge"], "graph-digest");
|
||||
assert_eq!(
|
||||
json["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"applied"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"
|
||||
{
|
||||
"version": 1,
|
||||
"state_revision": 9,
|
||||
"applied_revision": {
|
||||
"config_digest": "old",
|
||||
"resources": {
|
||||
"graph.knowledge": { "digest": "old-graph" }
|
||||
}
|
||||
}
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("plan")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["state_observations"]["state_revision"], 9);
|
||||
assert!(
|
||||
json["state_observations"]["state_cas"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.starts_with("sha256:")
|
||||
);
|
||||
assert_eq!(json["state_observations"]["locked"], true);
|
||||
assert!(json["state_observations"]["lock_id"].is_string());
|
||||
assert!(!state_dir.join("lock.json").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_plan_locked_state_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("lock.json"),
|
||||
r#"
|
||||
{
|
||||
"version": 1,
|
||||
"lock_id": "held-lock",
|
||||
"operation": "plan",
|
||||
"created_at": "2026-06-08T00:00:00Z",
|
||||
"pid": 123
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("plan")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
);
|
||||
let json = parse_stdout_json(&output);
|
||||
assert_eq!(json["ok"], false);
|
||||
assert_eq!(json["state_observations"]["locked"], true);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_lock_held"),
|
||||
"locked state should produce a useful diagnostic: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_validate_invalid_config_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@ serde_json = { workspace = true }
|
|||
serde_yaml = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
time = { workspace = true }
|
||||
ulid = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fs;
|
||||
use std::fs::{self, OpenOptions};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process;
|
||||
|
||||
use omnigraph_compiler::build_catalog;
|
||||
use omnigraph_compiler::query::parser::parse_query;
|
||||
|
|
@ -8,11 +10,16 @@ use omnigraph_compiler::query::typecheck::typecheck_query_decl;
|
|||
use omnigraph_compiler::schema::parser::parse_schema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use ulid::Ulid;
|
||||
|
||||
pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml";
|
||||
pub const CLUSTER_STATE_DIR: &str = "__cluster";
|
||||
pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json";
|
||||
pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum DiagnosticSeverity {
|
||||
Error,
|
||||
|
|
@ -86,10 +93,39 @@ pub struct DesiredRevision {
|
|||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct StateObservations {
|
||||
pub state_path: String,
|
||||
pub lock_path: String,
|
||||
pub state_found: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub applied_config_digest: Option<String>,
|
||||
pub state_revision: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub state_cas: Option<String>,
|
||||
pub resource_count: usize,
|
||||
pub locked: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub lock_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ResourceLifecycleStatus {
|
||||
Pending,
|
||||
Planned,
|
||||
Applying,
|
||||
Applied,
|
||||
Drifted,
|
||||
Blocked,
|
||||
Error,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct ResourceStatusRecord {
|
||||
pub status: ResourceLifecycleStatus,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub conditions: Vec<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
|
||||
|
|
@ -136,15 +172,39 @@ pub struct PlanOutput {
|
|||
pub diagnostics: Vec<Diagnostic>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct StatusOutput {
|
||||
pub ok: bool,
|
||||
pub config_dir: String,
|
||||
pub state_observations: StateObservations,
|
||||
pub resource_digests: BTreeMap<String, String>,
|
||||
pub resource_statuses: BTreeMap<String, ResourceStatusRecord>,
|
||||
pub diagnostics: Vec<Diagnostic>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct DesiredCluster {
|
||||
config_dir: PathBuf,
|
||||
config_digest: String,
|
||||
state_lock: bool,
|
||||
resource_digests: BTreeMap<String, String>,
|
||||
resources: Vec<ResourceSummary>,
|
||||
dependencies: Vec<Dependency>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ParsedConfig {
|
||||
raw: Option<RawClusterConfig>,
|
||||
diagnostics: Vec<Diagnostic>,
|
||||
config_dir: PathBuf,
|
||||
config_file: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct ClusterSettings {
|
||||
state_lock: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LoadOutcome {
|
||||
desired: Option<DesiredCluster>,
|
||||
|
|
@ -201,11 +261,22 @@ struct PolicyConfig {
|
|||
applies_to: Vec<String>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct ClusterState {
|
||||
version: u32,
|
||||
#[serde(default)]
|
||||
state_revision: u64,
|
||||
applied_revision: AppliedRevisionState,
|
||||
#[serde(default)]
|
||||
resource_statuses: BTreeMap<String, ResourceStatusRecord>,
|
||||
#[serde(default)]
|
||||
approval_records: BTreeMap<String, serde_json::Value>,
|
||||
#[serde(default)]
|
||||
recovery_records: BTreeMap<String, serde_json::Value>,
|
||||
#[serde(default)]
|
||||
observations: BTreeMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
|
|
@ -223,6 +294,33 @@ struct StateResource {
|
|||
digest: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct StateLockFile {
|
||||
version: u32,
|
||||
lock_id: String,
|
||||
operation: String,
|
||||
created_at: String,
|
||||
pid: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LocalStateBackend {
|
||||
state_dir: PathBuf,
|
||||
state_path: PathBuf,
|
||||
lock_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StateSnapshot {
|
||||
state: Option<ClusterState>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StateLockGuard {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
|
||||
let outcome = load_desired(config_dir.as_ref());
|
||||
let (resource_digests, resources, dependencies) = match outcome.desired {
|
||||
|
|
@ -249,13 +347,8 @@ pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
|
|||
pub fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
|
||||
let outcome = load_desired(config_dir.as_ref());
|
||||
let mut diagnostics = outcome.diagnostics;
|
||||
let state_path = outcome.config_dir.join(CLUSTER_STATE_FILE);
|
||||
let mut observations = StateObservations {
|
||||
state_path: display_path(&state_path),
|
||||
state_found: false,
|
||||
applied_config_digest: None,
|
||||
resource_count: 0,
|
||||
};
|
||||
let backend = LocalStateBackend::new(&outcome.config_dir);
|
||||
let mut observations = backend.observations();
|
||||
|
||||
let Some(desired) = outcome.desired else {
|
||||
return PlanOutput {
|
||||
|
|
@ -274,40 +367,49 @@ pub fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
|
|||
};
|
||||
};
|
||||
|
||||
let mut prior_resources = BTreeMap::new();
|
||||
if state_path.exists() {
|
||||
observations.state_found = true;
|
||||
match fs::read_to_string(&state_path) {
|
||||
Ok(text) => match serde_json::from_str::<ClusterState>(&text) {
|
||||
Ok(state) if state.version == 1 => {
|
||||
observations.applied_config_digest = state.applied_revision.config_digest;
|
||||
observations.resource_count = state.applied_revision.resources.len();
|
||||
prior_resources = state
|
||||
.applied_revision
|
||||
.resources
|
||||
.into_iter()
|
||||
.map(|(address, resource)| (address, resource.digest))
|
||||
.collect();
|
||||
}
|
||||
Ok(state) => diagnostics.push(Diagnostic::error(
|
||||
"unsupported_state_version",
|
||||
"state.version",
|
||||
format!(
|
||||
"unsupported cluster state version {}; this build supports version 1",
|
||||
state.version
|
||||
),
|
||||
)),
|
||||
Err(err) => diagnostics.push(Diagnostic::error(
|
||||
"invalid_state_json",
|
||||
CLUSTER_STATE_FILE,
|
||||
format!("could not parse state JSON: {err}"),
|
||||
)),
|
||||
if has_errors(&diagnostics) {
|
||||
return PlanOutput {
|
||||
ok: false,
|
||||
config_dir: display_path(&desired.config_dir),
|
||||
desired_revision: DesiredRevision {
|
||||
config_digest: Some(desired.config_digest),
|
||||
},
|
||||
Err(err) => diagnostics.push(Diagnostic::error(
|
||||
"state_read_error",
|
||||
CLUSTER_STATE_FILE,
|
||||
format!("could not read state file: {err}"),
|
||||
)),
|
||||
resource_digests: desired.resource_digests,
|
||||
dependencies: desired.dependencies,
|
||||
state_observations: observations,
|
||||
changes: Vec::new(),
|
||||
blast_radius: Vec::new(),
|
||||
approvals_required: Vec::new(),
|
||||
diagnostics,
|
||||
};
|
||||
}
|
||||
|
||||
let _lock_guard = if desired.state_lock {
|
||||
match backend.acquire_lock("plan", &mut observations) {
|
||||
Ok(guard) => Some(guard),
|
||||
Err(diagnostic) => {
|
||||
diagnostics.push(diagnostic);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"state_lock_disabled",
|
||||
"state.lock",
|
||||
"state.lock is false; plan read state without acquiring the cluster state lock",
|
||||
));
|
||||
None
|
||||
};
|
||||
|
||||
let mut prior_resources = BTreeMap::new();
|
||||
if !has_errors(&diagnostics) {
|
||||
match backend.read_state(&mut observations) {
|
||||
Ok(snapshot) => {
|
||||
if let Some(state) = snapshot.state {
|
||||
prior_resources = state_resource_digests(&state);
|
||||
}
|
||||
}
|
||||
Err(diagnostic) => diagnostics.push(diagnostic),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -336,7 +438,48 @@ pub fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
|
|||
}
|
||||
}
|
||||
|
||||
fn load_desired(config_dir: &Path) -> LoadOutcome {
|
||||
pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
|
||||
let parsed = parse_cluster_config(config_dir.as_ref());
|
||||
let mut diagnostics = parsed.diagnostics;
|
||||
let backend = LocalStateBackend::new(&parsed.config_dir);
|
||||
let mut observations = backend.observations();
|
||||
backend.observe_lock(&mut observations, &mut diagnostics);
|
||||
|
||||
let mut resource_digests = BTreeMap::new();
|
||||
let mut resource_statuses = BTreeMap::new();
|
||||
|
||||
if let Some(raw) = parsed.raw.as_ref() {
|
||||
let _settings = validate_cluster_header(raw, &mut diagnostics);
|
||||
if !has_errors(&diagnostics) {
|
||||
match backend.read_state(&mut observations) {
|
||||
Ok(snapshot) => {
|
||||
if let Some(state) = snapshot.state {
|
||||
resource_digests = state_resource_digests(&state);
|
||||
resource_statuses = state.resource_statuses;
|
||||
} else {
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"state_missing",
|
||||
CLUSTER_STATE_FILE,
|
||||
"state.json is missing; no applied cluster revision has been recorded",
|
||||
));
|
||||
}
|
||||
}
|
||||
Err(diagnostic) => diagnostics.push(diagnostic),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
StatusOutput {
|
||||
ok: !has_errors(&diagnostics),
|
||||
config_dir: display_path(&parsed.config_dir),
|
||||
state_observations: observations,
|
||||
resource_digests,
|
||||
resource_statuses,
|
||||
diagnostics,
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_cluster_config(config_dir: &Path) -> ParsedConfig {
|
||||
let config_dir = config_dir.to_path_buf();
|
||||
let config_file = config_dir.join(CLUSTER_CONFIG_FILE);
|
||||
let mut diagnostics = Vec::new();
|
||||
|
|
@ -347,8 +490,8 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
|
|||
display_path(&config_dir),
|
||||
"`--config` must point at a directory containing cluster.yaml",
|
||||
));
|
||||
return LoadOutcome {
|
||||
desired: None,
|
||||
return ParsedConfig {
|
||||
raw: None,
|
||||
diagnostics,
|
||||
config_dir,
|
||||
config_file,
|
||||
|
|
@ -363,8 +506,8 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
|
|||
CLUSTER_CONFIG_FILE,
|
||||
format!("could not read cluster.yaml: {err}"),
|
||||
));
|
||||
return LoadOutcome {
|
||||
desired: None,
|
||||
return ParsedConfig {
|
||||
raw: None,
|
||||
diagnostics,
|
||||
config_dir,
|
||||
config_file,
|
||||
|
|
@ -375,8 +518,8 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
|
|||
diagnostics.extend(duplicate_key_diagnostics(&text));
|
||||
diagnostics.extend(future_field_diagnostics(&text));
|
||||
if has_errors(&diagnostics) {
|
||||
return LoadOutcome {
|
||||
desired: None,
|
||||
return ParsedConfig {
|
||||
raw: None,
|
||||
diagnostics,
|
||||
config_dir,
|
||||
config_file,
|
||||
|
|
@ -384,22 +527,29 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
|
|||
}
|
||||
|
||||
let raw = match serde_yaml::from_str::<RawClusterConfig>(&text) {
|
||||
Ok(raw) => raw,
|
||||
Ok(raw) => Some(raw),
|
||||
Err(err) => {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"invalid_cluster_yaml",
|
||||
CLUSTER_CONFIG_FILE,
|
||||
format!("could not parse cluster.yaml: {err}"),
|
||||
));
|
||||
return LoadOutcome {
|
||||
desired: None,
|
||||
diagnostics,
|
||||
config_dir,
|
||||
config_file,
|
||||
};
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
ParsedConfig {
|
||||
raw,
|
||||
diagnostics,
|
||||
config_dir,
|
||||
config_file,
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_cluster_header(
|
||||
raw: &RawClusterConfig,
|
||||
diagnostics: &mut Vec<Diagnostic>,
|
||||
) -> ClusterSettings {
|
||||
if raw.version != 1 {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"unsupported_cluster_config_version",
|
||||
|
|
@ -424,11 +574,242 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
|
|||
diagnostics.push(Diagnostic::error(
|
||||
"unsupported_state_backend",
|
||||
"state.backend",
|
||||
"Stage 1 supports only omitted state.backend or `cluster`",
|
||||
"Stage 2A supports only omitted state.backend or `cluster`",
|
||||
));
|
||||
}
|
||||
}
|
||||
let _lock_parsed_for_forward_compat = raw.state.lock;
|
||||
|
||||
ClusterSettings {
|
||||
state_lock: raw.state.lock.unwrap_or(true),
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalStateBackend {
|
||||
fn new(config_dir: &Path) -> Self {
|
||||
let state_dir = config_dir.join(CLUSTER_STATE_DIR);
|
||||
Self {
|
||||
state_path: config_dir.join(CLUSTER_STATE_FILE),
|
||||
lock_path: config_dir.join(CLUSTER_LOCK_FILE),
|
||||
state_dir,
|
||||
}
|
||||
}
|
||||
|
||||
fn observations(&self) -> StateObservations {
|
||||
StateObservations {
|
||||
state_path: display_path(&self.state_path),
|
||||
lock_path: display_path(&self.lock_path),
|
||||
state_found: false,
|
||||
applied_config_digest: None,
|
||||
state_revision: 0,
|
||||
state_cas: None,
|
||||
resource_count: 0,
|
||||
locked: false,
|
||||
lock_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn read_state(
|
||||
&self,
|
||||
observations: &mut StateObservations,
|
||||
) -> Result<StateSnapshot, Diagnostic> {
|
||||
let text = match fs::read_to_string(&self.state_path) {
|
||||
Ok(text) => text,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {
|
||||
return Ok(StateSnapshot { state: None });
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(Diagnostic::error(
|
||||
"state_read_error",
|
||||
CLUSTER_STATE_FILE,
|
||||
format!("could not read state file: {err}"),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
observations.state_found = true;
|
||||
observations.state_cas = Some(format!("sha256:{}", sha256_hex(text.as_bytes())));
|
||||
|
||||
let state = serde_json::from_str::<ClusterState>(&text).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"invalid_state_json",
|
||||
CLUSTER_STATE_FILE,
|
||||
format!("could not parse state JSON: {err}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
if state.version != 1 {
|
||||
return Err(Diagnostic::error(
|
||||
"unsupported_state_version",
|
||||
"state.version",
|
||||
format!(
|
||||
"unsupported cluster state version {}; this build supports version 1",
|
||||
state.version
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
observations.applied_config_digest = state.applied_revision.config_digest.clone();
|
||||
observations.state_revision = state.state_revision;
|
||||
observations.resource_count = state.applied_revision.resources.len();
|
||||
|
||||
Ok(StateSnapshot { state: Some(state) })
|
||||
}
|
||||
|
||||
fn acquire_lock(
|
||||
&self,
|
||||
operation: &str,
|
||||
observations: &mut StateObservations,
|
||||
) -> Result<StateLockGuard, Diagnostic> {
|
||||
fs::create_dir_all(&self.state_dir).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"state_lock_error",
|
||||
CLUSTER_STATE_DIR,
|
||||
format!("could not create cluster state directory: {err}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
let lock_id = Ulid::new().to_string();
|
||||
let lock = StateLockFile {
|
||||
version: 1,
|
||||
lock_id: lock_id.clone(),
|
||||
operation: operation.to_string(),
|
||||
created_at: OffsetDateTime::now_utc()
|
||||
.format(&Rfc3339)
|
||||
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()),
|
||||
pid: process::id(),
|
||||
};
|
||||
let payload = serde_json::to_string_pretty(&lock).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"state_lock_error",
|
||||
CLUSTER_LOCK_FILE,
|
||||
format!("could not encode state lock: {err}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
match OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&self.lock_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
file.write_all(payload.as_bytes()).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"state_lock_error",
|
||||
CLUSTER_LOCK_FILE,
|
||||
format!("could not write state lock: {err}"),
|
||||
)
|
||||
})?;
|
||||
observations.locked = true;
|
||||
observations.lock_id = Some(lock_id.clone());
|
||||
Ok(StateLockGuard {
|
||||
path: self.lock_path.clone(),
|
||||
})
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::AlreadyExists => {
|
||||
self.observe_lock_id(observations);
|
||||
Err(Diagnostic::error(
|
||||
"state_lock_held",
|
||||
CLUSTER_LOCK_FILE,
|
||||
"cluster state lock already exists; remove it only after confirming no cluster operation is active",
|
||||
))
|
||||
}
|
||||
Err(err) => Err(Diagnostic::error(
|
||||
"state_lock_error",
|
||||
CLUSTER_LOCK_FILE,
|
||||
format!("could not acquire state lock: {err}"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn observe_lock(
|
||||
&self,
|
||||
observations: &mut StateObservations,
|
||||
diagnostics: &mut Vec<Diagnostic>,
|
||||
) {
|
||||
if self.lock_path.exists() {
|
||||
observations.locked = true;
|
||||
match fs::read_to_string(&self.lock_path) {
|
||||
Ok(text) => match serde_json::from_str::<StateLockFile>(&text) {
|
||||
Ok(lock) if lock.version == 1 => {
|
||||
observations.lock_id = Some(lock.lock_id);
|
||||
}
|
||||
Ok(lock) => diagnostics.push(Diagnostic::warning(
|
||||
"unsupported_state_lock_version",
|
||||
CLUSTER_LOCK_FILE,
|
||||
format!("unsupported cluster state lock version {}", lock.version),
|
||||
)),
|
||||
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||
"invalid_state_lock",
|
||||
CLUSTER_LOCK_FILE,
|
||||
format!("could not parse state lock: {err}"),
|
||||
)),
|
||||
},
|
||||
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||
"state_lock_read_error",
|
||||
CLUSTER_LOCK_FILE,
|
||||
format!("could not read state lock: {err}"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn observe_lock_id(&self, observations: &mut StateObservations) {
|
||||
observations.locked = true;
|
||||
if let Ok(text) = fs::read_to_string(&self.lock_path) {
|
||||
if let Ok(lock) = serde_json::from_str::<StateLockFile>(&text) {
|
||||
if lock.version == 1 {
|
||||
observations.lock_id = Some(lock.lock_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for StateLockGuard {
|
||||
fn drop(&mut self) {
|
||||
let _ = fs::remove_file(&self.path);
|
||||
}
|
||||
}
|
||||
|
||||
fn state_resource_digests(state: &ClusterState) -> BTreeMap<String, String> {
|
||||
state
|
||||
.applied_revision
|
||||
.resources
|
||||
.iter()
|
||||
.map(|(address, resource)| (address.clone(), resource.digest.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn load_desired(config_dir: &Path) -> LoadOutcome {
|
||||
let parsed = parse_cluster_config(config_dir);
|
||||
let config_dir = parsed.config_dir;
|
||||
let config_file = parsed.config_file;
|
||||
let mut diagnostics = parsed.diagnostics;
|
||||
let Some(raw) = parsed.raw else {
|
||||
return LoadOutcome {
|
||||
desired: None,
|
||||
diagnostics,
|
||||
config_dir,
|
||||
config_file,
|
||||
};
|
||||
};
|
||||
let settings = validate_cluster_header(&raw, &mut diagnostics);
|
||||
let config_text = match fs::read_to_string(&config_file) {
|
||||
Ok(text) => text,
|
||||
Err(err) => {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"cluster_config_read_error",
|
||||
CLUSTER_CONFIG_FILE,
|
||||
format!("could not re-read cluster.yaml: {err}"),
|
||||
));
|
||||
return LoadOutcome {
|
||||
desired: None,
|
||||
diagnostics,
|
||||
config_dir,
|
||||
config_file,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
let mut resources = BTreeMap::new();
|
||||
let mut dependencies = BTreeSet::new();
|
||||
|
|
@ -645,12 +1026,13 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
|
|||
resource_list.push(resource);
|
||||
}
|
||||
let dependencies: Vec<_> = dependencies.into_iter().collect();
|
||||
let config_digest = desired_config_digest(&text, &resource_digests);
|
||||
let config_digest = desired_config_digest(&config_text, &resource_digests);
|
||||
|
||||
LoadOutcome {
|
||||
desired: Some(DesiredCluster {
|
||||
config_dir: config_dir.clone(),
|
||||
config_digest,
|
||||
state_lock: settings.state_lock,
|
||||
resource_digests,
|
||||
resources: resource_list,
|
||||
dependencies,
|
||||
|
|
@ -1217,6 +1599,7 @@ graphs:
|
|||
.all(|c| c.operation == PlanOperation::Create)
|
||||
);
|
||||
assert!(out.changes.iter().any(|c| c.resource == "graph.knowledge"));
|
||||
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -1260,6 +1643,202 @@ graphs:
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn old_minimal_state_json_still_plans_with_default_revision() {
|
||||
let dir = fixture();
|
||||
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"{
|
||||
"version": 1,
|
||||
"applied_revision": {
|
||||
"config_digest": "old",
|
||||
"resources": {
|
||||
"graph.knowledge": { "digest": "old-graph" }
|
||||
}
|
||||
}
|
||||
}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let out = plan_config_dir(dir.path());
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert_eq!(out.state_observations.state_revision, 0);
|
||||
assert!(out.state_observations.state_cas.is_some());
|
||||
assert!(out.changes.iter().any(|change| {
|
||||
change.resource == "graph.knowledge" && change.operation == PlanOperation::Update
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extended_state_json_status_surfaces_statuses() {
|
||||
let dir = fixture();
|
||||
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
let state = r#"{
|
||||
"version": 1,
|
||||
"state_revision": 42,
|
||||
"applied_revision": {
|
||||
"config_digest": "applied-config",
|
||||
"resources": {
|
||||
"graph.knowledge": { "digest": "graph-digest" }
|
||||
}
|
||||
},
|
||||
"resource_statuses": {
|
||||
"graph.knowledge": {
|
||||
"status": "applied",
|
||||
"conditions": ["healthy"],
|
||||
"message": "ready"
|
||||
}
|
||||
},
|
||||
"approval_records": {},
|
||||
"recovery_records": {},
|
||||
"observations": {
|
||||
"graph.knowledge": { "manifest_version": 12 }
|
||||
}
|
||||
}"#;
|
||||
fs::write(state_dir.join("state.json"), state).unwrap();
|
||||
|
||||
let out = status_config_dir(dir.path());
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(out.state_observations.state_found);
|
||||
assert_eq!(out.state_observations.state_revision, 42);
|
||||
assert_eq!(
|
||||
out.state_observations.state_cas.as_deref(),
|
||||
Some(format!("sha256:{}", sha256_hex(state.as_bytes())).as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
out.resource_digests
|
||||
.get("graph.knowledge")
|
||||
.map(String::as_str),
|
||||
Some("graph-digest")
|
||||
);
|
||||
assert_eq!(
|
||||
out.resource_statuses["graph.knowledge"].status,
|
||||
ResourceLifecycleStatus::Applied
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn missing_state_status_succeeds_with_warning() {
|
||||
let dir = fixture();
|
||||
let out = status_config_dir(dir.path());
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(!out.state_observations.state_found);
|
||||
assert_eq!(out.state_observations.state_revision, 0);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "state_missing")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_state_status_fails() {
|
||||
let dir = fixture();
|
||||
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(state_dir.join("state.json"), "{").unwrap();
|
||||
|
||||
let out = status_config_dir(dir.path());
|
||||
assert!(!out.ok);
|
||||
assert!(out.state_observations.state_found);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "invalid_state_json")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plan_reports_state_cas_revision_and_removes_lock() {
|
||||
let dir = fixture();
|
||||
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
let state = r#"{
|
||||
"version": 1,
|
||||
"state_revision": 7,
|
||||
"applied_revision": {
|
||||
"config_digest": "old",
|
||||
"resources": {
|
||||
"graph.knowledge": { "digest": "old-graph" }
|
||||
}
|
||||
}
|
||||
}"#;
|
||||
fs::write(state_dir.join("state.json"), state).unwrap();
|
||||
|
||||
let out = plan_config_dir(dir.path());
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert_eq!(out.state_observations.state_revision, 7);
|
||||
assert_eq!(
|
||||
out.state_observations.state_cas.as_deref(),
|
||||
Some(format!("sha256:{}", sha256_hex(state.as_bytes())).as_str())
|
||||
);
|
||||
assert!(out.state_observations.locked);
|
||||
assert!(out.state_observations.lock_id.is_some());
|
||||
assert!(
|
||||
!dir.path().join(CLUSTER_LOCK_FILE).exists(),
|
||||
"plan must release lock before returning"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn existing_lock_makes_plan_fail() {
|
||||
let dir = fixture();
|
||||
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("lock.json"),
|
||||
r#"{
|
||||
"version": 1,
|
||||
"lock_id": "held-lock",
|
||||
"operation": "plan",
|
||||
"created_at": "2026-06-08T00:00:00Z",
|
||||
"pid": 123
|
||||
}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let out = plan_config_dir(dir.path());
|
||||
assert!(!out.ok);
|
||||
assert!(out.state_observations.locked);
|
||||
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "state_lock_held")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn state_lock_false_bypasses_lock_with_warning() {
|
||||
let dir = fixture();
|
||||
fs::write(
|
||||
dir.path().join(CLUSTER_CONFIG_FILE),
|
||||
r#"
|
||||
version: 1
|
||||
state:
|
||||
backend: cluster
|
||||
lock: false
|
||||
graphs:
|
||||
knowledge:
|
||||
schema: ./people.pg
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let out = plan_config_dir(dir.path());
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(!out.state_observations.locked);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "state_lock_disabled")
|
||||
);
|
||||
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn external_state_backend_rejected() {
|
||||
let dir = fixture();
|
||||
|
|
@ -1272,4 +1851,21 @@ graphs:
|
|||
assert!(!out.ok);
|
||||
assert_eq!(out.diagnostics[0].code, "unsupported_state_backend");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn external_state_backend_plan_rejected() {
|
||||
let dir = fixture();
|
||||
fs::write(
|
||||
dir.path().join(CLUSTER_CONFIG_FILE),
|
||||
"version: 1\nstate:\n backend: s3://bucket/state\ngraphs: {}\n",
|
||||
)
|
||||
.unwrap();
|
||||
let out = plan_config_dir(dir.path());
|
||||
assert!(!out.ok);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "unsupported_state_backend")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
|
|||
|---|---|---|
|
||||
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
|
||||
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
|
||||
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, read-only validate/plan |
|
||||
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling, read-only validate/plan/status |
|
||||
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
|
||||
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` sc
|
|||
| `schema plan \| apply \| show (alias: get)` | migrations |
|
||||
| `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` |
|
||||
| `queries validate \| list` | operate on the server-side stored-query registry (the `queries:` block). `validate` type-checks every stored query against the live schema offline (opens the selected graph; exits non-zero on any breakage), catching schema drift without restarting the server; `list` prints the selected registry's query names, MCP exposure, and typed params. For per-graph registries, pass `--target <graph>` or set `cli.graph`; with no graph selection, `list` shows only top-level `queries:`. Distinct from `lint`, which validates a single `.gq` file |
|
||||
| `cluster validate \| plan` | read-only cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`. No apply, lock, graph open, server change, or state write occurs in Stage 1 |
|
||||
| `cluster validate \| plan \| status` | read-only cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` while briefly holding `__cluster/lock.json`; `status` reads the state ledger. No apply, graph open, live drift scan, server change, or `state.json` mutation occurs in Stage 2A |
|
||||
| `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns; `--json` reports a `skipped` field) |
|
||||
| `cleanup --keep N --older-than 7d --confirm` | destructive version GC |
|
||||
| `embed` | offline JSONL embedding pipeline |
|
||||
|
|
@ -79,13 +79,16 @@ policy:
|
|||
```bash
|
||||
omnigraph cluster validate --config ./company-brain
|
||||
omnigraph cluster plan --config ./company-brain --json
|
||||
omnigraph cluster status --config ./company-brain --json
|
||||
```
|
||||
|
||||
`--config` is a directory containing `cluster.yaml`; it defaults to `.`.
|
||||
Stage 1 accepts graphs, schemas, stored queries, and policy bundle file
|
||||
Stage 2A accepts graphs, schemas, stored queries, and policy bundle file
|
||||
references. `cluster plan` reads local JSON state from
|
||||
`<config-dir>/__cluster/state.json`; a missing file means empty state. External
|
||||
state backends, apply, locks, pipelines, UI specs, embeddings, aliases, and
|
||||
`<config-dir>/__cluster/state.json`; a missing file means empty state. Plan
|
||||
acquires `__cluster/lock.json` by default and releases it before returning.
|
||||
`cluster status` reads state only and reports any existing lock. External state
|
||||
backends, apply, refresh/import, pipelines, UI specs, embeddings, aliases, and
|
||||
bindings are reserved for later stages. See [cluster-config.md](cluster-config.md).
|
||||
|
||||
## Output formats (`query` command, alias: `read`)
|
||||
|
|
|
|||
|
|
@ -1,17 +1,19 @@
|
|||
# Cluster Config
|
||||
|
||||
**Status:** Stage 1 read-only preview.
|
||||
**Status:** Stage 2A read-only preview.
|
||||
|
||||
Cluster config is the future control-plane configuration surface for a whole
|
||||
OmniGraph deployment. In this stage, OmniGraph can validate a local
|
||||
`cluster.yaml` folder and produce a deterministic read-only plan. It does not
|
||||
apply changes, acquire locks, open graph roots, start servers, or write state.
|
||||
`cluster.yaml` folder, produce a deterministic read-only plan, and inspect the
|
||||
local JSON state ledger. It does not apply changes, open graph roots, scan live
|
||||
cluster state, start servers, or write graph resources.
|
||||
|
||||
## Commands
|
||||
|
||||
```bash
|
||||
omnigraph cluster validate --config ./company-brain
|
||||
omnigraph cluster plan --config ./company-brain --json
|
||||
omnigraph cluster status --config ./company-brain --json
|
||||
```
|
||||
|
||||
`--config` points at a directory, not a file. The directory must contain
|
||||
|
|
@ -19,7 +21,7 @@ omnigraph cluster plan --config ./company-brain --json
|
|||
|
||||
## Supported `cluster.yaml`
|
||||
|
||||
Stage 1 accepts only the read-only resource subset:
|
||||
Stage 2A accepts only the read-only resource subset:
|
||||
|
||||
```yaml
|
||||
version: 1
|
||||
|
|
@ -43,10 +45,12 @@ policies:
|
|||
applies_to: [knowledge]
|
||||
```
|
||||
|
||||
`metadata.name` is a display label. `state.lock` is parsed for forward
|
||||
compatibility, but no lock is acquired in this read-only stage. `state.backend`
|
||||
may be omitted or set to `cluster`; external state backends are reserved for a
|
||||
later stage.
|
||||
`metadata.name` is a display label. `state.backend` may be omitted or set to
|
||||
`cluster`; external state backends are reserved for a later stage. `state.lock`
|
||||
defaults to `true`. When enabled, `cluster plan` briefly acquires
|
||||
`<config-dir>/__cluster/lock.json` while it reads state, then removes it before
|
||||
returning. `cluster status` never acquires the lock; it only reports whether one
|
||||
is present.
|
||||
|
||||
## Validation
|
||||
|
||||
|
|
@ -78,6 +82,7 @@ resource is planned as a create. If present, the file must use this shape:
|
|||
```json
|
||||
{
|
||||
"version": 1,
|
||||
"state_revision": 0,
|
||||
"applied_revision": {
|
||||
"config_digest": "...",
|
||||
"resources": {
|
||||
|
|
@ -86,10 +91,34 @@ resource is planned as a create. If present, the file must use this shape:
|
|||
"query.knowledge.find_experts": { "digest": "..." },
|
||||
"policy.base": { "digest": "..." }
|
||||
}
|
||||
}
|
||||
},
|
||||
"resource_statuses": {
|
||||
"graph.knowledge": {
|
||||
"status": "applied",
|
||||
"conditions": [],
|
||||
"message": "optional status detail"
|
||||
}
|
||||
},
|
||||
"approval_records": {},
|
||||
"recovery_records": {},
|
||||
"observations": {}
|
||||
}
|
||||
```
|
||||
|
||||
`state_revision`, `resource_statuses`, `approval_records`, `recovery_records`,
|
||||
and `observations` are optional so older Stage 1 state fixtures keep working.
|
||||
Missing `state_revision` is treated as `0`. Resource status values are
|
||||
`pending`, `planned`, `applying`, `applied`, `drifted`, `blocked`, or `error`.
|
||||
|
||||
Plan output compares desired resource digests against state resource digests
|
||||
and reports `create`, `update`, and `delete` changes. The command never writes
|
||||
`state.json`; apply and locking are later-stage work.
|
||||
and reports `create`, `update`, and `delete` changes. It also reports the state
|
||||
CAS (`sha256:<digest>`), state revision, and lock id used for the read. The
|
||||
command never writes `state.json`; apply, refresh, import, and live drift scans
|
||||
are later-stage work.
|
||||
|
||||
## Status
|
||||
|
||||
`cluster status` reads the same local JSON state ledger and prints what the
|
||||
ledger says is deployed. It does not validate referenced schema/query/policy
|
||||
files and does not inspect live graphs. Missing `state.json` succeeds with a
|
||||
warning; invalid state JSON or an unsupported state version fails.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue