diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 8593ef3..673adb7 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -815,7 +815,8 @@ fn print_cluster_plan_human(output: &PlanOutput) { output.approvals_required.len() ); for change in &output.changes { - println!(" {:?} {}", change.operation, change.resource); + let bindings = if change.binding_change { " [bindings]" } else { "" }; + println!(" {:?} {}{bindings}", change.operation, change.resource); if let Some(migration) = &change.migration { if !migration.supported { println!(" migration UNSUPPORTED:"); @@ -862,16 +863,17 @@ fn print_cluster_apply_human(output: &ApplyOutput) { fn print_cluster_apply_changes(changes: &[omnigraph_cluster::PlanChange]) { for change in changes { + let bindings = if change.binding_change { " [bindings]" } else { "" }; match (&change.disposition, change.reason.as_deref()) { (Some(disposition), Some(reason)) => println!( - " {:?} {} [{disposition:?}: {reason}]", + " {:?} {}{bindings} [{disposition:?}: {reason}]", change.operation, change.resource ), (Some(disposition), None) => println!( - " {:?} {} [{disposition:?}]", + " {:?} {}{bindings} [{disposition:?}]", change.operation, change.resource ), - _ => println!(" {:?} {}", change.operation, change.resource), + _ => println!(" {:?} {}{bindings}", change.operation, change.resource), } } if changes.is_empty() { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 336f19e..e4590f6 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1451,9 +1451,10 @@ policies: change_for(&mixed, "query.knowledge.find_person")["disposition"], "applied" ); - // policy.shared's applies_to narrowed, but its FILE digest is unchanged - // — applies_to lives in cluster.yaml (the config digest), so it is not a - // resource change. + // 5A: policy.shared's applies_to narrowed with an unchanged file digest + // — now a first-class binding change, applied in the same run. + assert_eq!(change_for(&mixed, "policy.shared")["binding_change"], true); + assert_eq!(change_for(&mixed, "policy.shared")["disposition"], "applied"); assert_eq!( change_for(&mixed, "graph.knowledge")["disposition"], "derived" diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index f67d8f7..af2ef81 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -196,6 +196,11 @@ pub struct PlanChange { pub disposition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub reason: Option, + /// True for a policy change whose file digest is unchanged but whose + /// `applies_to` bindings differ from the applied revision (including the + /// pre-5A backfill case). + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub binding_change: bool, /// For schema updates: the engine's migration plan against the live /// graph (RFC-004 §D7's data-aware preview). Absent when the preview is /// unavailable (warning `schema_preview_unavailable`). @@ -347,6 +352,8 @@ struct DesiredCluster { resource_digests: BTreeMap, resources: Vec, dependencies: Vec, + /// `policy.` address -> normalized applies_to refs. + policy_bindings: BTreeMap>, } #[derive(Debug, Clone)] @@ -457,6 +464,13 @@ struct AppliedRevisionState { #[serde(deny_unknown_fields)] struct StateResource { digest: String, + /// Policy resources only: the applied `applies_to` bindings, normalized + /// to typed refs (`cluster` | `graph.`). Recorded so the state + /// ledger is serving-sufficient for the Phase-5 server boot (RFC-005 + /// §D3). Absent on pre-5A entries (backfilled by the next apply) and on + /// non-policy resources. + #[serde(default, skip_serializing_if = "Option::is_none")] + applies_to: Option>, } #[derive(Debug, Serialize, Deserialize)] @@ -623,11 +637,13 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { warn_pending_recovery_sidecars(&desired.config_dir, &mut diagnostics); let mut prior_resources = BTreeMap::new(); + let mut prior_state: Option = None; if !has_errors(&diagnostics) { match backend.read_state(&mut observations) { Ok(snapshot) => { if let Some(state) = snapshot.state { prior_resources = state_resource_digests(&state); + prior_state = Some(state); } } Err(diagnostic) => diagnostics.push(diagnostic), @@ -639,6 +655,9 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } else { diff_resources(&prior_resources, &desired.resource_digests) }; + if !has_errors(&diagnostics) { + append_policy_binding_changes(&mut changes, prior_state.as_ref(), &desired); + } // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. let artifacts = backend.list_approval_artifacts(&mut diagnostics); @@ -850,6 +869,7 @@ pub async fn apply_config_dir_with_options( let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); + append_policy_binding_changes(&mut changes, Some(&state), &desired); let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics); let approved = approved_resources( &approval_artifacts, @@ -1429,6 +1449,12 @@ pub async fn apply_config_dir_with_options( .after_digest .clone() .expect("create/update always carries an after digest"), + // Policies record their applied bindings so the + // ledger is serving-sufficient (RFC-005 §D3). + applies_to: desired + .policy_bindings + .get(&change.resource) + .cloned(), }, ); set_resource_status_applied(&mut new_state, &change.resource); @@ -1467,10 +1493,11 @@ pub async fn apply_config_dir_with_options( } recompute_state_graph_digests(&mut new_state, &desired); - let residual = diff_resources( + let mut residual = diff_resources( &state_resource_digests(&new_state), &desired.resource_digests, ); + append_policy_binding_changes(&mut residual, Some(&new_state), &desired); let converged = residual.is_empty(); if converged { new_state.applied_revision.config_digest = Some(desired.config_digest.clone()); @@ -2741,6 +2768,7 @@ async fn sweep_graph_create_sidecar( schema_addr.clone(), StateResource { digest: live_digest.clone(), + applies_to: None, }, ); let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); @@ -2749,7 +2777,7 @@ async fn sweep_graph_create_sidecar( state .applied_revision .resources - .insert(graph_address.clone(), StateResource { digest: composite }); + .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); set_resource_status_applied(state, &graph_address); set_resource_status_applied(state, &schema_addr); state.recovery_records.insert( @@ -2869,6 +2897,7 @@ async fn sweep_schema_apply_sidecar( schema_addr.clone(), StateResource { digest: live_digest.clone(), + applies_to: None, }, ); let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); @@ -2876,7 +2905,7 @@ async fn sweep_schema_apply_sidecar( state .applied_revision .resources - .insert(graph_address.clone(), StateResource { digest: composite }); + .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); set_resource_status_applied(state, &graph_address); set_resource_status_applied(state, &schema_addr); state.recovery_records.insert( @@ -3109,6 +3138,7 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt schema_address.clone(), StateResource { digest: observation.schema_digest.clone(), + applies_to: None, }, ); let query_digests = state_query_digests_for_graph(state, &graph.id); @@ -3121,6 +3151,7 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt graph_address.clone(), StateResource { digest: graph_digest_value, + applies_to: None, }, ); state.observations.insert( @@ -3455,6 +3486,7 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { ); } + let mut policy_bindings: BTreeMap> = BTreeMap::new(); for (policy_name, policy) in &raw.policies { validate_id( "policy name", @@ -3471,10 +3503,14 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { } let policy_address = policy_address(policy_name); + let mut normalized_bindings: Vec = Vec::new(); for (idx, target) in policy.applies_to.iter().enumerate() { match normalize_policy_target(target) { - PolicyTarget::Cluster => {} + PolicyTarget::Cluster => { + normalized_bindings.push("cluster".to_string()); + } PolicyTarget::Graph(graph_id) => { + normalized_bindings.push(graph_address(&graph_id)); if raw.graphs.contains_key(&graph_id) { dependencies.insert(Dependency { from: policy_address.clone(), @@ -3498,6 +3534,10 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { } } + normalized_bindings.sort(); + normalized_bindings.dedup(); + policy_bindings.insert(policy_address.clone(), normalized_bindings); + let policy_path = resolve_config_path(&config_dir, &policy.file); match fs::read(&policy_path) { Ok(bytes) => { @@ -3551,6 +3591,7 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { resource_digests, resources: resource_list, dependencies, + policy_bindings, }), diagnostics, config_dir, @@ -3614,6 +3655,7 @@ fn diff_resources( after_digest: Some(after.clone()), disposition: None, reason: None, + binding_change: false, migration: None, }), Some(before) if before != after => changes.push(PlanChange { @@ -3623,6 +3665,7 @@ fn diff_resources( after_digest: Some(after.clone()), disposition: None, reason: None, + binding_change: false, migration: None, }), Some(_) => {} @@ -3637,6 +3680,7 @@ fn diff_resources( after_digest: None, disposition: None, reason: None, + binding_change: false, migration: None, }); } @@ -3645,6 +3689,43 @@ fn diff_resources( changes } +/// Binding-only policy changes: the file digest is unchanged (so +/// `diff_resources` saw nothing) but the applied `applies_to` differs from +/// the desired bindings — including the pre-5A case where the state entry +/// has no bindings recorded yet. These are first-class plan changes: without +/// this pass a binding edit would silently rot or silently converge. +fn append_policy_binding_changes( + changes: &mut Vec, + prior_state: Option<&ClusterState>, + desired: &DesiredCluster, +) { + let Some(state) = prior_state else { + return; // no state: everything is already a Create carrying bindings + }; + for (address, desired_bindings) in &desired.policy_bindings { + if changes.iter().any(|change| &change.resource == address) { + continue; // content change already covers it + } + let Some(entry) = state.applied_revision.resources.get(address) else { + continue; // not applied yet: the Create covers it + }; + if entry.applies_to.as_ref() == Some(desired_bindings) { + continue; + } + changes.push(PlanChange { + resource: address.clone(), + operation: PlanOperation::Update, + before_digest: Some(entry.digest.clone()), + after_digest: Some(entry.digest.clone()), + disposition: None, + reason: None, + binding_change: true, + migration: None, + }); + } + changes.sort_by(|a, b| a.resource.cmp(&b.resource)); +} + fn compute_blast_radius(changes: &[PlanChange], dependencies: &[Dependency]) -> Vec { changes .iter() @@ -4163,7 +4244,7 @@ fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredClus state .applied_revision .resources - .insert(graph_address, StateResource { digest }); + .insert(graph_address, StateResource { digest, applies_to: None }); } } @@ -7062,6 +7143,135 @@ graphs: assert!(out.converged, "{out:?}"); } + // ---- policy bindings in the applied revision (5A) ---- + + #[tokio::test] + async fn apply_records_policy_bindings() { + let dir = fixture(); + write_applyable_state(dir.path()); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{:?}", out.diagnostics); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["policy.base"]["applies_to"], + serde_json::json!(["graph.knowledge"]), + "{state}" + ); + // Non-policy entries carry no bindings field at all. + assert!( + state["applied_revision"]["resources"]["query.knowledge.find_person"] + .get("applies_to") + .is_none() + ); + } + + #[tokio::test] + async fn binding_change_is_a_visible_plan_change() { + let dir = fixture(); + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + // Edit ONLY applies_to: the policy file digest is unchanged. + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +metadata: + name: test +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + base: + file: ./base.policy.yaml + applies_to: [cluster, knowledge] +"#, + ) + .unwrap(); + + let plan = plan_config_dir(dir.path()).await; + let change = plan + .changes + .iter() + .find(|change| change.resource == "policy.base") + .expect("binding change must be visible in plan"); + assert!(change.binding_change); + assert_eq!(change.operation, PlanOperation::Update); + assert_eq!(change.before_digest, change.after_digest); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{out:?}"); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["policy.base"]["applies_to"], + serde_json::json!(["cluster", "graph.knowledge"]) + ); + // Idempotent: a second run sees no changes. + let again = apply_config_dir(dir.path()).await; + assert!(again.changes.is_empty() && !again.state_written, "{again:?}"); + } + + #[tokio::test] + async fn pre_5a_state_backfills_bindings() { + let dir = fixture(); + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + // Strip the bindings from the state entry (a pre-5A ledger). + let mut state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(), + ) + .unwrap(); + state["applied_revision"]["resources"]["policy.base"] + .as_object_mut() + .unwrap() + .remove("applies_to"); + fs::write( + dir.path().join(CLUSTER_STATE_FILE), + serde_json::to_string_pretty(&state).unwrap(), + ) + .unwrap(); + + let plan = plan_config_dir(dir.path()).await; + assert!( + plan.changes + .iter() + .any(|change| change.resource == "policy.base" && change.binding_change), + "{plan:?}" + ); + let out = apply_config_dir(dir.path()).await; + assert!(out.ok && out.converged, "{out:?}"); + let healed = read_state_json(dir.path()); + assert_eq!( + healed["applied_revision"]["resources"]["policy.base"]["applies_to"], + serde_json::json!(["graph.knowledge"]) + ); + } + + #[tokio::test] + async fn bindings_survive_refresh() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + + let refresh = refresh_config_dir(dir.path()).await; + assert!(refresh.ok, "{:?}", refresh.diagnostics); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["policy.base"]["applies_to"], + serde_json::json!(["graph.knowledge"]) + ); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture(); diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 2302b13..a7a6cb3 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), and Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows) | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 2df26be..284bfbf 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -115,7 +115,10 @@ resource is planned as a create. If present, the file must use this shape: "graph.knowledge": { "digest": "..." }, "schema.knowledge": { "digest": "..." }, "query.knowledge.find_experts": { "digest": "..." }, - "policy.base": { "digest": "..." } + "policy.base": { + "digest": "...", + "applies_to": ["cluster", "graph.knowledge"] + } } }, "resource_statuses": { @@ -147,6 +150,14 @@ writes `state.json` and does not scan live graphs. Use explicit `cluster refresh` / `cluster import` when the state ledger should be updated from live observations. Live drift scans during plan are later-stage work. +Policy entries additionally record their applied `applies_to` bindings as +normalized typed refs — the state ledger is serving-sufficient for the +future server-boot stage. A change to `applies_to` alone (the policy file +digest unchanged) appears in the plan as an Update marked `binding_change` +(human output: `[bindings]`), applies like any catalog change, and counts +toward convergence; ledgers written before this field existed are backfilled +by the next apply. + Each plan change carries a `disposition` field — an honest preview of what `cluster apply` will do with it in this stage: `applied` (executes), `derived` (a `graph.` composite-digest update that converges automatically once its