feat(cluster): pub read-only serving-snapshot API

RFC-005 §D2/§D4: read_serving_snapshot reads the applied revision as
everything a server needs to boot — graphs at derived roots, stored-query
sources read from the content-addressed catalog and re-hashed against the
recorded digests, policy blob paths with their applied applies_to bindings.
All-or-nothing: missing state, pending recovery sidecars, missing/tampered
blobs, pre-5A entries without bindings, and an empty graph set each refuse
the snapshot with a remedy; no partial serving. Lock-free by design — the
state file is replaced atomically, so the read is a consistent
point-in-time ledger.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 17:39:26 +03:00
parent bed36a8423
commit f5b43164b8

View file

@ -1699,6 +1699,191 @@ pub async fn approve_config_dir(
}
}
/// One graph in a serving snapshot: its id and on-disk root.
#[derive(Debug, Clone)]
pub struct ServingGraph {
pub graph_id: String,
pub root: PathBuf,
}
/// One stored query: its graph binding, registry name, and verified source.
#[derive(Debug, Clone)]
pub struct ServingQuery {
pub graph_id: String,
pub name: String,
pub source: String,
}
/// One policy bundle: its verified catalog blob path and applied bindings
/// (normalized typed refs: `cluster` | `graph.<id>`).
#[derive(Debug, Clone)]
pub struct ServingPolicy {
pub name: String,
pub blob_path: PathBuf,
pub applies_to: Vec<String>,
}
/// Everything a server needs to boot from the cluster catalog (RFC-005 §D2).
#[derive(Debug, Clone)]
pub struct ServingSnapshot {
pub graphs: Vec<ServingGraph>,
pub queries: Vec<ServingQuery>,
pub policies: Vec<ServingPolicy>,
}
/// Read the applied revision as a serving snapshot — the read-only loader for
/// the Phase-5 server boot. All-or-nothing per RFC-005 §D4: every readiness
/// failure is collected and the whole snapshot refused; no partial serving.
/// Takes no lock: the state file is replaced atomically, so this reads a
/// consistent point-in-time ledger.
pub fn read_serving_snapshot(config_dir: impl AsRef<Path>) -> Result<ServingSnapshot, Vec<Diagnostic>> {
let config_dir = config_dir.as_ref().to_path_buf();
let backend = LocalStateBackend::new(&config_dir);
let mut diagnostics: Vec<Diagnostic> = Vec::new();
// A ledger a sweep is about to rewrite must not start serving.
let sidecars = backend.list_recovery_sidecars(&mut diagnostics);
if !sidecars.is_empty() {
diagnostics.push(Diagnostic::error(
"cluster_recovery_pending",
CLUSTER_RECOVERIES_DIR,
format!(
"{} interrupted operation(s) await recovery; run any state-mutating cluster command (e.g. `cluster apply`) to sweep, then retry",
sidecars.len()
),
));
}
let mut observations = backend.observations();
let state = match backend.read_state(&mut observations) {
Ok(snapshot) => match snapshot.state {
Some(state) => Some(state),
None => {
diagnostics.push(Diagnostic::error(
"cluster_state_missing",
CLUSTER_STATE_FILE,
"no cluster state ledger; run `cluster import` and `cluster apply` first",
));
None
}
},
Err(diagnostic) => {
diagnostics.push(diagnostic);
None
}
};
let Some(state) = state else {
return Err(diagnostics);
};
let mut graphs = Vec::new();
let mut queries = Vec::new();
let mut policies = Vec::new();
for (address, entry) in &state.applied_revision.resources {
match resource_kind(address) {
ResourceKind::Graph(graph_id) => {
graphs.push(ServingGraph {
root: config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
graph_id,
});
}
ResourceKind::Schema(_) => {}
kind @ ResourceKind::Query { .. } => {
let ResourceKind::Query { graph, name } = &kind else {
unreachable!()
};
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
Ok(source) => queries.push(ServingQuery {
graph_id: graph.clone(),
name: name.clone(),
source,
}),
Err(diagnostic) => diagnostics.push(diagnostic),
}
}
kind @ ResourceKind::Policy(_) => {
let ResourceKind::Policy(name) = &kind else {
unreachable!()
};
let Some(applies_to) = entry.applies_to.clone() else {
diagnostics.push(Diagnostic::error(
"policy_bindings_missing",
address.clone(),
"no applied applies_to bindings recorded (ledger predates binding metadata); re-run `cluster apply` to backfill",
));
continue;
};
match read_verified_payload(&config_dir, &kind, &entry.digest, address) {
Ok(_) => policies.push(ServingPolicy {
name: name.clone(),
blob_path: payload_path(&config_dir, &kind, &entry.digest)
.expect("policy kind always has a payload path"),
applies_to,
}),
Err(diagnostic) => diagnostics.push(diagnostic),
}
}
ResourceKind::Unknown => {}
}
}
if graphs.is_empty() {
diagnostics.push(Diagnostic::error(
"cluster_empty",
CLUSTER_STATE_FILE,
"the applied revision records no graphs; apply a cluster with at least one graph before serving from it",
));
}
if has_errors(&diagnostics) {
return Err(diagnostics);
}
Ok(ServingSnapshot {
graphs,
queries,
policies,
})
}
/// Read a catalog blob and verify it against the recorded digest.
fn read_verified_payload(
config_dir: &Path,
kind: &ResourceKind,
digest: &str,
address: &str,
) -> Result<String, Diagnostic> {
let path = payload_path(config_dir, kind, digest)
.expect("query/policy kinds always have a payload path");
let bytes = fs::read(&path).map_err(|err| {
Diagnostic::error(
"catalog_payload_missing",
address,
format!(
"catalog blob '{}' unreadable ({err}); run `cluster refresh` then `cluster apply`, and restart",
display_path(&path)
),
)
})?;
if sha256_hex(&bytes) != digest {
return Err(Diagnostic::error(
"catalog_payload_digest_mismatch",
address,
format!(
"catalog blob '{}' does not match its recorded digest; run `cluster refresh` then `cluster apply`, and restart",
display_path(&path)
),
));
}
String::from_utf8(bytes).map_err(|err| {
Diagnostic::error(
"catalog_payload_invalid",
address,
format!("catalog blob is not valid UTF-8: {err}"),
)
})
}
pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
let parsed = parse_cluster_config(config_dir.as_ref());
let mut diagnostics = parsed.diagnostics;
@ -7272,6 +7457,109 @@ policies:
);
}
// ---- serving snapshot (5B read-only loader) ----
#[tokio::test]
async fn serving_snapshot_reads_converged_cluster() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
let converge = apply_config_dir(dir.path()).await;
assert!(converge.converged, "{converge:?}");
let snapshot = read_serving_snapshot(dir.path()).expect("converged cluster must serve");
assert_eq!(snapshot.graphs.len(), 1);
assert_eq!(snapshot.graphs[0].graph_id, "knowledge");
assert!(snapshot.graphs[0].root.ends_with("graphs/knowledge.omni"));
assert_eq!(snapshot.queries.len(), 1);
assert_eq!(snapshot.queries[0].name, "find_person");
assert!(snapshot.queries[0].source.contains("query find_person"));
assert_eq!(snapshot.policies.len(), 1);
assert_eq!(snapshot.policies[0].applies_to, vec!["graph.knowledge"]);
assert!(snapshot.policies[0].blob_path.exists());
}
#[test]
fn serving_snapshot_refuses_missing_state() {
let dir = fixture();
let err = read_serving_snapshot(dir.path()).unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_state_missing"),
"{err:?}"
);
}
#[tokio::test]
async fn serving_snapshot_refuses_pending_recovery() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
apply_config_dir(dir.path()).await;
write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SERVE");
let err = read_serving_snapshot(dir.path()).unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_recovery_pending"),
"{err:?}"
);
}
#[tokio::test]
async fn serving_snapshot_refuses_tampered_blob_and_stripped_bindings() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
apply_config_dir(dir.path()).await;
// Tamper with the query blob...
let snapshot = read_serving_snapshot(dir.path()).unwrap();
let desired = validate_config_dir(dir.path());
let query_digest = &desired.resource_digests["query.knowledge.find_person"];
let blob = dir
.path()
.join(CLUSTER_RESOURCES_DIR)
.join("query/knowledge/find_person")
.join(format!("{query_digest}.gq"));
fs::write(&blob, "tampered").unwrap();
// ...and strip the policy bindings (pre-5A ledger).
let mut state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(),
)
.unwrap();
state["applied_revision"]["resources"]["policy.base"]
.as_object_mut()
.unwrap()
.remove("applies_to");
fs::write(
dir.path().join(CLUSTER_STATE_FILE),
serde_json::to_string_pretty(&state).unwrap(),
)
.unwrap();
let err = read_serving_snapshot(dir.path()).unwrap_err();
assert!(
err.iter()
.any(|diagnostic| diagnostic.code == "catalog_payload_digest_mismatch"),
"{err:?}"
);
assert!(
err.iter().any(|diagnostic| diagnostic.code == "policy_bindings_missing"),
"{err:?}"
);
let _ = snapshot; // the pre-tamper read succeeded
}
#[test]
fn serving_snapshot_refuses_empty_cluster() {
let dir = fixture();
write_state_resources(dir.path(), &[]); // state exists, no graphs
let err = read_serving_snapshot(dir.path()).unwrap_err();
assert!(
err.iter().any(|diagnostic| diagnostic.code == "cluster_empty"),
"{err:?}"
);
}
#[test]
fn status_warns_on_pending_recovery_sidecar() {
let dir = fixture();