mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
- Crash before the removal: root intact, approval file unconsumed, sidecar survives, no ack; the next run retires the stale intent (row 8) and the still-approved delete completes in the same run. - Crash after the removal, before the state CAS: root gone, ledger byte-identical, the sidecar carries the approval id; the next run's sweep rolls the tombstone forward, consumes the approval, audits the recovery, and converges (row 7b). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
593 lines
21 KiB
Rust
593 lines
21 KiB
Rust
//! Fault-injection tests for the cluster apply protocol.
|
|
//!
|
|
//! These live in an integration binary (not in-source) deliberately: the fail
|
|
//! crate's registry is process-global, so a configured `cluster_apply.*`
|
|
//! action would fire inside any concurrently running normal apply test in the
|
|
//! lib-test process. A separate binary isolates the registry by construction —
|
|
//! same reason the engine keeps its failpoint suite in `tests/failpoints.rs`.
|
|
|
|
#![cfg(feature = "failpoints")]
|
|
|
|
use std::collections::BTreeMap;
|
|
use std::fs;
|
|
use std::path::{Path, PathBuf};
|
|
|
|
use fail::FailScenario;
|
|
use omnigraph_cluster::failpoints::ScopedFailPoint;
|
|
use omnigraph::db::Omnigraph;
|
|
use omnigraph_cluster::{
|
|
ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir,
|
|
validate_config_dir,
|
|
};
|
|
use tempfile::tempdir;
|
|
|
|
const SCHEMA: &str = r#"
|
|
node Person {
|
|
name: String @key
|
|
age: I32?
|
|
}
|
|
"#;
|
|
|
|
const QUERY: &str = r#"
|
|
query find_person($name: String) {
|
|
match { $p: Person { name: $name } }
|
|
return { $p.name, $p.age }
|
|
}
|
|
"#;
|
|
|
|
fn fixture() -> tempfile::TempDir {
|
|
let dir = tempdir().unwrap();
|
|
fs::write(dir.path().join("people.pg"), SCHEMA).unwrap();
|
|
fs::write(dir.path().join("people.gq"), QUERY).unwrap();
|
|
fs::write(dir.path().join("base.policy.yaml"), "rules: []\n").unwrap();
|
|
fs::write(
|
|
dir.path().join("cluster.yaml"),
|
|
r#"
|
|
version: 1
|
|
state:
|
|
backend: cluster
|
|
lock: true
|
|
graphs:
|
|
knowledge:
|
|
schema: ./people.pg
|
|
queries:
|
|
find_person:
|
|
file: ./people.gq
|
|
policies:
|
|
base:
|
|
file: ./base.policy.yaml
|
|
applies_to: [knowledge]
|
|
"#,
|
|
)
|
|
.unwrap();
|
|
dir
|
|
}
|
|
|
|
/// Seed a state.json where the graph/schema digests match desired, so query
|
|
/// and policy changes are applicable. Digests are borrowed from the public
|
|
/// validate output; the graph composite is a placeholder that apply converges
|
|
/// as a Derived update.
|
|
fn seed_applyable_state(config_dir: &Path) -> BTreeMap<String, String> {
|
|
let validate = validate_config_dir(config_dir);
|
|
assert!(validate.ok, "{:?}", validate.diagnostics);
|
|
let schema_digest = validate.resource_digests["schema.knowledge"].clone();
|
|
let state_dir = config_dir.join("__cluster");
|
|
fs::create_dir_all(&state_dir).unwrap();
|
|
fs::write(
|
|
state_dir.join("state.json"),
|
|
format!(
|
|
r#"{{
|
|
"version": 1,
|
|
"state_revision": 1,
|
|
"applied_revision": {{
|
|
"resources": {{
|
|
"graph.knowledge": {{ "digest": "seed" }},
|
|
"schema.knowledge": {{ "digest": "{schema_digest}" }}
|
|
}}
|
|
}}
|
|
}}
|
|
"#
|
|
),
|
|
)
|
|
.unwrap();
|
|
validate.resource_digests
|
|
}
|
|
|
|
fn state_path(config_dir: &Path) -> PathBuf {
|
|
config_dir.join("__cluster/state.json")
|
|
}
|
|
|
|
fn query_blob(config_dir: &Path, digests: &BTreeMap<String, String>) -> PathBuf {
|
|
config_dir
|
|
.join("__cluster/resources/query/knowledge/find_person")
|
|
.join(format!("{}.gq", digests["query.knowledge.find_person"]))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn failpoint_wiring_returns_injected_diagnostic() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
seed_applyable_state(dir.path());
|
|
|
|
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
|
|
let out = apply_config_dir(dir.path()).await;
|
|
assert!(!out.ok);
|
|
assert!(out.diagnostics.iter().any(|diagnostic| {
|
|
diagnostic.code == "injected_failpoint"
|
|
&& diagnostic
|
|
.message
|
|
.contains("cluster_apply.after_payload_phase")
|
|
}));
|
|
drop(_failpoint);
|
|
scenario.teardown();
|
|
}
|
|
|
|
/// Crash between the payload phase and the state write: blobs are on disk,
|
|
/// state.json is byte-identical, nothing is acknowledged — and a plain re-run
|
|
/// repairs by trusting the existing content-addressed blobs.
|
|
#[tokio::test]
|
|
async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
let digests = seed_applyable_state(dir.path());
|
|
let state_before = fs::read(state_path(dir.path())).unwrap();
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
|
|
let out = apply_config_dir(dir.path()).await;
|
|
assert!(!out.ok);
|
|
assert!(!out.state_written);
|
|
assert!(!out.converged);
|
|
assert_eq!(out.applied_count, 0);
|
|
// Persisted pre-apply snapshot: no phantom Applied statuses.
|
|
assert!(
|
|
!out.resource_statuses
|
|
.contains_key("query.knowledge.find_person"),
|
|
"{:?}",
|
|
out.resource_statuses
|
|
);
|
|
// State has not moved; payloads are inert on disk; the lock released.
|
|
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
|
|
assert!(query_blob(dir.path(), &digests).exists());
|
|
assert!(!dir.path().join("__cluster/lock.json").exists());
|
|
}
|
|
|
|
// The repair is a plain re-run: existing blobs are trusted by digest.
|
|
let recovered = apply_config_dir(dir.path()).await;
|
|
assert!(recovered.ok, "{:?}", recovered.diagnostics);
|
|
assert!(recovered.converged);
|
|
assert!(recovered.state_written);
|
|
assert_eq!(
|
|
recovered.resource_statuses["query.knowledge.find_person"].status,
|
|
omnigraph_cluster::ResourceLifecycleStatus::Applied
|
|
);
|
|
scenario.teardown();
|
|
}
|
|
|
|
/// A concurrent writer mutating state.json between apply's read and its write
|
|
/// (possible under `state.lock: false`) must surface `state_cas_mismatch`,
|
|
/// acknowledge nothing, and leave the concurrent writer's state on disk.
|
|
#[tokio::test]
|
|
async fn apply_cas_race_surfaces_state_cas_mismatch() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
let digests = seed_applyable_state(dir.path());
|
|
|
|
// Simulate the concurrent writer at the exact race window: rewrite
|
|
// state.json (valid JSON, graph/schema digests preserved, revision 99)
|
|
// after apply read it but before apply writes. RAII-guarded so a panic
|
|
// inside apply cannot leak the callback into the global registry.
|
|
let race_path = state_path(dir.path());
|
|
let failpoint =
|
|
ScopedFailPoint::with_callback("cluster_apply.before_state_write", move || {
|
|
let mut state: serde_json::Value =
|
|
serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap();
|
|
state["state_revision"] = serde_json::json!(99);
|
|
fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap();
|
|
});
|
|
|
|
let out = apply_config_dir(dir.path()).await;
|
|
drop(failpoint);
|
|
|
|
assert!(!out.ok);
|
|
assert!(!out.state_written);
|
|
assert!(
|
|
out.diagnostics
|
|
.iter()
|
|
.any(|diagnostic| diagnostic.code == "state_cas_mismatch"),
|
|
"{:?}",
|
|
out.diagnostics
|
|
);
|
|
// Persisted snapshot, not the unwritten in-memory mutations.
|
|
assert!(
|
|
!out.resource_statuses
|
|
.contains_key("query.knowledge.find_person")
|
|
);
|
|
// The concurrent writer's state is what's on disk; apply's mutation never landed.
|
|
let state: serde_json::Value =
|
|
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
|
|
assert_eq!(state["state_revision"], 99);
|
|
assert!(
|
|
state["applied_revision"]["resources"]
|
|
.get("query.knowledge.find_person")
|
|
.is_none()
|
|
);
|
|
// Blobs written before the race are inert.
|
|
assert!(query_blob(dir.path(), &digests).exists());
|
|
|
|
// Recovery is a plain re-run against the rewritten state.
|
|
let recovered = apply_config_dir(dir.path()).await;
|
|
assert!(recovered.ok, "{:?}", recovered.diagnostics);
|
|
assert!(recovered.converged);
|
|
scenario.teardown();
|
|
}
|
|
|
|
fn seed_empty_state(config_dir: &Path) {
|
|
let state_dir = config_dir.join("__cluster");
|
|
fs::create_dir_all(&state_dir).unwrap();
|
|
fs::write(
|
|
state_dir.join("state.json"),
|
|
r#"{
|
|
"version": 1,
|
|
"state_revision": 1,
|
|
"applied_revision": { "resources": {} }
|
|
}
|
|
"#,
|
|
)
|
|
.unwrap();
|
|
}
|
|
|
|
fn recovery_sidecars(config_dir: &Path) -> Vec<PathBuf> {
|
|
match fs::read_dir(config_dir.join("__cluster/recoveries")) {
|
|
Ok(entries) => {
|
|
let mut paths: Vec<PathBuf> = entries
|
|
.flatten()
|
|
.map(|entry| entry.path())
|
|
.filter(|path| path.extension().is_some_and(|ext| ext == "json"))
|
|
.collect();
|
|
paths.sort();
|
|
paths
|
|
}
|
|
Err(_) => Vec::new(),
|
|
}
|
|
}
|
|
|
|
/// Crash before the init: the create-intent sidecar survives, nothing moved.
|
|
/// The next run's sweep removes the intent (row 1) and the same run creates
|
|
/// the graph and converges.
|
|
#[tokio::test]
|
|
async fn create_crash_before_init_recovers_via_sweep() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
seed_empty_state(dir.path());
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_create", "return");
|
|
let out = apply_config_dir(dir.path()).await;
|
|
assert!(!out.ok);
|
|
assert!(out.diagnostics.iter().any(|diagnostic| {
|
|
diagnostic.code == "injected_failpoint"
|
|
&& diagnostic
|
|
.message
|
|
.contains("cluster_apply.before_graph_create")
|
|
}));
|
|
assert_eq!(recovery_sidecars(dir.path()).len(), 1);
|
|
assert!(!dir.path().join("graphs/knowledge.omni").exists());
|
|
// No resource digest moved.
|
|
let state: serde_json::Value = serde_json::from_str(
|
|
&fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(),
|
|
)
|
|
.unwrap();
|
|
assert!(
|
|
state["applied_revision"]["resources"]
|
|
.as_object()
|
|
.unwrap()
|
|
.is_empty()
|
|
);
|
|
}
|
|
|
|
let recovered = apply_config_dir(dir.path()).await;
|
|
assert!(recovered.ok, "{:?}", recovered.diagnostics);
|
|
assert!(recovered.converged);
|
|
assert!(dir.path().join("graphs/knowledge.omni").exists());
|
|
assert!(recovery_sidecars(dir.path()).is_empty());
|
|
scenario.teardown();
|
|
}
|
|
|
|
/// Crash after the init but before the state CAS: the graph exists, the
|
|
/// ledger is stale, nothing was acknowledged. The next run's sweep rolls the
|
|
/// ledger forward (row 4) with an audit entry, and the run converges.
|
|
#[tokio::test]
|
|
async fn create_crash_after_init_rolls_state_forward() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
seed_empty_state(dir.path());
|
|
let state_before = fs::read(dir.path().join("__cluster/state.json")).unwrap();
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_create", "return");
|
|
let out = apply_config_dir(dir.path()).await;
|
|
assert!(!out.ok);
|
|
assert!(!out.state_written);
|
|
// The graph exists; the cluster state is byte-identical (no ack).
|
|
assert!(dir.path().join("graphs/knowledge.omni").exists());
|
|
assert_eq!(
|
|
fs::read(dir.path().join("__cluster/state.json")).unwrap(),
|
|
state_before
|
|
);
|
|
// The sidecar carries the post-init manifest pin.
|
|
let sidecars = recovery_sidecars(dir.path());
|
|
assert_eq!(sidecars.len(), 1);
|
|
let sidecar: serde_json::Value =
|
|
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
|
|
assert!(
|
|
sidecar["expected_manifest_version"].is_number(),
|
|
"{sidecar}"
|
|
);
|
|
}
|
|
|
|
let recovered = apply_config_dir(dir.path()).await;
|
|
assert!(recovered.ok, "{:?}", recovered.diagnostics);
|
|
assert!(
|
|
recovered
|
|
.diagnostics
|
|
.iter()
|
|
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
|
|
);
|
|
assert!(recovered.converged);
|
|
assert!(recovery_sidecars(dir.path()).is_empty());
|
|
let state: serde_json::Value = serde_json::from_str(
|
|
&fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(),
|
|
)
|
|
.unwrap();
|
|
assert!(
|
|
state["recovery_records"]
|
|
.as_object()
|
|
.unwrap()
|
|
.values()
|
|
.any(|record| record["outcome"] == "rolled_forward")
|
|
);
|
|
scenario.teardown();
|
|
}
|
|
|
|
const SCHEMA_V2: &str = r#"
|
|
node Person {
|
|
name: String @key
|
|
age: I32?
|
|
bio: String?
|
|
}
|
|
"#;
|
|
|
|
async fn converge_with_live_graph(dir: &Path) {
|
|
let graph_dir = dir.join("graphs");
|
|
fs::create_dir_all(&graph_dir).unwrap();
|
|
Omnigraph::init(
|
|
graph_dir.join("knowledge.omni").to_string_lossy().as_ref(),
|
|
SCHEMA,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
seed_applyable_state(dir);
|
|
let out = apply_config_dir(dir).await;
|
|
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
|
|
}
|
|
|
|
async fn live_schema_digest(dir: &Path) -> String {
|
|
let uri = dir.join("graphs/knowledge.omni");
|
|
let db = Omnigraph::open_read_only(uri.to_string_lossy().as_ref())
|
|
.await
|
|
.unwrap();
|
|
use sha2::{Digest, Sha256};
|
|
let digest = Sha256::digest(db.schema_source().as_bytes());
|
|
digest.iter().map(|byte| format!("{byte:02x}")).collect()
|
|
}
|
|
|
|
/// Crash before the engine schema apply: sidecar (with actor) survives, the
|
|
/// live schema and ledger are untouched; the next run's sweep retires the
|
|
/// stale intent and the same run applies and converges.
|
|
#[tokio::test]
|
|
async fn schema_crash_before_apply_recovers_via_sweep() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
converge_with_live_graph(dir.path()).await;
|
|
let pre_digest = live_schema_digest(dir.path()).await;
|
|
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("cluster_apply.before_schema_apply", "return");
|
|
let out = apply_config_dir_with_options(
|
|
dir.path(),
|
|
ApplyOptions {
|
|
actor: Some("test-actor".to_string()),
|
|
},
|
|
)
|
|
.await;
|
|
assert!(!out.ok);
|
|
assert_eq!(out.actor.as_deref(), Some("test-actor"));
|
|
let sidecars = recovery_sidecars(dir.path());
|
|
assert_eq!(sidecars.len(), 1);
|
|
let sidecar: serde_json::Value =
|
|
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
|
|
assert_eq!(sidecar["kind"], "schema_apply");
|
|
assert_eq!(sidecar["actor"], "test-actor");
|
|
// Nothing moved.
|
|
assert_eq!(live_schema_digest(dir.path()).await, pre_digest);
|
|
}
|
|
|
|
let recovered = apply_config_dir(dir.path()).await;
|
|
assert!(recovered.ok, "{:?}", recovered.diagnostics);
|
|
assert!(recovered.converged);
|
|
assert!(recovery_sidecars(dir.path()).is_empty());
|
|
assert_ne!(live_schema_digest(dir.path()).await, pre_digest);
|
|
scenario.teardown();
|
|
}
|
|
|
|
/// Crash after the engine schema apply, before the state CAS: the manifest
|
|
/// moved, the ledger is stale, nothing acknowledged; the next run's sweep
|
|
/// rolls the ledger forward with an audit entry and the run converges.
|
|
#[tokio::test]
|
|
async fn schema_crash_after_apply_rolls_state_forward() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
converge_with_live_graph(dir.path()).await;
|
|
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
|
|
let state_before = fs::read(state_path(dir.path())).unwrap();
|
|
let desired = validate_config_dir(dir.path());
|
|
let v2_digest = desired.resource_digests["schema.knowledge"].clone();
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("cluster_apply.after_schema_apply", "return");
|
|
let out = apply_config_dir(dir.path()).await;
|
|
assert!(!out.ok);
|
|
assert!(!out.state_written);
|
|
// The live schema moved; the ledger is byte-identical (no ack).
|
|
assert_eq!(live_schema_digest(dir.path()).await, v2_digest);
|
|
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
|
|
let sidecars = recovery_sidecars(dir.path());
|
|
assert_eq!(sidecars.len(), 1);
|
|
let sidecar: serde_json::Value =
|
|
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
|
|
assert!(sidecar["expected_manifest_version"].is_number(), "{sidecar}");
|
|
}
|
|
|
|
let recovered = apply_config_dir(dir.path()).await;
|
|
assert!(recovered.ok, "{:?}", recovered.diagnostics);
|
|
assert!(
|
|
recovered
|
|
.diagnostics
|
|
.iter()
|
|
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
|
|
);
|
|
assert!(recovered.converged);
|
|
assert!(recovery_sidecars(dir.path()).is_empty());
|
|
let state: serde_json::Value =
|
|
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
|
|
assert_eq!(
|
|
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
|
|
v2_digest
|
|
);
|
|
scenario.teardown();
|
|
}
|
|
|
|
/// Seed: converged state + a stale `old` graph subtree with a real root and
|
|
/// a valid approval for its delete. Returns the approval id.
|
|
async fn seed_approved_delete(dir: &Path) -> String {
|
|
let digests = seed_applyable_state(dir);
|
|
let graph_digest = digests["graph.knowledge"].clone();
|
|
let schema_digest = digests["schema.knowledge"].clone();
|
|
let state_dir = dir.join("__cluster");
|
|
fs::write(
|
|
state_dir.join("state.json"),
|
|
format!(
|
|
r#"{{
|
|
"version": 1,
|
|
"state_revision": 1,
|
|
"applied_revision": {{
|
|
"resources": {{
|
|
"graph.knowledge": {{ "digest": "{graph_digest}" }},
|
|
"schema.knowledge": {{ "digest": "{schema_digest}" }},
|
|
"graph.old": {{ "digest": "3333" }},
|
|
"schema.old": {{ "digest": "4444" }}
|
|
}}
|
|
}}
|
|
}}
|
|
"#
|
|
),
|
|
)
|
|
.unwrap();
|
|
let root = dir.join("graphs/old.omni");
|
|
fs::create_dir_all(&root).unwrap();
|
|
fs::write(root.join("_schema.pg"), "stale").unwrap();
|
|
let approved = approve_config_dir(dir, "graph.old", "test-actor").await;
|
|
assert!(approved.ok, "{:?}", approved.diagnostics);
|
|
approved.approval_id.unwrap()
|
|
}
|
|
|
|
/// Crash before the removal: root intact, approval unconsumed, no ack; the
|
|
/// next run retires the stale intent (row 8) and the still-approved delete
|
|
/// completes in the same run.
|
|
#[tokio::test]
|
|
async fn delete_crash_before_removal_reproposes() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
let approval_id = seed_approved_delete(dir.path()).await;
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_delete", "return");
|
|
let out = apply_config_dir(dir.path()).await;
|
|
assert!(!out.ok);
|
|
assert!(dir.path().join("graphs/old.omni").exists());
|
|
assert_eq!(recovery_sidecars(dir.path()).len(), 1);
|
|
// The approval is untouched (file unconsumed).
|
|
let artifact: serde_json::Value = serde_json::from_str(
|
|
&fs::read_to_string(
|
|
dir.path()
|
|
.join("__cluster/approvals")
|
|
.join(format!("{approval_id}.json")),
|
|
)
|
|
.unwrap(),
|
|
)
|
|
.unwrap();
|
|
assert!(artifact["consumed_at"].is_null());
|
|
}
|
|
|
|
let recovered = apply_config_dir(dir.path()).await;
|
|
assert!(recovered.ok, "{:?}", recovered.diagnostics);
|
|
assert!(
|
|
recovered
|
|
.diagnostics
|
|
.iter()
|
|
.any(|diagnostic| diagnostic.code == "graph_delete_incomplete")
|
|
);
|
|
assert!(recovered.converged);
|
|
assert!(!dir.path().join("graphs/old.omni").exists());
|
|
assert!(recovery_sidecars(dir.path()).is_empty());
|
|
scenario.teardown();
|
|
}
|
|
|
|
/// Crash after the removal, before the state CAS: root gone, ledger stale,
|
|
/// nothing acknowledged; the next run's sweep rolls the tombstone forward,
|
|
/// consumes the approval the sidecar carries, and audits the recovery.
|
|
#[tokio::test]
|
|
async fn delete_crash_after_removal_rolls_forward() {
|
|
let scenario = FailScenario::setup();
|
|
let dir = fixture();
|
|
let approval_id = seed_approved_delete(dir.path()).await;
|
|
let state_before = fs::read(state_path(dir.path())).unwrap();
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_delete", "return");
|
|
let out = apply_config_dir(dir.path()).await;
|
|
assert!(!out.ok);
|
|
assert!(!out.state_written);
|
|
assert!(!dir.path().join("graphs/old.omni").exists());
|
|
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
|
|
let sidecars = recovery_sidecars(dir.path());
|
|
assert_eq!(sidecars.len(), 1);
|
|
let sidecar: serde_json::Value =
|
|
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
|
|
assert_eq!(sidecar["approval_id"], approval_id.as_str());
|
|
}
|
|
|
|
let recovered = apply_config_dir(dir.path()).await;
|
|
assert!(recovered.ok, "{:?}", recovered.diagnostics);
|
|
assert!(
|
|
recovered
|
|
.diagnostics
|
|
.iter()
|
|
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
|
|
);
|
|
assert!(recovered.converged);
|
|
let state: serde_json::Value =
|
|
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
|
|
assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone");
|
|
assert!(state["approval_records"][&approval_id]["consumed_at"].is_string());
|
|
assert!(
|
|
state["recovery_records"]
|
|
.as_object()
|
|
.unwrap()
|
|
.values()
|
|
.any(|record| record["kind"] == "graph_delete")
|
|
);
|
|
scenario.teardown();
|
|
}
|