mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
feat(cluster): graph-create recovery sidecars and sweep
RFC-004 §D2/§D3 for the graph_create kind. RecoverySidecar records intent
under __cluster/recoveries/{ulid}.json; the roll-forward-only sweep runs at
the start of apply/refresh/import under the state lock and classifies each
survivor by observation: root absent -> intent removed (row 1); outcome
already recorded -> retired (row 2); create completed but state stale ->
ledger rolled forward with a recovery_records audit entry (row 4); partial
root -> Error/graph_create_incomplete, kept, never auto-deleted (row 5);
unexpected schema -> Drifted/actual_applied_state_pending, kept (row 6).
Sweep mutations ride the command's existing CAS write; completed sidecars
are deleted only after that write lands. Read-only status/plan warn
(cluster_recovery_pending) without acting. The apply payload gate now counts
only payload-phase errors so kept-sidecar diagnostics don't abort the run
before their statuses persist.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
6fbf09d5c9
commit
bf8cc7a753
1 changed files with 548 additions and 6 deletions
|
|
@ -24,6 +24,7 @@ pub const CLUSTER_STATE_DIR: &str = "__cluster";
|
|||
pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json";
|
||||
pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json";
|
||||
pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources";
|
||||
pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
|
|
@ -415,11 +416,53 @@ struct StateLockFile {
|
|||
pid: u32,
|
||||
}
|
||||
|
||||
/// Recovery-intent record for a graph-moving apply operation (RFC-004 §D2).
|
||||
/// Written under the state lock before the engine call that can create or
|
||||
/// move a graph manifest; deleted only after the cluster state CAS that
|
||||
/// records the outcome lands. The sweep (§D3) classifies survivors.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct RecoverySidecar {
|
||||
schema_version: u32,
|
||||
operation_id: String,
|
||||
started_at: String,
|
||||
#[serde(default)]
|
||||
actor: Option<String>,
|
||||
kind: RecoverySidecarKind,
|
||||
graph_id: String,
|
||||
graph_uri: String,
|
||||
#[serde(default)]
|
||||
observed_manifest_version: Option<u64>,
|
||||
#[serde(default)]
|
||||
expected_manifest_version: Option<u64>,
|
||||
desired_schema_digest: String,
|
||||
#[serde(default)]
|
||||
state_cas_base: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
enum RecoverySidecarKind {
|
||||
GraphCreate,
|
||||
// SchemaApply and GraphDelete arrive with stages 4B/4C.
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct SweepOutcome {
|
||||
/// Graphs whose sidecar was kept (rows 5/6): graph-moving work for them
|
||||
/// is blocked until the operator repairs and re-observes.
|
||||
pending_graphs: BTreeSet<String>,
|
||||
/// Sidecars whose outcome is recorded (rows 2/4): deleted only after the
|
||||
/// command's state write lands, so a CAS failure re-sweeps them.
|
||||
completed_sidecars: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LocalStateBackend {
|
||||
state_dir: PathBuf,
|
||||
state_path: PathBuf,
|
||||
lock_path: PathBuf,
|
||||
recoveries_dir: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
@ -513,6 +556,10 @@ pub fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
|
|||
None
|
||||
};
|
||||
|
||||
// Plan is read-only: pending sidecars are reported, never acted on
|
||||
// (RFC-004 open question 3 keeps read-only commands warn-only).
|
||||
warn_pending_recovery_sidecars(&desired.config_dir, &mut diagnostics);
|
||||
|
||||
let mut prior_resources = BTreeMap::new();
|
||||
if !has_errors(&diagnostics) {
|
||||
match backend.read_state(&mut observations) {
|
||||
|
|
@ -656,7 +703,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
}
|
||||
};
|
||||
let expected_cas = snapshot.state_cas;
|
||||
let Some(state) = snapshot.state else {
|
||||
let Some(mut state) = snapshot.state else {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"state_missing",
|
||||
CLUSTER_STATE_FILE,
|
||||
|
|
@ -672,9 +719,16 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
);
|
||||
};
|
||||
|
||||
// Snapshot the as-read state BEFORE the sweep so sweep mutations count as
|
||||
// changes for the final dirty check and get persisted by the state CAS.
|
||||
let before_value =
|
||||
serde_json::to_value(&state).expect("cluster state must serialize deterministically");
|
||||
let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await;
|
||||
|
||||
let prior_resources = state_resource_digests(&state);
|
||||
let mut changes = diff_resources(&prior_resources, &desired.resource_digests);
|
||||
classify_changes(&mut changes, &desired.dependencies);
|
||||
let _ = &sweep.pending_graphs; // consumed by the graph-create executor (4A C4)
|
||||
|
||||
// Defensive invariant: nothing the approval gate covers may be executable.
|
||||
// Today approvals only cover graph/schema deletes (always deferred); this
|
||||
|
|
@ -723,6 +777,9 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
|
||||
// Payload phase: content-addressed writes before the state CAS. Any
|
||||
// failure aborts before state moves; blobs already written are inert.
|
||||
// Gate on payload-phase errors only — sweep errors (e.g. a kept row-5
|
||||
// sidecar) must not abort the run, or their statuses would never persist.
|
||||
let errors_before_payloads = count_errors(&diagnostics);
|
||||
let source_paths: BTreeMap<&str, &str> = desired
|
||||
.resources
|
||||
.iter()
|
||||
|
|
@ -761,7 +818,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
diagnostics.push(diagnostic);
|
||||
}
|
||||
}
|
||||
if has_errors(&diagnostics) {
|
||||
if count_errors(&diagnostics) > errors_before_payloads {
|
||||
return early_return(
|
||||
display_path(&desired.config_dir),
|
||||
Some(desired.config_digest),
|
||||
|
|
@ -788,9 +845,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
}
|
||||
|
||||
// State mutation. Apply owns query/policy statuses only; graph/schema
|
||||
// statuses belong to refresh/import observation and must not be clobbered.
|
||||
let before_value =
|
||||
serde_json::to_value(&state).expect("cluster state must serialize deterministically");
|
||||
// statuses belong to refresh/import observation and must not be clobbered
|
||||
// (the sweep above is the one exception: it owns recovery statuses).
|
||||
let mut new_state = state.clone();
|
||||
for change in &changes {
|
||||
match change.disposition {
|
||||
|
|
@ -855,6 +911,13 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
}
|
||||
}
|
||||
}
|
||||
// Completed (rows 2/4) sweep sidecars are deleted only once their outcome
|
||||
// is durably recorded; on a failed write they stay and re-sweep next run.
|
||||
if !state_write_failed {
|
||||
for sidecar_path in &sweep.completed_sidecars {
|
||||
let _ = fs::remove_file(sidecar_path);
|
||||
}
|
||||
}
|
||||
// On a failed state write, report the statuses that are actually on disk
|
||||
// (the pre-apply snapshot), not the in-memory mutations that were never
|
||||
// persisted — automation reading `resource_statuses` independently of `ok`
|
||||
|
|
@ -902,6 +965,7 @@ pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
|
|||
let backend = LocalStateBackend::new(&parsed.config_dir);
|
||||
let mut observations = backend.observations();
|
||||
backend.observe_lock(&mut observations, &mut diagnostics);
|
||||
warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics);
|
||||
|
||||
let mut resource_digests = BTreeMap::new();
|
||||
let mut resource_statuses = BTreeMap::new();
|
||||
|
|
@ -1107,6 +1171,11 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
|
|||
(StateSyncOperation::Import, None) => initial_import_state(&desired),
|
||||
};
|
||||
|
||||
// Recovery sweep first (RFC-004 §D3): classify any interrupted graph
|
||||
// operation before observation/verification so a rolled-forward outcome
|
||||
// is what those passes see.
|
||||
let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await;
|
||||
|
||||
// Catalog payload verification must run BEFORE graph observation: removing
|
||||
// a drifted query digest first means the live-graph composite recompute
|
||||
// below already excludes it, so the persisted graph.<id> composite stays
|
||||
|
|
@ -1177,7 +1246,13 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
|
|||
}
|
||||
|
||||
match backend.write_state(&state, expected_cas.as_deref(), &mut observations) {
|
||||
Ok(()) => {}
|
||||
Ok(()) => {
|
||||
// Completed sweep sidecars are deleted only after their outcome
|
||||
// is durably recorded; on failure they stay and re-sweep.
|
||||
for sidecar_path in &sweep.completed_sidecars {
|
||||
let _ = fs::remove_file(sidecar_path);
|
||||
}
|
||||
}
|
||||
Err(diagnostic) => diagnostics.push(diagnostic),
|
||||
}
|
||||
|
||||
|
|
@ -1307,10 +1382,104 @@ impl LocalStateBackend {
|
|||
Self {
|
||||
state_path: config_dir.join(CLUSTER_STATE_FILE),
|
||||
lock_path: config_dir.join(CLUSTER_LOCK_FILE),
|
||||
recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR),
|
||||
state_dir,
|
||||
}
|
||||
}
|
||||
|
||||
/// List recovery sidecars in ULID (filename) order. Unparseable files are
|
||||
/// reported as warnings and skipped — they stay on disk for the operator.
|
||||
fn list_recovery_sidecars(
|
||||
&self,
|
||||
diagnostics: &mut Vec<Diagnostic>,
|
||||
) -> Vec<(PathBuf, RecoverySidecar)> {
|
||||
let mut paths = Vec::new();
|
||||
match fs::read_dir(&self.recoveries_dir) {
|
||||
Ok(entries) => {
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if path.extension().is_some_and(|ext| ext == "json") {
|
||||
paths.push(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||
Err(err) => {
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"recovery_sidecar_read_error",
|
||||
CLUSTER_RECOVERIES_DIR,
|
||||
format!("could not list recovery sidecars: {err}"),
|
||||
));
|
||||
}
|
||||
}
|
||||
paths.sort();
|
||||
let mut sidecars = Vec::new();
|
||||
for path in paths {
|
||||
match fs::read_to_string(&path)
|
||||
.map_err(|err| err.to_string())
|
||||
.and_then(|text| {
|
||||
serde_json::from_str::<RecoverySidecar>(&text).map_err(|err| err.to_string())
|
||||
}) {
|
||||
Ok(sidecar) if sidecar.schema_version == 1 => sidecars.push((path, sidecar)),
|
||||
Ok(sidecar) => diagnostics.push(Diagnostic::warning(
|
||||
"unsupported_recovery_sidecar_version",
|
||||
display_path(&path),
|
||||
format!(
|
||||
"unsupported recovery sidecar version {}; leaving it in place",
|
||||
sidecar.schema_version
|
||||
),
|
||||
)),
|
||||
Err(err) => diagnostics.push(Diagnostic::warning(
|
||||
"invalid_recovery_sidecar",
|
||||
display_path(&path),
|
||||
format!("could not parse recovery sidecar ({err}); leaving it in place"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
sidecars
|
||||
}
|
||||
|
||||
/// Atomically write (or rewrite) a recovery sidecar; returns its path.
|
||||
fn write_recovery_sidecar(&self, sidecar: &RecoverySidecar) -> Result<PathBuf, Diagnostic> {
|
||||
fs::create_dir_all(&self.recoveries_dir).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"recovery_sidecar_write_error",
|
||||
CLUSTER_RECOVERIES_DIR,
|
||||
format!("could not create recoveries directory: {err}"),
|
||||
)
|
||||
})?;
|
||||
let target = self
|
||||
.recoveries_dir
|
||||
.join(format!("{}.json", sidecar.operation_id));
|
||||
let mut payload = serde_json::to_string_pretty(sidecar).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"recovery_sidecar_write_error",
|
||||
display_path(&target),
|
||||
format!("could not encode recovery sidecar: {err}"),
|
||||
)
|
||||
})?;
|
||||
payload.push('\n');
|
||||
let tmp_path = self
|
||||
.recoveries_dir
|
||||
.join(format!("{}.json.tmp.{}", sidecar.operation_id, Ulid::new()));
|
||||
fs::write(&tmp_path, payload.as_bytes()).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"recovery_sidecar_write_error",
|
||||
display_path(&tmp_path),
|
||||
format!("could not write recovery sidecar: {err}"),
|
||||
)
|
||||
})?;
|
||||
if let Err(err) = fs::rename(&tmp_path, &target) {
|
||||
let _ = fs::remove_file(&tmp_path);
|
||||
return Err(Diagnostic::error(
|
||||
"recovery_sidecar_write_error",
|
||||
display_path(&target),
|
||||
format!("could not move recovery sidecar into place: {err}"),
|
||||
));
|
||||
}
|
||||
Ok(target)
|
||||
}
|
||||
|
||||
fn observations(&self) -> StateObservations {
|
||||
StateObservations {
|
||||
state_path: display_path(&self.state_path),
|
||||
|
|
@ -1701,6 +1870,169 @@ fn initial_import_state(desired: &DesiredCluster) -> ClusterState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Recovery sweep (RFC-004 §D3): runs at the start of every state-mutating
|
||||
/// cluster command, under the state lock, before the command's own work.
|
||||
/// Roll-forward-only — the engine's own sidecars make each graph-level
|
||||
/// operation atomic within the graph, so the cluster never rolls a graph
|
||||
/// back; it converges the ledger to observable reality or refuses loudly.
|
||||
/// Mutations ride the calling command's CAS-checked state write; completed
|
||||
/// sidecars are deleted only after that write lands.
|
||||
async fn sweep_recovery_sidecars(
|
||||
backend: &LocalStateBackend,
|
||||
state: &mut ClusterState,
|
||||
diagnostics: &mut Vec<Diagnostic>,
|
||||
) -> SweepOutcome {
|
||||
let mut outcome = SweepOutcome::default();
|
||||
for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) {
|
||||
match sidecar.kind {
|
||||
RecoverySidecarKind::GraphCreate => {
|
||||
sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
outcome
|
||||
}
|
||||
|
||||
async fn sweep_graph_create_sidecar(
|
||||
path: PathBuf,
|
||||
sidecar: RecoverySidecar,
|
||||
state: &mut ClusterState,
|
||||
diagnostics: &mut Vec<Diagnostic>,
|
||||
outcome: &mut SweepOutcome,
|
||||
) {
|
||||
let graph_address = graph_address(&sidecar.graph_id);
|
||||
let schema_addr = schema_address(&sidecar.graph_id);
|
||||
let graph_path = PathBuf::from(&sidecar.graph_uri);
|
||||
|
||||
// Row 1: nothing moved — the init never landed. The sidecar is pure
|
||||
// intent; remove it and let the command's own plan re-propose the create.
|
||||
if !graph_path.exists() {
|
||||
let _ = fs::remove_file(&path);
|
||||
return;
|
||||
}
|
||||
|
||||
match Omnigraph::open_read_only(&sidecar.graph_uri).await {
|
||||
Ok(db) => {
|
||||
let live_digest = sha256_hex(db.schema_source().as_bytes());
|
||||
let recorded = state
|
||||
.applied_revision
|
||||
.resources
|
||||
.get(&schema_addr)
|
||||
.map(|resource| resource.digest.clone());
|
||||
if recorded.as_deref() == Some(live_digest.as_str()) {
|
||||
// Row 2: crash fell between the state CAS and sidecar delete.
|
||||
outcome.completed_sidecars.push(path);
|
||||
} else if live_digest == sidecar.desired_schema_digest {
|
||||
// Row 4: the create completed on the graph; roll the cluster
|
||||
// state forward to observable reality.
|
||||
state.applied_revision.resources.insert(
|
||||
schema_addr.clone(),
|
||||
StateResource {
|
||||
digest: live_digest.clone(),
|
||||
},
|
||||
);
|
||||
let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id);
|
||||
let composite =
|
||||
graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests));
|
||||
state
|
||||
.applied_revision
|
||||
.resources
|
||||
.insert(graph_address.clone(), StateResource { digest: composite });
|
||||
set_resource_status_applied(state, &graph_address);
|
||||
set_resource_status_applied(state, &schema_addr);
|
||||
state.recovery_records.insert(
|
||||
sidecar.operation_id.clone(),
|
||||
json!({
|
||||
"kind": "graph_create",
|
||||
"graph_id": sidecar.graph_id,
|
||||
"outcome": "rolled_forward",
|
||||
"recovered_at": now_rfc3339(),
|
||||
"actor": sidecar.actor,
|
||||
}),
|
||||
);
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"cluster_recovery_rolled_forward",
|
||||
graph_address.clone(),
|
||||
"an interrupted graph create had completed on the graph; cluster state was rolled forward to match",
|
||||
));
|
||||
outcome.completed_sidecars.push(path);
|
||||
} else {
|
||||
// Row 6: the graph moved to something the sidecar did not
|
||||
// intend. Refuse to guess; require refresh + operator re-plan.
|
||||
set_resource_status(
|
||||
state,
|
||||
&graph_address,
|
||||
ResourceLifecycleStatus::Drifted,
|
||||
"actual_applied_state_pending",
|
||||
"graph state does not match the interrupted operation; run `cluster refresh` and re-plan",
|
||||
);
|
||||
set_resource_status(
|
||||
state,
|
||||
&schema_addr,
|
||||
ResourceLifecycleStatus::Drifted,
|
||||
"actual_applied_state_pending",
|
||||
"graph state does not match the interrupted operation; run `cluster refresh` and re-plan",
|
||||
);
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"cluster_recovery_pending",
|
||||
graph_address.clone(),
|
||||
"an interrupted graph create left unexpected graph state; graph-moving work is blocked until repaired",
|
||||
));
|
||||
outcome.pending_graphs.insert(sidecar.graph_id.clone());
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// Row 5: partial root (the engine's documented init gap). Never
|
||||
// auto-delete — reconciler deletes are the same data-loss class
|
||||
// as human deletes; the operator removes the root explicitly.
|
||||
set_resource_status(
|
||||
state,
|
||||
&graph_address,
|
||||
ResourceLifecycleStatus::Error,
|
||||
"graph_create_incomplete",
|
||||
"graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`",
|
||||
);
|
||||
set_resource_status(
|
||||
state,
|
||||
&schema_addr,
|
||||
ResourceLifecycleStatus::Error,
|
||||
"graph_create_incomplete",
|
||||
"graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`",
|
||||
);
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"graph_create_incomplete",
|
||||
graph_address.clone(),
|
||||
format!(
|
||||
"graph root '{}' exists but cannot be opened ({err}); remove the graph root and re-run `cluster apply`",
|
||||
sidecar.graph_uri
|
||||
),
|
||||
));
|
||||
outcome.pending_graphs.insert(sidecar.graph_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read-only commands report pending sidecars without acting on them.
|
||||
fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec<Diagnostic>) {
|
||||
let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR);
|
||||
let Ok(entries) = fs::read_dir(&recoveries_dir) else {
|
||||
return;
|
||||
};
|
||||
let mut names: Vec<String> = entries
|
||||
.flatten()
|
||||
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "json"))
|
||||
.map(|entry| entry.file_name().to_string_lossy().into_owned())
|
||||
.collect();
|
||||
names.sort();
|
||||
for name in names {
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"cluster_recovery_pending",
|
||||
format!("{CLUSTER_RECOVERIES_DIR}/{name}"),
|
||||
"a recovery sidecar from an interrupted apply is pending; the next state-mutating command will classify it",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize {
|
||||
let mut graph_error_count = 0;
|
||||
for graph in &desired.graphs {
|
||||
|
|
@ -2868,6 +3200,13 @@ fn has_errors(diagnostics: &[Diagnostic]) -> bool {
|
|||
.any(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error)
|
||||
}
|
||||
|
||||
fn count_errors(diagnostics: &[Diagnostic]) -> usize {
|
||||
diagnostics
|
||||
.iter()
|
||||
.filter(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error)
|
||||
.count()
|
||||
}
|
||||
|
||||
fn display_path(path: &Path) -> String {
|
||||
path.display().to_string()
|
||||
}
|
||||
|
|
@ -4621,6 +4960,209 @@ graphs:
|
|||
);
|
||||
}
|
||||
|
||||
// ---- recovery sidecars + sweep (Stage 4A) ----
|
||||
|
||||
fn derived_graph_uri(config_dir: &Path, graph_id: &str) -> String {
|
||||
display_path(
|
||||
&config_dir
|
||||
.join(CLUSTER_GRAPHS_DIR)
|
||||
.join(format!("{graph_id}.omni")),
|
||||
)
|
||||
}
|
||||
|
||||
fn write_create_sidecar(
|
||||
config_dir: &Path,
|
||||
graph_id: &str,
|
||||
desired_schema_digest: &str,
|
||||
operation_id: &str,
|
||||
) -> PathBuf {
|
||||
let dir = config_dir.join(CLUSTER_RECOVERIES_DIR);
|
||||
fs::create_dir_all(&dir).unwrap();
|
||||
let path = dir.join(format!("{operation_id}.json"));
|
||||
fs::write(
|
||||
&path,
|
||||
serde_json::to_string_pretty(&json!({
|
||||
"schema_version": 1,
|
||||
"operation_id": operation_id,
|
||||
"started_at": "1970-01-01T00:00:00Z",
|
||||
"kind": "graph_create",
|
||||
"graph_id": graph_id,
|
||||
"graph_uri": derived_graph_uri(config_dir, graph_id),
|
||||
"desired_schema_digest": desired_schema_digest,
|
||||
}))
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
path
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sweep_removes_sidecar_when_root_absent() {
|
||||
let dir = fixture();
|
||||
write_applyable_state(dir.path());
|
||||
let sidecar = write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01ROW1");
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
// Row 1: nothing moved; intent removed, run proceeds normally.
|
||||
assert!(!sidecar.exists());
|
||||
assert!(out.converged);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sweep_rolls_forward_completed_create() {
|
||||
let dir = fixture();
|
||||
init_derived_graph(dir.path()).await;
|
||||
write_state_resources(dir.path(), &[]); // state predates the create
|
||||
let desired = validate_config_dir(dir.path());
|
||||
let schema_digest = desired.resource_digests["schema.knowledge"].clone();
|
||||
let sidecar = write_create_sidecar(dir.path(), "knowledge", &schema_digest, "01ROW4");
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
|
||||
);
|
||||
// Row 4: ledger converged to observable reality, audit recorded,
|
||||
// sidecar retired after the CAS landed.
|
||||
let state = read_state_json(dir.path());
|
||||
assert_eq!(
|
||||
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
|
||||
schema_digest
|
||||
);
|
||||
assert!(
|
||||
state["recovery_records"]
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.values()
|
||||
.any(|record| record["outcome"] == "rolled_forward"
|
||||
&& record["graph_id"] == "knowledge")
|
||||
);
|
||||
assert!(!sidecar.exists());
|
||||
// With the graph rolled forward, the same run converges the catalog.
|
||||
assert!(out.converged, "{out:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sweep_completes_already_recorded_create() {
|
||||
let dir = fixture();
|
||||
init_derived_graph(dir.path()).await;
|
||||
write_applyable_state(dir.path()); // state already records graph+schema
|
||||
let desired = validate_config_dir(dir.path());
|
||||
let sidecar = write_create_sidecar(
|
||||
dir.path(),
|
||||
"knowledge",
|
||||
&desired.resource_digests["schema.knowledge"],
|
||||
"01ROW2",
|
||||
);
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
// Row 2: outcome was already durable; no audit entry, sidecar retired.
|
||||
assert!(!sidecar.exists());
|
||||
let state = read_state_json(dir.path());
|
||||
assert!(
|
||||
state["recovery_records"]
|
||||
.as_object()
|
||||
.is_none_or(|records| records.is_empty()),
|
||||
"{state}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sweep_keeps_sidecar_for_incomplete_root() {
|
||||
let dir = fixture();
|
||||
write_applyable_state(dir.path());
|
||||
// A root that exists but cannot be opened: the engine's partial-init gap.
|
||||
let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni");
|
||||
fs::create_dir_all(&root).unwrap();
|
||||
fs::write(root.join("_schema.pg"), "junk").unwrap();
|
||||
let sidecar = write_create_sidecar(dir.path(), "knowledge", "whatever", "01ROW5");
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(!out.ok);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "graph_create_incomplete")
|
||||
);
|
||||
// Row 5: never auto-delete; sidecar and root stay for the operator,
|
||||
// and the Error status is persisted by the run's state write.
|
||||
assert!(sidecar.exists());
|
||||
assert!(root.exists());
|
||||
let state = read_state_json(dir.path());
|
||||
assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error");
|
||||
assert!(
|
||||
state["resource_statuses"]["graph.knowledge"]["conditions"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|condition| condition == "graph_create_incomplete")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sweep_flags_unexpected_schema_as_pending() {
|
||||
let dir = fixture();
|
||||
write_state_resources(dir.path(), &[]);
|
||||
// Live graph exists with a schema the sidecar never intended.
|
||||
let graph_dir = dir.path().join(CLUSTER_GRAPHS_DIR);
|
||||
fs::create_dir_all(&graph_dir).unwrap();
|
||||
Omnigraph::init(
|
||||
&derived_graph_uri(dir.path(), "knowledge"),
|
||||
"\nnode Other {\n name: String @key\n}\n",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let desired = validate_config_dir(dir.path());
|
||||
let sidecar = write_create_sidecar(
|
||||
dir.path(),
|
||||
"knowledge",
|
||||
&desired.resource_digests["schema.knowledge"],
|
||||
"01ROW6",
|
||||
);
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics); // warning, not error
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "cluster_recovery_pending")
|
||||
);
|
||||
// Row 6: refuse to guess; sidecar kept, Drifted persisted.
|
||||
assert!(sidecar.exists());
|
||||
let state = read_state_json(dir.path());
|
||||
assert_eq!(
|
||||
state["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"drifted"
|
||||
);
|
||||
assert!(
|
||||
state["resource_statuses"]["graph.knowledge"]["conditions"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|condition| condition == "actual_applied_state_pending")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_warns_on_pending_recovery_sidecar() {
|
||||
let dir = fixture();
|
||||
write_applyable_state(dir.path());
|
||||
write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01STATUS");
|
||||
|
||||
let out = status_config_dir(dir.path());
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "cluster_recovery_pending"
|
||||
&& diagnostic.severity == DiagnosticSeverity::Warning)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plan_annotates_apply_dispositions() {
|
||||
let dir = fixture();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue