From 0b84b1adc3ec274be26fd00e286a7c8af564ccf5 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 15:30:33 +0300 Subject: [PATCH] feat(cluster): record policy applies_to bindings in the applied revision MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slice 5A of RFC-005: the state ledger becomes serving-sufficient for the Phase-5 server boot. StateResource gains an optional applies_to (normalized typed refs: cluster | graph.), written by apply for every applied policy create/update from the desired config's validated bindings. The hole this closes: applies_to is not part of the policy file digest, so a binding-only edit previously produced NO plan change at all (a 4C e2e even asserted that — the gap, not a contract). Binding changes are now first-class: a post-diff pass emits an Update with equal before/after digests and a binding_change marker (visible in plan/apply JSON and human output as [bindings]), classification/execution treat it as an ordinary catalog-tier applied change (payload skips naturally — the blob is unchanged), and convergence requires zero binding divergence, so stale bindings can never report converged. Pre-5A ledger entries (no bindings recorded) surface as the same backfill Update; one apply heals them, exactly the remedy RFC-005's boot-error path names. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 10 +- crates/omnigraph-cli/tests/cli.rs | 7 +- crates/omnigraph-cluster/src/lib.rs | 220 +++++++++++++++++++++++++++- 3 files changed, 225 insertions(+), 12 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 8593ef3..673adb7 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -815,7 +815,8 @@ fn print_cluster_plan_human(output: &PlanOutput) { output.approvals_required.len() ); for change in &output.changes { - println!(" {:?} {}", change.operation, change.resource); + let bindings = if change.binding_change { " [bindings]" } else { "" }; + println!(" {:?} {}{bindings}", change.operation, change.resource); if let Some(migration) = &change.migration { if !migration.supported { println!(" migration UNSUPPORTED:"); @@ -862,16 +863,17 @@ fn print_cluster_apply_human(output: &ApplyOutput) { fn print_cluster_apply_changes(changes: &[omnigraph_cluster::PlanChange]) { for change in changes { + let bindings = if change.binding_change { " [bindings]" } else { "" }; match (&change.disposition, change.reason.as_deref()) { (Some(disposition), Some(reason)) => println!( - " {:?} {} [{disposition:?}: {reason}]", + " {:?} {}{bindings} [{disposition:?}: {reason}]", change.operation, change.resource ), (Some(disposition), None) => println!( - " {:?} {} [{disposition:?}]", + " {:?} {}{bindings} [{disposition:?}]", change.operation, change.resource ), - _ => println!(" {:?} {}", change.operation, change.resource), + _ => println!(" {:?} {}{bindings}", change.operation, change.resource), } } if changes.is_empty() { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 336f19e..e4590f6 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1451,9 +1451,10 @@ policies: change_for(&mixed, "query.knowledge.find_person")["disposition"], "applied" ); - // policy.shared's applies_to narrowed, but its FILE digest is unchanged - // — applies_to lives in cluster.yaml (the config digest), so it is not a - // resource change. + // 5A: policy.shared's applies_to narrowed with an unchanged file digest + // — now a first-class binding change, applied in the same run. + assert_eq!(change_for(&mixed, "policy.shared")["binding_change"], true); + assert_eq!(change_for(&mixed, "policy.shared")["disposition"], "applied"); assert_eq!( change_for(&mixed, "graph.knowledge")["disposition"], "derived" diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index f67d8f7..af2ef81 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -196,6 +196,11 @@ pub struct PlanChange { pub disposition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub reason: Option, + /// True for a policy change whose file digest is unchanged but whose + /// `applies_to` bindings differ from the applied revision (including the + /// pre-5A backfill case). + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub binding_change: bool, /// For schema updates: the engine's migration plan against the live /// graph (RFC-004 §D7's data-aware preview). Absent when the preview is /// unavailable (warning `schema_preview_unavailable`). @@ -347,6 +352,8 @@ struct DesiredCluster { resource_digests: BTreeMap, resources: Vec, dependencies: Vec, + /// `policy.` address -> normalized applies_to refs. + policy_bindings: BTreeMap>, } #[derive(Debug, Clone)] @@ -457,6 +464,13 @@ struct AppliedRevisionState { #[serde(deny_unknown_fields)] struct StateResource { digest: String, + /// Policy resources only: the applied `applies_to` bindings, normalized + /// to typed refs (`cluster` | `graph.`). Recorded so the state + /// ledger is serving-sufficient for the Phase-5 server boot (RFC-005 + /// §D3). Absent on pre-5A entries (backfilled by the next apply) and on + /// non-policy resources. + #[serde(default, skip_serializing_if = "Option::is_none")] + applies_to: Option>, } #[derive(Debug, Serialize, Deserialize)] @@ -623,11 +637,13 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { warn_pending_recovery_sidecars(&desired.config_dir, &mut diagnostics); let mut prior_resources = BTreeMap::new(); + let mut prior_state: Option = None; if !has_errors(&diagnostics) { match backend.read_state(&mut observations) { Ok(snapshot) => { if let Some(state) = snapshot.state { prior_resources = state_resource_digests(&state); + prior_state = Some(state); } } Err(diagnostic) => diagnostics.push(diagnostic), @@ -639,6 +655,9 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } else { diff_resources(&prior_resources, &desired.resource_digests) }; + if !has_errors(&diagnostics) { + append_policy_binding_changes(&mut changes, prior_state.as_ref(), &desired); + } // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. let artifacts = backend.list_approval_artifacts(&mut diagnostics); @@ -850,6 +869,7 @@ pub async fn apply_config_dir_with_options( let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); + append_policy_binding_changes(&mut changes, Some(&state), &desired); let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics); let approved = approved_resources( &approval_artifacts, @@ -1429,6 +1449,12 @@ pub async fn apply_config_dir_with_options( .after_digest .clone() .expect("create/update always carries an after digest"), + // Policies record their applied bindings so the + // ledger is serving-sufficient (RFC-005 §D3). + applies_to: desired + .policy_bindings + .get(&change.resource) + .cloned(), }, ); set_resource_status_applied(&mut new_state, &change.resource); @@ -1467,10 +1493,11 @@ pub async fn apply_config_dir_with_options( } recompute_state_graph_digests(&mut new_state, &desired); - let residual = diff_resources( + let mut residual = diff_resources( &state_resource_digests(&new_state), &desired.resource_digests, ); + append_policy_binding_changes(&mut residual, Some(&new_state), &desired); let converged = residual.is_empty(); if converged { new_state.applied_revision.config_digest = Some(desired.config_digest.clone()); @@ -2741,6 +2768,7 @@ async fn sweep_graph_create_sidecar( schema_addr.clone(), StateResource { digest: live_digest.clone(), + applies_to: None, }, ); let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); @@ -2749,7 +2777,7 @@ async fn sweep_graph_create_sidecar( state .applied_revision .resources - .insert(graph_address.clone(), StateResource { digest: composite }); + .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); set_resource_status_applied(state, &graph_address); set_resource_status_applied(state, &schema_addr); state.recovery_records.insert( @@ -2869,6 +2897,7 @@ async fn sweep_schema_apply_sidecar( schema_addr.clone(), StateResource { digest: live_digest.clone(), + applies_to: None, }, ); let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); @@ -2876,7 +2905,7 @@ async fn sweep_schema_apply_sidecar( state .applied_revision .resources - .insert(graph_address.clone(), StateResource { digest: composite }); + .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); set_resource_status_applied(state, &graph_address); set_resource_status_applied(state, &schema_addr); state.recovery_records.insert( @@ -3109,6 +3138,7 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt schema_address.clone(), StateResource { digest: observation.schema_digest.clone(), + applies_to: None, }, ); let query_digests = state_query_digests_for_graph(state, &graph.id); @@ -3121,6 +3151,7 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt graph_address.clone(), StateResource { digest: graph_digest_value, + applies_to: None, }, ); state.observations.insert( @@ -3455,6 +3486,7 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { ); } + let mut policy_bindings: BTreeMap> = BTreeMap::new(); for (policy_name, policy) in &raw.policies { validate_id( "policy name", @@ -3471,10 +3503,14 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { } let policy_address = policy_address(policy_name); + let mut normalized_bindings: Vec = Vec::new(); for (idx, target) in policy.applies_to.iter().enumerate() { match normalize_policy_target(target) { - PolicyTarget::Cluster => {} + PolicyTarget::Cluster => { + normalized_bindings.push("cluster".to_string()); + } PolicyTarget::Graph(graph_id) => { + normalized_bindings.push(graph_address(&graph_id)); if raw.graphs.contains_key(&graph_id) { dependencies.insert(Dependency { from: policy_address.clone(), @@ -3498,6 +3534,10 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { } } + normalized_bindings.sort(); + normalized_bindings.dedup(); + policy_bindings.insert(policy_address.clone(), normalized_bindings); + let policy_path = resolve_config_path(&config_dir, &policy.file); match fs::read(&policy_path) { Ok(bytes) => { @@ -3551,6 +3591,7 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { resource_digests, resources: resource_list, dependencies, + policy_bindings, }), diagnostics, config_dir, @@ -3614,6 +3655,7 @@ fn diff_resources( after_digest: Some(after.clone()), disposition: None, reason: None, + binding_change: false, migration: None, }), Some(before) if before != after => changes.push(PlanChange { @@ -3623,6 +3665,7 @@ fn diff_resources( after_digest: Some(after.clone()), disposition: None, reason: None, + binding_change: false, migration: None, }), Some(_) => {} @@ -3637,6 +3680,7 @@ fn diff_resources( after_digest: None, disposition: None, reason: None, + binding_change: false, migration: None, }); } @@ -3645,6 +3689,43 @@ fn diff_resources( changes } +/// Binding-only policy changes: the file digest is unchanged (so +/// `diff_resources` saw nothing) but the applied `applies_to` differs from +/// the desired bindings — including the pre-5A case where the state entry +/// has no bindings recorded yet. These are first-class plan changes: without +/// this pass a binding edit would silently rot or silently converge. +fn append_policy_binding_changes( + changes: &mut Vec, + prior_state: Option<&ClusterState>, + desired: &DesiredCluster, +) { + let Some(state) = prior_state else { + return; // no state: everything is already a Create carrying bindings + }; + for (address, desired_bindings) in &desired.policy_bindings { + if changes.iter().any(|change| &change.resource == address) { + continue; // content change already covers it + } + let Some(entry) = state.applied_revision.resources.get(address) else { + continue; // not applied yet: the Create covers it + }; + if entry.applies_to.as_ref() == Some(desired_bindings) { + continue; + } + changes.push(PlanChange { + resource: address.clone(), + operation: PlanOperation::Update, + before_digest: Some(entry.digest.clone()), + after_digest: Some(entry.digest.clone()), + disposition: None, + reason: None, + binding_change: true, + migration: None, + }); + } + changes.sort_by(|a, b| a.resource.cmp(&b.resource)); +} + fn compute_blast_radius(changes: &[PlanChange], dependencies: &[Dependency]) -> Vec { changes .iter() @@ -4163,7 +4244,7 @@ fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredClus state .applied_revision .resources - .insert(graph_address, StateResource { digest }); + .insert(graph_address, StateResource { digest, applies_to: None }); } } @@ -7062,6 +7143,135 @@ graphs: assert!(out.converged, "{out:?}"); } + // ---- policy bindings in the applied revision (5A) ---- + + #[tokio::test] + async fn apply_records_policy_bindings() { + let dir = fixture(); + write_applyable_state(dir.path()); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{:?}", out.diagnostics); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["policy.base"]["applies_to"], + serde_json::json!(["graph.knowledge"]), + "{state}" + ); + // Non-policy entries carry no bindings field at all. + assert!( + state["applied_revision"]["resources"]["query.knowledge.find_person"] + .get("applies_to") + .is_none() + ); + } + + #[tokio::test] + async fn binding_change_is_a_visible_plan_change() { + let dir = fixture(); + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + // Edit ONLY applies_to: the policy file digest is unchanged. + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +metadata: + name: test +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + base: + file: ./base.policy.yaml + applies_to: [cluster, knowledge] +"#, + ) + .unwrap(); + + let plan = plan_config_dir(dir.path()).await; + let change = plan + .changes + .iter() + .find(|change| change.resource == "policy.base") + .expect("binding change must be visible in plan"); + assert!(change.binding_change); + assert_eq!(change.operation, PlanOperation::Update); + assert_eq!(change.before_digest, change.after_digest); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{out:?}"); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["policy.base"]["applies_to"], + serde_json::json!(["cluster", "graph.knowledge"]) + ); + // Idempotent: a second run sees no changes. + let again = apply_config_dir(dir.path()).await; + assert!(again.changes.is_empty() && !again.state_written, "{again:?}"); + } + + #[tokio::test] + async fn pre_5a_state_backfills_bindings() { + let dir = fixture(); + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + // Strip the bindings from the state entry (a pre-5A ledger). + let mut state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(), + ) + .unwrap(); + state["applied_revision"]["resources"]["policy.base"] + .as_object_mut() + .unwrap() + .remove("applies_to"); + fs::write( + dir.path().join(CLUSTER_STATE_FILE), + serde_json::to_string_pretty(&state).unwrap(), + ) + .unwrap(); + + let plan = plan_config_dir(dir.path()).await; + assert!( + plan.changes + .iter() + .any(|change| change.resource == "policy.base" && change.binding_change), + "{plan:?}" + ); + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{out:?}"); + let healed = read_state_json(dir.path()); + assert_eq!( + healed["applied_revision"]["resources"]["policy.base"]["applies_to"], + serde_json::json!(["graph.knowledge"]) + ); + } + + #[tokio::test] + async fn bindings_survive_refresh() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + + let refresh = refresh_config_dir(dir.path()).await; + assert!(refresh.ok, "{:?}", refresh.diagnostics); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["policy.base"]["applies_to"], + serde_json::json!(["graph.knowledge"]) + ); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture();