refactor(cluster): make apply_config_dir async

Mechanical conversion ahead of Stage 4A graph create (which calls the async
Omnigraph::init from inside apply): the fn signature, the CLI dispatch arm,
and every test caller (#[test] -> #[tokio::test]). Zero behavior change; all
60 lib tests and 3 failpoint tests green before and after.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 04:43:38 +03:00
parent 26b26999fd
commit 6fbf09d5c9
3 changed files with 63 additions and 63 deletions

View file

@ -3558,7 +3558,7 @@ async fn main() -> Result<()> {
finish_cluster_plan(&output, json)?;
}
ClusterCommand::Apply { config, json } => {
let output = apply_config_dir(config);
let output = apply_config_dir(config).await;
finish_cluster_apply(&output, json)?;
}
ClusterCommand::Status { config, json } => {

View file

@ -561,7 +561,7 @@ pub fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
/// state is the publish point: a failure after payload writes leaves inert
/// digest-named blobs and no success acknowledgement; re-running apply is the
/// repair.
pub fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
@ -3932,10 +3932,10 @@ graphs:
.join(format!("{digest}.yaml"))
}
#[test]
fn apply_without_state_fails_with_state_missing() {
#[tokio::test]
async fn apply_without_state_fails_with_state_missing() {
let dir = fixture();
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
out.diagnostics
@ -3948,8 +3948,8 @@ graphs:
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn apply_writes_payloads_state_and_statuses() {
#[tokio::test]
async fn apply_writes_payloads_state_and_statuses() {
let dir = fixture();
write_applyable_state(dir.path());
let desired = validate_config_dir(dir.path());
@ -3965,7 +3965,7 @@ graphs:
.unwrap()
.clone();
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert_eq!(out.applied_count, 2);
assert_eq!(out.deferred_count, 0);
@ -4011,8 +4011,8 @@ graphs:
out.desired_revision.config_digest.clone().unwrap()
}
#[test]
fn apply_update_changes_query_digest_and_keeps_old_blob() {
#[tokio::test]
async fn apply_update_changes_query_digest_and_keeps_old_blob() {
let dir = fixture();
let desired = validate_config_dir(dir.path());
let schema_digest = desired
@ -4035,7 +4035,7 @@ graphs:
fs::create_dir_all(old_blob.parent().unwrap()).unwrap();
fs::write(&old_blob, "old query source").unwrap();
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
let new_digest = desired
.resource_digests
@ -4050,8 +4050,8 @@ graphs:
assert!(query_payload_path(dir.path(), new_digest).exists());
}
#[test]
fn apply_deletes_removed_resources_but_keeps_blobs() {
#[tokio::test]
async fn apply_deletes_removed_resources_but_keeps_blobs() {
let dir = fixture();
let desired = validate_config_dir(dir.path());
let schema_digest = desired
@ -4080,7 +4080,7 @@ graphs:
fs::create_dir_all(stale_blob.parent().unwrap()).unwrap();
fs::write(&stale_blob, "old policy").unwrap();
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.converged);
let state = read_state_json(dir.path());
@ -4109,8 +4109,8 @@ graphs:
assert_eq!(resources["graph.knowledge"]["digest"], expected_composite);
}
#[test]
fn apply_defers_schema_change_and_blocks_dependent_query() {
#[tokio::test]
async fn apply_defers_schema_change_and_blocks_dependent_query() {
let dir = fixture();
write_applyable_state(dir.path());
// Change the schema after seeding state: schema.knowledge now differs.
@ -4120,7 +4120,7 @@ graphs:
)
.unwrap();
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.converged);
let by_resource: BTreeMap<&str, &PlanChange> = out
@ -4185,12 +4185,12 @@ graphs:
);
}
#[test]
fn apply_blocks_resources_of_uncreated_graph() {
#[tokio::test]
async fn apply_blocks_resources_of_uncreated_graph() {
let dir = fixture();
write_state_resources(dir.path(), &[]);
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert_eq!(out.applied_count, 0);
assert!(!out.converged);
@ -4227,8 +4227,8 @@ graphs:
);
}
#[test]
fn apply_does_not_delete_subtree_of_deleted_graph() {
#[tokio::test]
async fn apply_does_not_delete_subtree_of_deleted_graph() {
let dir = fixture();
let desired = validate_config_dir(dir.path());
let schema_digest = desired
@ -4249,7 +4249,7 @@ graphs:
],
);
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.converged);
let by_resource: BTreeMap<&str, &PlanChange> = out
@ -4276,17 +4276,17 @@ graphs:
assert_eq!(resources["query.old.q"]["digest"], "5555");
}
#[test]
fn apply_is_idempotent() {
#[tokio::test]
async fn apply_is_idempotent() {
let dir = fixture();
write_applyable_state(dir.path());
let first = apply_config_dir(dir.path());
let first = apply_config_dir(dir.path()).await;
assert!(first.ok, "{:?}", first.diagnostics);
assert!(first.state_written);
let state_after_first = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap();
let second = apply_config_dir(dir.path());
let second = apply_config_dir(dir.path()).await;
assert!(second.ok, "{:?}", second.diagnostics);
assert!(second.changes.is_empty());
assert_eq!(second.applied_count, 0);
@ -4297,13 +4297,13 @@ graphs:
assert_eq!(second.state_observations.state_revision, 2);
}
#[test]
fn apply_respects_held_lock() {
#[tokio::test]
async fn apply_respects_held_lock() {
let dir = fixture();
write_applyable_state(dir.path());
write_lock_file(dir.path(), "held-lock", "plan");
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
out.diagnostics
@ -4317,8 +4317,8 @@ graphs:
assert_eq!(state["state_revision"], 1);
}
#[test]
fn apply_state_lock_false_bypasses_with_warning() {
#[tokio::test]
async fn apply_state_lock_false_bypasses_with_warning() {
let dir = fixture();
fs::write(
dir.path().join(CLUSTER_CONFIG_FILE),
@ -4338,7 +4338,7 @@ graphs:
.unwrap();
write_applyable_state(dir.path());
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.state_written);
assert!(!out.state_observations.lock_acquired);
@ -4350,8 +4350,8 @@ graphs:
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn apply_skips_existing_payload_blob() {
#[tokio::test]
async fn apply_skips_existing_payload_blob() {
let dir = fixture();
write_applyable_state(dir.path());
let desired = validate_config_dir(dir.path());
@ -4366,13 +4366,13 @@ graphs:
fs::create_dir_all(blob.parent().unwrap()).unwrap();
fs::write(&blob, "pre-existing").unwrap();
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert_eq!(fs::read_to_string(&blob).unwrap(), "pre-existing");
}
#[test]
fn apply_invalid_config_fails_before_lock() {
#[tokio::test]
async fn apply_invalid_config_fails_before_lock() {
let dir = fixture();
fs::write(
dir.path().join(CLUSTER_CONFIG_FILE),
@ -4380,7 +4380,7 @@ graphs:
)
.unwrap();
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
// Config errors bail before the lock or any state directory exists.
assert!(!dir.path().join(CLUSTER_STATE_DIR).exists());
@ -4391,8 +4391,8 @@ graphs:
/// mutations (phantom `applied` entries would mislead automation that
/// reads `resource_statuses` independently of `ok`).
#[cfg(unix)]
#[test]
fn apply_state_write_failure_reports_persisted_statuses() {
#[tokio::test]
async fn apply_state_write_failure_reports_persisted_statuses() {
use std::os::unix::fs::PermissionsExt;
let dir = fixture();
@ -4435,7 +4435,7 @@ graphs:
return;
}
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap();
assert!(!out.ok);
@ -4459,9 +4459,9 @@ graphs:
// ---- catalog payload verification (Stage 3B) ----
/// Converge a fixture dir and return the query blob path.
fn converge_fixture(config_dir: &Path) -> std::path::PathBuf {
async fn converge_fixture(config_dir: &Path) -> std::path::PathBuf {
write_applyable_state(config_dir);
let out = apply_config_dir(config_dir);
let out = apply_config_dir(config_dir).await;
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
let desired = validate_config_dir(config_dir);
query_payload_path(
@ -4473,10 +4473,10 @@ graphs:
)
}
#[test]
fn status_reports_missing_payload_read_only() {
#[tokio::test]
async fn status_reports_missing_payload_read_only() {
let dir = fixture();
let blob = converge_fixture(dir.path());
let blob = converge_fixture(dir.path()).await;
let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap();
fs::remove_file(&blob).unwrap();
@ -4501,7 +4501,7 @@ graphs:
async fn refresh_removes_digest_and_drifts_on_missing_payload() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
let blob = converge_fixture(dir.path()).await;
fs::remove_file(&blob).unwrap();
let out = refresh_config_dir(dir.path()).await;
@ -4527,7 +4527,7 @@ graphs:
async fn refresh_drifts_on_corrupted_payload() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
let blob = converge_fixture(dir.path()).await;
fs::write(&blob, "corrupted content").unwrap();
let out = refresh_config_dir(dir.path()).await;
@ -4547,7 +4547,7 @@ graphs:
async fn refresh_flags_unreadable_payload_as_error() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
let blob = converge_fixture(dir.path()).await;
// A same-named directory yields a non-NotFound IO error portably.
fs::remove_file(&blob).unwrap();
fs::create_dir(&blob).unwrap();
@ -4575,7 +4575,7 @@ graphs:
async fn payload_drift_self_heals_through_refresh_plan_apply() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
let blob = converge_fixture(dir.path()).await;
let original = fs::read_to_string(&blob).unwrap();
fs::remove_file(&blob).unwrap();
@ -4591,7 +4591,7 @@ graphs:
assert_eq!(query_change.operation, PlanOperation::Create);
assert_eq!(query_change.disposition, Some(ApplyDisposition::Applied));
let apply = apply_config_dir(dir.path());
let apply = apply_config_dir(dir.path()).await;
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
assert_eq!(fs::read_to_string(&blob).unwrap(), original);

View file

@ -99,14 +99,14 @@ fn query_blob(config_dir: &Path, digests: &BTreeMap<String, String>) -> PathBuf
.join(format!("{}.gq", digests["query.knowledge.find_person"]))
}
#[test]
fn failpoint_wiring_returns_injected_diagnostic() {
#[tokio::test]
async fn failpoint_wiring_returns_injected_diagnostic() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_applyable_state(dir.path());
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "injected_failpoint"
@ -121,8 +121,8 @@ fn failpoint_wiring_returns_injected_diagnostic() {
/// Crash between the payload phase and the state write: blobs are on disk,
/// state.json is byte-identical, nothing is acknowledged — and a plain re-run
/// repairs by trusting the existing content-addressed blobs.
#[test]
fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
#[tokio::test]
async fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
let scenario = FailScenario::setup();
let dir = fixture();
let digests = seed_applyable_state(dir.path());
@ -130,7 +130,7 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
assert!(!out.converged);
@ -149,7 +149,7 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
}
// The repair is a plain re-run: existing blobs are trusted by digest.
let recovered = apply_config_dir(dir.path());
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
assert!(recovered.state_written);
@ -163,8 +163,8 @@ fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
/// A concurrent writer mutating state.json between apply's read and its write
/// (possible under `state.lock: false`) must surface `state_cas_mismatch`,
/// acknowledge nothing, and leave the concurrent writer's state on disk.
#[test]
fn apply_cas_race_surfaces_state_cas_mismatch() {
#[tokio::test]
async fn apply_cas_race_surfaces_state_cas_mismatch() {
let scenario = FailScenario::setup();
let dir = fixture();
let digests = seed_applyable_state(dir.path());
@ -182,7 +182,7 @@ fn apply_cas_race_surfaces_state_cas_mismatch() {
fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap();
});
let out = apply_config_dir(dir.path());
let out = apply_config_dir(dir.path()).await;
drop(failpoint);
assert!(!out.ok);
@ -212,7 +212,7 @@ fn apply_cas_race_surfaces_state_cas_mismatch() {
assert!(query_blob(dir.path(), &digests).exists());
// Recovery is a plain re-run against the rewritten state.
let recovered = apply_config_dir(dir.path());
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
scenario.teardown();