From b313075476a5c49d747b425b6bc6bd245258a93e Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:02:12 +0300 Subject: [PATCH] refactor(cluster): make plan_config_dir async Mechanical conversion ahead of Stage 4B (plan will preview schema migrations against live graphs): signature, CLI dispatch, and test callers. Zero behavior change. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 2 +- crates/omnigraph-cluster/src/lib.rs | 74 ++++++++++++++--------------- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 08c1fab..7d09f36 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -3554,7 +3554,7 @@ async fn main() -> Result<()> { finish_cluster_validate(&output, json)?; } ClusterCommand::Plan { config, json } => { - let output = plan_config_dir(config); + let output = plan_config_dir(config).await; finish_cluster_plan(&output, json)?; } ClusterCommand::Apply { config, json } => { diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 863691c..7f92a67 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -499,7 +499,7 @@ pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { } } -pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { +pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); @@ -3681,10 +3681,10 @@ graphs: ); } - #[test] - fn missing_state_plans_creates() { + #[tokio::test] + async fn missing_state_plans_creates() { let dir = fixture(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.state_observations.state_found); assert!(!out.state_observations.locked); @@ -3698,10 +3698,10 @@ graphs: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn config_digest_ignores_yaml_comments_and_formatting() { + #[tokio::test] + async fn config_digest_ignores_yaml_comments_and_formatting() { let dir = fixture(); - let first = plan_config_dir(dir.path()); + let first = plan_config_dir(dir.path()).await; assert!(first.ok, "{:?}", first.diagnostics); fs::write( @@ -3724,7 +3724,7 @@ policies: ) .unwrap(); - let second = plan_config_dir(dir.path()); + let second = plan_config_dir(dir.path()).await; assert!(second.ok, "{:?}", second.diagnostics); assert_eq!( first.desired_revision.config_digest, @@ -3732,10 +3732,10 @@ policies: ); } - #[test] - fn existing_state_plans_update_and_delete_deterministically() { + #[tokio::test] + async fn existing_state_plans_update_and_delete_deterministically() { let dir = fixture(); - let first = plan_config_dir(dir.path()); + let first = plan_config_dir(dir.path()).await; let state_dir = dir.path().join("__cluster"); fs::create_dir_all(&state_dir).unwrap(); fs::write( @@ -3755,7 +3755,7 @@ policies: ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); let rendered: Vec<_> = out .changes @@ -3773,8 +3773,8 @@ policies: ); } - #[test] - fn old_minimal_state_json_still_plans_with_default_revision() { + #[tokio::test] + async fn old_minimal_state_json_still_plans_with_default_revision() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -3792,7 +3792,7 @@ policies: ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(out.state_observations.state_revision, 0); assert!(out.state_observations.state_cas.is_some()); @@ -4018,12 +4018,12 @@ graphs: assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn plan_succeeds_after_force_unlock() { + #[tokio::test] + async fn plan_succeeds_after_force_unlock() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "plan"); - let locked = plan_config_dir(dir.path()); + let locked = plan_config_dir(dir.path()).await; assert!(!locked.ok); assert!( locked @@ -4035,12 +4035,12 @@ graphs: let unlocked = force_unlock_config_dir(dir.path(), "held-lock"); assert!(unlocked.ok, "{:?}", unlocked.diagnostics); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); } - #[test] - fn plan_reports_state_cas_revision_and_removes_lock() { + #[tokio::test] + async fn plan_reports_state_cas_revision_and_removes_lock() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -4056,7 +4056,7 @@ graphs: }"#; fs::write(state_dir.join("state.json"), state).unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(out.state_observations.state_revision, 7); assert_eq!( @@ -4073,8 +4073,8 @@ graphs: ); } - #[test] - fn existing_lock_makes_plan_fail() { + #[tokio::test] + async fn existing_lock_makes_plan_fail() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -4090,7 +4090,7 @@ graphs: ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(!out.ok); assert!(out.state_observations.locked); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); @@ -4111,8 +4111,8 @@ graphs: })); } - #[test] - fn state_lock_false_bypasses_lock_with_warning() { + #[tokio::test] + async fn state_lock_false_bypasses_lock_with_warning() { let dir = fixture(); fs::write( dir.path().join(CLUSTER_CONFIG_FILE), @@ -4128,7 +4128,7 @@ graphs: ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.state_observations.locked); assert!(!out.state_observations.lock_acquired); @@ -4153,15 +4153,15 @@ graphs: assert_eq!(out.diagnostics[0].code, "unsupported_state_backend"); } - #[test] - fn external_state_backend_plan_rejected() { + #[tokio::test] + async fn external_state_backend_plan_rejected() { let dir = fixture(); fs::write( dir.path().join(CLUSTER_CONFIG_FILE), "version: 1\nstate:\n backend: s3://bucket/state\ngraphs: {}\n", ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(!out.ok); assert!( out.diagnostics @@ -4304,7 +4304,7 @@ graphs: assert!(!out.resource_digests.contains_key("graph.knowledge")); assert_eq!(out.observations["graph.knowledge"]["exists"], false); - let plan = plan_config_dir(dir.path()); + let plan = plan_config_dir(dir.path()).await; assert!(plan.ok, "{:?}", plan.diagnostics); assert!(plan.changes.iter().any(|change| { change.resource == "graph.knowledge" && change.operation == PlanOperation::Create @@ -4342,7 +4342,7 @@ graphs: false ); - let plan = plan_config_dir(dir.path()); + let plan = plan_config_dir(dir.path()).await; assert!(plan.ok, "{:?}", plan.diagnostics); assert!(plan.changes.iter().any(|change| { change.resource == "schema.knowledge" && change.operation == PlanOperation::Update @@ -5233,7 +5233,7 @@ graphs: let refresh = refresh_config_dir(dir.path()).await; assert!(refresh.ok, "{:?}", refresh.diagnostics); - let plan = plan_config_dir(dir.path()); + let plan = plan_config_dir(dir.path()).await; let query_change = plan .changes .iter() @@ -5516,10 +5516,10 @@ graphs: ); } - #[test] - fn plan_annotates_apply_dispositions() { + #[tokio::test] + async fn plan_annotates_apply_dispositions() { let dir = fixture(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); let by_resource: BTreeMap<&str, &PlanChange> = out .changes