Merge pull request #175 from ModernRelay/feat/cluster-policy-bindings-5a

feat(cluster): Slice 5A — policy applies_to bindings in the applied revision
This commit is contained in:
Andrew Altshuler 2026-06-10 16:57:26 +03:00 committed by GitHub
commit bed36a8423
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 238 additions and 14 deletions

View file

@ -815,7 +815,8 @@ fn print_cluster_plan_human(output: &PlanOutput) {
output.approvals_required.len()
);
for change in &output.changes {
println!(" {:?} {}", change.operation, change.resource);
let bindings = if change.binding_change { " [bindings]" } else { "" };
println!(" {:?} {}{bindings}", change.operation, change.resource);
if let Some(migration) = &change.migration {
if !migration.supported {
println!(" migration UNSUPPORTED:");
@ -862,16 +863,17 @@ fn print_cluster_apply_human(output: &ApplyOutput) {
fn print_cluster_apply_changes(changes: &[omnigraph_cluster::PlanChange]) {
for change in changes {
let bindings = if change.binding_change { " [bindings]" } else { "" };
match (&change.disposition, change.reason.as_deref()) {
(Some(disposition), Some(reason)) => println!(
" {:?} {} [{disposition:?}: {reason}]",
" {:?} {}{bindings} [{disposition:?}: {reason}]",
change.operation, change.resource
),
(Some(disposition), None) => println!(
" {:?} {} [{disposition:?}]",
" {:?} {}{bindings} [{disposition:?}]",
change.operation, change.resource
),
_ => println!(" {:?} {}", change.operation, change.resource),
_ => println!(" {:?} {}{bindings}", change.operation, change.resource),
}
}
if changes.is_empty() {

View file

@ -1451,9 +1451,10 @@ policies:
change_for(&mixed, "query.knowledge.find_person")["disposition"],
"applied"
);
// policy.shared's applies_to narrowed, but its FILE digest is unchanged
// — applies_to lives in cluster.yaml (the config digest), so it is not a
// resource change.
// 5A: policy.shared's applies_to narrowed with an unchanged file digest
// — now a first-class binding change, applied in the same run.
assert_eq!(change_for(&mixed, "policy.shared")["binding_change"], true);
assert_eq!(change_for(&mixed, "policy.shared")["disposition"], "applied");
assert_eq!(
change_for(&mixed, "graph.knowledge")["disposition"],
"derived"

View file

@ -196,6 +196,11 @@ pub struct PlanChange {
pub disposition: Option<ApplyDisposition>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
/// True for a policy change whose file digest is unchanged but whose
/// `applies_to` bindings differ from the applied revision (including the
/// pre-5A backfill case).
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub binding_change: bool,
/// For schema updates: the engine's migration plan against the live
/// graph (RFC-004 §D7's data-aware preview). Absent when the preview is
/// unavailable (warning `schema_preview_unavailable`).
@ -347,6 +352,8 @@ struct DesiredCluster {
resource_digests: BTreeMap<String, String>,
resources: Vec<ResourceSummary>,
dependencies: Vec<Dependency>,
/// `policy.<name>` address -> normalized applies_to refs.
policy_bindings: BTreeMap<String, Vec<String>>,
}
#[derive(Debug, Clone)]
@ -457,6 +464,13 @@ struct AppliedRevisionState {
#[serde(deny_unknown_fields)]
struct StateResource {
digest: String,
/// Policy resources only: the applied `applies_to` bindings, normalized
/// to typed refs (`cluster` | `graph.<id>`). Recorded so the state
/// ledger is serving-sufficient for the Phase-5 server boot (RFC-005
/// §D3). Absent on pre-5A entries (backfilled by the next apply) and on
/// non-policy resources.
#[serde(default, skip_serializing_if = "Option::is_none")]
applies_to: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
@ -623,11 +637,13 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
warn_pending_recovery_sidecars(&desired.config_dir, &mut diagnostics);
let mut prior_resources = BTreeMap::new();
let mut prior_state: Option<ClusterState> = None;
if !has_errors(&diagnostics) {
match backend.read_state(&mut observations) {
Ok(snapshot) => {
if let Some(state) = snapshot.state {
prior_resources = state_resource_digests(&state);
prior_state = Some(state);
}
}
Err(diagnostic) => diagnostics.push(diagnostic),
@ -639,6 +655,9 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
} else {
diff_resources(&prior_resources, &desired.resource_digests)
};
if !has_errors(&diagnostics) {
append_policy_binding_changes(&mut changes, prior_state.as_ref(), &desired);
}
// Plan previews dispositions without sweeping; a pending recovery is
// surfaced as the cluster_recovery_pending warning above instead.
let artifacts = backend.list_approval_artifacts(&mut diagnostics);
@ -850,6 +869,7 @@ pub async fn apply_config_dir_with_options(
let prior_resources = state_resource_digests(&state);
let mut changes = diff_resources(&prior_resources, &desired.resource_digests);
append_policy_binding_changes(&mut changes, Some(&state), &desired);
let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics);
let approved = approved_resources(
&approval_artifacts,
@ -1429,6 +1449,12 @@ pub async fn apply_config_dir_with_options(
.after_digest
.clone()
.expect("create/update always carries an after digest"),
// Policies record their applied bindings so the
// ledger is serving-sufficient (RFC-005 §D3).
applies_to: desired
.policy_bindings
.get(&change.resource)
.cloned(),
},
);
set_resource_status_applied(&mut new_state, &change.resource);
@ -1467,10 +1493,11 @@ pub async fn apply_config_dir_with_options(
}
recompute_state_graph_digests(&mut new_state, &desired);
let residual = diff_resources(
let mut residual = diff_resources(
&state_resource_digests(&new_state),
&desired.resource_digests,
);
append_policy_binding_changes(&mut residual, Some(&new_state), &desired);
let converged = residual.is_empty();
if converged {
new_state.applied_revision.config_digest = Some(desired.config_digest.clone());
@ -2741,6 +2768,7 @@ async fn sweep_graph_create_sidecar(
schema_addr.clone(),
StateResource {
digest: live_digest.clone(),
applies_to: None,
},
);
let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id);
@ -2749,7 +2777,7 @@ async fn sweep_graph_create_sidecar(
state
.applied_revision
.resources
.insert(graph_address.clone(), StateResource { digest: composite });
.insert(graph_address.clone(), StateResource { digest: composite, applies_to: None });
set_resource_status_applied(state, &graph_address);
set_resource_status_applied(state, &schema_addr);
state.recovery_records.insert(
@ -2869,6 +2897,7 @@ async fn sweep_schema_apply_sidecar(
schema_addr.clone(),
StateResource {
digest: live_digest.clone(),
applies_to: None,
},
);
let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id);
@ -2876,7 +2905,7 @@ async fn sweep_schema_apply_sidecar(
state
.applied_revision
.resources
.insert(graph_address.clone(), StateResource { digest: composite });
.insert(graph_address.clone(), StateResource { digest: composite, applies_to: None });
set_resource_status_applied(state, &graph_address);
set_resource_status_applied(state, &schema_addr);
state.recovery_records.insert(
@ -3109,6 +3138,7 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt
schema_address.clone(),
StateResource {
digest: observation.schema_digest.clone(),
applies_to: None,
},
);
let query_digests = state_query_digests_for_graph(state, &graph.id);
@ -3121,6 +3151,7 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt
graph_address.clone(),
StateResource {
digest: graph_digest_value,
applies_to: None,
},
);
state.observations.insert(
@ -3455,6 +3486,7 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
);
}
let mut policy_bindings: BTreeMap<String, Vec<String>> = BTreeMap::new();
for (policy_name, policy) in &raw.policies {
validate_id(
"policy name",
@ -3471,10 +3503,14 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
}
let policy_address = policy_address(policy_name);
let mut normalized_bindings: Vec<String> = Vec::new();
for (idx, target) in policy.applies_to.iter().enumerate() {
match normalize_policy_target(target) {
PolicyTarget::Cluster => {}
PolicyTarget::Cluster => {
normalized_bindings.push("cluster".to_string());
}
PolicyTarget::Graph(graph_id) => {
normalized_bindings.push(graph_address(&graph_id));
if raw.graphs.contains_key(&graph_id) {
dependencies.insert(Dependency {
from: policy_address.clone(),
@ -3498,6 +3534,10 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
}
}
normalized_bindings.sort();
normalized_bindings.dedup();
policy_bindings.insert(policy_address.clone(), normalized_bindings);
let policy_path = resolve_config_path(&config_dir, &policy.file);
match fs::read(&policy_path) {
Ok(bytes) => {
@ -3551,6 +3591,7 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
resource_digests,
resources: resource_list,
dependencies,
policy_bindings,
}),
diagnostics,
config_dir,
@ -3614,6 +3655,7 @@ fn diff_resources(
after_digest: Some(after.clone()),
disposition: None,
reason: None,
binding_change: false,
migration: None,
}),
Some(before) if before != after => changes.push(PlanChange {
@ -3623,6 +3665,7 @@ fn diff_resources(
after_digest: Some(after.clone()),
disposition: None,
reason: None,
binding_change: false,
migration: None,
}),
Some(_) => {}
@ -3637,6 +3680,7 @@ fn diff_resources(
after_digest: None,
disposition: None,
reason: None,
binding_change: false,
migration: None,
});
}
@ -3645,6 +3689,43 @@ fn diff_resources(
changes
}
/// Binding-only policy changes: the file digest is unchanged (so
/// `diff_resources` saw nothing) but the applied `applies_to` differs from
/// the desired bindings — including the pre-5A case where the state entry
/// has no bindings recorded yet. These are first-class plan changes: without
/// this pass a binding edit would silently rot or silently converge.
fn append_policy_binding_changes(
changes: &mut Vec<PlanChange>,
prior_state: Option<&ClusterState>,
desired: &DesiredCluster,
) {
let Some(state) = prior_state else {
return; // no state: everything is already a Create carrying bindings
};
for (address, desired_bindings) in &desired.policy_bindings {
if changes.iter().any(|change| &change.resource == address) {
continue; // content change already covers it
}
let Some(entry) = state.applied_revision.resources.get(address) else {
continue; // not applied yet: the Create covers it
};
if entry.applies_to.as_ref() == Some(desired_bindings) {
continue;
}
changes.push(PlanChange {
resource: address.clone(),
operation: PlanOperation::Update,
before_digest: Some(entry.digest.clone()),
after_digest: Some(entry.digest.clone()),
disposition: None,
reason: None,
binding_change: true,
migration: None,
});
}
changes.sort_by(|a, b| a.resource.cmp(&b.resource));
}
fn compute_blast_radius(changes: &[PlanChange], dependencies: &[Dependency]) -> Vec<BlastRadius> {
changes
.iter()
@ -4163,7 +4244,7 @@ fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredClus
state
.applied_revision
.resources
.insert(graph_address, StateResource { digest });
.insert(graph_address, StateResource { digest, applies_to: None });
}
}
@ -7062,6 +7143,135 @@ graphs:
assert!(out.converged, "{out:?}");
}
// ---- policy bindings in the applied revision (5A) ----
#[tokio::test]
async fn apply_records_policy_bindings() {
let dir = fixture();
write_applyable_state(dir.path());
let out = apply_config_dir(dir.path()).await;
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
let state = read_state_json(dir.path());
assert_eq!(
state["applied_revision"]["resources"]["policy.base"]["applies_to"],
serde_json::json!(["graph.knowledge"]),
"{state}"
);
// Non-policy entries carry no bindings field at all.
assert!(
state["applied_revision"]["resources"]["query.knowledge.find_person"]
.get("applies_to")
.is_none()
);
}
#[tokio::test]
async fn binding_change_is_a_visible_plan_change() {
let dir = fixture();
write_applyable_state(dir.path());
let converge = apply_config_dir(dir.path()).await;
assert!(converge.converged, "{converge:?}");
// Edit ONLY applies_to: the policy file digest is unchanged.
fs::write(
dir.path().join(CLUSTER_CONFIG_FILE),
r#"
version: 1
metadata:
name: test
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
base:
file: ./base.policy.yaml
applies_to: [cluster, knowledge]
"#,
)
.unwrap();
let plan = plan_config_dir(dir.path()).await;
let change = plan
.changes
.iter()
.find(|change| change.resource == "policy.base")
.expect("binding change must be visible in plan");
assert!(change.binding_change);
assert_eq!(change.operation, PlanOperation::Update);
assert_eq!(change.before_digest, change.after_digest);
let out = apply_config_dir(dir.path()).await;
assert!(out.ok && out.converged, "{out:?}");
let state = read_state_json(dir.path());
assert_eq!(
state["applied_revision"]["resources"]["policy.base"]["applies_to"],
serde_json::json!(["cluster", "graph.knowledge"])
);
// Idempotent: a second run sees no changes.
let again = apply_config_dir(dir.path()).await;
assert!(again.changes.is_empty() && !again.state_written, "{again:?}");
}
#[tokio::test]
async fn pre_5a_state_backfills_bindings() {
let dir = fixture();
write_applyable_state(dir.path());
let converge = apply_config_dir(dir.path()).await;
assert!(converge.converged, "{converge:?}");
// Strip the bindings from the state entry (a pre-5A ledger).
let mut state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(),
)
.unwrap();
state["applied_revision"]["resources"]["policy.base"]
.as_object_mut()
.unwrap()
.remove("applies_to");
fs::write(
dir.path().join(CLUSTER_STATE_FILE),
serde_json::to_string_pretty(&state).unwrap(),
)
.unwrap();
let plan = plan_config_dir(dir.path()).await;
assert!(
plan.changes
.iter()
.any(|change| change.resource == "policy.base" && change.binding_change),
"{plan:?}"
);
let out = apply_config_dir(dir.path()).await;
assert!(out.ok && out.converged, "{out:?}");
let healed = read_state_json(dir.path());
assert_eq!(
healed["applied_revision"]["resources"]["policy.base"]["applies_to"],
serde_json::json!(["graph.knowledge"])
);
}
#[tokio::test]
async fn bindings_survive_refresh() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
let converge = apply_config_dir(dir.path()).await;
assert!(converge.converged, "{converge:?}");
let refresh = refresh_config_dir(dir.path()).await;
assert!(refresh.ok, "{:?}", refresh.diagnostics);
let state = read_state_json(dir.path());
assert_eq!(
state["applied_revision"]["resources"]["policy.base"]["applies_to"],
serde_json::json!(["graph.knowledge"])
);
}
#[test]
fn status_warns_on_pending_recovery_sidecar() {
let dir = fixture();

View file

@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), and Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows) |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill) |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |

View file

@ -115,7 +115,10 @@ resource is planned as a create. If present, the file must use this shape:
"graph.knowledge": { "digest": "..." },
"schema.knowledge": { "digest": "..." },
"query.knowledge.find_experts": { "digest": "..." },
"policy.base": { "digest": "..." }
"policy.base": {
"digest": "...",
"applies_to": ["cluster", "graph.knowledge"]
}
}
},
"resource_statuses": {
@ -147,6 +150,14 @@ writes `state.json` and does not scan live graphs. Use explicit
`cluster refresh` / `cluster import` when the state ledger should be updated
from live observations. Live drift scans during plan are later-stage work.
Policy entries additionally record their applied `applies_to` bindings as
normalized typed refs — the state ledger is serving-sufficient for the
future server-boot stage. A change to `applies_to` alone (the policy file
digest unchanged) appears in the plan as an Update marked `binding_change`
(human output: `[bindings]`), applies like any catalog change, and counts
toward convergence; ledgers written before this field existed are backfilled
by the next apply.
Each plan change carries a `disposition` field — an honest preview of what
`cluster apply` will do with it in this stage: `applied` (executes), `derived`
(a `graph.<id>` composite-digest update that converges automatically once its