refactor(cluster): make plan_config_dir async

Mechanical conversion ahead of Stage 4B (plan will preview schema migrations
against live graphs): signature, CLI dispatch, and test callers. Zero
behavior change.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 13:02:12 +03:00
parent e6921157cc
commit b313075476
2 changed files with 38 additions and 38 deletions

View file

@ -3554,7 +3554,7 @@ async fn main() -> Result<()> {
finish_cluster_validate(&output, json)?;
}
ClusterCommand::Plan { config, json } => {
let output = plan_config_dir(config);
let output = plan_config_dir(config).await;
finish_cluster_plan(&output, json)?;
}
ClusterCommand::Apply { config, json } => {

View file

@ -499,7 +499,7 @@ pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
}
}
pub fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
@ -3681,10 +3681,10 @@ graphs:
);
}
#[test]
fn missing_state_plans_creates() {
#[tokio::test]
async fn missing_state_plans_creates() {
let dir = fixture();
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.state_observations.state_found);
assert!(!out.state_observations.locked);
@ -3698,10 +3698,10 @@ graphs:
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn config_digest_ignores_yaml_comments_and_formatting() {
#[tokio::test]
async fn config_digest_ignores_yaml_comments_and_formatting() {
let dir = fixture();
let first = plan_config_dir(dir.path());
let first = plan_config_dir(dir.path()).await;
assert!(first.ok, "{:?}", first.diagnostics);
fs::write(
@ -3724,7 +3724,7 @@ policies:
)
.unwrap();
let second = plan_config_dir(dir.path());
let second = plan_config_dir(dir.path()).await;
assert!(second.ok, "{:?}", second.diagnostics);
assert_eq!(
first.desired_revision.config_digest,
@ -3732,10 +3732,10 @@ policies:
);
}
#[test]
fn existing_state_plans_update_and_delete_deterministically() {
#[tokio::test]
async fn existing_state_plans_update_and_delete_deterministically() {
let dir = fixture();
let first = plan_config_dir(dir.path());
let first = plan_config_dir(dir.path()).await;
let state_dir = dir.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
@ -3755,7 +3755,7 @@ policies:
)
.unwrap();
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
let rendered: Vec<_> = out
.changes
@ -3773,8 +3773,8 @@ policies:
);
}
#[test]
fn old_minimal_state_json_still_plans_with_default_revision() {
#[tokio::test]
async fn old_minimal_state_json_still_plans_with_default_revision() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
@ -3792,7 +3792,7 @@ policies:
)
.unwrap();
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert_eq!(out.state_observations.state_revision, 0);
assert!(out.state_observations.state_cas.is_some());
@ -4018,12 +4018,12 @@ graphs:
assert!(dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn plan_succeeds_after_force_unlock() {
#[tokio::test]
async fn plan_succeeds_after_force_unlock() {
let dir = fixture();
write_lock_file(dir.path(), "held-lock", "plan");
let locked = plan_config_dir(dir.path());
let locked = plan_config_dir(dir.path()).await;
assert!(!locked.ok);
assert!(
locked
@ -4035,12 +4035,12 @@ graphs:
let unlocked = force_unlock_config_dir(dir.path(), "held-lock");
assert!(unlocked.ok, "{:?}", unlocked.diagnostics);
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
}
#[test]
fn plan_reports_state_cas_revision_and_removes_lock() {
#[tokio::test]
async fn plan_reports_state_cas_revision_and_removes_lock() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
@ -4056,7 +4056,7 @@ graphs:
}"#;
fs::write(state_dir.join("state.json"), state).unwrap();
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert_eq!(out.state_observations.state_revision, 7);
assert_eq!(
@ -4073,8 +4073,8 @@ graphs:
);
}
#[test]
fn existing_lock_makes_plan_fail() {
#[tokio::test]
async fn existing_lock_makes_plan_fail() {
let dir = fixture();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::create_dir_all(&state_dir).unwrap();
@ -4090,7 +4090,7 @@ graphs:
)
.unwrap();
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.state_observations.locked);
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
@ -4111,8 +4111,8 @@ graphs:
}));
}
#[test]
fn state_lock_false_bypasses_lock_with_warning() {
#[tokio::test]
async fn state_lock_false_bypasses_lock_with_warning() {
let dir = fixture();
fs::write(
dir.path().join(CLUSTER_CONFIG_FILE),
@ -4128,7 +4128,7 @@ graphs:
)
.unwrap();
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.state_observations.locked);
assert!(!out.state_observations.lock_acquired);
@ -4153,15 +4153,15 @@ graphs:
assert_eq!(out.diagnostics[0].code, "unsupported_state_backend");
}
#[test]
fn external_state_backend_plan_rejected() {
#[tokio::test]
async fn external_state_backend_plan_rejected() {
let dir = fixture();
fs::write(
dir.path().join(CLUSTER_CONFIG_FILE),
"version: 1\nstate:\n backend: s3://bucket/state\ngraphs: {}\n",
)
.unwrap();
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
out.diagnostics
@ -4304,7 +4304,7 @@ graphs:
assert!(!out.resource_digests.contains_key("graph.knowledge"));
assert_eq!(out.observations["graph.knowledge"]["exists"], false);
let plan = plan_config_dir(dir.path());
let plan = plan_config_dir(dir.path()).await;
assert!(plan.ok, "{:?}", plan.diagnostics);
assert!(plan.changes.iter().any(|change| {
change.resource == "graph.knowledge" && change.operation == PlanOperation::Create
@ -4342,7 +4342,7 @@ graphs:
false
);
let plan = plan_config_dir(dir.path());
let plan = plan_config_dir(dir.path()).await;
assert!(plan.ok, "{:?}", plan.diagnostics);
assert!(plan.changes.iter().any(|change| {
change.resource == "schema.knowledge" && change.operation == PlanOperation::Update
@ -5233,7 +5233,7 @@ graphs:
let refresh = refresh_config_dir(dir.path()).await;
assert!(refresh.ok, "{:?}", refresh.diagnostics);
let plan = plan_config_dir(dir.path());
let plan = plan_config_dir(dir.path()).await;
let query_change = plan
.changes
.iter()
@ -5516,10 +5516,10 @@ graphs:
);
}
#[test]
fn plan_annotates_apply_dispositions() {
#[tokio::test]
async fn plan_annotates_apply_dispositions() {
let dir = fixture();
let out = plan_config_dir(dir.path());
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
let by_resource: BTreeMap<&str, &PlanChange> = out
.changes