feat(cluster): schema-apply recovery sidecar kind and sweep

RecoverySidecarKind::SchemaApply with digest-based sweep classification
(robust to unrelated manifest movement; version pins stay forensic):
ledger-consistent -> sidecar retired (RFC-004 rows 1+2); live digest matches
the intended schema, state stale -> roll forward with composite recompute and
a recovery_records audit entry (row 3); unverifiable or unexpected digests ->
pending, kept, graph-moving work blocked (rows 1-unopenable/6).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 13:05:42 +03:00
parent ca63a9340b
commit 0571c05ebb

View file

@ -450,7 +450,8 @@ struct RecoverySidecar {
#[serde(rename_all = "snake_case")]
enum RecoverySidecarKind {
GraphCreate,
// SchemaApply and GraphDelete arrive with stages 4B/4C.
SchemaApply,
// GraphDelete arrives with stage 4C.
}
#[derive(Debug, Default)]
@ -2090,6 +2091,9 @@ async fn sweep_recovery_sidecars(
RecoverySidecarKind::GraphCreate => {
sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
}
RecoverySidecarKind::SchemaApply => {
sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
}
}
}
outcome
@ -2214,6 +2218,102 @@ async fn sweep_graph_create_sidecar(
}
}
async fn sweep_schema_apply_sidecar(
path: PathBuf,
sidecar: RecoverySidecar,
state: &mut ClusterState,
diagnostics: &mut Vec<Diagnostic>,
outcome: &mut SweepOutcome,
) {
let graph_address = graph_address(&sidecar.graph_id);
let schema_addr = schema_address(&sidecar.graph_id);
// Digest-based classification: robust to unrelated manifest movement;
// the sidecar's version pins stay forensic.
let live_digest = match Omnigraph::open_read_only(&sidecar.graph_uri).await {
Ok(db) => sha256_hex(db.schema_source().as_bytes()),
Err(err) => {
// Cannot verify the interrupted operation — refuse to guess.
diagnostics.push(Diagnostic::warning(
"cluster_recovery_pending",
graph_address.clone(),
format!(
"an interrupted schema apply cannot be verified (graph '{}' did not open: {err}); graph-moving work is blocked until repaired",
sidecar.graph_uri
),
));
outcome.pending_graphs.insert(sidecar.graph_id.clone());
return;
}
};
let recorded = state
.applied_revision
.resources
.get(&schema_addr)
.map(|resource| resource.digest.clone());
if recorded.as_deref() == Some(live_digest.as_str()) {
// Ledger consistent with the live graph (the apply never landed, or
// landed and was recorded): the sidecar is stale intent — retire it.
outcome.completed_sidecars.push(path);
} else if live_digest == sidecar.desired_schema_digest {
// RFC-004 §D3 row 3: the schema apply completed on the graph; roll
// the cluster state forward to observable reality.
state.applied_revision.resources.insert(
schema_addr.clone(),
StateResource {
digest: live_digest.clone(),
},
);
let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id);
let composite = graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests));
state
.applied_revision
.resources
.insert(graph_address.clone(), StateResource { digest: composite });
set_resource_status_applied(state, &graph_address);
set_resource_status_applied(state, &schema_addr);
state.recovery_records.insert(
sidecar.operation_id.clone(),
json!({
"kind": "schema_apply",
"graph_id": sidecar.graph_id,
"outcome": "rolled_forward",
"recovered_at": now_rfc3339(),
"actor": sidecar.actor,
}),
);
diagnostics.push(Diagnostic::warning(
"cluster_recovery_rolled_forward",
graph_address.clone(),
"an interrupted schema apply had completed on the graph; cluster state was rolled forward to match",
));
outcome.completed_sidecars.push(path);
} else {
// Row 6: live schema is neither the recorded nor the desired digest.
set_resource_status(
state,
&graph_address,
ResourceLifecycleStatus::Drifted,
"actual_applied_state_pending",
"graph state does not match the interrupted operation; run `cluster refresh` and re-plan",
);
set_resource_status(
state,
&schema_addr,
ResourceLifecycleStatus::Drifted,
"actual_applied_state_pending",
"graph state does not match the interrupted operation; run `cluster refresh` and re-plan",
);
diagnostics.push(Diagnostic::warning(
"cluster_recovery_pending",
graph_address.clone(),
"an interrupted schema apply left unexpected graph state; graph-moving work is blocked until repaired",
));
outcome.pending_graphs.insert(sidecar.graph_id.clone());
}
}
/// Read-only commands report pending sidecars without acting on them.
fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec<Diagnostic>) {
let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR);
@ -5613,6 +5713,135 @@ graphs:
);
}
fn write_schema_apply_sidecar(
config_dir: &Path,
graph_id: &str,
desired_schema_digest: &str,
operation_id: &str,
) -> PathBuf {
let dir = config_dir.join(CLUSTER_RECOVERIES_DIR);
fs::create_dir_all(&dir).unwrap();
let path = dir.join(format!("{operation_id}.json"));
fs::write(
&path,
serde_json::to_string_pretty(&json!({
"schema_version": 1,
"operation_id": operation_id,
"started_at": "1970-01-01T00:00:00Z",
"kind": "schema_apply",
"graph_id": graph_id,
"graph_uri": derived_graph_uri(config_dir, graph_id),
"desired_schema_digest": desired_schema_digest,
}))
.unwrap(),
)
.unwrap();
path
}
const SCHEMA_V2: &str = "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n";
#[tokio::test]
async fn sweep_retires_schema_sidecar_when_ledger_consistent() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path()); // state digest == live digest
let sidecar =
write_schema_apply_sidecar(dir.path(), "knowledge", "never-applied", "01SROW1");
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!sidecar.exists());
let state = read_state_json(dir.path());
assert!(
state["recovery_records"]
.as_object()
.is_none_or(|records| records.is_empty())
);
}
#[tokio::test]
async fn sweep_rolls_forward_completed_schema_apply() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
// The schema apply completed on the graph out-of-process...
let graph_uri = derived_graph_uri(dir.path(), "knowledge");
let db = Omnigraph::open(&graph_uri).await.unwrap();
db.apply_schema(SCHEMA_V2).await.unwrap();
// ...the desired config matches it, and the sidecar records the intent.
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
let desired = validate_config_dir(dir.path());
let v2_digest = desired.resource_digests["schema.knowledge"].clone();
let sidecar = write_schema_apply_sidecar(dir.path(), "knowledge", &v2_digest, "01SROW3");
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
);
assert!(!sidecar.exists());
let state = read_state_json(dir.path());
assert_eq!(
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
v2_digest
);
assert!(
state["recovery_records"]
.as_object()
.unwrap()
.values()
.any(|record| record["kind"] == "schema_apply"
&& record["outcome"] == "rolled_forward")
);
assert!(out.converged, "{out:?}");
}
#[tokio::test]
async fn sweep_flags_unexpected_schema_apply_state_as_pending() {
let dir = fixture();
init_derived_graph(dir.path()).await; // live = v1
write_state_resources(dir.path(), &[("schema.knowledge", "stale-digest")]);
// Sidecar intended a digest that is neither live nor recorded.
let sidecar =
write_schema_apply_sidecar(dir.path(), "knowledge", "intended-digest", "01SROW6");
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics); // warnings only
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "cluster_recovery_pending")
);
assert!(sidecar.exists());
let state = read_state_json(dir.path());
assert_eq!(
state["resource_statuses"]["schema.knowledge"]["status"],
"drifted"
);
}
#[tokio::test]
async fn sweep_keeps_schema_sidecar_for_unopenable_root() {
let dir = fixture();
write_applyable_state(dir.path());
let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni");
fs::create_dir_all(&root).unwrap(); // exists, won't open
let sidecar =
write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SROWX");
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics); // warning: cannot verify
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "cluster_recovery_pending")
);
assert!(sidecar.exists());
}
#[test]
fn status_warns_on_pending_recovery_sidecar() {
let dir = fixture();