mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
feat(cluster): execute graph creates in cluster apply
Stage 4A (RFC-004 §D1/§D5): graph.<id> Create — and its paired schema Create, which the init carries — classify Applied and execute first in the run, sequentially and sidecar-fenced: sidecar written before Omnigraph::init at the derived root, rewritten with the post-init manifest pin, deleted only after the final state CAS lands. Dependent queries and policies no longer block on a graph create in the same plan — creates run first, so they apply in the same run; a create failure demotes them to blocked (dependency_not_applied) and stops further graph-moving work (loud partials), with the sidecar left for the sweep to classify. Graphs with a kept recovery sidecar (rows 5/6) classify Blocked/cluster_recovery_pending, and the sweep's Drifted/Error statuses are never clobbered by a generic Blocked. Schema source is re-read and digest-verified under the lock before the init (the write_resource_payload TOCTOU posture). Plan previews the same dispositions. e2e fallout updated: a fresh multi-graph config now converges in one apply; a destroyed root is re-created as an EMPTY graph by the next apply (declarative convergence — visible in plan, called out in docs); the new cluster_e2e_declared_graph_created_by_apply pins the no-manual-init flow. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
bf8cc7a753
commit
c3007369cd
2 changed files with 519 additions and 102 deletions
|
|
@ -1285,7 +1285,7 @@ node Person {
|
|||
/// Disaster input fails closed: a destroyed graph root drifts the ledger,
|
||||
/// the plan proposes deferred creates, and apply moves nothing.
|
||||
#[test]
|
||||
fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() {
|
||||
fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
|
|
@ -1327,15 +1327,20 @@ fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() {
|
|||
|
||||
let plan = cluster_json(temp.path(), "plan");
|
||||
assert_eq!(change_for(&plan, "graph.knowledge")["operation"], "create");
|
||||
assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred");
|
||||
assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred");
|
||||
// Stage 4A: the re-create is executable and the plan says so — nothing
|
||||
// hidden about converging a destroyed root back to an EMPTY graph (the
|
||||
// data was already lost; this is declarative convergence, RFC-004 §D1).
|
||||
assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "applied");
|
||||
assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "applied");
|
||||
// Converged-then-destroyed: query/policy are already in state at the
|
||||
// desired digests, so they are not changes at all.
|
||||
assert_eq!(plan["changes"].as_array().unwrap().len(), 2, "{plan}");
|
||||
|
||||
let disaster_apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(disaster_apply["applied_count"], 0, "{disaster_apply}");
|
||||
assert_eq!(disaster_apply["converged"], false, "{disaster_apply}");
|
||||
let recreate = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(recreate["ok"], true, "{recreate}");
|
||||
assert_eq!(recreate["converged"], true, "{recreate}");
|
||||
// The empty graph is back on disk; catalog state survived throughout.
|
||||
assert!(temp.path().join("graphs/knowledge.omni").exists());
|
||||
let state: serde_json::Value = serde_json::from_str(
|
||||
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
|
||||
)
|
||||
|
|
@ -1352,59 +1357,84 @@ fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() {
|
|||
);
|
||||
}
|
||||
|
||||
/// The disposition matrix as a system: one apply over two graphs (one live,
|
||||
/// one not yet created) plus graph-spanning and cluster-scoped policies must
|
||||
/// produce all four dispositions at once — then converge after the second
|
||||
/// graph appears.
|
||||
/// The disposition matrix as a system under Stage 4A: a fresh multi-graph
|
||||
/// config converges in ONE apply (both graphs created, spanning and
|
||||
/// cluster-scoped policies applied), and a later mixed run — schema update
|
||||
/// (deferred), its dependent query (blocked), an independent query update
|
||||
/// (applied), its composite (derived) — shows all four dispositions at once
|
||||
/// before the graph-plane schema apply closes the loop.
|
||||
#[test]
|
||||
fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_multi_graph_cluster_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path()); // knowledge only
|
||||
// No manual init: Stage 4A creates both graphs.
|
||||
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["ok"], true, "{apply}");
|
||||
assert_eq!(apply["converged"], false, "{apply}");
|
||||
assert_eq!(apply["applied_count"], 2, "{apply}");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied");
|
||||
assert_eq!(
|
||||
change_for(&apply, "query.knowledge.find_person")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&apply, "policy.cluster_wide")["disposition"],
|
||||
change_for(&apply, "graph.engineering")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&apply, "query.engineering.find_service")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
// The graph-spanning and cluster-scoped policies ride the same run.
|
||||
assert_eq!(change_for(&apply, "policy.shared")["disposition"], "applied");
|
||||
assert_eq!(
|
||||
change_for(&apply, "policy.cluster_wide")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
assert!(temp.path().join("graphs/knowledge.omni").exists());
|
||||
assert!(temp.path().join("graphs/engineering.omni").exists());
|
||||
|
||||
// Mixed run: a knowledge schema update (4B territory — deferred) gates
|
||||
// its query update (blocked), while an engineering query update is
|
||||
// independent (applied) and re-derives its composite.
|
||||
fs::write(
|
||||
temp.path().join("people.pg"),
|
||||
"\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("people.gq"),
|
||||
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("services.gq"),
|
||||
"\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name, $s.name }\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mixed = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(mixed["ok"], true, "{mixed}");
|
||||
assert_eq!(mixed["converged"], false, "{mixed}");
|
||||
assert_eq!(change_for(&mixed, "schema.knowledge")["disposition"], "deferred");
|
||||
assert_eq!(change_for(&mixed, "graph.knowledge")["disposition"], "deferred");
|
||||
assert_eq!(
|
||||
change_for(&mixed, "query.knowledge.find_person")["disposition"],
|
||||
"blocked"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&apply, "query.engineering.find_service")["reason"],
|
||||
"dependency_missing"
|
||||
);
|
||||
// One missing dependency graph blocks the whole spanning policy.
|
||||
assert_eq!(change_for(&apply, "policy.shared")["disposition"], "blocked");
|
||||
assert_eq!(
|
||||
change_for(&apply, "graph.engineering")["disposition"],
|
||||
"deferred"
|
||||
change_for(&mixed, "query.knowledge.find_person")["reason"],
|
||||
"dependency_not_applied"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&apply, "schema.engineering")["disposition"],
|
||||
"deferred"
|
||||
change_for(&mixed, "query.engineering.find_service")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&apply, "graph.knowledge")["disposition"],
|
||||
change_for(&mixed, "graph.engineering")["disposition"],
|
||||
"derived"
|
||||
);
|
||||
assert_eq!(
|
||||
apply["resource_statuses"]["policy.shared"]["status"],
|
||||
"blocked"
|
||||
);
|
||||
// Deterministic ordering: changes sorted by resource address.
|
||||
let order: Vec<&str> = apply["changes"]
|
||||
let order: Vec<&str> = mixed["changes"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
|
|
@ -1412,21 +1442,22 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
|
|||
.collect();
|
||||
let mut sorted = order.clone();
|
||||
sorted.sort_unstable();
|
||||
assert_eq!(order, sorted, "{apply}");
|
||||
assert_eq!(order, sorted, "{mixed}");
|
||||
|
||||
// The second graph appears; refresh observes it; apply converges.
|
||||
init_named_cluster_graph(temp.path(), "engineering", "services.pg");
|
||||
// The graph-plane tool applies the schema; refresh observes; converge.
|
||||
output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg(temp.path().join("graphs/knowledge.omni"))
|
||||
.arg("--schema")
|
||||
.arg(temp.path().join("people.pg"))
|
||||
.arg("--json"),
|
||||
);
|
||||
let refresh = cluster_json(temp.path(), "refresh");
|
||||
assert_eq!(refresh["ok"], true, "{refresh}");
|
||||
|
||||
let converge = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(converge["ok"], true, "{converge}");
|
||||
assert_eq!(converge["converged"], true, "{converge}");
|
||||
assert_eq!(
|
||||
change_for(&converge, "query.engineering.find_service")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
assert_eq!(change_for(&converge, "policy.shared")["disposition"], "applied");
|
||||
|
||||
let final_plan = cluster_json(temp.path(), "plan");
|
||||
assert!(
|
||||
|
|
@ -1435,6 +1466,39 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
|
|||
);
|
||||
}
|
||||
|
||||
/// Stage 4A headline: a declared graph is created by `cluster apply` itself —
|
||||
/// no manual `omnigraph init` anywhere in the flow.
|
||||
#[test]
|
||||
fn cluster_e2e_declared_graph_created_by_apply() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["ok"], true, "{apply}");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied");
|
||||
assert!(temp.path().join("graphs/knowledge.omni").exists());
|
||||
|
||||
// The created graph is a real graph: the per-graph CLI can open it.
|
||||
let snapshot = output_success(
|
||||
cli()
|
||||
.arg("snapshot")
|
||||
.arg(temp.path().join("graphs/knowledge.omni")),
|
||||
);
|
||||
assert!(!stdout_string(&snapshot).is_empty());
|
||||
|
||||
let plan = cluster_json(temp.path(), "plan");
|
||||
assert!(plan["changes"].as_array().unwrap().is_empty(), "{plan}");
|
||||
let status = cluster_json(temp.path(), "status");
|
||||
assert_eq!(
|
||||
status["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"applied"
|
||||
);
|
||||
}
|
||||
|
||||
/// Catalog payload drift self-heals across the command surface: status warns
|
||||
/// read-only, refresh persists the drift and drops the digest, apply
|
||||
/// republishes the blob, status comes back clean.
|
||||
|
|
|
|||
|
|
@ -577,7 +577,9 @@ pub fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
|
|||
} else {
|
||||
diff_resources(&prior_resources, &desired.resource_digests)
|
||||
};
|
||||
classify_changes(&mut changes, &desired.dependencies);
|
||||
// Plan previews dispositions without sweeping; a pending recovery is
|
||||
// surfaced as the cluster_recovery_pending warning above instead.
|
||||
classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new());
|
||||
let blast_radius = compute_blast_radius(&changes, &desired.dependencies);
|
||||
let approvals_required = compute_approvals(&changes);
|
||||
let ok = !has_errors(&diagnostics);
|
||||
|
|
@ -727,8 +729,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
|
||||
let prior_resources = state_resource_digests(&state);
|
||||
let mut changes = diff_resources(&prior_resources, &desired.resource_digests);
|
||||
classify_changes(&mut changes, &desired.dependencies);
|
||||
let _ = &sweep.pending_graphs; // consumed by the graph-create executor (4A C4)
|
||||
classify_changes(&mut changes, &desired.dependencies, &sweep.pending_graphs);
|
||||
|
||||
// Defensive invariant: nothing the approval gate covers may be executable.
|
||||
// Today approvals only cover graph/schema deletes (always deferred); this
|
||||
|
|
@ -756,6 +757,169 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
);
|
||||
}
|
||||
|
||||
// Graph creates execute first (RFC-004 §D5), sequentially, sidecar-fenced:
|
||||
// sidecar written before the init, rewritten with the post-init manifest
|
||||
// version, deleted only after the final state CAS lands. A failure stops
|
||||
// further graph-moving work and demotes that graph's dependents.
|
||||
let source_paths: BTreeMap<&str, &str> = desired
|
||||
.resources
|
||||
.iter()
|
||||
.filter_map(|resource| {
|
||||
resource
|
||||
.path
|
||||
.as_deref()
|
||||
.map(|path| (resource.address.as_str(), path))
|
||||
})
|
||||
.collect();
|
||||
let graph_creates_to_run: Vec<String> = changes
|
||||
.iter()
|
||||
.filter(|change| {
|
||||
change.disposition == Some(ApplyDisposition::Applied)
|
||||
&& change.operation == PlanOperation::Create
|
||||
&& matches!(resource_kind(&change.resource), ResourceKind::Graph(_))
|
||||
})
|
||||
.filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
|
||||
.collect();
|
||||
let mut completed_create_sidecars: Vec<PathBuf> = Vec::new();
|
||||
let mut failed_graphs: BTreeSet<String> = BTreeSet::new();
|
||||
let mut creates_aborted = false;
|
||||
for graph_id in &graph_creates_to_run {
|
||||
if creates_aborted {
|
||||
// A prior create failed: stop graph-moving work (loud partials).
|
||||
diagnostics.push(Diagnostic::warning(
|
||||
"graph_create_skipped",
|
||||
graph_address(graph_id),
|
||||
"skipped after an earlier graph create failed in this run",
|
||||
));
|
||||
failed_graphs.insert(graph_id.clone());
|
||||
continue;
|
||||
}
|
||||
let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id)
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let graph_uri = display_path(
|
||||
&desired
|
||||
.config_dir
|
||||
.join(CLUSTER_GRAPHS_DIR)
|
||||
.join(format!("{graph_id}.omni")),
|
||||
);
|
||||
let mut sidecar = RecoverySidecar {
|
||||
schema_version: 1,
|
||||
operation_id: Ulid::new().to_string(),
|
||||
started_at: now_rfc3339(),
|
||||
actor: None,
|
||||
kind: RecoverySidecarKind::GraphCreate,
|
||||
graph_id: graph_id.clone(),
|
||||
graph_uri: graph_uri.clone(),
|
||||
observed_manifest_version: None,
|
||||
expected_manifest_version: None,
|
||||
desired_schema_digest: desired_graph.schema_digest.clone(),
|
||||
state_cas_base: expected_cas.clone(),
|
||||
};
|
||||
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
|
||||
Ok(path) => path,
|
||||
Err(diagnostic) => {
|
||||
diagnostics.push(diagnostic);
|
||||
failed_graphs.insert(graph_id.clone());
|
||||
creates_aborted = true;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_create") {
|
||||
// Simulated crash before the init: the sidecar stays for the
|
||||
// sweep (row 1: root absent -> intent removed next run).
|
||||
diagnostics.push(diagnostic);
|
||||
failed_graphs.insert(graph_id.clone());
|
||||
creates_aborted = true;
|
||||
continue;
|
||||
}
|
||||
// Re-read + re-verify the schema source under the lock — the same
|
||||
// TOCTOU posture as write_resource_payload.
|
||||
let schema_source = source_paths
|
||||
.get(schema_address(graph_id).as_str())
|
||||
.ok_or_else(|| {
|
||||
Diagnostic::error(
|
||||
"graph_create_failed",
|
||||
graph_address(graph_id),
|
||||
"no schema source recorded for graph",
|
||||
)
|
||||
})
|
||||
.and_then(|path| {
|
||||
fs::read_to_string(Path::new(path)).map_err(|err| {
|
||||
Diagnostic::error(
|
||||
"graph_create_failed",
|
||||
graph_address(graph_id),
|
||||
format!("could not read schema source '{path}': {err}"),
|
||||
)
|
||||
})
|
||||
})
|
||||
.and_then(|source| {
|
||||
if sha256_hex(source.as_bytes()) == desired_graph.schema_digest {
|
||||
Ok(source)
|
||||
} else {
|
||||
Err(Diagnostic::error(
|
||||
"resource_content_changed",
|
||||
schema_address(graph_id),
|
||||
"schema source changed while apply was running; re-run `cluster apply`",
|
||||
))
|
||||
}
|
||||
});
|
||||
let schema_source = match schema_source {
|
||||
Ok(source) => source,
|
||||
Err(diagnostic) => {
|
||||
diagnostics.push(diagnostic);
|
||||
let _ = fs::remove_file(&sidecar_path); // nothing moved
|
||||
failed_graphs.insert(graph_id.clone());
|
||||
creates_aborted = true;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match Omnigraph::init(&graph_uri, &schema_source).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
diagnostics.push(Diagnostic::error(
|
||||
"graph_create_failed",
|
||||
graph_address(graph_id),
|
||||
format!("could not initialize graph at '{graph_uri}': {err}"),
|
||||
));
|
||||
// The sidecar stays: the sweep classifies whether the failed
|
||||
// init left a partial root (row 5) or nothing (row 1).
|
||||
failed_graphs.insert(graph_id.clone());
|
||||
creates_aborted = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// Record the post-init pin in the sidecar (best effort — a failure
|
||||
// here leaves expected = null and the sweep classifies by digest).
|
||||
if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await {
|
||||
if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await {
|
||||
sidecar.expected_manifest_version = Some(snapshot.version());
|
||||
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) {
|
||||
diagnostics.push(diagnostic);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Crash point: the graph exists, the cluster state does not record it
|
||||
// yet. A failure here must acknowledge nothing; the next run's sweep
|
||||
// rolls the ledger forward (row 4).
|
||||
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_create") {
|
||||
diagnostics.push(diagnostic);
|
||||
return early_return(
|
||||
display_path(&desired.config_dir),
|
||||
Some(desired.config_digest),
|
||||
observations,
|
||||
changes,
|
||||
state.resource_statuses,
|
||||
diagnostics,
|
||||
);
|
||||
}
|
||||
completed_create_sidecars.push(sidecar_path);
|
||||
}
|
||||
if !failed_graphs.is_empty() {
|
||||
demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies);
|
||||
}
|
||||
|
||||
for change in &changes {
|
||||
match change.disposition {
|
||||
Some(ApplyDisposition::Deferred) => diagnostics.push(Diagnostic::warning(
|
||||
|
|
@ -780,16 +944,6 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
// Gate on payload-phase errors only — sweep errors (e.g. a kept row-5
|
||||
// sidecar) must not abort the run, or their statuses would never persist.
|
||||
let errors_before_payloads = count_errors(&diagnostics);
|
||||
let source_paths: BTreeMap<&str, &str> = desired
|
||||
.resources
|
||||
.iter()
|
||||
.filter_map(|resource| {
|
||||
resource
|
||||
.path
|
||||
.as_deref()
|
||||
.map(|path| (resource.address.as_str(), path))
|
||||
})
|
||||
.collect();
|
||||
for change in &changes {
|
||||
if change.disposition != Some(ApplyDisposition::Applied)
|
||||
|| change.operation == PlanOperation::Delete
|
||||
|
|
@ -869,13 +1023,17 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
}
|
||||
},
|
||||
Some(ApplyDisposition::Blocked) => {
|
||||
set_resource_status(
|
||||
&mut new_state,
|
||||
&change.resource,
|
||||
ResourceLifecycleStatus::Blocked,
|
||||
change.reason.as_deref().unwrap_or("dependency_not_applied"),
|
||||
"waiting on an unapplied or missing dependency",
|
||||
);
|
||||
// The sweep owns recovery statuses (Drifted/Error with their
|
||||
// conditions); a generic Blocked must not clobber them.
|
||||
if change.reason.as_deref() != Some("cluster_recovery_pending") {
|
||||
set_resource_status(
|
||||
&mut new_state,
|
||||
&change.resource,
|
||||
ResourceLifecycleStatus::Blocked,
|
||||
change.reason.as_deref().unwrap_or("dependency_not_applied"),
|
||||
"waiting on an unapplied or missing dependency",
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
|
@ -914,7 +1072,11 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
|
|||
// Completed (rows 2/4) sweep sidecars are deleted only once their outcome
|
||||
// is durably recorded; on a failed write they stay and re-sweep next run.
|
||||
if !state_write_failed {
|
||||
for sidecar_path in &sweep.completed_sidecars {
|
||||
for sidecar_path in sweep
|
||||
.completed_sidecars
|
||||
.iter()
|
||||
.chain(completed_create_sidecars.iter())
|
||||
{
|
||||
let _ = fs::remove_file(sidecar_path);
|
||||
}
|
||||
}
|
||||
|
|
@ -2669,15 +2831,27 @@ fn resource_kind(address: &str) -> ResourceKind {
|
|||
/// it. Stage 3A executes only query/policy catalog writes; graph/schema
|
||||
/// movement is a later phase, and `graph.<id>` composite updates whose schema
|
||||
/// component is unchanged converge automatically once query digests land.
|
||||
fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) {
|
||||
let mut schema_changed = BTreeSet::new();
|
||||
fn classify_changes(
|
||||
changes: &mut [PlanChange],
|
||||
dependencies: &[Dependency],
|
||||
pending_recovery: &BTreeSet<String>,
|
||||
) {
|
||||
let mut schema_creates = BTreeSet::new();
|
||||
let mut schema_pending = BTreeSet::new();
|
||||
let mut graph_creates = BTreeSet::new();
|
||||
let mut graph_deletes = BTreeSet::new();
|
||||
for change in changes.iter() {
|
||||
match resource_kind(&change.resource) {
|
||||
ResourceKind::Schema(graph) => {
|
||||
schema_changed.insert(graph);
|
||||
}
|
||||
ResourceKind::Schema(graph) => match change.operation {
|
||||
PlanOperation::Create => {
|
||||
schema_creates.insert(graph);
|
||||
}
|
||||
// Schema updates (4B) and deletes (4C) are still pending in
|
||||
// this stage and block dependents.
|
||||
_ => {
|
||||
schema_pending.insert(graph);
|
||||
}
|
||||
},
|
||||
ResourceKind::Graph(graph) => match change.operation {
|
||||
PlanOperation::Create => {
|
||||
graph_creates.insert(graph);
|
||||
|
|
@ -2690,12 +2864,38 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) {
|
|||
_ => {}
|
||||
}
|
||||
}
|
||||
// A schema Create is satisfied by its paired graph create (the init
|
||||
// carries the schema); a standalone schema Create stays pending.
|
||||
for graph in &schema_creates {
|
||||
if !graph_creates.contains(graph) {
|
||||
schema_pending.insert(graph.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for change in changes.iter_mut() {
|
||||
let (disposition, reason) = match resource_kind(&change.resource) {
|
||||
ResourceKind::Schema(_) => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
|
||||
ResourceKind::Schema(graph) => match change.operation {
|
||||
PlanOperation::Create
|
||||
if graph_creates.contains(&graph)
|
||||
&& !pending_recovery.contains(&graph) =>
|
||||
{
|
||||
// Applied with the graph create — the init carries it.
|
||||
(ApplyDisposition::Applied, None)
|
||||
}
|
||||
PlanOperation::Create if graph_creates.contains(&graph) => {
|
||||
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||
}
|
||||
_ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
|
||||
},
|
||||
ResourceKind::Graph(graph) => match change.operation {
|
||||
PlanOperation::Update if !schema_changed.contains(&graph) => {
|
||||
PlanOperation::Create => {
|
||||
if pending_recovery.contains(&graph) {
|
||||
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||
} else {
|
||||
(ApplyDisposition::Applied, None)
|
||||
}
|
||||
}
|
||||
PlanOperation::Update if !schema_pending.contains(&graph) => {
|
||||
(ApplyDisposition::Derived, None)
|
||||
}
|
||||
_ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
|
||||
|
|
@ -2712,16 +2912,16 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) {
|
|||
}
|
||||
}
|
||||
PlanOperation::Create | PlanOperation::Update => {
|
||||
// A missing graph is the more fundamental blocker than a
|
||||
// pending schema change, so check it first.
|
||||
if graph_creates.contains(&graph) {
|
||||
(ApplyDisposition::Blocked, Some("dependency_missing"))
|
||||
} else if schema_changed.contains(&graph) {
|
||||
if pending_recovery.contains(&graph) {
|
||||
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||
} else if schema_pending.contains(&graph) {
|
||||
(
|
||||
ApplyDisposition::Blocked,
|
||||
Some("dependency_not_applied"),
|
||||
)
|
||||
} else {
|
||||
// A graph create in the same plan no longer blocks:
|
||||
// creates execute first in the same apply run.
|
||||
(ApplyDisposition::Applied, None)
|
||||
}
|
||||
}
|
||||
|
|
@ -2729,15 +2929,15 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) {
|
|||
ResourceKind::Policy(_) => match change.operation {
|
||||
PlanOperation::Delete => (ApplyDisposition::Applied, None),
|
||||
PlanOperation::Create | PlanOperation::Update => {
|
||||
let blocked_dep = dependencies.iter().any(|dep| {
|
||||
let blocked_pending = dependencies.iter().any(|dep| {
|
||||
dep.from == change.resource
|
||||
&& dep
|
||||
.to
|
||||
.strip_prefix("graph.")
|
||||
.is_some_and(|graph| graph_creates.contains(graph))
|
||||
.is_some_and(|graph| pending_recovery.contains(graph))
|
||||
});
|
||||
if blocked_dep {
|
||||
(ApplyDisposition::Blocked, Some("dependency_missing"))
|
||||
if blocked_pending {
|
||||
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
|
||||
} else {
|
||||
(ApplyDisposition::Applied, None)
|
||||
}
|
||||
|
|
@ -2752,6 +2952,46 @@ fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) {
|
|||
}
|
||||
}
|
||||
|
||||
/// After a graph create fails mid-run, every change that depended on that
|
||||
/// graph (its schema, its queries, policies referencing it) flips from
|
||||
/// Applied to Blocked so the output and the persisted statuses tell the
|
||||
/// truth about what this run actually executed.
|
||||
fn demote_dependents_of_failed_graphs(
|
||||
changes: &mut [PlanChange],
|
||||
failed: &BTreeSet<String>,
|
||||
dependencies: &[Dependency],
|
||||
) {
|
||||
for change in changes.iter_mut() {
|
||||
if change.disposition != Some(ApplyDisposition::Applied) {
|
||||
continue;
|
||||
}
|
||||
let demote_reason = match resource_kind(&change.resource) {
|
||||
ResourceKind::Graph(graph) if failed.contains(&graph) => Some("graph_create_failed"),
|
||||
ResourceKind::Schema(graph) if failed.contains(&graph) => {
|
||||
Some("dependency_not_applied")
|
||||
}
|
||||
ResourceKind::Query { graph, .. } if failed.contains(&graph) => {
|
||||
Some("dependency_not_applied")
|
||||
}
|
||||
ResourceKind::Policy(_) => {
|
||||
let blocked = dependencies.iter().any(|dep| {
|
||||
dep.from == change.resource
|
||||
&& dep
|
||||
.to
|
||||
.strip_prefix("graph.")
|
||||
.is_some_and(|graph| failed.contains(graph))
|
||||
});
|
||||
blocked.then_some("dependency_not_applied")
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
if let Some(reason) = demote_reason {
|
||||
change.disposition = Some(ApplyDisposition::Blocked);
|
||||
change.reason = Some(reason.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Content-addressed catalog path for an applied resource payload. Extensions
|
||||
/// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name,
|
||||
/// so the catalog layout cannot drift with operator file conventions.
|
||||
|
|
@ -4525,45 +4765,117 @@ graphs:
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_blocks_resources_of_uncreated_graph() {
|
||||
async fn apply_creates_graph_and_unblocks_dependents() {
|
||||
let dir = fixture();
|
||||
write_state_resources(dir.path(), &[]);
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(out.ok, "{:?}", out.diagnostics);
|
||||
assert_eq!(out.applied_count, 0);
|
||||
assert!(!out.converged);
|
||||
assert!(out.converged, "{out:?}");
|
||||
let by_resource: BTreeMap<&str, &PlanChange> = out
|
||||
.changes
|
||||
.iter()
|
||||
.map(|change| (change.resource.as_str(), change))
|
||||
.collect();
|
||||
// Stage 4A: the create executes, and its dependents apply in-run.
|
||||
assert_eq!(
|
||||
by_resource["graph.knowledge"].disposition,
|
||||
Some(ApplyDisposition::Deferred)
|
||||
Some(ApplyDisposition::Applied)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["schema.knowledge"].disposition,
|
||||
Some(ApplyDisposition::Applied)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["query.knowledge.find_person"].disposition,
|
||||
Some(ApplyDisposition::Applied)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["policy.base"].disposition,
|
||||
Some(ApplyDisposition::Applied)
|
||||
);
|
||||
// The graph exists on disk and opens; state records everything.
|
||||
let graph_uri = derived_graph_uri(dir.path(), "knowledge");
|
||||
let db = Omnigraph::open_read_only(&graph_uri).await.unwrap();
|
||||
let desired = validate_config_dir(dir.path());
|
||||
assert_eq!(
|
||||
sha256_hex(db.schema_source().as_bytes()),
|
||||
desired.resource_digests["schema.knowledge"]
|
||||
);
|
||||
let state = read_state_json(dir.path());
|
||||
assert_eq!(
|
||||
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
|
||||
desired.resource_digests["schema.knowledge"]
|
||||
);
|
||||
assert_eq!(
|
||||
state["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"applied"
|
||||
);
|
||||
// The create's sidecar was retired after the state CAS landed.
|
||||
assert!(
|
||||
!dir.path().join(CLUSTER_RECOVERIES_DIR).exists()
|
||||
|| fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR))
|
||||
.unwrap()
|
||||
.next()
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_create_failure_blocks_dependents_and_keeps_sidecar() {
|
||||
let dir = fixture();
|
||||
write_state_resources(dir.path(), &[]);
|
||||
// Make the init fail its strict preflight: a junk _schema.pg already
|
||||
// sits at the derived root (the engine refuses to overwrite it).
|
||||
let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni");
|
||||
fs::create_dir_all(&root).unwrap();
|
||||
fs::write(root.join("_schema.pg"), "junk").unwrap();
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(!out.ok);
|
||||
assert!(
|
||||
out.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.code == "graph_create_failed")
|
||||
);
|
||||
let by_resource: BTreeMap<&str, &PlanChange> = out
|
||||
.changes
|
||||
.iter()
|
||||
.map(|change| (change.resource.as_str(), change))
|
||||
.collect();
|
||||
// Dependents are demoted: the run tells the truth about what executed.
|
||||
assert_eq!(
|
||||
by_resource["graph.knowledge"].disposition,
|
||||
Some(ApplyDisposition::Blocked)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["query.knowledge.find_person"].disposition,
|
||||
Some(ApplyDisposition::Blocked)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["query.knowledge.find_person"].reason.as_deref(),
|
||||
Some("dependency_missing")
|
||||
Some("dependency_not_applied")
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["policy.base"].reason.as_deref(),
|
||||
Some("dependency_missing")
|
||||
by_resource["policy.base"].disposition,
|
||||
Some(ApplyDisposition::Blocked)
|
||||
);
|
||||
// Statuses for blocked resources are recorded (state changed), but no
|
||||
// resource digests moved.
|
||||
assert!(!out.converged);
|
||||
// The sidecar stays for the sweep to classify next run.
|
||||
assert!(
|
||||
fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR))
|
||||
.unwrap()
|
||||
.next()
|
||||
.is_some()
|
||||
);
|
||||
// No graph digests moved.
|
||||
let state = read_state_json(dir.path());
|
||||
assert_eq!(state["state_revision"], 2);
|
||||
assert!(
|
||||
state["applied_revision"]["resources"]
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.is_empty()
|
||||
);
|
||||
assert_eq!(
|
||||
state["resource_statuses"]["policy.base"]["status"],
|
||||
"blocked"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -5147,6 +5459,47 @@ graphs:
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_blocks_create_while_recovery_pending() {
|
||||
let dir = fixture();
|
||||
write_state_resources(dir.path(), &[]);
|
||||
// A kept (row 5) sidecar: partial root that cannot be opened.
|
||||
let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni");
|
||||
fs::create_dir_all(&root).unwrap();
|
||||
fs::write(root.join("_schema.pg"), "junk").unwrap();
|
||||
let sidecar = write_create_sidecar(dir.path(), "knowledge", "whatever", "01PEND");
|
||||
|
||||
let out = apply_config_dir(dir.path()).await;
|
||||
assert!(!out.ok); // row 5 is an error condition
|
||||
let by_resource: BTreeMap<&str, &PlanChange> = out
|
||||
.changes
|
||||
.iter()
|
||||
.map(|change| (change.resource.as_str(), change))
|
||||
.collect();
|
||||
// The pending recovery blocks the create and its dependents; the
|
||||
// executor never attempts the init.
|
||||
assert_eq!(
|
||||
by_resource["graph.knowledge"].disposition,
|
||||
Some(ApplyDisposition::Blocked)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["graph.knowledge"].reason.as_deref(),
|
||||
Some("cluster_recovery_pending")
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["query.knowledge.find_person"].reason.as_deref(),
|
||||
Some("cluster_recovery_pending")
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["policy.base"].reason.as_deref(),
|
||||
Some("cluster_recovery_pending")
|
||||
);
|
||||
assert!(sidecar.exists());
|
||||
// The sweep's Error status is what persists — not a generic Blocked.
|
||||
let state = read_state_json(dir.path());
|
||||
assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_warns_on_pending_recovery_sidecar() {
|
||||
let dir = fixture();
|
||||
|
|
@ -5173,23 +5526,23 @@ graphs:
|
|||
.iter()
|
||||
.map(|change| (change.resource.as_str(), change))
|
||||
.collect();
|
||||
// Empty state: graph/schema creates are deferred, query/policy blocked
|
||||
// on the uncreated graph — and plan says so before apply runs.
|
||||
// Stage 4A: graph/schema creates are executable, and dependents ride
|
||||
// the same run — plan previews exactly that.
|
||||
assert_eq!(
|
||||
by_resource["graph.knowledge"].disposition,
|
||||
Some(ApplyDisposition::Deferred)
|
||||
Some(ApplyDisposition::Applied)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["schema.knowledge"].disposition,
|
||||
Some(ApplyDisposition::Deferred)
|
||||
Some(ApplyDisposition::Applied)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["query.knowledge.find_person"].disposition,
|
||||
Some(ApplyDisposition::Blocked)
|
||||
Some(ApplyDisposition::Applied)
|
||||
);
|
||||
assert_eq!(
|
||||
by_resource["policy.base"].reason.as_deref(),
|
||||
Some("dependency_missing")
|
||||
by_resource["policy.base"].disposition,
|
||||
Some(ApplyDisposition::Applied)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue