mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-07-03 02:51:04 +02:00
Merge pull request #188 from ModernRelay/refactor/cluster-modularize
refactor(cluster): modularize lib.rs — pure code movement
This commit is contained in:
commit
2f58fc47fa
8 changed files with 6048 additions and 5985 deletions
881
crates/omnigraph-cluster/src/config.rs
Normal file
881
crates/omnigraph-cluster/src/config.rs
Normal file
|
|
@ -0,0 +1,881 @@
|
||||||
|
//! Declared-configuration loading: cluster.yaml parsing, query
|
||||||
|
//! discovery, source digesting, validation (moved verbatim from lib.rs
|
||||||
|
//! in the modularization). Reads the operator's WORKING TREE — stored
|
||||||
|
//! state never lives here (see store.rs).
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// How a graph declares its stored queries. Terraform-style: the `.gq`
|
||||||
|
/// files ARE the declaration — point at them (or a directory) and every
|
||||||
|
/// `query <name>` they contain is discovered. The explicit name->file map
|
||||||
|
/// remains for fine-grained control.
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(untagged)]
|
||||||
|
pub(crate) enum QueriesDecl {
|
||||||
|
/// `queries: ./queries/` — a directory (top-level `*.gq`, sorted) or a
|
||||||
|
/// single `.gq` file; every declaration inside is registered.
|
||||||
|
Discover(PathBuf),
|
||||||
|
/// `queries: [./queries/, ./extra.gq]` — several directories/files.
|
||||||
|
DiscoverMany(Vec<PathBuf>),
|
||||||
|
/// `queries: { name: { file: ... } }` — explicit registry.
|
||||||
|
Explicit(BTreeMap<String, QueryConfig>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for QueriesDecl {
|
||||||
|
fn default() -> Self {
|
||||||
|
QueriesDecl::Explicit(BTreeMap::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Expand a graph's query declaration into the canonical name->file map.
|
||||||
|
/// Discovery reads and parses each `.gq`; unreadable or unparseable files
|
||||||
|
/// and duplicate query names are loud validation errors — a declaration the
|
||||||
|
/// tool cannot enumerate is broken, not partially usable.
|
||||||
|
pub(crate) fn resolve_query_decls(
|
||||||
|
config_dir: &Path,
|
||||||
|
graph_id: &str,
|
||||||
|
decl: &QueriesDecl,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
) -> (BTreeMap<String, QueryConfig>, BTreeMap<PathBuf, String>) {
|
||||||
|
let paths: Vec<PathBuf> = match decl {
|
||||||
|
QueriesDecl::Explicit(map) => {
|
||||||
|
return (
|
||||||
|
map.iter()
|
||||||
|
.map(|(name, config)| {
|
||||||
|
(name.clone(), QueryConfig { file: config.file.clone() })
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
BTreeMap::new(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
QueriesDecl::Discover(path) => vec![path.clone()],
|
||||||
|
QueriesDecl::DiscoverMany(paths) => paths.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut files: Vec<(PathBuf, PathBuf)> = Vec::new(); // (declared-relative, resolved)
|
||||||
|
for declared in &paths {
|
||||||
|
let resolved = resolve_config_path(config_dir, declared);
|
||||||
|
if resolved.is_dir() {
|
||||||
|
let mut entries: Vec<PathBuf> = match fs::read_dir(&resolved) {
|
||||||
|
Ok(read) => read
|
||||||
|
.flatten()
|
||||||
|
.map(|entry| entry.path())
|
||||||
|
.filter(|path| path.extension().is_some_and(|ext| ext == "gq"))
|
||||||
|
.collect(),
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"query_dir_unreadable",
|
||||||
|
format!("graphs.{graph_id}.queries"),
|
||||||
|
format!("could not list query directory '{}': {err}", resolved.display()),
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
entries.sort();
|
||||||
|
if entries.is_empty() {
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"query_dir_empty",
|
||||||
|
format!("graphs.{graph_id}.queries"),
|
||||||
|
format!("query directory '{}' contains no .gq files", resolved.display()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
for path in entries {
|
||||||
|
let relative = declared.join(path.file_name().expect("dir entries have names"));
|
||||||
|
files.push((relative, path));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
files.push((declared.clone(), resolved));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut registry: BTreeMap<String, QueryConfig> = BTreeMap::new();
|
||||||
|
let mut origin: BTreeMap<String, PathBuf> = BTreeMap::new();
|
||||||
|
// Content read once at discovery and handed to the caller — the per-query
|
||||||
|
// digest/typecheck pass reuses it instead of re-reading (no N+1 reads, no
|
||||||
|
// window for the file to change between enumeration and validation).
|
||||||
|
let mut contents: BTreeMap<PathBuf, String> = BTreeMap::new();
|
||||||
|
for (declared, resolved) in files {
|
||||||
|
let source = match fs::read_to_string(&resolved) {
|
||||||
|
Ok(source) => source,
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"query_file_missing",
|
||||||
|
format!("graphs.{graph_id}.queries"),
|
||||||
|
format!("could not read query file '{}': {err}", resolved.display()),
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let parsed = match parse_query(&source) {
|
||||||
|
Ok(parsed) => parsed,
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"query_parse_error",
|
||||||
|
format!("graphs.{graph_id}.queries"),
|
||||||
|
format!("'{}' does not parse: {err}", resolved.display()),
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for query_decl in &parsed.queries {
|
||||||
|
let name = query_decl.name.clone();
|
||||||
|
if let Some(previous) = origin.get(&name) {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"duplicate_query_name",
|
||||||
|
format!("graphs.{graph_id}.queries.{name}"),
|
||||||
|
format!(
|
||||||
|
"query '{name}' is declared in both '{}' and '{}'",
|
||||||
|
previous.display(),
|
||||||
|
declared.display()
|
||||||
|
),
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
origin.insert(name.clone(), declared.clone());
|
||||||
|
registry.insert(name, QueryConfig { file: declared.clone() });
|
||||||
|
}
|
||||||
|
contents.insert(declared, source);
|
||||||
|
}
|
||||||
|
(registry, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn parse_cluster_config(config_dir: &Path) -> ParsedConfig {
|
||||||
|
let config_dir = config_dir.to_path_buf();
|
||||||
|
let config_file = config_dir.join(CLUSTER_CONFIG_FILE);
|
||||||
|
let mut diagnostics = Vec::new();
|
||||||
|
|
||||||
|
if !config_dir.is_dir() {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"config_dir_not_found",
|
||||||
|
display_path(&config_dir),
|
||||||
|
"`--config` must point at a directory containing cluster.yaml",
|
||||||
|
));
|
||||||
|
return ParsedConfig {
|
||||||
|
raw: None,
|
||||||
|
diagnostics,
|
||||||
|
config_dir,
|
||||||
|
config_file,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let text = match fs::read_to_string(&config_file) {
|
||||||
|
Ok(text) => text,
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"cluster_config_read_error",
|
||||||
|
CLUSTER_CONFIG_FILE,
|
||||||
|
format!("could not read cluster.yaml: {err}"),
|
||||||
|
));
|
||||||
|
return ParsedConfig {
|
||||||
|
raw: None,
|
||||||
|
diagnostics,
|
||||||
|
config_dir,
|
||||||
|
config_file,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
diagnostics.extend(duplicate_key_diagnostics(&text));
|
||||||
|
diagnostics.extend(future_field_diagnostics(&text));
|
||||||
|
if has_errors(&diagnostics) {
|
||||||
|
return ParsedConfig {
|
||||||
|
raw: None,
|
||||||
|
diagnostics,
|
||||||
|
config_dir,
|
||||||
|
config_file,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let raw = match serde_yaml::from_str::<RawClusterConfig>(&text) {
|
||||||
|
Ok(raw) => Some(raw),
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"invalid_cluster_yaml",
|
||||||
|
CLUSTER_CONFIG_FILE,
|
||||||
|
format!("could not parse cluster.yaml: {err}"),
|
||||||
|
));
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ParsedConfig {
|
||||||
|
raw,
|
||||||
|
diagnostics,
|
||||||
|
config_dir,
|
||||||
|
config_file,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn validate_cluster_header(
|
||||||
|
raw: &RawClusterConfig,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
) -> ClusterSettings {
|
||||||
|
if raw.version != 1 {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"unsupported_cluster_config_version",
|
||||||
|
"version",
|
||||||
|
format!(
|
||||||
|
"unsupported cluster config version {}; this build supports version 1",
|
||||||
|
raw.version
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if let Some(name) = raw.metadata.name.as_deref() {
|
||||||
|
if name.trim().is_empty() {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"empty_metadata_name",
|
||||||
|
"metadata.name",
|
||||||
|
"metadata.name must not be empty when provided",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(backend) = raw.state.backend.as_deref() {
|
||||||
|
if backend != "cluster" {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"unsupported_state_backend",
|
||||||
|
"state.backend",
|
||||||
|
"Stage 2C supports only omitted state.backend or `cluster`",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ClusterSettings {
|
||||||
|
state_lock: raw.state.lock.unwrap_or(true),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
pub(crate) fn state_resource_digests(state: &ClusterState) -> BTreeMap<String, String> {
|
||||||
|
state
|
||||||
|
.applied_revision
|
||||||
|
.resources
|
||||||
|
.iter()
|
||||||
|
.map(|(address, resource)| (address.clone(), resource.digest.clone()))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn initial_import_state(desired: &DesiredCluster) -> ClusterState {
|
||||||
|
ClusterState {
|
||||||
|
version: 1,
|
||||||
|
state_revision: 0,
|
||||||
|
applied_revision: AppliedRevisionState {
|
||||||
|
config_digest: Some(desired.config_digest.clone()),
|
||||||
|
resources: BTreeMap::new(),
|
||||||
|
},
|
||||||
|
resource_statuses: BTreeMap::new(),
|
||||||
|
approval_records: BTreeMap::new(),
|
||||||
|
recovery_records: BTreeMap::new(),
|
||||||
|
observations: BTreeMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub(crate) async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize {
|
||||||
|
let mut graph_error_count = 0;
|
||||||
|
for graph in &desired.graphs {
|
||||||
|
let graph_address = graph_address(&graph.id);
|
||||||
|
let schema_address = schema_address(&graph.id);
|
||||||
|
let graph_path = desired
|
||||||
|
.config_dir
|
||||||
|
.join(CLUSTER_GRAPHS_DIR)
|
||||||
|
.join(format!("{}.omni", graph.id));
|
||||||
|
let graph_uri = display_path(&graph_path);
|
||||||
|
let observed_at = now_rfc3339();
|
||||||
|
|
||||||
|
if !graph_path.exists() {
|
||||||
|
state.applied_revision.resources.remove(&graph_address);
|
||||||
|
state.applied_revision.resources.remove(&schema_address);
|
||||||
|
state.observations.insert(
|
||||||
|
graph_address.clone(),
|
||||||
|
graph_observation_json(GraphObservationJson {
|
||||||
|
address: &graph_address,
|
||||||
|
graph_uri: &graph_uri,
|
||||||
|
observed_at: &observed_at,
|
||||||
|
exists: false,
|
||||||
|
manifest_version: None,
|
||||||
|
schema_digest: None,
|
||||||
|
desired_schema_digest: &graph.schema_digest,
|
||||||
|
schema_matches_desired: Some(false),
|
||||||
|
error: Some("derived graph root is missing"),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&graph_address,
|
||||||
|
ResourceLifecycleStatus::Drifted,
|
||||||
|
"graph_missing",
|
||||||
|
"derived graph root is missing",
|
||||||
|
);
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&schema_address,
|
||||||
|
ResourceLifecycleStatus::Drifted,
|
||||||
|
"graph_missing",
|
||||||
|
"derived graph root is missing",
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match observe_live_graph(&graph_uri).await {
|
||||||
|
Ok(observation) => {
|
||||||
|
let schema_matches = observation.schema_digest == graph.schema_digest;
|
||||||
|
state.applied_revision.resources.insert(
|
||||||
|
schema_address.clone(),
|
||||||
|
StateResource {
|
||||||
|
digest: observation.schema_digest.clone(),
|
||||||
|
applies_to: None,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let query_digests = state_query_digests_for_graph(state, &graph.id);
|
||||||
|
let graph_digest_value = graph_digest(
|
||||||
|
&graph.id,
|
||||||
|
Some(&observation.schema_digest),
|
||||||
|
Some(&query_digests),
|
||||||
|
);
|
||||||
|
state.applied_revision.resources.insert(
|
||||||
|
graph_address.clone(),
|
||||||
|
StateResource {
|
||||||
|
digest: graph_digest_value,
|
||||||
|
applies_to: None,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
state.observations.insert(
|
||||||
|
graph_address.clone(),
|
||||||
|
graph_observation_json(GraphObservationJson {
|
||||||
|
address: &graph_address,
|
||||||
|
graph_uri: &graph_uri,
|
||||||
|
observed_at: &observed_at,
|
||||||
|
exists: true,
|
||||||
|
manifest_version: Some(observation.manifest_version),
|
||||||
|
schema_digest: Some(observation.schema_digest.as_str()),
|
||||||
|
desired_schema_digest: &graph.schema_digest,
|
||||||
|
schema_matches_desired: Some(schema_matches),
|
||||||
|
error: None,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
if schema_matches {
|
||||||
|
set_resource_status_applied(state, &graph_address);
|
||||||
|
set_resource_status_applied(state, &schema_address);
|
||||||
|
} else {
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&graph_address,
|
||||||
|
ResourceLifecycleStatus::Drifted,
|
||||||
|
"schema_mismatch",
|
||||||
|
"live schema digest differs from desired schema digest",
|
||||||
|
);
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&schema_address,
|
||||||
|
ResourceLifecycleStatus::Drifted,
|
||||||
|
"schema_mismatch",
|
||||||
|
"live schema digest differs from desired schema digest",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
graph_error_count += 1;
|
||||||
|
state.observations.insert(
|
||||||
|
graph_address.clone(),
|
||||||
|
graph_observation_json(GraphObservationJson {
|
||||||
|
address: &graph_address,
|
||||||
|
graph_uri: &graph_uri,
|
||||||
|
observed_at: &observed_at,
|
||||||
|
exists: true,
|
||||||
|
manifest_version: None,
|
||||||
|
schema_digest: None,
|
||||||
|
desired_schema_digest: &graph.schema_digest,
|
||||||
|
schema_matches_desired: None,
|
||||||
|
error: Some(error.as_str()),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&graph_address,
|
||||||
|
ResourceLifecycleStatus::Error,
|
||||||
|
"graph_observation_error",
|
||||||
|
error.as_str(),
|
||||||
|
);
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&schema_address,
|
||||||
|
ResourceLifecycleStatus::Error,
|
||||||
|
"graph_observation_error",
|
||||||
|
error.as_str(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
graph_error_count
|
||||||
|
}
|
||||||
|
|
||||||
|
/// RFC-004 §D7: the data-aware preview — the engine's migration plan for a
|
||||||
|
/// desired schema against the live graph, computed read-only (no lock).
|
||||||
|
pub(crate) async fn preview_schema_migration(
|
||||||
|
graph_uri: &str,
|
||||||
|
schema_path: &str,
|
||||||
|
) -> Result<SchemaMigrationPlan, String> {
|
||||||
|
let source = fs::read_to_string(schema_path).map_err(|err| err.to_string())?;
|
||||||
|
let db = Omnigraph::open_read_only(graph_uri)
|
||||||
|
.await
|
||||||
|
.map_err(|err| err.to_string())?;
|
||||||
|
let preview = db
|
||||||
|
.preview_schema_apply_with_options(&source, SchemaApplyOptions::default())
|
||||||
|
.await
|
||||||
|
.map_err(|err| err.to_string())?;
|
||||||
|
Ok(preview.plan)
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LiveGraphObservation {
|
||||||
|
manifest_version: u64,
|
||||||
|
schema_digest: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn observe_live_graph(graph_uri: &str) -> Result<LiveGraphObservation, String> {
|
||||||
|
let db = Omnigraph::open_read_only(graph_uri)
|
||||||
|
.await
|
||||||
|
.map_err(|err| err.to_string())?;
|
||||||
|
let snapshot = db
|
||||||
|
.snapshot_of(ReadTarget::branch("main"))
|
||||||
|
.await
|
||||||
|
.map_err(|err| err.to_string())?;
|
||||||
|
let schema_source = db.schema_source();
|
||||||
|
Ok(LiveGraphObservation {
|
||||||
|
manifest_version: snapshot.version(),
|
||||||
|
schema_digest: sha256_hex(schema_source.as_bytes()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
struct GraphObservationJson<'a> {
|
||||||
|
address: &'a str,
|
||||||
|
graph_uri: &'a str,
|
||||||
|
observed_at: &'a str,
|
||||||
|
exists: bool,
|
||||||
|
manifest_version: Option<u64>,
|
||||||
|
schema_digest: Option<&'a str>,
|
||||||
|
desired_schema_digest: &'a str,
|
||||||
|
schema_matches_desired: Option<bool>,
|
||||||
|
error: Option<&'a str>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn graph_observation_json(observation: GraphObservationJson<'_>) -> serde_json::Value {
|
||||||
|
json!({
|
||||||
|
"kind": "graph",
|
||||||
|
"address": observation.address,
|
||||||
|
"graph_uri": observation.graph_uri,
|
||||||
|
"observed_at": observation.observed_at,
|
||||||
|
"exists": observation.exists,
|
||||||
|
"manifest_version": observation.manifest_version,
|
||||||
|
"schema_digest": observation.schema_digest,
|
||||||
|
"desired_schema_digest": observation.desired_schema_digest,
|
||||||
|
"schema_matches_desired": observation.schema_matches_desired,
|
||||||
|
"error": observation.error,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
|
||||||
|
let parsed = parse_cluster_config(config_dir);
|
||||||
|
let config_dir = parsed.config_dir;
|
||||||
|
let config_file = parsed.config_file;
|
||||||
|
let mut diagnostics = parsed.diagnostics;
|
||||||
|
let Some(raw) = parsed.raw else {
|
||||||
|
return LoadOutcome {
|
||||||
|
desired: None,
|
||||||
|
diagnostics,
|
||||||
|
config_dir,
|
||||||
|
config_file,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
let settings = validate_cluster_header(&raw, &mut diagnostics);
|
||||||
|
|
||||||
|
let mut resources = BTreeMap::new();
|
||||||
|
let mut dependencies = BTreeSet::new();
|
||||||
|
let mut graph_query_digests: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
|
||||||
|
let mut graph_schema_digests: BTreeMap<String, String> = BTreeMap::new();
|
||||||
|
|
||||||
|
for (graph_id, graph) in &raw.graphs {
|
||||||
|
validate_id(
|
||||||
|
"graph id",
|
||||||
|
&format!("graphs.{graph_id}"),
|
||||||
|
graph_id,
|
||||||
|
&mut diagnostics,
|
||||||
|
);
|
||||||
|
let graph_address = graph_address(graph_id);
|
||||||
|
let schema_address = schema_address(graph_id);
|
||||||
|
dependencies.insert(Dependency {
|
||||||
|
from: schema_address.clone(),
|
||||||
|
to: graph_address.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let schema_path = resolve_config_path(&config_dir, &graph.schema);
|
||||||
|
let schema_source = match fs::read_to_string(&schema_path) {
|
||||||
|
Ok(source) => {
|
||||||
|
let digest = sha256_hex(source.as_bytes());
|
||||||
|
graph_schema_digests.insert(graph_id.clone(), digest.clone());
|
||||||
|
resources.insert(
|
||||||
|
schema_address.clone(),
|
||||||
|
ResourceSummary {
|
||||||
|
address: schema_address.clone(),
|
||||||
|
kind: "schema".to_string(),
|
||||||
|
digest,
|
||||||
|
path: Some(display_path(&schema_path)),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
Some(source)
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"schema_file_missing",
|
||||||
|
format!("graphs.{graph_id}.schema"),
|
||||||
|
format!(
|
||||||
|
"could not read schema file '{}': {err}",
|
||||||
|
schema_path.display()
|
||||||
|
),
|
||||||
|
));
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let catalog = schema_source.and_then(|source| match parse_schema(&source) {
|
||||||
|
Ok(schema) => match build_catalog(&schema) {
|
||||||
|
Ok(catalog) => Some(catalog),
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"schema_catalog_error",
|
||||||
|
format!("graphs.{graph_id}.schema"),
|
||||||
|
err.to_string(),
|
||||||
|
));
|
||||||
|
None
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"schema_parse_error",
|
||||||
|
format!("graphs.{graph_id}.schema"),
|
||||||
|
err.to_string(),
|
||||||
|
));
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let (graph_queries, query_contents) =
|
||||||
|
resolve_query_decls(&config_dir, graph_id, &graph.queries, &mut diagnostics);
|
||||||
|
for (query_name, query) in &graph_queries {
|
||||||
|
validate_id(
|
||||||
|
"query name",
|
||||||
|
&format!("graphs.{graph_id}.queries.{query_name}"),
|
||||||
|
query_name,
|
||||||
|
&mut diagnostics,
|
||||||
|
);
|
||||||
|
let query_address = query_address(graph_id, query_name);
|
||||||
|
dependencies.insert(Dependency {
|
||||||
|
from: query_address.clone(),
|
||||||
|
to: graph_address.clone(),
|
||||||
|
});
|
||||||
|
dependencies.insert(Dependency {
|
||||||
|
from: query_address.clone(),
|
||||||
|
to: schema_address.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let query_path = resolve_config_path(&config_dir, &query.file);
|
||||||
|
let source = match query_contents.get(&query.file) {
|
||||||
|
Some(cached) => Ok(cached.clone()),
|
||||||
|
None => fs::read_to_string(&query_path),
|
||||||
|
};
|
||||||
|
match source {
|
||||||
|
Ok(source) => {
|
||||||
|
let digest = sha256_hex(source.as_bytes());
|
||||||
|
graph_query_digests
|
||||||
|
.entry(graph_id.clone())
|
||||||
|
.or_default()
|
||||||
|
.insert(query_name.clone(), digest.clone());
|
||||||
|
resources.insert(
|
||||||
|
query_address.clone(),
|
||||||
|
ResourceSummary {
|
||||||
|
address: query_address,
|
||||||
|
kind: "query".to_string(),
|
||||||
|
digest,
|
||||||
|
path: Some(display_path(&query_path)),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
validate_query_source(
|
||||||
|
graph_id,
|
||||||
|
query_name,
|
||||||
|
&source,
|
||||||
|
catalog.as_ref(),
|
||||||
|
&mut diagnostics,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(err) => diagnostics.push(Diagnostic::error(
|
||||||
|
"query_file_missing",
|
||||||
|
format!("graphs.{graph_id}.queries.{query_name}.file"),
|
||||||
|
format!(
|
||||||
|
"could not read query file '{}': {err}",
|
||||||
|
query_path.display()
|
||||||
|
),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for graph_id in raw.graphs.keys() {
|
||||||
|
let digest = graph_digest(
|
||||||
|
graph_id,
|
||||||
|
graph_schema_digests.get(graph_id),
|
||||||
|
graph_query_digests.get(graph_id),
|
||||||
|
);
|
||||||
|
resources.insert(
|
||||||
|
graph_address(graph_id),
|
||||||
|
ResourceSummary {
|
||||||
|
address: graph_address(graph_id),
|
||||||
|
kind: "graph".to_string(),
|
||||||
|
digest,
|
||||||
|
path: None,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut policy_bindings: BTreeMap<String, Vec<String>> = BTreeMap::new();
|
||||||
|
for (policy_name, policy) in &raw.policies {
|
||||||
|
validate_id(
|
||||||
|
"policy name",
|
||||||
|
&format!("policies.{policy_name}"),
|
||||||
|
policy_name,
|
||||||
|
&mut diagnostics,
|
||||||
|
);
|
||||||
|
if policy.applies_to.is_empty() {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"policy_missing_applies_to",
|
||||||
|
format!("policies.{policy_name}.applies_to"),
|
||||||
|
"policy.applies_to must name `cluster` or at least one graph",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let policy_address = policy_address(policy_name);
|
||||||
|
let mut normalized_bindings: Vec<String> = Vec::new();
|
||||||
|
for (idx, target) in policy.applies_to.iter().enumerate() {
|
||||||
|
match normalize_policy_target(target) {
|
||||||
|
PolicyTarget::Cluster => {
|
||||||
|
normalized_bindings.push("cluster".to_string());
|
||||||
|
}
|
||||||
|
PolicyTarget::Graph(graph_id) => {
|
||||||
|
normalized_bindings.push(graph_address(&graph_id));
|
||||||
|
if raw.graphs.contains_key(&graph_id) {
|
||||||
|
dependencies.insert(Dependency {
|
||||||
|
from: policy_address.clone(),
|
||||||
|
to: graph_address(&graph_id),
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"dangling_graph_reference",
|
||||||
|
format!("policies.{policy_name}.applies_to[{idx}]"),
|
||||||
|
format!(
|
||||||
|
"policy references graph `{graph_id}`, but no graph with that id is declared"
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PolicyTarget::WrongKind(kind) => diagnostics.push(Diagnostic::error(
|
||||||
|
"wrong_kind_reference",
|
||||||
|
format!("policies.{policy_name}.applies_to[{idx}]"),
|
||||||
|
format!("policy applies_to expects graph refs or `cluster`, got `{kind}`"),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
normalized_bindings.sort();
|
||||||
|
normalized_bindings.dedup();
|
||||||
|
policy_bindings.insert(policy_address.clone(), normalized_bindings);
|
||||||
|
|
||||||
|
let policy_path = resolve_config_path(&config_dir, &policy.file);
|
||||||
|
match fs::read(&policy_path) {
|
||||||
|
Ok(bytes) => {
|
||||||
|
resources.insert(
|
||||||
|
policy_address.clone(),
|
||||||
|
ResourceSummary {
|
||||||
|
address: policy_address,
|
||||||
|
kind: "policy".to_string(),
|
||||||
|
digest: sha256_hex(&bytes),
|
||||||
|
path: Some(display_path(&policy_path)),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(err) => diagnostics.push(Diagnostic::error(
|
||||||
|
"policy_file_missing",
|
||||||
|
format!("policies.{policy_name}.file"),
|
||||||
|
format!(
|
||||||
|
"could not read policy file '{}': {err}",
|
||||||
|
policy_path.display()
|
||||||
|
),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut resource_digests = BTreeMap::new();
|
||||||
|
let mut resource_list = Vec::new();
|
||||||
|
for (address, resource) in resources {
|
||||||
|
resource_digests.insert(address, resource.digest.clone());
|
||||||
|
resource_list.push(resource);
|
||||||
|
}
|
||||||
|
let dependencies: Vec<_> = dependencies.into_iter().collect();
|
||||||
|
let graphs = raw
|
||||||
|
.graphs
|
||||||
|
.keys()
|
||||||
|
.map(|graph_id| DesiredGraph {
|
||||||
|
id: graph_id.clone(),
|
||||||
|
schema_digest: graph_schema_digests
|
||||||
|
.get(graph_id)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let config_digest = desired_config_digest(&raw, &resource_digests);
|
||||||
|
|
||||||
|
LoadOutcome {
|
||||||
|
desired: Some(DesiredCluster {
|
||||||
|
config_dir: config_dir.clone(),
|
||||||
|
config_digest,
|
||||||
|
state_lock: settings.state_lock,
|
||||||
|
graphs,
|
||||||
|
resource_digests,
|
||||||
|
resources: resource_list,
|
||||||
|
dependencies,
|
||||||
|
policy_bindings,
|
||||||
|
}),
|
||||||
|
diagnostics,
|
||||||
|
config_dir,
|
||||||
|
config_file,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn validate_query_source(
|
||||||
|
graph_id: &str,
|
||||||
|
query_name: &str,
|
||||||
|
source: &str,
|
||||||
|
catalog: Option<&omnigraph_compiler::catalog::Catalog>,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
) {
|
||||||
|
let path = format!("graphs.{graph_id}.queries.{query_name}");
|
||||||
|
match parse_query(source) {
|
||||||
|
Ok(query_file) => {
|
||||||
|
let Some(query_decl) = query_file.queries.iter().find(|q| q.name == query_name) else {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"query_key_mismatch",
|
||||||
|
path,
|
||||||
|
format!("no `query {query_name}` declaration found in the referenced .gq file"),
|
||||||
|
));
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if let Some(catalog) = catalog {
|
||||||
|
if let Err(err) = typecheck_query_decl(catalog, query_decl) {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"query_typecheck_error",
|
||||||
|
format!("graphs.{graph_id}.queries.{query_name}"),
|
||||||
|
err.to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"query_typecheck_skipped",
|
||||||
|
format!("graphs.{graph_id}.queries.{query_name}"),
|
||||||
|
"query parsed, but type-check was skipped because the graph schema is invalid",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => diagnostics.push(Diagnostic::error(
|
||||||
|
"query_parse_error",
|
||||||
|
path,
|
||||||
|
err.to_string(),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn future_field_diagnostics(text: &str) -> Vec<Diagnostic> {
|
||||||
|
let Ok(value) = serde_yaml::from_str::<serde_yaml::Value>(text) else {
|
||||||
|
return Vec::new();
|
||||||
|
};
|
||||||
|
let Some(mapping) = value.as_mapping() else {
|
||||||
|
return Vec::new();
|
||||||
|
};
|
||||||
|
let future_fields = [
|
||||||
|
"apply",
|
||||||
|
"env_file",
|
||||||
|
"providers",
|
||||||
|
"pipelines",
|
||||||
|
"embeddings",
|
||||||
|
"ui",
|
||||||
|
"aliases",
|
||||||
|
"bindings",
|
||||||
|
];
|
||||||
|
mapping
|
||||||
|
.keys()
|
||||||
|
.filter_map(|key| key.as_str())
|
||||||
|
.filter(|key| future_fields.contains(key))
|
||||||
|
.map(|key| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"future_phase_field",
|
||||||
|
key,
|
||||||
|
format!("`{key}` is reserved for a later cluster-control phase"),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn validate_id(kind: &str, path: &str, value: &str, diagnostics: &mut Vec<Diagnostic>) {
|
||||||
|
let mut chars = value.chars();
|
||||||
|
let valid = chars
|
||||||
|
.next()
|
||||||
|
.is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_')
|
||||||
|
&& chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-');
|
||||||
|
if !valid {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"invalid_resource_id",
|
||||||
|
path,
|
||||||
|
format!("{kind} `{value}` must start with a letter or `_` and contain only ASCII letters, digits, `_`, or `-`"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum PolicyTarget {
|
||||||
|
Cluster,
|
||||||
|
Graph(String),
|
||||||
|
WrongKind(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn normalize_policy_target(value: &str) -> PolicyTarget {
|
||||||
|
if value == "cluster" {
|
||||||
|
PolicyTarget::Cluster
|
||||||
|
} else if let Some(graph_id) = value.strip_prefix("graph.") {
|
||||||
|
PolicyTarget::Graph(graph_id.to_string())
|
||||||
|
} else if value.contains('.') {
|
||||||
|
PolicyTarget::WrongKind(value.to_string())
|
||||||
|
} else {
|
||||||
|
PolicyTarget::Graph(value.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn graph_address(graph_id: &str) -> String {
|
||||||
|
format!("graph.{graph_id}")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn schema_address(graph_id: &str) -> String {
|
||||||
|
format!("schema.{graph_id}")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn query_address(graph_id: &str, query_name: &str) -> String {
|
||||||
|
format!("query.{graph_id}.{query_name}")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn policy_address(policy_name: &str) -> String {
|
||||||
|
format!("policy.{policy_name}")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn resolve_config_path(config_dir: &Path, path: &Path) -> PathBuf {
|
||||||
|
if path.is_absolute() {
|
||||||
|
path.to_path_buf()
|
||||||
|
} else {
|
||||||
|
config_dir.join(path)
|
||||||
|
}
|
||||||
|
}
|
||||||
420
crates/omnigraph-cluster/src/diff.rs
Normal file
420
crates/omnigraph-cluster/src/diff.rs
Normal file
|
|
@ -0,0 +1,420 @@
|
||||||
|
//! Plan/apply classification: resource diffing, dispositions, approval
|
||||||
|
//! gating, demotion (moved verbatim from lib.rs in the modularization).
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
pub(crate) fn diff_resources(
|
||||||
|
prior: &BTreeMap<String, String>,
|
||||||
|
desired: &BTreeMap<String, String>,
|
||||||
|
) -> Vec<PlanChange> {
|
||||||
|
let mut changes = Vec::new();
|
||||||
|
for (address, after) in desired {
|
||||||
|
match prior.get(address) {
|
||||||
|
None => changes.push(PlanChange {
|
||||||
|
resource: address.clone(),
|
||||||
|
operation: PlanOperation::Create,
|
||||||
|
before_digest: None,
|
||||||
|
after_digest: Some(after.clone()),
|
||||||
|
disposition: None,
|
||||||
|
reason: None,
|
||||||
|
binding_change: false,
|
||||||
|
migration: None,
|
||||||
|
}),
|
||||||
|
Some(before) if before != after => changes.push(PlanChange {
|
||||||
|
resource: address.clone(),
|
||||||
|
operation: PlanOperation::Update,
|
||||||
|
before_digest: Some(before.clone()),
|
||||||
|
after_digest: Some(after.clone()),
|
||||||
|
disposition: None,
|
||||||
|
reason: None,
|
||||||
|
binding_change: false,
|
||||||
|
migration: None,
|
||||||
|
}),
|
||||||
|
Some(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (address, before) in prior {
|
||||||
|
if !desired.contains_key(address) {
|
||||||
|
changes.push(PlanChange {
|
||||||
|
resource: address.clone(),
|
||||||
|
operation: PlanOperation::Delete,
|
||||||
|
before_digest: Some(before.clone()),
|
||||||
|
after_digest: None,
|
||||||
|
disposition: None,
|
||||||
|
reason: None,
|
||||||
|
binding_change: false,
|
||||||
|
migration: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
changes.sort_by(|a, b| a.resource.cmp(&b.resource));
|
||||||
|
changes
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Binding-only policy changes: the file digest is unchanged (so
|
||||||
|
/// `diff_resources` saw nothing) but the applied `applies_to` differs from
|
||||||
|
/// the desired bindings — including the pre-5A case where the state entry
|
||||||
|
/// has no bindings recorded yet. These are first-class plan changes: without
|
||||||
|
/// this pass a binding edit would silently rot or silently converge.
|
||||||
|
pub(crate) fn append_policy_binding_changes(
|
||||||
|
changes: &mut Vec<PlanChange>,
|
||||||
|
prior_state: Option<&ClusterState>,
|
||||||
|
desired: &DesiredCluster,
|
||||||
|
) {
|
||||||
|
let Some(state) = prior_state else {
|
||||||
|
return; // no state: everything is already a Create carrying bindings
|
||||||
|
};
|
||||||
|
for (address, desired_bindings) in &desired.policy_bindings {
|
||||||
|
if changes.iter().any(|change| &change.resource == address) {
|
||||||
|
continue; // content change already covers it
|
||||||
|
}
|
||||||
|
let Some(entry) = state.applied_revision.resources.get(address) else {
|
||||||
|
continue; // not applied yet: the Create covers it
|
||||||
|
};
|
||||||
|
if entry.applies_to.as_ref() == Some(desired_bindings) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
changes.push(PlanChange {
|
||||||
|
resource: address.clone(),
|
||||||
|
operation: PlanOperation::Update,
|
||||||
|
before_digest: Some(entry.digest.clone()),
|
||||||
|
after_digest: Some(entry.digest.clone()),
|
||||||
|
disposition: None,
|
||||||
|
reason: None,
|
||||||
|
binding_change: true,
|
||||||
|
migration: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
changes.sort_by(|a, b| a.resource.cmp(&b.resource));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn compute_blast_radius(
|
||||||
|
changes: &[PlanChange],
|
||||||
|
dependencies: &[Dependency],
|
||||||
|
) -> Vec<BlastRadius> {
|
||||||
|
changes
|
||||||
|
.iter()
|
||||||
|
.filter_map(|change| {
|
||||||
|
let affected: Vec<_> = dependencies
|
||||||
|
.iter()
|
||||||
|
.filter_map(|dep| (dep.to == change.resource).then_some(dep.from.clone()))
|
||||||
|
.collect();
|
||||||
|
(!affected.is_empty()).then(|| BlastRadius {
|
||||||
|
resource: change.resource.clone(),
|
||||||
|
affected,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn compute_approvals(
|
||||||
|
changes: &[PlanChange],
|
||||||
|
approved: &BTreeSet<String>,
|
||||||
|
) -> Vec<ApprovalRequirement> {
|
||||||
|
// One gate per subtree: the graph.<id> delete carries its schema and
|
||||||
|
// queries, so a schema delete whose graph is also deleted is not listed.
|
||||||
|
let graph_deletes: BTreeSet<String> = changes
|
||||||
|
.iter()
|
||||||
|
.filter(|change| change.operation == PlanOperation::Delete)
|
||||||
|
.filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
|
||||||
|
.collect();
|
||||||
|
changes
|
||||||
|
.iter()
|
||||||
|
.filter_map(|change| {
|
||||||
|
if change.operation != PlanOperation::Delete {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let gated = match resource_kind(&change.resource) {
|
||||||
|
ResourceKind::Graph(_) => true,
|
||||||
|
ResourceKind::Schema(graph) => !graph_deletes.contains(&graph),
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
gated.then(|| ApprovalRequirement {
|
||||||
|
resource: change.resource.clone(),
|
||||||
|
reason: "delete may remove deployed graph or schema definition".to_string(),
|
||||||
|
satisfied: approved.contains(&change.resource),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resources with a valid (digest-matching, unconsumed) pending approval.
|
||||||
|
/// Near-misses — an artifact for the same resource whose bound digests no
|
||||||
|
/// longer match — warn as `approval_stale` and never authorize anything.
|
||||||
|
pub(crate) fn approved_resources(
|
||||||
|
artifacts: &[(PathBuf, ApprovalArtifact)],
|
||||||
|
changes: &[PlanChange],
|
||||||
|
config_digest: &str,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
) -> BTreeSet<String> {
|
||||||
|
let mut approved = BTreeSet::new();
|
||||||
|
for change in changes {
|
||||||
|
let candidates: Vec<&ApprovalArtifact> = artifacts
|
||||||
|
.iter()
|
||||||
|
.map(|(_, artifact)| artifact)
|
||||||
|
.filter(|artifact| artifact.consumed_at.is_none() && artifact.resource == change.resource)
|
||||||
|
.collect();
|
||||||
|
if candidates.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let matched = candidates.iter().any(|artifact| {
|
||||||
|
artifact.bound_config_digest == config_digest
|
||||||
|
&& artifact.bound_before_digest == change.before_digest
|
||||||
|
&& artifact.bound_after_digest == change.after_digest
|
||||||
|
});
|
||||||
|
if matched {
|
||||||
|
approved.insert(change.resource.clone());
|
||||||
|
} else {
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"approval_stale",
|
||||||
|
change.resource.clone(),
|
||||||
|
"an approval artifact exists but its bound digests no longer match the plan; re-run `cluster approve`",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
approved
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
pub(crate) enum ResourceKind {
|
||||||
|
Graph(String),
|
||||||
|
Schema(String),
|
||||||
|
Query { graph: String, name: String },
|
||||||
|
Policy(String),
|
||||||
|
Unknown,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn resource_kind(address: &str) -> ResourceKind {
|
||||||
|
if let Some(graph) = address.strip_prefix("graph.") {
|
||||||
|
ResourceKind::Graph(graph.to_string())
|
||||||
|
} else if let Some(graph) = address.strip_prefix("schema.") {
|
||||||
|
ResourceKind::Schema(graph.to_string())
|
||||||
|
} else if let Some(rest) = address.strip_prefix("query.") {
|
||||||
|
match rest.split_once('.') {
|
||||||
|
Some((graph, name)) => ResourceKind::Query {
|
||||||
|
graph: graph.to_string(),
|
||||||
|
name: name.to_string(),
|
||||||
|
},
|
||||||
|
None => ResourceKind::Unknown,
|
||||||
|
}
|
||||||
|
} else if let Some(name) = address.strip_prefix("policy.") {
|
||||||
|
ResourceKind::Policy(name.to_string())
|
||||||
|
} else {
|
||||||
|
ResourceKind::Unknown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Classify every planned change with the disposition config-only apply gives
|
||||||
|
/// it. Stage 3A executes only query/policy catalog writes; graph/schema
|
||||||
|
/// movement is a later phase, and `graph.<id>` composite updates whose schema
|
||||||
|
/// component is unchanged converge automatically once query digests land.
|
||||||
|
pub(crate) fn classify_changes(
|
||||||
|
changes: &mut [PlanChange],
|
||||||
|
dependencies: &[Dependency],
|
||||||
|
pending_recovery: &BTreeSet<String>,
|
||||||
|
approved: &BTreeSet<String>,
|
||||||
|
) {
|
||||||
|
let mut schema_creates = BTreeSet::new();
|
||||||
|
let mut schema_pending = BTreeSet::new();
|
||||||
|
let mut graph_creates = BTreeSet::new();
|
||||||
|
let mut graph_deletes = BTreeSet::new();
|
||||||
|
for change in changes.iter() {
|
||||||
|
match resource_kind(&change.resource) {
|
||||||
|
ResourceKind::Schema(graph) => match change.operation {
|
||||||
|
PlanOperation::Create => {
|
||||||
|
schema_creates.insert(graph);
|
||||||
|
}
|
||||||
|
// Schema updates execute in-run before catalog writes (4B)
|
||||||
|
// and no longer block dependents; deletes (4C) still do.
|
||||||
|
PlanOperation::Update => {}
|
||||||
|
PlanOperation::Delete => {
|
||||||
|
schema_pending.insert(graph);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ResourceKind::Graph(graph) => match change.operation {
|
||||||
|
PlanOperation::Create => {
|
||||||
|
graph_creates.insert(graph);
|
||||||
|
}
|
||||||
|
PlanOperation::Delete => {
|
||||||
|
graph_deletes.insert(graph);
|
||||||
|
}
|
||||||
|
PlanOperation::Update => {}
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// A schema Create is satisfied by its paired graph create (the init
|
||||||
|
// carries the schema); a standalone schema Create stays pending.
|
||||||
|
for graph in &schema_creates {
|
||||||
|
if !graph_creates.contains(graph) {
|
||||||
|
schema_pending.insert(graph.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Subtree deletes ride the approved graph delete.
|
||||||
|
let rides_approved_delete = |graph: &str| {
|
||||||
|
graph_deletes.contains(graph)
|
||||||
|
&& approved.contains(&graph_address(graph))
|
||||||
|
&& !pending_recovery.contains(graph)
|
||||||
|
};
|
||||||
|
|
||||||
|
for change in changes.iter_mut() {
|
||||||
|
let (disposition, reason) = match resource_kind(&change.resource) {
|
||||||
|
ResourceKind::Schema(graph) => match change.operation {
|
||||||
|
PlanOperation::Create
|
||||||
|
if graph_creates.contains(&graph)
|
||||||
|
&& !pending_recovery.contains(&graph) =>
|
||||||
|
{
|
||||||
|
// Applied with the graph create — the init carries it.
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
}
|
||||||
|
PlanOperation::Update if !pending_recovery.contains(&graph) => {
|
||||||
|
// Stage 4B: schema updates execute via the engine's
|
||||||
|
// schema apply (soft drops only; allow_data_loss is 4C).
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
}
|
||||||
|
PlanOperation::Create | PlanOperation::Update => {
|
||||||
|
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||||
|
}
|
||||||
|
PlanOperation::Delete if graph_deletes.contains(&graph) => {
|
||||||
|
if rides_approved_delete(&graph) {
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
} else if pending_recovery.contains(&graph) {
|
||||||
|
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||||
|
} else {
|
||||||
|
(ApplyDisposition::Blocked, Some("approval_required"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
|
||||||
|
},
|
||||||
|
ResourceKind::Graph(graph) => match change.operation {
|
||||||
|
PlanOperation::Create => {
|
||||||
|
if pending_recovery.contains(&graph) {
|
||||||
|
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||||
|
} else {
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PlanOperation::Update if !schema_pending.contains(&graph) => {
|
||||||
|
(ApplyDisposition::Derived, None)
|
||||||
|
}
|
||||||
|
// Stage 4C: an approved graph delete executes (the
|
||||||
|
// irreversible tier — gated by a digest-bound artifact).
|
||||||
|
PlanOperation::Delete => {
|
||||||
|
if pending_recovery.contains(&graph) {
|
||||||
|
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||||
|
} else if rides_approved_delete(&graph) {
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
} else {
|
||||||
|
(ApplyDisposition::Blocked, Some("approval_required"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
|
||||||
|
},
|
||||||
|
ResourceKind::Query { graph, .. } => match change.operation {
|
||||||
|
PlanOperation::Delete => {
|
||||||
|
if rides_approved_delete(&graph) {
|
||||||
|
// Tombstoned with the approved graph delete.
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
} else if graph_deletes.contains(&graph) {
|
||||||
|
(ApplyDisposition::Blocked, Some("approval_required"))
|
||||||
|
} else {
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PlanOperation::Create | PlanOperation::Update => {
|
||||||
|
if pending_recovery.contains(&graph) {
|
||||||
|
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||||
|
} else if schema_pending.contains(&graph) {
|
||||||
|
(
|
||||||
|
ApplyDisposition::Blocked,
|
||||||
|
Some("dependency_not_applied"),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// A graph create in the same plan no longer blocks:
|
||||||
|
// creates execute first in the same apply run.
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ResourceKind::Policy(_) => match change.operation {
|
||||||
|
PlanOperation::Delete => (ApplyDisposition::Applied, None),
|
||||||
|
PlanOperation::Create | PlanOperation::Update => {
|
||||||
|
let blocked_pending = dependencies.iter().any(|dep| {
|
||||||
|
dep.from == change.resource
|
||||||
|
&& dep
|
||||||
|
.to
|
||||||
|
.strip_prefix("graph.")
|
||||||
|
.is_some_and(|graph| pending_recovery.contains(graph))
|
||||||
|
});
|
||||||
|
if blocked_pending {
|
||||||
|
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||||
|
} else {
|
||||||
|
(ApplyDisposition::Applied, None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ResourceKind::Unknown => {
|
||||||
|
(ApplyDisposition::Deferred, Some("apply_unsupported_kind"))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
change.disposition = Some(disposition);
|
||||||
|
change.reason = reason.map(str::to_string);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub(crate) enum FailedGraphOrigin {
|
||||||
|
GraphCreate,
|
||||||
|
SchemaApply,
|
||||||
|
GraphDelete,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// After a graph-moving operation fails mid-run, every change that depended
|
||||||
|
/// on that graph flips from Applied to Blocked so the output and the
|
||||||
|
/// persisted statuses tell the truth about what this run actually executed.
|
||||||
|
/// The originating change carries the failure code; dependents carry
|
||||||
|
/// `dependency_not_applied`.
|
||||||
|
pub(crate) fn demote_dependents_of_failed_graphs(
|
||||||
|
changes: &mut [PlanChange],
|
||||||
|
failed: &BTreeMap<String, FailedGraphOrigin>,
|
||||||
|
dependencies: &[Dependency],
|
||||||
|
) {
|
||||||
|
for change in changes.iter_mut() {
|
||||||
|
if change.disposition != Some(ApplyDisposition::Applied) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let demote_reason = match resource_kind(&change.resource) {
|
||||||
|
ResourceKind::Graph(graph) => match failed.get(&graph) {
|
||||||
|
Some(FailedGraphOrigin::GraphCreate) => Some("graph_create_failed"),
|
||||||
|
Some(FailedGraphOrigin::GraphDelete) => Some("graph_delete_failed"),
|
||||||
|
Some(FailedGraphOrigin::SchemaApply) => Some("dependency_not_applied"),
|
||||||
|
None => None,
|
||||||
|
},
|
||||||
|
ResourceKind::Schema(graph) => match failed.get(&graph) {
|
||||||
|
Some(FailedGraphOrigin::SchemaApply) => Some("schema_apply_failed"),
|
||||||
|
Some(FailedGraphOrigin::GraphCreate) | Some(FailedGraphOrigin::GraphDelete) => {
|
||||||
|
Some("dependency_not_applied")
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
},
|
||||||
|
ResourceKind::Query { graph, .. } if failed.contains_key(&graph) => {
|
||||||
|
Some("dependency_not_applied")
|
||||||
|
}
|
||||||
|
ResourceKind::Policy(_) => {
|
||||||
|
let blocked = dependencies.iter().any(|dep| {
|
||||||
|
dep.from == change.resource
|
||||||
|
&& dep
|
||||||
|
.to
|
||||||
|
.strip_prefix("graph.")
|
||||||
|
.is_some_and(|graph| failed.contains_key(graph))
|
||||||
|
});
|
||||||
|
blocked.then_some("dependency_not_applied")
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
if let Some(reason) = demote_reason {
|
||||||
|
change.disposition = Some(ApplyDisposition::Blocked);
|
||||||
|
change.reason = Some(reason.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load diff
189
crates/omnigraph-cluster/src/serve.rs
Normal file
189
crates/omnigraph-cluster/src/serve.rs
Normal file
|
|
@ -0,0 +1,189 @@
|
||||||
|
//! Phase-5 serving snapshot: the read-only loader a `--cluster` server
|
||||||
|
//! boots from (moved verbatim from lib.rs in the modularization).
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// One graph in a serving snapshot: its id and on-disk root.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ServingGraph {
|
||||||
|
pub graph_id: String,
|
||||||
|
pub root: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One stored query: its graph binding, registry name, and verified source.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ServingQuery {
|
||||||
|
pub graph_id: String,
|
||||||
|
pub name: String,
|
||||||
|
pub source: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One policy bundle: its verified catalog blob path and applied bindings
|
||||||
|
/// (normalized typed refs: `cluster` | `graph.<id>`).
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ServingPolicy {
|
||||||
|
pub name: String,
|
||||||
|
pub blob_path: PathBuf,
|
||||||
|
pub applies_to: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Everything a server needs to boot from the cluster catalog (RFC-005 §D2).
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ServingSnapshot {
|
||||||
|
pub graphs: Vec<ServingGraph>,
|
||||||
|
pub queries: Vec<ServingQuery>,
|
||||||
|
pub policies: Vec<ServingPolicy>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the applied revision as a serving snapshot — the read-only loader for
|
||||||
|
/// the Phase-5 server boot. All-or-nothing per RFC-005 §D4: every readiness
|
||||||
|
/// failure is collected and the whole snapshot refused; no partial serving.
|
||||||
|
/// Takes no lock: the state file is replaced atomically, so this reads a
|
||||||
|
/// consistent point-in-time ledger.
|
||||||
|
pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnapshot, Vec<Diagnostic>> {
|
||||||
|
let config_dir = config_dir.as_ref().to_path_buf();
|
||||||
|
let backend = LocalStateBackend::new(&config_dir);
|
||||||
|
let mut diagnostics: Vec<Diagnostic> = Vec::new();
|
||||||
|
|
||||||
|
// A ledger a sweep is about to rewrite must not start serving.
|
||||||
|
let sidecars = backend.list_recovery_sidecars(&mut diagnostics);
|
||||||
|
if !sidecars.is_empty() {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"cluster_recovery_pending",
|
||||||
|
CLUSTER_RECOVERIES_DIR,
|
||||||
|
format!(
|
||||||
|
"{} interrupted operation(s) await recovery; run any state-mutating cluster command (e.g. `cluster apply`) to sweep, then retry",
|
||||||
|
sidecars.len()
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut observations = backend.observations();
|
||||||
|
let state = match backend.read_state(&mut observations) {
|
||||||
|
Ok(snapshot) => match snapshot.state {
|
||||||
|
Some(state) => Some(state),
|
||||||
|
None => {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"cluster_state_missing",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
"no cluster state ledger; run `cluster import` and `cluster apply` first",
|
||||||
|
));
|
||||||
|
None
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(diagnostic) => {
|
||||||
|
diagnostics.push(diagnostic);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let Some(state) = state else {
|
||||||
|
return Err(diagnostics);
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut graphs = Vec::new();
|
||||||
|
let mut queries = Vec::new();
|
||||||
|
let mut policies = Vec::new();
|
||||||
|
for (address, entry) in &state.applied_revision.resources {
|
||||||
|
match resource_kind(address) {
|
||||||
|
ResourceKind::Graph(graph_id) => {
|
||||||
|
graphs.push(ServingGraph {
|
||||||
|
root: config_dir
|
||||||
|
.join(CLUSTER_GRAPHS_DIR)
|
||||||
|
.join(format!("{graph_id}.omni")),
|
||||||
|
graph_id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
ResourceKind::Schema(_) => {}
|
||||||
|
kind @ ResourceKind::Query { .. } => {
|
||||||
|
let ResourceKind::Query { graph, name } = &kind else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
|
||||||
|
Ok(source) => queries.push(ServingQuery {
|
||||||
|
graph_id: graph.clone(),
|
||||||
|
name: name.clone(),
|
||||||
|
source,
|
||||||
|
}),
|
||||||
|
Err(diagnostic) => diagnostics.push(diagnostic),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
kind @ ResourceKind::Policy(_) => {
|
||||||
|
let ResourceKind::Policy(name) = &kind else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
let Some(applies_to) = entry.applies_to.clone() else {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"policy_bindings_missing",
|
||||||
|
address.clone(),
|
||||||
|
"no applied applies_to bindings recorded (ledger predates binding metadata); re-run `cluster apply` to backfill",
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
|
||||||
|
Ok(_) => policies.push(ServingPolicy {
|
||||||
|
name: name.clone(),
|
||||||
|
blob_path: payload_path(&config_dir, &kind, &entry.digest)
|
||||||
|
.expect("policy kind always has a payload path"),
|
||||||
|
applies_to,
|
||||||
|
}),
|
||||||
|
Err(diagnostic) => diagnostics.push(diagnostic),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ResourceKind::Unknown => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if graphs.is_empty() {
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"cluster_empty",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
"the applied revision records no graphs; apply a cluster with at least one graph before serving from it",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if has_errors(&diagnostics) {
|
||||||
|
return Err(diagnostics);
|
||||||
|
}
|
||||||
|
Ok(ServingSnapshot {
|
||||||
|
graphs,
|
||||||
|
queries,
|
||||||
|
policies,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a catalog blob and verify it against the recorded digest.
|
||||||
|
pub(crate) fn read_verified_payload(
|
||||||
|
config_dir: &Path,
|
||||||
|
kind: &ResourceKind,
|
||||||
|
digest: &str,
|
||||||
|
address: &str,
|
||||||
|
) -> Result<String, Diagnostic> {
|
||||||
|
let path = payload_path(config_dir, kind, digest)
|
||||||
|
.expect("query/policy kinds always have a payload path");
|
||||||
|
let bytes = fs::read(&path).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"catalog_payload_missing",
|
||||||
|
address,
|
||||||
|
format!(
|
||||||
|
"catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart",
|
||||||
|
display_path(&path)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
if sha256_hex(&bytes) != digest {
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"catalog_payload_digest_mismatch",
|
||||||
|
address,
|
||||||
|
format!(
|
||||||
|
"catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart",
|
||||||
|
display_path(&path)
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
String::from_utf8(bytes).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"catalog_payload_invalid",
|
||||||
|
address,
|
||||||
|
format!("catalog blob is not valid UTF-8: {err}"),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
588
crates/omnigraph-cluster/src/store.rs
Normal file
588
crates/omnigraph-cluster/src/store.rs
Normal file
|
|
@ -0,0 +1,588 @@
|
||||||
|
//! The cluster's storage backend: state ledger, lock, recovery
|
||||||
|
//! sidecars, approval artifacts (moved verbatim from lib.rs in the
|
||||||
|
//! modularization). The object-storage port (RFC-006) lands here as a
|
||||||
|
//! follow-up — this module is the single home for stored-state I/O.
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct LocalStateBackend {
|
||||||
|
state_dir: PathBuf,
|
||||||
|
state_path: PathBuf,
|
||||||
|
lock_path: PathBuf,
|
||||||
|
recoveries_dir: PathBuf,
|
||||||
|
approvals_dir: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct StateSnapshot {
|
||||||
|
pub(crate) state: Option<ClusterState>,
|
||||||
|
pub(crate) state_cas: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct StateLockGuard {
|
||||||
|
path: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LocalStateBackend {
|
||||||
|
pub(crate) fn new(config_dir: &Path) -> Self {
|
||||||
|
let state_dir = config_dir.join(CLUSTER_STATE_DIR);
|
||||||
|
Self {
|
||||||
|
state_path: config_dir.join(CLUSTER_STATE_FILE),
|
||||||
|
lock_path: config_dir.join(CLUSTER_LOCK_FILE),
|
||||||
|
recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR),
|
||||||
|
approvals_dir: config_dir.join(CLUSTER_APPROVALS_DIR),
|
||||||
|
state_dir,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List approval artifacts in ULID (filename) order; unparseable files
|
||||||
|
/// warn and stay on disk for the operator.
|
||||||
|
pub(crate) fn list_approval_artifacts(
|
||||||
|
&self,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
) -> Vec<(PathBuf, ApprovalArtifact)> {
|
||||||
|
let mut paths = Vec::new();
|
||||||
|
match fs::read_dir(&self.approvals_dir) {
|
||||||
|
Ok(entries) => {
|
||||||
|
for entry in entries.flatten() {
|
||||||
|
let path = entry.path();
|
||||||
|
if path.extension().is_some_and(|ext| ext == "json") {
|
||||||
|
paths.push(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||||
|
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||||
|
"approval_read_error",
|
||||||
|
CLUSTER_APPROVALS_DIR,
|
||||||
|
format!("could not list approval artifacts: {err}"),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
paths.sort();
|
||||||
|
let mut artifacts = Vec::new();
|
||||||
|
for path in paths {
|
||||||
|
match fs::read_to_string(&path)
|
||||||
|
.map_err(|err| err.to_string())
|
||||||
|
.and_then(|text| {
|
||||||
|
serde_json::from_str::<ApprovalArtifact>(&text).map_err(|err| err.to_string())
|
||||||
|
}) {
|
||||||
|
Ok(artifact) if artifact.schema_version == 1 => artifacts.push((path, artifact)),
|
||||||
|
Ok(artifact) => diagnostics.push(Diagnostic::warning(
|
||||||
|
"unsupported_approval_version",
|
||||||
|
display_path(&path),
|
||||||
|
format!(
|
||||||
|
"unsupported approval artifact version {}; leaving it in place",
|
||||||
|
artifact.schema_version
|
||||||
|
),
|
||||||
|
)),
|
||||||
|
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||||
|
"invalid_approval_artifact",
|
||||||
|
display_path(&path),
|
||||||
|
format!("could not parse approval artifact ({err}); leaving it in place"),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
artifacts
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atomically write (or rewrite, e.g. on consumption) an approval artifact.
|
||||||
|
pub(crate) fn write_approval_artifact(&self, artifact: &ApprovalArtifact) -> Result<PathBuf, Diagnostic> {
|
||||||
|
fs::create_dir_all(&self.approvals_dir).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"approval_write_error",
|
||||||
|
CLUSTER_APPROVALS_DIR,
|
||||||
|
format!("could not create approvals directory: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let target = self
|
||||||
|
.approvals_dir
|
||||||
|
.join(format!("{}.json", artifact.approval_id));
|
||||||
|
let mut payload = serde_json::to_string_pretty(artifact).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"approval_write_error",
|
||||||
|
display_path(&target),
|
||||||
|
format!("could not encode approval artifact: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
payload.push('\n');
|
||||||
|
let tmp_path = self
|
||||||
|
.approvals_dir
|
||||||
|
.join(format!("{}.json.tmp.{}", artifact.approval_id, Ulid::new()));
|
||||||
|
fs::write(&tmp_path, payload.as_bytes()).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"approval_write_error",
|
||||||
|
display_path(&tmp_path),
|
||||||
|
format!("could not write approval artifact: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
if let Err(err) = fs::rename(&tmp_path, &target) {
|
||||||
|
let _ = fs::remove_file(&tmp_path);
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"approval_write_error",
|
||||||
|
display_path(&target),
|
||||||
|
format!("could not move approval artifact into place: {err}"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(target)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List recovery sidecars in ULID (filename) order. Unparseable files are
|
||||||
|
/// reported as warnings and skipped — they stay on disk for the operator.
|
||||||
|
pub(crate) fn list_recovery_sidecars(
|
||||||
|
&self,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
) -> Vec<(PathBuf, RecoverySidecar)> {
|
||||||
|
let mut paths = Vec::new();
|
||||||
|
match fs::read_dir(&self.recoveries_dir) {
|
||||||
|
Ok(entries) => {
|
||||||
|
for entry in entries.flatten() {
|
||||||
|
let path = entry.path();
|
||||||
|
if path.extension().is_some_and(|ext| ext == "json") {
|
||||||
|
paths.push(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||||
|
Err(err) => {
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"recovery_sidecar_read_error",
|
||||||
|
CLUSTER_RECOVERIES_DIR,
|
||||||
|
format!("could not list recovery sidecars: {err}"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
paths.sort();
|
||||||
|
let mut sidecars = Vec::new();
|
||||||
|
for path in paths {
|
||||||
|
match fs::read_to_string(&path)
|
||||||
|
.map_err(|err| err.to_string())
|
||||||
|
.and_then(|text| {
|
||||||
|
serde_json::from_str::<RecoverySidecar>(&text).map_err(|err| err.to_string())
|
||||||
|
}) {
|
||||||
|
Ok(sidecar) if sidecar.schema_version == 1 => sidecars.push((path, sidecar)),
|
||||||
|
Ok(sidecar) => diagnostics.push(Diagnostic::warning(
|
||||||
|
"unsupported_recovery_sidecar_version",
|
||||||
|
display_path(&path),
|
||||||
|
format!(
|
||||||
|
"unsupported recovery sidecar version {}; leaving it in place",
|
||||||
|
sidecar.schema_version
|
||||||
|
),
|
||||||
|
)),
|
||||||
|
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||||
|
"invalid_recovery_sidecar",
|
||||||
|
display_path(&path),
|
||||||
|
format!("could not parse recovery sidecar ({err}); leaving it in place"),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sidecars
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atomically write (or rewrite) a recovery sidecar; returns its path.
|
||||||
|
pub(crate) fn write_recovery_sidecar(&self, sidecar: &RecoverySidecar) -> Result<PathBuf, Diagnostic> {
|
||||||
|
fs::create_dir_all(&self.recoveries_dir).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"recovery_sidecar_write_error",
|
||||||
|
CLUSTER_RECOVERIES_DIR,
|
||||||
|
format!("could not create recoveries directory: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let target = self
|
||||||
|
.recoveries_dir
|
||||||
|
.join(format!("{}.json", sidecar.operation_id));
|
||||||
|
let mut payload = serde_json::to_string_pretty(sidecar).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"recovery_sidecar_write_error",
|
||||||
|
display_path(&target),
|
||||||
|
format!("could not encode recovery sidecar: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
payload.push('\n');
|
||||||
|
let tmp_path = self
|
||||||
|
.recoveries_dir
|
||||||
|
.join(format!("{}.json.tmp.{}", sidecar.operation_id, Ulid::new()));
|
||||||
|
fs::write(&tmp_path, payload.as_bytes()).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"recovery_sidecar_write_error",
|
||||||
|
display_path(&tmp_path),
|
||||||
|
format!("could not write recovery sidecar: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
if let Err(err) = fs::rename(&tmp_path, &target) {
|
||||||
|
let _ = fs::remove_file(&tmp_path);
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"recovery_sidecar_write_error",
|
||||||
|
display_path(&target),
|
||||||
|
format!("could not move recovery sidecar into place: {err}"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(target)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn observations(&self) -> StateObservations {
|
||||||
|
StateObservations {
|
||||||
|
state_path: display_path(&self.state_path),
|
||||||
|
lock_path: display_path(&self.lock_path),
|
||||||
|
state_found: false,
|
||||||
|
applied_config_digest: None,
|
||||||
|
state_revision: 0,
|
||||||
|
state_cas: None,
|
||||||
|
resource_count: 0,
|
||||||
|
locked: false,
|
||||||
|
lock_id: None,
|
||||||
|
lock_acquired: false,
|
||||||
|
acquired_lock_id: None,
|
||||||
|
lock_operation: None,
|
||||||
|
lock_created_at: None,
|
||||||
|
lock_pid: None,
|
||||||
|
lock_age_seconds: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn read_state(
|
||||||
|
&self,
|
||||||
|
observations: &mut StateObservations,
|
||||||
|
) -> Result<StateSnapshot, Diagnostic> {
|
||||||
|
let text = match fs::read_to_string(&self.state_path) {
|
||||||
|
Ok(text) => text,
|
||||||
|
Err(err) if err.kind() == ErrorKind::NotFound => {
|
||||||
|
return Ok(StateSnapshot {
|
||||||
|
state: None,
|
||||||
|
state_cas: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"state_read_error",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
format!("could not read state file: {err}"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
observations.state_found = true;
|
||||||
|
let state_cas = format!("sha256:{}", sha256_hex(text.as_bytes()));
|
||||||
|
observations.state_cas = Some(state_cas.clone());
|
||||||
|
|
||||||
|
let state = serde_json::from_str::<ClusterState>(&text).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"invalid_state_json",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
format!("could not parse state JSON: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if state.version != 1 {
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"unsupported_state_version",
|
||||||
|
"state.version",
|
||||||
|
format!(
|
||||||
|
"unsupported cluster state version {}; this build supports version 1",
|
||||||
|
state.version
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
observations.applied_config_digest = state.applied_revision.config_digest.clone();
|
||||||
|
observations.state_revision = state.state_revision;
|
||||||
|
observations.resource_count = state.applied_revision.resources.len();
|
||||||
|
|
||||||
|
Ok(StateSnapshot {
|
||||||
|
state: Some(state),
|
||||||
|
state_cas: Some(state_cas),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn write_state(
|
||||||
|
&self,
|
||||||
|
state: &ClusterState,
|
||||||
|
expected_cas: Option<&str>,
|
||||||
|
observations: &mut StateObservations,
|
||||||
|
) -> Result<(), Diagnostic> {
|
||||||
|
fs::create_dir_all(&self.state_dir).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_write_error",
|
||||||
|
CLUSTER_STATE_DIR,
|
||||||
|
format!("could not create cluster state directory: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let current_cas = self.current_state_cas()?;
|
||||||
|
if current_cas.as_deref() != expected_cas {
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"state_cas_mismatch",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
"state.json changed while the command was running; re-run the command against the latest state",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut payload = serde_json::to_string_pretty(state).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_write_error",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
format!("could not encode state JSON: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
payload.push('\n');
|
||||||
|
|
||||||
|
let tmp_path = self
|
||||||
|
.state_dir
|
||||||
|
.join(format!("state.json.tmp.{}", Ulid::new()));
|
||||||
|
let mut file = OpenOptions::new()
|
||||||
|
.write(true)
|
||||||
|
.create_new(true)
|
||||||
|
.open(&tmp_path)
|
||||||
|
.map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_write_error",
|
||||||
|
display_path(&tmp_path),
|
||||||
|
format!("could not create temporary state file: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
file.write_all(payload.as_bytes()).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_write_error",
|
||||||
|
display_path(&tmp_path),
|
||||||
|
format!("could not write temporary state file: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
file.sync_all().map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_write_error",
|
||||||
|
display_path(&tmp_path),
|
||||||
|
format!("could not sync temporary state file: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
drop(file);
|
||||||
|
|
||||||
|
if let Err(err) = fs::rename(&tmp_path, &self.state_path) {
|
||||||
|
let _ = fs::remove_file(&tmp_path);
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"state_write_error",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
format!("could not replace state.json atomically: {err}"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let written = fs::read_to_string(&self.state_path).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_write_error",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
format!("could not read state.json after write: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
observations.state_found = true;
|
||||||
|
observations.applied_config_digest = state.applied_revision.config_digest.clone();
|
||||||
|
observations.state_revision = state.state_revision;
|
||||||
|
observations.state_cas = Some(format!("sha256:{}", sha256_hex(written.as_bytes())));
|
||||||
|
observations.resource_count = state.applied_revision.resources.len();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn current_state_cas(&self) -> Result<Option<String>, Diagnostic> {
|
||||||
|
match fs::read(&self.state_path) {
|
||||||
|
Ok(bytes) => Ok(Some(format!("sha256:{}", sha256_hex(&bytes)))),
|
||||||
|
Err(err) if err.kind() == ErrorKind::NotFound => Ok(None),
|
||||||
|
Err(err) => Err(Diagnostic::error(
|
||||||
|
"state_read_error",
|
||||||
|
CLUSTER_STATE_FILE,
|
||||||
|
format!("could not read state file for CAS check: {err}"),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn acquire_lock(
|
||||||
|
&self,
|
||||||
|
operation: &str,
|
||||||
|
observations: &mut StateObservations,
|
||||||
|
) -> Result<StateLockGuard, Diagnostic> {
|
||||||
|
fs::create_dir_all(&self.state_dir).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_lock_error",
|
||||||
|
CLUSTER_STATE_DIR,
|
||||||
|
format!("could not create cluster state directory: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let lock_id = Ulid::new().to_string();
|
||||||
|
let lock = StateLockFile {
|
||||||
|
version: 1,
|
||||||
|
lock_id: lock_id.clone(),
|
||||||
|
operation: operation.to_string(),
|
||||||
|
created_at: OffsetDateTime::now_utc()
|
||||||
|
.format(&Rfc3339)
|
||||||
|
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()),
|
||||||
|
pid: process::id(),
|
||||||
|
};
|
||||||
|
let payload = serde_json::to_string_pretty(&lock).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_lock_error",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("could not encode state lock: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
match OpenOptions::new()
|
||||||
|
.write(true)
|
||||||
|
.create_new(true)
|
||||||
|
.open(&self.lock_path)
|
||||||
|
{
|
||||||
|
Ok(mut file) => {
|
||||||
|
if let Err(err) = file.write_all(payload.as_bytes()) {
|
||||||
|
// No guard exists yet, so clean up the create-new file here
|
||||||
|
// instead of leaving a stale partial lock for the next run.
|
||||||
|
drop(file);
|
||||||
|
let _ = fs::remove_file(&self.lock_path);
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"state_lock_error",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("could not write state lock: {err}"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
observations.lock_acquired = true;
|
||||||
|
observations.acquired_lock_id = Some(lock_id.clone());
|
||||||
|
Ok(StateLockGuard {
|
||||||
|
path: self.lock_path.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Err(err) if err.kind() == ErrorKind::AlreadyExists => {
|
||||||
|
self.observe_lock_metadata_lossy(observations);
|
||||||
|
Err(Diagnostic::error(
|
||||||
|
"state_lock_held",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
state_lock_held_message(observations),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Err(err) => Err(Diagnostic::error(
|
||||||
|
"state_lock_error",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("could not acquire state lock: {err}"),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn force_unlock(
|
||||||
|
&self,
|
||||||
|
requested_lock_id: &str,
|
||||||
|
observations: &mut StateObservations,
|
||||||
|
) -> Result<(), Diagnostic> {
|
||||||
|
let text = match fs::read_to_string(&self.lock_path) {
|
||||||
|
Ok(text) => text,
|
||||||
|
Err(err) if err.kind() == ErrorKind::NotFound => {
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"state_lock_missing",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
"cluster state lock is not present; nothing was unlocked",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"state_lock_read_error",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("could not read state lock: {err}"),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
observations.locked = true;
|
||||||
|
let lock = parse_lock_file_for_unlock(&text)?;
|
||||||
|
observations.observe_lock_metadata(&lock);
|
||||||
|
|
||||||
|
if lock.lock_id != requested_lock_id {
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"state_lock_id_mismatch",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!(
|
||||||
|
"cluster state lock id is {}; refusing to unlock with requested id {requested_lock_id}",
|
||||||
|
lock.lock_id
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
fs::remove_file(&self.lock_path).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"state_unlock_error",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("could not remove state lock: {err}"),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn observe_lock(
|
||||||
|
&self,
|
||||||
|
observations: &mut StateObservations,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
) {
|
||||||
|
if self.lock_path.exists() {
|
||||||
|
observations.locked = true;
|
||||||
|
match fs::read_to_string(&self.lock_path) {
|
||||||
|
Ok(text) => match serde_json::from_str::<StateLockFile>(&text) {
|
||||||
|
Ok(lock) if lock.version == 1 => {
|
||||||
|
observations.observe_lock_metadata(&lock);
|
||||||
|
}
|
||||||
|
Ok(lock) => diagnostics.push(Diagnostic::warning(
|
||||||
|
"unsupported_state_lock_version",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("unsupported cluster state lock version {}", lock.version),
|
||||||
|
)),
|
||||||
|
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||||
|
"invalid_state_lock",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("could not parse state lock: {err}"),
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||||
|
"state_lock_read_error",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("could not read state lock: {err}"),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn observe_lock_metadata_lossy(&self, observations: &mut StateObservations) {
|
||||||
|
observations.locked = true;
|
||||||
|
if let Ok(text) = fs::read_to_string(&self.lock_path) {
|
||||||
|
if let Ok(lock) = serde_json::from_str::<StateLockFile>(&text) {
|
||||||
|
if lock.version == 1 {
|
||||||
|
observations.observe_lock_metadata(&lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for StateLockGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let _ = fs::remove_file(&self.path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn parse_lock_file_for_unlock(text: &str) -> Result<StateLockFile, Diagnostic> {
|
||||||
|
let lock = serde_json::from_str::<StateLockFile>(text).map_err(|err| {
|
||||||
|
Diagnostic::error(
|
||||||
|
"invalid_state_lock",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("could not parse state lock: {err}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
if lock.version != 1 {
|
||||||
|
return Err(Diagnostic::error(
|
||||||
|
"unsupported_state_lock_version",
|
||||||
|
CLUSTER_LOCK_FILE,
|
||||||
|
format!("unsupported cluster state lock version {}", lock.version),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(lock)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn state_lock_held_message(observations: &StateObservations) -> String {
|
||||||
|
match observations.lock_id.as_deref() {
|
||||||
|
Some(lock_id) => format!(
|
||||||
|
"cluster state lock already exists (lock id {lock_id}); run `omnigraph cluster force-unlock {lock_id}` only after confirming no cluster operation is active"
|
||||||
|
),
|
||||||
|
None => "cluster state lock already exists; remove it only after confirming no cluster operation is active".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
386
crates/omnigraph-cluster/src/sweep.rs
Normal file
386
crates/omnigraph-cluster/src/sweep.rs
Normal file
|
|
@ -0,0 +1,386 @@
|
||||||
|
//! The recovery sweep: RFC-004's roll-forward-only sidecar
|
||||||
|
//! classification (moved verbatim from lib.rs in the modularization).
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// Recovery sweep (RFC-004 §D3): runs at the start of every state-mutating
|
||||||
|
/// cluster command, under the state lock, before the command's own work.
|
||||||
|
/// Roll-forward-only — the engine's own sidecars make each graph-level
|
||||||
|
/// operation atomic within the graph, so the cluster never rolls a graph
|
||||||
|
/// back; it converges the ledger to observable reality or refuses loudly.
|
||||||
|
/// Mutations ride the calling command's CAS-checked state write; completed
|
||||||
|
/// sidecars are deleted only after that write lands.
|
||||||
|
pub(crate) async fn sweep_recovery_sidecars(
|
||||||
|
backend: &LocalStateBackend,
|
||||||
|
state: &mut ClusterState,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
) -> SweepOutcome {
|
||||||
|
let mut outcome = SweepOutcome::default();
|
||||||
|
for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) {
|
||||||
|
match sidecar.kind {
|
||||||
|
RecoverySidecarKind::GraphCreate => {
|
||||||
|
sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
|
||||||
|
}
|
||||||
|
RecoverySidecarKind::SchemaApply => {
|
||||||
|
sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
|
||||||
|
}
|
||||||
|
RecoverySidecarKind::GraphDelete => {
|
||||||
|
sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
outcome
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn sweep_graph_create_sidecar(
|
||||||
|
path: PathBuf,
|
||||||
|
sidecar: RecoverySidecar,
|
||||||
|
state: &mut ClusterState,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
outcome: &mut SweepOutcome,
|
||||||
|
) {
|
||||||
|
let graph_address = graph_address(&sidecar.graph_id);
|
||||||
|
let schema_addr = schema_address(&sidecar.graph_id);
|
||||||
|
let graph_path = PathBuf::from(&sidecar.graph_uri);
|
||||||
|
|
||||||
|
// Row 1: nothing moved — the init never landed. The sidecar is pure
|
||||||
|
// intent; remove it and let the command's own plan re-propose the create.
|
||||||
|
if !graph_path.exists() {
|
||||||
|
let _ = fs::remove_file(&path);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match Omnigraph::open_read_only(&sidecar.graph_uri).await {
|
||||||
|
Ok(db) => {
|
||||||
|
let live_digest = sha256_hex(db.schema_source().as_bytes());
|
||||||
|
let recorded = state
|
||||||
|
.applied_revision
|
||||||
|
.resources
|
||||||
|
.get(&schema_addr)
|
||||||
|
.map(|resource| resource.digest.clone());
|
||||||
|
if recorded.as_deref() == Some(live_digest.as_str()) {
|
||||||
|
// Row 2: crash fell between the state CAS and sidecar delete.
|
||||||
|
outcome.completed_sidecars.push(path);
|
||||||
|
} else if live_digest == sidecar.desired_schema_digest {
|
||||||
|
// Row 4: the create completed on the graph; roll the cluster
|
||||||
|
// state forward to observable reality.
|
||||||
|
state.applied_revision.resources.insert(
|
||||||
|
schema_addr.clone(),
|
||||||
|
StateResource {
|
||||||
|
digest: live_digest.clone(),
|
||||||
|
applies_to: None,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id);
|
||||||
|
let composite =
|
||||||
|
graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests));
|
||||||
|
state
|
||||||
|
.applied_revision
|
||||||
|
.resources
|
||||||
|
.insert(graph_address.clone(), StateResource { digest: composite, applies_to: None });
|
||||||
|
set_resource_status_applied(state, &graph_address);
|
||||||
|
set_resource_status_applied(state, &schema_addr);
|
||||||
|
state.recovery_records.insert(
|
||||||
|
sidecar.operation_id.clone(),
|
||||||
|
json!({
|
||||||
|
"kind": "graph_create",
|
||||||
|
"graph_id": sidecar.graph_id,
|
||||||
|
"outcome": "rolled_forward",
|
||||||
|
"recovered_at": now_rfc3339(),
|
||||||
|
"actor": sidecar.actor,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"cluster_recovery_rolled_forward",
|
||||||
|
graph_address.clone(),
|
||||||
|
"an interrupted graph create had completed on the graph; cluster state was rolled forward to match",
|
||||||
|
));
|
||||||
|
outcome.completed_sidecars.push(path);
|
||||||
|
} else {
|
||||||
|
// Row 6: the graph moved to something the sidecar did not
|
||||||
|
// intend. Refuse to guess; require refresh + operator re-plan.
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&graph_address,
|
||||||
|
ResourceLifecycleStatus::Drifted,
|
||||||
|
"actual_applied_state_pending",
|
||||||
|
"graph state does not match the interrupted operation; run `cluster refresh` and re-plan",
|
||||||
|
);
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&schema_addr,
|
||||||
|
ResourceLifecycleStatus::Drifted,
|
||||||
|
"actual_applied_state_pending",
|
||||||
|
"graph state does not match the interrupted operation; run `cluster refresh` and re-plan",
|
||||||
|
);
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"cluster_recovery_pending",
|
||||||
|
graph_address.clone(),
|
||||||
|
"an interrupted graph create left unexpected graph state; graph-moving work is blocked until repaired",
|
||||||
|
));
|
||||||
|
outcome.pending_graphs.insert(sidecar.graph_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
// Row 5: partial root (the engine's documented init gap). Never
|
||||||
|
// auto-delete — reconciler deletes are the same data-loss class
|
||||||
|
// as human deletes; the operator removes the root explicitly.
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&graph_address,
|
||||||
|
ResourceLifecycleStatus::Error,
|
||||||
|
"graph_create_incomplete",
|
||||||
|
"graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`",
|
||||||
|
);
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&schema_addr,
|
||||||
|
ResourceLifecycleStatus::Error,
|
||||||
|
"graph_create_incomplete",
|
||||||
|
"graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`",
|
||||||
|
);
|
||||||
|
diagnostics.push(Diagnostic::error(
|
||||||
|
"graph_create_incomplete",
|
||||||
|
graph_address.clone(),
|
||||||
|
format!(
|
||||||
|
"graph root '{}' exists but cannot be opened ({err}); remove the graph root and re-run `cluster apply`",
|
||||||
|
sidecar.graph_uri
|
||||||
|
),
|
||||||
|
));
|
||||||
|
outcome.pending_graphs.insert(sidecar.graph_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn sweep_schema_apply_sidecar(
|
||||||
|
path: PathBuf,
|
||||||
|
sidecar: RecoverySidecar,
|
||||||
|
state: &mut ClusterState,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
outcome: &mut SweepOutcome,
|
||||||
|
) {
|
||||||
|
let graph_address = graph_address(&sidecar.graph_id);
|
||||||
|
let schema_addr = schema_address(&sidecar.graph_id);
|
||||||
|
|
||||||
|
// Digest-based classification: robust to unrelated manifest movement;
|
||||||
|
// the sidecar's version pins stay forensic.
|
||||||
|
let live_digest = match Omnigraph::open_read_only(&sidecar.graph_uri).await {
|
||||||
|
Ok(db) => sha256_hex(db.schema_source().as_bytes()),
|
||||||
|
Err(err) => {
|
||||||
|
// Cannot verify the interrupted operation — refuse to guess.
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"cluster_recovery_pending",
|
||||||
|
graph_address.clone(),
|
||||||
|
format!(
|
||||||
|
"an interrupted schema apply cannot be verified (graph '{}' did not open: {err}); graph-moving work is blocked until repaired",
|
||||||
|
sidecar.graph_uri
|
||||||
|
),
|
||||||
|
));
|
||||||
|
outcome.pending_graphs.insert(sidecar.graph_id.clone());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let recorded = state
|
||||||
|
.applied_revision
|
||||||
|
.resources
|
||||||
|
.get(&schema_addr)
|
||||||
|
.map(|resource| resource.digest.clone());
|
||||||
|
if recorded.as_deref() == Some(live_digest.as_str()) {
|
||||||
|
// Ledger consistent with the live graph (the apply never landed, or
|
||||||
|
// landed and was recorded): the sidecar is stale intent — retire it.
|
||||||
|
outcome.completed_sidecars.push(path);
|
||||||
|
} else if live_digest == sidecar.desired_schema_digest {
|
||||||
|
// RFC-004 §D3 row 3: the schema apply completed on the graph; roll
|
||||||
|
// the cluster state forward to observable reality.
|
||||||
|
state.applied_revision.resources.insert(
|
||||||
|
schema_addr.clone(),
|
||||||
|
StateResource {
|
||||||
|
digest: live_digest.clone(),
|
||||||
|
applies_to: None,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id);
|
||||||
|
let composite = graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests));
|
||||||
|
state
|
||||||
|
.applied_revision
|
||||||
|
.resources
|
||||||
|
.insert(graph_address.clone(), StateResource { digest: composite, applies_to: None });
|
||||||
|
set_resource_status_applied(state, &graph_address);
|
||||||
|
set_resource_status_applied(state, &schema_addr);
|
||||||
|
state.recovery_records.insert(
|
||||||
|
sidecar.operation_id.clone(),
|
||||||
|
json!({
|
||||||
|
"kind": "schema_apply",
|
||||||
|
"graph_id": sidecar.graph_id,
|
||||||
|
"outcome": "rolled_forward",
|
||||||
|
"recovered_at": now_rfc3339(),
|
||||||
|
"actor": sidecar.actor,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"cluster_recovery_rolled_forward",
|
||||||
|
graph_address.clone(),
|
||||||
|
"an interrupted schema apply had completed on the graph; cluster state was rolled forward to match",
|
||||||
|
));
|
||||||
|
outcome.completed_sidecars.push(path);
|
||||||
|
} else {
|
||||||
|
// Row 6: live schema is neither the recorded nor the desired digest.
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&graph_address,
|
||||||
|
ResourceLifecycleStatus::Drifted,
|
||||||
|
"actual_applied_state_pending",
|
||||||
|
"graph state does not match the interrupted operation; run `cluster refresh` and re-plan",
|
||||||
|
);
|
||||||
|
set_resource_status(
|
||||||
|
state,
|
||||||
|
&schema_addr,
|
||||||
|
ResourceLifecycleStatus::Drifted,
|
||||||
|
"actual_applied_state_pending",
|
||||||
|
"graph state does not match the interrupted operation; run `cluster refresh` and re-plan",
|
||||||
|
);
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"cluster_recovery_pending",
|
||||||
|
graph_address.clone(),
|
||||||
|
"an interrupted schema apply left unexpected graph state; graph-moving work is blocked until repaired",
|
||||||
|
));
|
||||||
|
outcome.pending_graphs.insert(sidecar.graph_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn sweep_graph_delete_sidecar(
|
||||||
|
path: PathBuf,
|
||||||
|
sidecar: RecoverySidecar,
|
||||||
|
state: &mut ClusterState,
|
||||||
|
diagnostics: &mut Vec<Diagnostic>,
|
||||||
|
outcome: &mut SweepOutcome,
|
||||||
|
) {
|
||||||
|
let graph_address = graph_address(&sidecar.graph_id);
|
||||||
|
let root = PathBuf::from(&sidecar.graph_uri);
|
||||||
|
|
||||||
|
if root.exists() {
|
||||||
|
// Row 8: the delete never completed. Prefix removal is idempotent and
|
||||||
|
// works on partial roots, so the repair is simply the re-proposed,
|
||||||
|
// still-approved delete on a later run — retire the stale intent.
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"graph_delete_incomplete",
|
||||||
|
graph_address,
|
||||||
|
"a previous graph delete did not complete; it will be re-proposed by plan and can be retried under its approval",
|
||||||
|
));
|
||||||
|
outcome.completed_sidecars.push(path);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !state.applied_revision.resources.contains_key(&graph_address) {
|
||||||
|
// Row 7: already tombstoned (or never recorded); crash fell between
|
||||||
|
// the state CAS and sidecar delete.
|
||||||
|
outcome.completed_sidecars.push(path);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Row 7b: the root is gone, the ledger is stale — roll forward the
|
||||||
|
// tombstone, consume the approval the sidecar carries, audit.
|
||||||
|
tombstone_graph_subtree(state, &sidecar.graph_id, sidecar.approval_id.as_deref(), sidecar.actor.as_deref());
|
||||||
|
state.recovery_records.insert(
|
||||||
|
sidecar.operation_id.clone(),
|
||||||
|
json!({
|
||||||
|
"kind": "graph_delete",
|
||||||
|
"graph_id": sidecar.graph_id,
|
||||||
|
"outcome": "rolled_forward",
|
||||||
|
"recovered_at": now_rfc3339(),
|
||||||
|
"actor": sidecar.actor,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
if let Some(approval_id) = &sidecar.approval_id {
|
||||||
|
record_approval_consumed(state, approval_id, &sidecar.operation_id);
|
||||||
|
outcome.consumed_approvals.push(approval_id.clone());
|
||||||
|
}
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"cluster_recovery_rolled_forward",
|
||||||
|
graph_address,
|
||||||
|
"an interrupted graph delete had completed on disk; cluster state was rolled forward to match",
|
||||||
|
));
|
||||||
|
outcome.completed_sidecars.push(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a graph's subtree (graph, schema, queries) from the ledger and
|
||||||
|
/// leave a tombstone observation. Idempotent.
|
||||||
|
pub(crate) fn tombstone_graph_subtree(
|
||||||
|
state: &mut ClusterState,
|
||||||
|
graph_id: &str,
|
||||||
|
approval_id: Option<&str>,
|
||||||
|
actor: Option<&str>,
|
||||||
|
) {
|
||||||
|
let graph_addr = graph_address(graph_id);
|
||||||
|
let schema_addr = schema_address(graph_id);
|
||||||
|
let query_prefix = format!("query.{graph_id}.");
|
||||||
|
state.applied_revision.resources.remove(&graph_addr);
|
||||||
|
state.applied_revision.resources.remove(&schema_addr);
|
||||||
|
state
|
||||||
|
.applied_revision
|
||||||
|
.resources
|
||||||
|
.retain(|address, _| !address.starts_with(&query_prefix));
|
||||||
|
state.resource_statuses.remove(&graph_addr);
|
||||||
|
state.resource_statuses.remove(&schema_addr);
|
||||||
|
state
|
||||||
|
.resource_statuses
|
||||||
|
.retain(|address, _| !address.starts_with(&query_prefix));
|
||||||
|
state.observations.insert(
|
||||||
|
graph_addr,
|
||||||
|
json!({
|
||||||
|
"kind": "tombstone",
|
||||||
|
"deleted_at": now_rfc3339(),
|
||||||
|
"approval_id": approval_id,
|
||||||
|
"actor": actor,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record approval consumption in the state ledger. The artifact FILE is
|
||||||
|
/// rewritten with consumed_at only after the state write lands, so a failed
|
||||||
|
/// CAS leaves the approval valid for the retry.
|
||||||
|
pub(crate) fn record_approval_consumed(state: &mut ClusterState, approval_id: &str, operation_id: &str) {
|
||||||
|
state.approval_records.insert(
|
||||||
|
approval_id.to_string(),
|
||||||
|
json!({
|
||||||
|
"consumed_at": now_rfc3339(),
|
||||||
|
"consumed_by_operation": operation_id,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark approval artifact files consumed on disk (post-CAS).
|
||||||
|
pub(crate) fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) {
|
||||||
|
if approval_ids.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let mut sink = Vec::new();
|
||||||
|
for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) {
|
||||||
|
if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() {
|
||||||
|
artifact.consumed_at = Some(now_rfc3339());
|
||||||
|
let _ = backend.write_approval_artifact(&artifact);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read-only commands report pending sidecars without acting on them.
|
||||||
|
pub(crate) fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec<Diagnostic>) {
|
||||||
|
let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR);
|
||||||
|
let Ok(entries) = fs::read_dir(&recoveries_dir) else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let mut names: Vec<String> = entries
|
||||||
|
.flatten()
|
||||||
|
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "json"))
|
||||||
|
.map(|entry| entry.file_name().to_string_lossy().into_owned())
|
||||||
|
.collect();
|
||||||
|
names.sort();
|
||||||
|
for name in names {
|
||||||
|
diagnostics.push(Diagnostic::warning(
|
||||||
|
"cluster_recovery_pending",
|
||||||
|
format!("{CLUSTER_RECOVERIES_DIR}/{name}"),
|
||||||
|
"a recovery sidecar from an interrupted apply is pending; the next state-mutating command will classify it",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
3019
crates/omnigraph-cluster/src/tests.rs
Normal file
3019
crates/omnigraph-cluster/src/tests.rs
Normal file
File diff suppressed because it is too large
Load diff
510
crates/omnigraph-cluster/src/types.rs
Normal file
510
crates/omnigraph-cluster/src/types.rs
Normal file
|
|
@ -0,0 +1,510 @@
|
||||||
|
//! Public output/diagnostic types and internal state/sidecar/approval
|
||||||
|
//! models (moved verbatim from lib.rs in the modularization).
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum DiagnosticSeverity {
|
||||||
|
Error,
|
||||||
|
Warning,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
|
||||||
|
pub struct Diagnostic {
|
||||||
|
pub code: String,
|
||||||
|
pub severity: DiagnosticSeverity,
|
||||||
|
pub path: String,
|
||||||
|
pub message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Diagnostic {
|
||||||
|
pub(crate) fn error(code: impl Into<String>, path: impl Into<String>, message: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
code: code.into(),
|
||||||
|
severity: DiagnosticSeverity::Error,
|
||||||
|
path: path.into(),
|
||||||
|
message: message.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn warning(
|
||||||
|
code: impl Into<String>,
|
||||||
|
path: impl Into<String>,
|
||||||
|
message: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
code: code.into(),
|
||||||
|
severity: DiagnosticSeverity::Warning,
|
||||||
|
path: path.into(),
|
||||||
|
message: message.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
|
||||||
|
pub struct ResourceSummary {
|
||||||
|
pub address: String,
|
||||||
|
pub kind: String,
|
||||||
|
pub digest: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub path: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct Dependency {
|
||||||
|
pub from: String,
|
||||||
|
pub to: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct ValidateOutput {
|
||||||
|
pub ok: bool,
|
||||||
|
pub config_dir: String,
|
||||||
|
pub config_file: String,
|
||||||
|
pub resource_digests: BTreeMap<String, String>,
|
||||||
|
pub resources: Vec<ResourceSummary>,
|
||||||
|
pub dependencies: Vec<Dependency>,
|
||||||
|
pub diagnostics: Vec<Diagnostic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct DesiredRevision {
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub config_digest: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct StateObservations {
|
||||||
|
pub state_path: String,
|
||||||
|
pub lock_path: String,
|
||||||
|
pub state_found: bool,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub applied_config_digest: Option<String>,
|
||||||
|
pub state_revision: u64,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub state_cas: Option<String>,
|
||||||
|
pub resource_count: usize,
|
||||||
|
pub locked: bool,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub lock_id: Option<String>,
|
||||||
|
pub lock_acquired: bool,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub acquired_lock_id: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub lock_operation: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub lock_created_at: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub lock_pid: Option<u32>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub lock_age_seconds: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StateObservations {
|
||||||
|
pub(crate) fn observe_lock_metadata(&mut self, lock: &StateLockFile) {
|
||||||
|
self.locked = true;
|
||||||
|
self.lock_id = Some(lock.lock_id.clone());
|
||||||
|
self.lock_operation = Some(lock.operation.clone());
|
||||||
|
self.lock_created_at = Some(lock.created_at.clone());
|
||||||
|
self.lock_pid = Some(lock.pid);
|
||||||
|
self.lock_age_seconds = lock_age_seconds(&lock.created_at);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum ResourceLifecycleStatus {
|
||||||
|
Pending,
|
||||||
|
Planned,
|
||||||
|
Applying,
|
||||||
|
Applied,
|
||||||
|
Drifted,
|
||||||
|
Blocked,
|
||||||
|
Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub struct ResourceStatusRecord {
|
||||||
|
pub status: ResourceLifecycleStatus,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub conditions: Vec<String>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub message: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum PlanOperation {
|
||||||
|
Create,
|
||||||
|
Update,
|
||||||
|
Delete,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// How `cluster apply` treats a planned change in the current stage.
|
||||||
|
///
|
||||||
|
/// `Applied` changes execute (config-only query/policy catalog writes).
|
||||||
|
/// `Derived` marks a `graph.<id>` composite-digest update that converges
|
||||||
|
/// automatically once its applied query digests land in state. `Deferred`
|
||||||
|
/// changes need a later phase (graph/schema lifecycle or schema content).
|
||||||
|
/// `Blocked` query/policy changes are gated by an unapplied or missing
|
||||||
|
/// dependency.
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum ApplyDisposition {
|
||||||
|
Applied,
|
||||||
|
Derived,
|
||||||
|
Deferred,
|
||||||
|
Blocked,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||||
|
pub struct PlanChange {
|
||||||
|
pub resource: String,
|
||||||
|
pub operation: PlanOperation,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub before_digest: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub after_digest: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub disposition: Option<ApplyDisposition>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub reason: Option<String>,
|
||||||
|
/// True for a policy change whose file digest is unchanged but whose
|
||||||
|
/// `applies_to` bindings differ from the applied revision (including the
|
||||||
|
/// pre-5A backfill case).
|
||||||
|
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||||
|
pub binding_change: bool,
|
||||||
|
/// For schema updates: the engine's migration plan against the live
|
||||||
|
/// graph (RFC-004 §D7's data-aware preview). Absent when the preview is
|
||||||
|
/// unavailable (warning `schema_preview_unavailable`).
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub migration: Option<SchemaMigrationPlan>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
|
||||||
|
pub struct BlastRadius {
|
||||||
|
pub resource: String,
|
||||||
|
pub affected: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
|
||||||
|
pub struct ApprovalRequirement {
|
||||||
|
pub resource: String,
|
||||||
|
pub reason: String,
|
||||||
|
/// True when a valid (digest-matching, unconsumed) approval artifact is
|
||||||
|
/// pending for this change.
|
||||||
|
pub satisfied: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct PlanOutput {
|
||||||
|
pub ok: bool,
|
||||||
|
pub config_dir: String,
|
||||||
|
pub desired_revision: DesiredRevision,
|
||||||
|
pub resource_digests: BTreeMap<String, String>,
|
||||||
|
pub dependencies: Vec<Dependency>,
|
||||||
|
pub state_observations: StateObservations,
|
||||||
|
pub changes: Vec<PlanChange>,
|
||||||
|
pub blast_radius: Vec<BlastRadius>,
|
||||||
|
pub approvals_required: Vec<ApprovalRequirement>,
|
||||||
|
pub diagnostics: Vec<Diagnostic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct StatusOutput {
|
||||||
|
pub ok: bool,
|
||||||
|
pub config_dir: String,
|
||||||
|
pub state_observations: StateObservations,
|
||||||
|
pub resource_digests: BTreeMap<String, String>,
|
||||||
|
pub resource_statuses: BTreeMap<String, ResourceStatusRecord>,
|
||||||
|
pub observations: BTreeMap<String, serde_json::Value>,
|
||||||
|
pub diagnostics: Vec<Diagnostic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum StateSyncOperation {
|
||||||
|
Refresh,
|
||||||
|
Import,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct StateSyncOutput {
|
||||||
|
pub ok: bool,
|
||||||
|
pub operation: StateSyncOperation,
|
||||||
|
pub config_dir: String,
|
||||||
|
pub state_observations: StateObservations,
|
||||||
|
pub resource_digests: BTreeMap<String, String>,
|
||||||
|
pub resource_statuses: BTreeMap<String, ResourceStatusRecord>,
|
||||||
|
pub observations: BTreeMap<String, serde_json::Value>,
|
||||||
|
pub diagnostics: Vec<Diagnostic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct ForceUnlockOutput {
|
||||||
|
pub ok: bool,
|
||||||
|
pub config_dir: String,
|
||||||
|
pub state_observations: StateObservations,
|
||||||
|
pub lock_removed: bool,
|
||||||
|
pub diagnostics: Vec<Diagnostic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Output of config-only `cluster apply`. "Applied" means recorded in the
|
||||||
|
/// local cluster catalog (`__cluster/`); nothing applied here serves traffic —
|
||||||
|
/// the server still boots from `omnigraph.yaml` until the server-boot stage.
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct ApplyOutput {
|
||||||
|
pub ok: bool,
|
||||||
|
pub config_dir: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub actor: Option<String>,
|
||||||
|
pub desired_revision: DesiredRevision,
|
||||||
|
pub state_observations: StateObservations,
|
||||||
|
/// Every planned change, with `disposition`/`reason` always populated.
|
||||||
|
pub changes: Vec<PlanChange>,
|
||||||
|
pub applied_count: usize,
|
||||||
|
/// Deferred + Blocked changes (Derived composite updates count as neither).
|
||||||
|
pub deferred_count: usize,
|
||||||
|
/// True when state matches the desired revision after this apply.
|
||||||
|
pub converged: bool,
|
||||||
|
/// False for a no-op re-apply: state bytes (and revision) were left untouched.
|
||||||
|
pub state_written: bool,
|
||||||
|
/// The statuses as persisted: post-apply on success, the pre-apply on-disk
|
||||||
|
/// snapshot when the state write fails (never unpersisted in-memory state).
|
||||||
|
pub resource_statuses: BTreeMap<String, ResourceStatusRecord>,
|
||||||
|
pub diagnostics: Vec<Diagnostic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A digest-bound human approval for an irreversible operation (RFC-004
|
||||||
|
/// §D4). Written by `cluster approve`, consumed by apply. The file is never
|
||||||
|
/// deleted on consumption — it is rewritten with `consumed_at` and also
|
||||||
|
/// summarized into the state ledger's `approval_records`, so the audit fact
|
||||||
|
/// survives the loss of either store (axiom 11).
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct ApprovalArtifact {
|
||||||
|
pub(crate) schema_version: u32,
|
||||||
|
pub(crate) approval_id: String,
|
||||||
|
pub(crate) resource: String,
|
||||||
|
pub(crate) operation: String,
|
||||||
|
pub(crate) reason: String,
|
||||||
|
pub(crate) bound_config_digest: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) bound_before_digest: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) bound_after_digest: Option<String>,
|
||||||
|
pub(crate) approved_by: String,
|
||||||
|
pub(crate) created_at: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) consumed_at: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) consumed_by_operation: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct ApproveOutput {
|
||||||
|
pub ok: bool,
|
||||||
|
pub config_dir: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub approval_id: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub resource: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub operation: Option<PlanOperation>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub approved_by: Option<String>,
|
||||||
|
pub diagnostics: Vec<Diagnostic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(crate) struct DesiredCluster {
|
||||||
|
pub(crate) config_dir: PathBuf,
|
||||||
|
pub(crate) config_digest: String,
|
||||||
|
pub(crate) state_lock: bool,
|
||||||
|
pub(crate) graphs: Vec<DesiredGraph>,
|
||||||
|
pub(crate) resource_digests: BTreeMap<String, String>,
|
||||||
|
pub(crate) resources: Vec<ResourceSummary>,
|
||||||
|
pub(crate) dependencies: Vec<Dependency>,
|
||||||
|
/// `policy.<name>` address -> normalized applies_to refs.
|
||||||
|
pub(crate) policy_bindings: BTreeMap<String, Vec<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(crate) struct DesiredGraph {
|
||||||
|
pub(crate) id: String,
|
||||||
|
pub(crate) schema_digest: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct ParsedConfig {
|
||||||
|
pub(crate) raw: Option<RawClusterConfig>,
|
||||||
|
pub(crate) diagnostics: Vec<Diagnostic>,
|
||||||
|
pub(crate) config_dir: PathBuf,
|
||||||
|
pub(crate) config_file: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub(crate) struct ClusterSettings {
|
||||||
|
pub(crate) state_lock: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct LoadOutcome {
|
||||||
|
pub(crate) desired: Option<DesiredCluster>,
|
||||||
|
pub(crate) diagnostics: Vec<Diagnostic>,
|
||||||
|
pub(crate) config_dir: PathBuf,
|
||||||
|
pub(crate) config_file: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct RawClusterConfig {
|
||||||
|
pub(crate) version: u32,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) metadata: Metadata,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) state: StateConfig,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) graphs: BTreeMap<String, GraphConfig>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) policies: BTreeMap<String, PolicyConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct Metadata {
|
||||||
|
pub(crate) name: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct StateConfig {
|
||||||
|
pub(crate) backend: Option<String>,
|
||||||
|
pub(crate) lock: Option<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct GraphConfig {
|
||||||
|
pub(crate) schema: PathBuf,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) queries: QueriesDecl,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// How a graph declares its stored queries. Terraform-style: the `.gq`
|
||||||
|
/// files ARE the declaration — point at them (or a directory) and every
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct QueryConfig {
|
||||||
|
pub(crate) file: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct PolicyConfig {
|
||||||
|
pub(crate) file: PathBuf,
|
||||||
|
pub(crate) applies_to: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stage 2A/2B accept these forward-compatible state sections so existing
|
||||||
|
// ledgers won't churn while approval/recovery semantics are staged later.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct ClusterState {
|
||||||
|
pub(crate) version: u32,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) state_revision: u64,
|
||||||
|
pub(crate) applied_revision: AppliedRevisionState,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) resource_statuses: BTreeMap<String, ResourceStatusRecord>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) approval_records: BTreeMap<String, serde_json::Value>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) recovery_records: BTreeMap<String, serde_json::Value>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) observations: BTreeMap<String, serde_json::Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct AppliedRevisionState {
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) config_digest: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) resources: BTreeMap<String, StateResource>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct StateResource {
|
||||||
|
pub(crate) digest: String,
|
||||||
|
/// Policy resources only: the applied `applies_to` bindings, normalized
|
||||||
|
/// to typed refs (`cluster` | `graph.<id>`). Recorded so the state
|
||||||
|
/// ledger is serving-sufficient for the Phase-5 server boot (RFC-005
|
||||||
|
/// §D3). Absent on pre-5A entries (backfilled by the next apply) and on
|
||||||
|
/// non-policy resources.
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub(crate) applies_to: Option<Vec<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct StateLockFile {
|
||||||
|
pub(crate) version: u32,
|
||||||
|
pub(crate) lock_id: String,
|
||||||
|
pub(crate) operation: String,
|
||||||
|
pub(crate) created_at: String,
|
||||||
|
pub(crate) pid: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Recovery-intent record for a graph-moving apply operation (RFC-004 §D2).
|
||||||
|
/// Written under the state lock before the engine call that can create or
|
||||||
|
/// move a graph manifest; deleted only after the cluster state CAS that
|
||||||
|
/// records the outcome lands. The sweep (§D3) classifies survivors.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub(crate) struct RecoverySidecar {
|
||||||
|
pub(crate) schema_version: u32,
|
||||||
|
pub(crate) operation_id: String,
|
||||||
|
pub(crate) started_at: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) actor: Option<String>,
|
||||||
|
pub(crate) kind: RecoverySidecarKind,
|
||||||
|
pub(crate) graph_id: String,
|
||||||
|
pub(crate) graph_uri: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) observed_manifest_version: Option<u64>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) expected_manifest_version: Option<u64>,
|
||||||
|
pub(crate) desired_schema_digest: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) state_cas_base: Option<String>,
|
||||||
|
/// For graph_delete: the approval this operation consumes; lets a sweep
|
||||||
|
/// roll-forward consume it too.
|
||||||
|
#[serde(default)]
|
||||||
|
pub(crate) approval_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub(crate) enum RecoverySidecarKind {
|
||||||
|
GraphCreate,
|
||||||
|
SchemaApply,
|
||||||
|
GraphDelete,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub(crate) struct SweepOutcome {
|
||||||
|
/// Graphs whose sidecar was kept (rows 5/6): graph-moving work for them
|
||||||
|
/// is blocked until the operator repairs and re-observes.
|
||||||
|
pub(crate) pending_graphs: BTreeSet<String>,
|
||||||
|
/// Sidecars whose outcome is recorded (rows 2/4): deleted only after the
|
||||||
|
/// command's state write lands, so a CAS failure re-sweeps them.
|
||||||
|
pub(crate) completed_sidecars: Vec<PathBuf>,
|
||||||
|
/// Approval artifacts consumed by a roll-forward (delete row 7b): their
|
||||||
|
/// files are rewritten with consumed_at only after the state write lands.
|
||||||
|
pub(crate) consumed_approvals: Vec<String>,
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue