diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 37db77f..08c1fab 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -3558,7 +3558,7 @@ async fn main() -> Result<()> { finish_cluster_plan(&output, json)?; } ClusterCommand::Apply { config, json } => { - let output = apply_config_dir(config); + let output = apply_config_dir(config).await; finish_cluster_apply(&output, json)?; } ClusterCommand::Status { config, json } => { diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 660f34c..56513ca 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -561,7 +561,7 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { /// state is the publish point: a failure after payload writes leaves inert /// digest-named blobs and no success acknowledgement; re-running apply is the /// repair. -pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { +pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); @@ -3932,10 +3932,10 @@ graphs: .join(format!("{digest}.yaml")) } - #[test] - fn apply_without_state_fails_with_state_missing() { + #[tokio::test] + async fn apply_without_state_fails_with_state_missing() { let dir = fixture(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!( out.diagnostics @@ -3948,8 +3948,8 @@ graphs: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn apply_writes_payloads_state_and_statuses() { + #[tokio::test] + async fn apply_writes_payloads_state_and_statuses() { let dir = fixture(); write_applyable_state(dir.path()); let desired = validate_config_dir(dir.path()); @@ -3965,7 +3965,7 @@ graphs: .unwrap() .clone(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(out.applied_count, 2); assert_eq!(out.deferred_count, 0); @@ -4011,8 +4011,8 @@ graphs: out.desired_revision.config_digest.clone().unwrap() } - #[test] - fn apply_update_changes_query_digest_and_keeps_old_blob() { + #[tokio::test] + async fn apply_update_changes_query_digest_and_keeps_old_blob() { let dir = fixture(); let desired = validate_config_dir(dir.path()); let schema_digest = desired @@ -4035,7 +4035,7 @@ graphs: fs::create_dir_all(old_blob.parent().unwrap()).unwrap(); fs::write(&old_blob, "old query source").unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); let new_digest = desired .resource_digests @@ -4050,8 +4050,8 @@ graphs: assert!(query_payload_path(dir.path(), new_digest).exists()); } - #[test] - fn apply_deletes_removed_resources_but_keeps_blobs() { + #[tokio::test] + async fn apply_deletes_removed_resources_but_keeps_blobs() { let dir = fixture(); let desired = validate_config_dir(dir.path()); let schema_digest = desired @@ -4080,7 +4080,7 @@ graphs: fs::create_dir_all(stale_blob.parent().unwrap()).unwrap(); fs::write(&stale_blob, "old policy").unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.converged); let state = read_state_json(dir.path()); @@ -4109,8 +4109,8 @@ graphs: assert_eq!(resources["graph.knowledge"]["digest"], expected_composite); } - #[test] - fn apply_defers_schema_change_and_blocks_dependent_query() { + #[tokio::test] + async fn apply_defers_schema_change_and_blocks_dependent_query() { let dir = fixture(); write_applyable_state(dir.path()); // Change the schema after seeding state: schema.knowledge now differs. @@ -4120,7 +4120,7 @@ graphs: ) .unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.converged); let by_resource: BTreeMap<&str, &PlanChange> = out @@ -4185,12 +4185,12 @@ graphs: ); } - #[test] - fn apply_blocks_resources_of_uncreated_graph() { + #[tokio::test] + async fn apply_blocks_resources_of_uncreated_graph() { let dir = fixture(); write_state_resources(dir.path(), &[]); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(out.applied_count, 0); assert!(!out.converged); @@ -4227,8 +4227,8 @@ graphs: ); } - #[test] - fn apply_does_not_delete_subtree_of_deleted_graph() { + #[tokio::test] + async fn apply_does_not_delete_subtree_of_deleted_graph() { let dir = fixture(); let desired = validate_config_dir(dir.path()); let schema_digest = desired @@ -4249,7 +4249,7 @@ graphs: ], ); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.converged); let by_resource: BTreeMap<&str, &PlanChange> = out @@ -4276,17 +4276,17 @@ graphs: assert_eq!(resources["query.old.q"]["digest"], "5555"); } - #[test] - fn apply_is_idempotent() { + #[tokio::test] + async fn apply_is_idempotent() { let dir = fixture(); write_applyable_state(dir.path()); - let first = apply_config_dir(dir.path()); + let first = apply_config_dir(dir.path()).await; assert!(first.ok, "{:?}", first.diagnostics); assert!(first.state_written); let state_after_first = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); - let second = apply_config_dir(dir.path()); + let second = apply_config_dir(dir.path()).await; assert!(second.ok, "{:?}", second.diagnostics); assert!(second.changes.is_empty()); assert_eq!(second.applied_count, 0); @@ -4297,13 +4297,13 @@ graphs: assert_eq!(second.state_observations.state_revision, 2); } - #[test] - fn apply_respects_held_lock() { + #[tokio::test] + async fn apply_respects_held_lock() { let dir = fixture(); write_applyable_state(dir.path()); write_lock_file(dir.path(), "held-lock", "plan"); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!( out.diagnostics @@ -4317,8 +4317,8 @@ graphs: assert_eq!(state["state_revision"], 1); } - #[test] - fn apply_state_lock_false_bypasses_with_warning() { + #[tokio::test] + async fn apply_state_lock_false_bypasses_with_warning() { let dir = fixture(); fs::write( dir.path().join(CLUSTER_CONFIG_FILE), @@ -4338,7 +4338,7 @@ graphs: .unwrap(); write_applyable_state(dir.path()); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(out.state_written); assert!(!out.state_observations.lock_acquired); @@ -4350,8 +4350,8 @@ graphs: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn apply_skips_existing_payload_blob() { + #[tokio::test] + async fn apply_skips_existing_payload_blob() { let dir = fixture(); write_applyable_state(dir.path()); let desired = validate_config_dir(dir.path()); @@ -4366,13 +4366,13 @@ graphs: fs::create_dir_all(blob.parent().unwrap()).unwrap(); fs::write(&blob, "pre-existing").unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(fs::read_to_string(&blob).unwrap(), "pre-existing"); } - #[test] - fn apply_invalid_config_fails_before_lock() { + #[tokio::test] + async fn apply_invalid_config_fails_before_lock() { let dir = fixture(); fs::write( dir.path().join(CLUSTER_CONFIG_FILE), @@ -4380,7 +4380,7 @@ graphs: ) .unwrap(); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); // Config errors bail before the lock or any state directory exists. assert!(!dir.path().join(CLUSTER_STATE_DIR).exists()); @@ -4391,8 +4391,8 @@ graphs: /// mutations (phantom `applied` entries would mislead automation that /// reads `resource_statuses` independently of `ok`). #[cfg(unix)] - #[test] - fn apply_state_write_failure_reports_persisted_statuses() { + #[tokio::test] + async fn apply_state_write_failure_reports_persisted_statuses() { use std::os::unix::fs::PermissionsExt; let dir = fixture(); @@ -4435,7 +4435,7 @@ graphs: return; } - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap(); assert!(!out.ok); @@ -4459,9 +4459,9 @@ graphs: // ---- catalog payload verification (Stage 3B) ---- /// Converge a fixture dir and return the query blob path. - fn converge_fixture(config_dir: &Path) -> std::path::PathBuf { + async fn converge_fixture(config_dir: &Path) -> std::path::PathBuf { write_applyable_state(config_dir); - let out = apply_config_dir(config_dir); + let out = apply_config_dir(config_dir).await; assert!(out.ok && out.converged, "{:?}", out.diagnostics); let desired = validate_config_dir(config_dir); query_payload_path( @@ -4473,10 +4473,10 @@ graphs: ) } - #[test] - fn status_reports_missing_payload_read_only() { + #[tokio::test] + async fn status_reports_missing_payload_read_only() { let dir = fixture(); - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); fs::remove_file(&blob).unwrap(); @@ -4501,7 +4501,7 @@ graphs: async fn refresh_removes_digest_and_drifts_on_missing_payload() { let dir = fixture(); init_derived_graph(dir.path()).await; - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; fs::remove_file(&blob).unwrap(); let out = refresh_config_dir(dir.path()).await; @@ -4527,7 +4527,7 @@ graphs: async fn refresh_drifts_on_corrupted_payload() { let dir = fixture(); init_derived_graph(dir.path()).await; - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; fs::write(&blob, "corrupted content").unwrap(); let out = refresh_config_dir(dir.path()).await; @@ -4547,7 +4547,7 @@ graphs: async fn refresh_flags_unreadable_payload_as_error() { let dir = fixture(); init_derived_graph(dir.path()).await; - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; // A same-named directory yields a non-NotFound IO error portably. fs::remove_file(&blob).unwrap(); fs::create_dir(&blob).unwrap(); @@ -4575,7 +4575,7 @@ graphs: async fn payload_drift_self_heals_through_refresh_plan_apply() { let dir = fixture(); init_derived_graph(dir.path()).await; - let blob = converge_fixture(dir.path()); + let blob = converge_fixture(dir.path()).await; let original = fs::read_to_string(&blob).unwrap(); fs::remove_file(&blob).unwrap(); @@ -4591,7 +4591,7 @@ graphs: assert_eq!(query_change.operation, PlanOperation::Create); assert_eq!(query_change.disposition, Some(ApplyDisposition::Applied)); - let apply = apply_config_dir(dir.path()); + let apply = apply_config_dir(dir.path()).await; assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); assert_eq!(fs::read_to_string(&blob).unwrap(), original); diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index db7b82d..743f1fe 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -99,14 +99,14 @@ fn query_blob(config_dir: &Path, digests: &BTreeMap) -> PathBuf .join(format!("{}.gq", digests["query.knowledge.find_person"])) } -#[test] -fn failpoint_wiring_returns_injected_diagnostic() { +#[tokio::test] +async fn failpoint_wiring_returns_injected_diagnostic() { let scenario = FailScenario::setup(); let dir = fixture(); seed_applyable_state(dir.path()); let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(out.diagnostics.iter().any(|diagnostic| { diagnostic.code == "injected_failpoint" @@ -121,8 +121,8 @@ fn failpoint_wiring_returns_injected_diagnostic() { /// Crash between the payload phase and the state write: blobs are on disk, /// state.json is byte-identical, nothing is acknowledged — and a plain re-run /// repairs by trusting the existing content-addressed blobs. -#[test] -fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { +#[tokio::test] +async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { let scenario = FailScenario::setup(); let dir = fixture(); let digests = seed_applyable_state(dir.path()); @@ -130,7 +130,7 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { { let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; assert!(!out.ok); assert!(!out.state_written); assert!(!out.converged); @@ -149,7 +149,7 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { } // The repair is a plain re-run: existing blobs are trusted by digest. - let recovered = apply_config_dir(dir.path()); + let recovered = apply_config_dir(dir.path()).await; assert!(recovered.ok, "{:?}", recovered.diagnostics); assert!(recovered.converged); assert!(recovered.state_written); @@ -163,8 +163,8 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { /// A concurrent writer mutating state.json between apply's read and its write /// (possible under `state.lock: false`) must surface `state_cas_mismatch`, /// acknowledge nothing, and leave the concurrent writer's state on disk. -#[test] -fn apply_cas_race_surfaces_state_cas_mismatch() { +#[tokio::test] +async fn apply_cas_race_surfaces_state_cas_mismatch() { let scenario = FailScenario::setup(); let dir = fixture(); let digests = seed_applyable_state(dir.path()); @@ -182,7 +182,7 @@ fn apply_cas_race_surfaces_state_cas_mismatch() { fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap(); }); - let out = apply_config_dir(dir.path()); + let out = apply_config_dir(dir.path()).await; drop(failpoint); assert!(!out.ok); @@ -212,7 +212,7 @@ fn apply_cas_race_surfaces_state_cas_mismatch() { assert!(query_blob(dir.path(), &digests).exists()); // Recovery is a plain re-run against the rewritten state. - let recovered = apply_config_dir(dir.path()); + let recovered = apply_config_dir(dir.path()).await; assert!(recovered.ok, "{:?}", recovered.diagnostics); assert!(recovered.converged); scenario.teardown();